diff --git a/testdata/watch/defer/Taskfile.yml b/testdata/watch/defer/Taskfile.yml new file mode 100644 index 0000000000..dcc1761f07 --- /dev/null +++ b/testdata/watch/defer/Taskfile.yml @@ -0,0 +1,22 @@ +version: '3' + +tasks: + server: + sources: + - 'foo.txt' + cmds: + - echo "server start" + - defer: sleep 0.2; echo "server end" + - sleep 10 + + client: + cmds: + - sleep 0.2 + - echo "client start" + - defer: sleep 0.6; echo "client end" + - sleep 10 + + default: + deps: + - server + - client diff --git a/testdata/watch/port/Taskfile.yml b/testdata/watch/port/Taskfile.yml new file mode 100644 index 0000000000..0d32edfa93 --- /dev/null +++ b/testdata/watch/port/Taskfile.yml @@ -0,0 +1,25 @@ +version: '3' + +vars: + PORT: 8042 + +tasks: + server: + run: always + sources: + - 'foo.txt' + cmds: + - echo "server started" + - socat TCP-LISTEN:{{.PORT}},fork,reuseaddr - || [ $? -eq 130 ] + - defer: kill $(lsof -t -i:{{.PORT}}) && while lsof -i :{{.PORT}} > /dev/null; do sleep 1; done + + client: + cmds: + - until lsof -Pi :{{.PORT}} -sTCP:LISTEN | tail -n +2; do sleep 0.5; done + - echo "sending message" + - echo "Hello world" | socat - TCP:localhost:{{.PORT}} + + default: + deps: + - server + - client diff --git a/testdata/watch/sources/Taskfile.yaml b/testdata/watch/sources/Taskfile.yaml new file mode 100644 index 0000000000..918c0f8e06 --- /dev/null +++ b/testdata/watch/sources/Taskfile.yaml @@ -0,0 +1,8 @@ +version: '3' + +tasks: + default: + sources: + - "./**/*.txt" + cmds: + - echo "Task running!" diff --git a/watch.go b/watch.go index 8e7f7ccf7d..4987824f70 100644 --- a/watch.go +++ b/watch.go @@ -8,6 +8,7 @@ import ( "path/filepath" "slices" "strings" + "sync" "syscall" "time" @@ -15,7 +16,6 @@ import ( "github.com/puzpuzpuz/xsync/v4" "github.com/go-task/task/v3/errors" - "github.com/go-task/task/v3/internal/filepathext" "github.com/go-task/task/v3/internal/fingerprint" "github.com/go-task/task/v3/internal/fsnotifyext" "github.com/go-task/task/v3/internal/logger" @@ -25,8 +25,12 @@ import ( const defaultWaitTime = 100 * time.Millisecond +var refreshChan = make(chan string) + // watchTasks start watching the given tasks func (e *Executor) watchTasks(calls ...*Call) error { + var wg sync.WaitGroup + tasks := make([]string, len(calls)) for i, c := range calls { tasks[i] = c.Task @@ -35,15 +39,19 @@ func (e *Executor) watchTasks(calls ...*Call) error { e.Logger.Errf(logger.Green, "task: Started watching for tasks: %s\n", strings.Join(tasks, ", ")) ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for _, c := range calls { - go func() { + wg.Add(1) + go func(c *Call) { + defer wg.Done() + err := e.RunTask(ctx, c) if err == nil { e.Logger.Errf(logger.Green, "task: task \"%s\" finished running\n", c.Task) } else if !isContextError(err) { e.Logger.Errf(logger.Red, "%v\n", err) } - }() + }(c) } var waitTime time.Duration @@ -68,9 +76,24 @@ func (e *Executor) watchTasks(calls ...*Call) error { closeOnInterrupt(w) + watchFiles, err := e.collectSources(calls) + if err != nil { + cancel() + return err + } go func() { for { select { + case path := <-refreshChan: + // If a path is added its necessary to refresh the sources, otherwise the + // watcher may not pick up any changes in that new path. + _ = path + watchFiles, err = e.collectSources(calls) + if err != nil { + e.Logger.Errf(logger.Red, "%v\n", err) + continue + } + case event, ok := <-eventsChan: if !ok { cancel() @@ -78,41 +101,70 @@ func (e *Executor) watchTasks(calls ...*Call) error { } e.Logger.VerboseErrf(logger.Magenta, "task: received watch event: %v\n", event) - cancel() - ctx, cancel = context.WithCancel(context.Background()) - - e.Compiler.ResetCache() - - for _, c := range calls { - go func() { - if ShouldIgnore(event.Name) { - e.Logger.VerboseErrf(logger.Magenta, "task: event skipped for being an ignored dir: %s\n", event.Name) - return + // Check if this watch event should be ignored. + if ShouldIgnore(event.Name) { + e.Logger.VerboseErrf(logger.Magenta, "task: event skipped for being an ignored dir: %s\n", event.Name) + continue + } + if event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) || event.Has(fsnotify.Write) { + if !slices.Contains(watchFiles, event.Name) { + relPath := event.Name + if rel, err := filepath.Rel(e.Dir, event.Name); err == nil { + relPath = rel } - t, err := e.GetTask(c) - if err != nil { - e.Logger.Errf(logger.Red, "%v\n", err) - return + e.Logger.VerboseErrf(logger.Magenta, "task: skipped for file not in sources: %s\n", relPath) + continue + } + } + if event.Has(fsnotify.Create) { + createDir := false + if info, err := os.Stat(event.Name); err == nil { + if info.IsDir() { + createDir = true } - baseDir := filepathext.SmartJoin(e.Dir, t.Dir) - files, err := e.collectSources(calls) - if err != nil { + } + watchFiles, err = e.collectSources(calls) + if err != nil { + e.Logger.Errf(logger.Red, "%v\n", err) + continue + } + + if createDir { + // If the CREATE relates to a folder, update the registered watch dirs (immediately). + if err := e.registerWatchedDirs(w, calls...); err != nil { e.Logger.Errf(logger.Red, "%v\n", err) - return } - - if !event.Has(fsnotify.Remove) && !slices.Contains(files, event.Name) { - relPath, _ := filepath.Rel(baseDir, event.Name) + } else { + if !slices.Contains(watchFiles, event.Name) { + relPath := event.Name + if rel, err := filepath.Rel(e.Dir, event.Name); err == nil { + relPath = rel + } e.Logger.VerboseErrf(logger.Magenta, "task: skipped for file not in sources: %s\n", relPath) - return + continue } + } + } + + // The watch event is good, restart the task calls. + cancel() + wg.Wait() // This is the single wait point, for the entry loop and the following loop. + + e.Compiler.ResetCache() + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + for _, c := range calls { + wg.Add(1) + go func(c *Call) { + defer wg.Done() + err = e.RunTask(ctx, c) if err == nil { e.Logger.Errf(logger.Green, "task: task \"%s\" finished running\n", c.Task) } else if !isContextError(err) { e.Logger.Errf(logger.Red, "%v\n", err) } - }() + }(c) } case err, ok := <-w.Errors: switch { @@ -167,8 +219,25 @@ func (e *Executor) registerWatchedDirs(w *fsnotify.Watcher, calls ...*Call) erro if err != nil { return err } + dirs := []string{} for _, f := range files { - d := filepath.Dir(f) + dir := filepath.Dir(f) + if !slices.Contains(dirs, dir) { + dirs = append(dirs, dir) + } + } + + // Remove dirs from the watch, otherwise the watched dir may become stale and + // if the dir is recreated, it will not trigger any watch events. + e.watchedDirs.Range(func(dir string, value bool) bool { + if !slices.Contains(dirs, dir) { + e.watchedDirs.Delete(dir) + } + return true + }) + + // Add new dirs to the watch. + for _, d := range dirs { if isSet, ok := e.watchedDirs.Load(d); ok && isSet { continue } @@ -181,6 +250,9 @@ func (e *Executor) registerWatchedDirs(w *fsnotify.Watcher, calls ...*Call) erro e.watchedDirs.Store(d, true) relPath, _ := filepath.Rel(e.Dir, d) e.Logger.VerboseOutf(logger.Green, "task: watching new dir: %v\n", relPath) + + // Signal that the watcher should refresh its watch file list. + refreshChan <- d } return nil } diff --git a/watch_test.go b/watch_test.go index 8bef86ed49..d8bd0e8e22 100644 --- a/watch_test.go +++ b/watch_test.go @@ -8,6 +8,10 @@ import ( "context" "fmt" "os" + "os/exec" + "path/filepath" + "runtime" + "slices" "strings" "testing" "time" @@ -100,3 +104,281 @@ func TestShouldIgnore(t *testing.T) { }) } } + +func TestWatchSources(t *testing.T) { + t.Parallel() + + tests := []struct { + action string + path string + expectRestart bool + }{ + // Entry condition: file fubar/foo.txt exists. + {"create", "fubar/bar.txt", true}, + {"remove", "fubar/foo.txt", true}, + {"rename", "fubar/foo.txt", true}, + {"write", "fubar/foo.txt", true}, + {"create", "fubar/bar.text", false}, + {"remove", "fubar/foo.text", false}, + {"rename", "fubar/foo.text", false}, + {"write", "fubar/foo.text", false}, + } + + for _, tc := range tests { + tc := tc + t.Run(fmt.Sprintf("%s-%s", tc.action, tc.path), func(t *testing.T) { + t.Parallel() + + checks := []string{`Started watching for tasks: default`, `echo "Task running!"`} + + // Setup the watch dir. + tmpDir := t.TempDir() + data, _ := os.ReadFile("testdata/watch/sources/Taskfile.yaml") + os.WriteFile(filepath.Join(tmpDir, "Taskfile.yaml"), data, 0644) + testFile := filepath.Join(tmpDir, "fubar/foo.txt") + os.MkdirAll(filepath.Dir(testFile), 0755) + os.WriteFile(testFile, []byte("hello world"), 0644) + + // Correct test case paths. + tc.path = filepath.Join(tmpDir, tc.path) + + // Start the Task. + var buffer SyncBuffer + e := task.NewExecutor( + task.WithDir(tmpDir), + task.WithStdout(&buffer), + task.WithStderr(&buffer), + task.WithWatch(true), + task.WithVerbose(true), + ) + require.NoError(t, e.Setup()) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + for { + select { + case <-ctx.Done(): + return + default: + err := e.Run(ctx, &task.Call{Task: "default"}) + if err != nil { + panic(err) + } + } + } + }() + + // Introduce the test condition. + time.Sleep(200 * time.Millisecond) + switch tc.action { + case "create": + f, _ := os.OpenFile(tc.path, os.O_CREATE|os.O_WRONLY, 0644) + defer f.Close() + f.WriteString("watch test") + checks = append(checks, `watch event: CREATE`) + + case "remove": + if !tc.expectRestart { + f, _ := os.OpenFile(tc.path, os.O_CREATE|os.O_WRONLY, 0644) + f.Close() + time.Sleep(100 * time.Millisecond) + checks = append(checks, `watch event: CREATE`) + } + os.Remove(tc.path) + checks = append(checks, `watch event: REMOVE`) + + case "rename": + if !tc.expectRestart { + f, _ := os.OpenFile(tc.path, os.O_CREATE|os.O_WRONLY, 0644) + f.Close() + time.Sleep(100 * time.Millisecond) + checks = append(checks, `watch event: CREATE`) + } + dir := filepath.Dir(tc.path) + base := filepath.Base(tc.path) + ext := filepath.Ext(base) + name := base[:len(base)-len(ext)] + _b := []byte(name) + slices.Reverse(_b) + name = string(_b) + os.Rename(tc.path, filepath.Join(dir, name+ext)) + checks = append(checks, `watch event: RENAME`) + + case "write": + f, _ := os.OpenFile(tc.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + defer f.Close() + f.WriteString("watch test") + checks = append(checks, `watch event: WRITE`) + } + + // Observe the expected conditions. + time.Sleep(200 * time.Millisecond) + cancel() + if tc.expectRestart { + checks = append(checks, `echo "Task running!"`) + } else { + checks = append(checks, `skipped for file not in sources:`) + } + + output := buffer.buf.String() + t.Log(output) + for _, check := range checks { + if idx := strings.Index(output, check); idx == -1 { + t.Log(output) + t.Log(checks) + t.Fatalf("Expected output not observed in sequence: %s", check) + } else { + output = output[idx+len(check):] + } + } + }) + } +} + +func TestWatchDefer(t *testing.T) { + t.Parallel() + + // Setup the watch dir. + tmpDir := t.TempDir() + data, _ := os.ReadFile("testdata/watch/defer/Taskfile.yml") + os.WriteFile(filepath.Join(tmpDir, "Taskfile.yml"), data, 0644) + testFile := filepath.Join(tmpDir, "foo.txt") + os.MkdirAll(filepath.Dir(testFile), 0755) + os.WriteFile(testFile, []byte("hello world"), 0644) + + // Start the Task. + var buffer SyncBuffer + e := task.NewExecutor( + task.WithDir(tmpDir), + task.WithStdout(&buffer), + task.WithStderr(&buffer), + task.WithWatch(true), + task.WithVerbose(true), + ) + require.NoError(t, e.Setup()) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + for { + select { + case <-ctx.Done(): + return + default: + err := e.Run(ctx, &task.Call{Task: "default"}) + if err != nil { + panic(err) + } + } + } + }() + + // Trigger the watch. + time.Sleep(1000 * time.Millisecond) + f, _ := os.OpenFile(testFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + defer f.Close() + f.WriteString("watch test") + + // Observe the expected conditions. + time.Sleep(1000 * time.Millisecond) + + // End the test. + cancel() + + checks := []string{ + `server start`, + `client start`, + // Watch triggers. + `received watch event: WRITE`, + `server end`, + `client end`, + // Defer complete, tasks restarted. + `server start`, + `client start`, + } + output := buffer.buf.String() + t.Log(output) + t.Log(checks) + for i, check := range checks { + if idx := strings.Index(output, check); idx == -1 { + t.Fatalf("Expected output not observed in sequence: [%d] %s", i, check) + } else { + output = output[idx+len(check):] + } + } +} + +func TestWatchPort(t *testing.T) { + if runtime.GOOS != "linux" && runtime.GOOS != "darwin" { + t.Skip("Skipping test: only supported on Linux") + } + _, err := exec.LookPath("socat") + if err != nil { + t.Skip("socat not found in PATH, skipping test") + } + + t.Parallel() + + // Setup the watch dir. + tmpDir := t.TempDir() + data, _ := os.ReadFile("testdata/watch/port/Taskfile.yml") + os.WriteFile(filepath.Join(tmpDir, "Taskfile.yml"), data, 0644) + testFile := filepath.Join(tmpDir, "foo.txt") + os.MkdirAll(filepath.Dir(testFile), 0755) + os.WriteFile(testFile, []byte("hello world"), 0644) + + // Start the Task. + var buffer SyncBuffer + e := task.NewExecutor( + task.WithDir(tmpDir), + task.WithStdout(&buffer), + task.WithStderr(&buffer), + task.WithWatch(true), + task.WithVerbose(true), + ) + require.NoError(t, e.Setup()) + ctx, cancel := context.WithCancel(context.Background()) + go func() { + for { + select { + case <-ctx.Done(): + return + default: + err := e.Run(ctx, &task.Call{Task: "default"}) + if err != nil { + panic(err) + } + } + } + }() + + // Trigger the watch. + time.Sleep(1000 * time.Millisecond) + f, _ := os.OpenFile(testFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + defer f.Close() + f.WriteString("watch test") + + // Observe the expected conditions. + time.Sleep(1000 * time.Millisecond) + + // End the test. + cancel() + + checks := []string{ + `server start`, + `sending message`, + `Hello world`, + // Watch triggers. + `received watch event: WRITE`, + `server start`, + `sending message`, + `Hello world`, + } + output := buffer.buf.String() + t.Log(output) + t.Log(checks) + for i, check := range checks { + if idx := strings.Index(output, check); idx == -1 { + t.Fatalf("Expected output not observed in sequence: [%d] %s", i, check) + } else { + output = output[idx+len(check):] + } + } +} diff --git a/website/src/docs/faq.md b/website/src/docs/faq.md index 1efdbda771..a1c8014aa8 100644 --- a/website/src/docs/faq.md +++ b/website/src/docs/faq.md @@ -133,3 +133,41 @@ This is the list of core utils that are currently available: * `touch` * `xargs` * (more might be added in the future) + + +## Watcher unreliable restarts after watch-events + +Task uses Go contexts to manage the lifecycle of watched tasks. When a watch-event occurs, the current context of Task is cancelled which propagates to all watched tasks. Each watched task, upon cancellation, sends a signal to all running processes started by that task. Those processes then normally exit promptly. + +In most circumstances, this mechanism is reliable. However, if a task is running sophisticated application (e.g., a web server), some resources may not be immediately released by the application (or the operating system). As a result, restarting a watched task may fail if those resources are still in use. + +This can be resolved by using the task `defer` command to manage the shutdown of applications. Additionally, a task can include commands that block execution until its required resources have been fully released. The following example demonstrates both of these techniques: + +```yaml +version: '3' + +vars: + PORT: 8080 + +tasks: + server: + run: always + sources: + - 'foo.txt' + cmds: + - echo "server started" + - socat TCP-LISTEN:{{.PORT}},fork,reuseaddr - || [ $? -eq 130 ] + - defer: kill $(lsof -t -i:{{.PORT}}) && while lsof -i :{{.PORT}} > /dev/null; do sleep 1; done + + client: + cmds: + - until lsof -Pi :{{.PORT}} -sTCP:LISTEN | tail -n +2; do sleep 0.5; done + - echo "sending message" + - echo "Hello world" | socat - TCP:localhost:{{.PORT}} + + default: + deps: + - server + - client + +``` \ No newline at end of file