Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions testdata/watch/defer/Taskfile.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: '3'

tasks:
server:
sources:
- 'foo.txt'
cmds:
- echo "server start"
- defer: sleep 0.2; echo "server end"
- sleep 10

client:
cmds:
- sleep 0.2
- echo "client start"
- defer: sleep 0.6; echo "client end"
- sleep 10

default:
deps:
- server
- client
25 changes: 25 additions & 0 deletions testdata/watch/port/Taskfile.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
version: '3'

vars:
PORT: 8042

tasks:
server:
run: always
sources:
- 'foo.txt'
cmds:
- echo "server started"
- socat TCP-LISTEN:{{.PORT}},fork,reuseaddr - || [ $? -eq 130 ]
- defer: kill $(lsof -t -i:{{.PORT}}) && while lsof -i :{{.PORT}} > /dev/null; do sleep 1; done

client:
cmds:
- until lsof -Pi :{{.PORT}} -sTCP:LISTEN | tail -n +2; do sleep 0.5; done
- echo "sending message"
- echo "Hello world" | socat - TCP:localhost:{{.PORT}}

default:
deps:
- server
- client
8 changes: 8 additions & 0 deletions testdata/watch/sources/Taskfile.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: '3'

tasks:
default:
sources:
- "./**/*.txt"
cmds:
- echo "Task running!"
126 changes: 99 additions & 27 deletions watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"path/filepath"
"slices"
"strings"
"sync"
"syscall"
"time"

"github.com/fsnotify/fsnotify"
"github.com/puzpuzpuz/xsync/v4"

"github.com/go-task/task/v3/errors"
"github.com/go-task/task/v3/internal/filepathext"
"github.com/go-task/task/v3/internal/fingerprint"
"github.com/go-task/task/v3/internal/fsnotifyext"
"github.com/go-task/task/v3/internal/logger"
Expand All @@ -25,8 +25,12 @@ import (

const defaultWaitTime = 100 * time.Millisecond

var refreshChan = make(chan string)

// watchTasks start watching the given tasks
func (e *Executor) watchTasks(calls ...*Call) error {
var wg sync.WaitGroup

tasks := make([]string, len(calls))
for i, c := range calls {
tasks[i] = c.Task
Expand All @@ -35,15 +39,19 @@ func (e *Executor) watchTasks(calls ...*Call) error {
e.Logger.Errf(logger.Green, "task: Started watching for tasks: %s\n", strings.Join(tasks, ", "))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, c := range calls {
go func() {
wg.Add(1)
go func(c *Call) {
defer wg.Done()

err := e.RunTask(ctx, c)
if err == nil {
e.Logger.Errf(logger.Green, "task: task \"%s\" finished running\n", c.Task)
} else if !isContextError(err) {
e.Logger.Errf(logger.Red, "%v\n", err)
}
}()
}(c)
}

var waitTime time.Duration
Expand All @@ -68,51 +76,95 @@ func (e *Executor) watchTasks(calls ...*Call) error {

closeOnInterrupt(w)

watchFiles, err := e.collectSources(calls)
if err != nil {
cancel()
return err
}
go func() {
for {
select {
case path := <-refreshChan:
// If a path is added its necessary to refresh the sources, otherwise the
// watcher may not pick up any changes in that new path.
_ = path
watchFiles, err = e.collectSources(calls)
if err != nil {
e.Logger.Errf(logger.Red, "%v\n", err)
continue
}

case event, ok := <-eventsChan:
if !ok {
cancel()
return
}
e.Logger.VerboseErrf(logger.Magenta, "task: received watch event: %v\n", event)

cancel()
ctx, cancel = context.WithCancel(context.Background())

e.Compiler.ResetCache()

for _, c := range calls {
go func() {
if ShouldIgnore(event.Name) {
e.Logger.VerboseErrf(logger.Magenta, "task: event skipped for being an ignored dir: %s\n", event.Name)
return
// Check if this watch event should be ignored.
if ShouldIgnore(event.Name) {
e.Logger.VerboseErrf(logger.Magenta, "task: event skipped for being an ignored dir: %s\n", event.Name)
continue
}
if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) || event.Has(fsnotify.Write) {
if !slices.Contains(watchFiles, event.Name) {
relPath := event.Name
if rel, err := filepath.Rel(e.Dir, event.Name); err == nil {
relPath = rel
}
t, err := e.GetTask(c)
if err != nil {
e.Logger.Errf(logger.Red, "%v\n", err)
return
e.Logger.VerboseErrf(logger.Magenta, "task: skipped for file not in sources: %s\n", relPath)
continue
}
}
if event.Has(fsnotify.Create) {
createDir := false
if info, err := os.Stat(event.Name); err == nil {
if info.IsDir() {
createDir = true
}
baseDir := filepathext.SmartJoin(e.Dir, t.Dir)
files, err := e.collectSources(calls)
if err != nil {
}
watchFiles, err = e.collectSources(calls)
if err != nil {
e.Logger.Errf(logger.Red, "%v\n", err)
continue
}

if createDir {
// If the CREATE relates to a folder, update the registered watch dirs (immediately).
if err := e.registerWatchedDirs(w, calls...); err != nil {
e.Logger.Errf(logger.Red, "%v\n", err)
return
}

if !event.Has(fsnotify.Remove) && !slices.Contains(files, event.Name) {
relPath, _ := filepath.Rel(baseDir, event.Name)
} else {
if !slices.Contains(watchFiles, event.Name) {
relPath := event.Name
if rel, err := filepath.Rel(e.Dir, event.Name); err == nil {
relPath = rel
}
e.Logger.VerboseErrf(logger.Magenta, "task: skipped for file not in sources: %s\n", relPath)
return
continue
}
}
}

// The watch event is good, restart the task calls.
cancel()
wg.Wait() // This is the single wait point, for the entry loop and the following loop.

e.Compiler.ResetCache()
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
for _, c := range calls {
wg.Add(1)
go func(c *Call) {
defer wg.Done()

err = e.RunTask(ctx, c)
if err == nil {
e.Logger.Errf(logger.Green, "task: task \"%s\" finished running\n", c.Task)
} else if !isContextError(err) {
e.Logger.Errf(logger.Red, "%v\n", err)
}
}()
}(c)
}
case err, ok := <-w.Errors:
switch {
Expand Down Expand Up @@ -167,8 +219,25 @@ func (e *Executor) registerWatchedDirs(w *fsnotify.Watcher, calls ...*Call) erro
if err != nil {
return err
}
dirs := []string{}
for _, f := range files {
d := filepath.Dir(f)
dir := filepath.Dir(f)
if !slices.Contains(dirs, dir) {
dirs = append(dirs, dir)
}
}

// Remove dirs from the watch, otherwise the watched dir may become stale and
// if the dir is recreated, it will not trigger any watch events.
e.watchedDirs.Range(func(dir string, value bool) bool {
if !slices.Contains(dirs, dir) {
e.watchedDirs.Delete(dir)
}
return true
})

// Add new dirs to the watch.
for _, d := range dirs {
if isSet, ok := e.watchedDirs.Load(d); ok && isSet {
continue
}
Expand All @@ -181,6 +250,9 @@ func (e *Executor) registerWatchedDirs(w *fsnotify.Watcher, calls ...*Call) erro
e.watchedDirs.Store(d, true)
relPath, _ := filepath.Rel(e.Dir, d)
e.Logger.VerboseOutf(logger.Green, "task: watching new dir: %v\n", relPath)

// Signal that the watcher should refresh its watch file list.
refreshChan <- d
}
return nil
}
Expand Down
Loading
Loading