Skip to content

watch: add file delete/rename handling #10386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 97 additions & 38 deletions pkg/compose/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package compose
import (
"context"
"fmt"
"io/fs"
"os"
"path"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -50,9 +53,30 @@ type Trigger struct {

const quietPeriod = 2 * time.Second

func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint:gocyclo
needRebuild := make(chan string)
needSync := make(chan api.CopyOptions, 5)
// fileMapping contains the Compose service and modified host system path.
//
// For file sync, the container path is also included.
// For rebuild, there is no container path, so it is always empty.
type fileMapping struct {
// service that the file event is for.
service string
// hostPath that was created/modified/deleted outside the container.
//
// This is the path as seen from the user's perspective, e.g.
// - C:\Users\moby\Documents\hello-world\main.go
// - /Users/moby/Documents/hello-world/main.go
hostPath string
// containerPath for the target file inside the container (only populated
// for sync events, not rebuild).
//
// This is the path as used in Docker CLI commands, e.g.
// - /workdir/main.go
containerPath string
}

func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint:gocyclo
needRebuild := make(chan fileMapping)
needSync := make(chan fileMapping)

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
Expand Down Expand Up @@ -120,38 +144,37 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
case <-ctx.Done():
return nil
case event := <-watcher.Events():
path := event.Path()
hostPath := event.Path()

for _, trigger := range config.Watch {
logrus.Debugf("change detected on %s - comparing with %s", path, trigger.Path)
if watch.IsChild(trigger.Path, path) {
fmt.Fprintf(s.stderr(), "change detected on %s\n", path)
logrus.Debugf("change detected on %s - comparing with %s", hostPath, trigger.Path)
if watch.IsChild(trigger.Path, hostPath) {
fmt.Fprintf(s.stderr(), "change detected on %s\n", hostPath)

f := fileMapping{
hostPath: hostPath,
service: name,
}

switch trigger.Action {
case WatchActionSync:
logrus.Debugf("modified file %s triggered sync", path)
rel, err := filepath.Rel(trigger.Path, path)
logrus.Debugf("modified file %s triggered sync", hostPath)
rel, err := filepath.Rel(trigger.Path, hostPath)
if err != nil {
return err
}
dest := filepath.Join(trigger.Target, rel)
needSync <- api.CopyOptions{
Source: path,
Destination: fmt.Sprintf("%s:%s", name, dest),
}
// always use Unix-style paths for inside the container
f.containerPath = path.Join(trigger.Target, rel)
needSync <- f
case WatchActionRebuild:
logrus.Debugf("modified file %s requires image to be rebuilt", path)
needRebuild <- name
logrus.Debugf("modified file %s requires image to be rebuilt", hostPath)
needRebuild <- f
default:
return fmt.Errorf("watch action %q is not supported", trigger)
}
continue WATCH
}
}

// default
needRebuild <- name

case err := <-watcher.Errors():
return err
}
Expand Down Expand Up @@ -183,11 +206,25 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project)
return config, nil
}

func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services []string) {
return func(services []string) {
fmt.Fprintf(s.stderr(), "Updating %s after changes were detected\n", strings.Join(services, ", "))
func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services rebuildServices) {
return func(services rebuildServices) {
serviceNames := make([]string, 0, len(services))
allPaths := make(utils.Set[string])
for serviceName, paths := range services {
serviceNames = append(serviceNames, serviceName)
for p := range paths {
allPaths.Add(p)
}
}

fmt.Fprintf(
s.stderr(),
"Rebuilding %s after changes were detected:%s\n",
strings.Join(serviceNames, ", "),
strings.Join(append([]string{""}, allPaths.Elements()...), "\n - "),
)
imageIds, err := s.build(ctx, project, api.BuildOptions{
Services: services,
Services: serviceNames,
})
if err != nil {
fmt.Fprintf(s.stderr(), "Build failed\n")
Expand All @@ -201,11 +238,11 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje

err = s.Up(ctx, project, api.UpOptions{
Create: api.CreateOptions{
Services: services,
Services: serviceNames,
Inherit: true,
},
Start: api.StartOptions{
Services: services,
Services: serviceNames,
Project: project,
},
})
Expand All @@ -215,39 +252,61 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje
}
}

func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync chan api.CopyOptions) func() error {
func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync <-chan fileMapping) func() error {
return func() error {
for {
select {
case <-ctx.Done():
return nil
case opt := <-needSync:
err := s.Copy(ctx, project.Name, opt)
if err != nil {
return err
if fi, statErr := os.Stat(opt.hostPath); statErr == nil && !fi.IsDir() {
err := s.Copy(ctx, project.Name, api.CopyOptions{
Source: opt.hostPath,
Destination: fmt.Sprintf("%s:%s", opt.service, opt.containerPath),
})
if err != nil {
return err
}
fmt.Fprintf(s.stderr(), "%s updated\n", opt.containerPath)
} else if errors.Is(statErr, fs.ErrNotExist) {
_, err := s.Exec(ctx, project.Name, api.RunOptions{
Service: opt.service,
Command: []string{"rm", "-rf", opt.containerPath},
Index: 1,
})
if err != nil {
logrus.Warnf("failed to delete %q from %s: %v", opt.containerPath, opt.service, err)
}
fmt.Fprintf(s.stderr(), "%s deleted from container\n", opt.containerPath)
}
fmt.Fprintf(s.stderr(), "%s updated\n", opt.Destination)
}
}
}
}

func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input chan string, fn func(services []string)) {
services := utils.Set[string]{}
type rebuildServices map[string]utils.Set[string]

func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileMapping, fn func(services rebuildServices)) {
services := make(rebuildServices)
t := clock.AfterFunc(delay, func() {
if len(services) > 0 {
refresh := services.Elements()
services.Clear()
fn(refresh)
fn(services)
// TODO(milas): this is a data race!
services = make(rebuildServices)
}
})
for {
select {
case <-ctx.Done():
return
case service := <-input:
case e := <-input:
t.Reset(delay)
services.Add(service)
svc, ok := services[e.service]
if !ok {
svc = make(utils.Set[string])
services[e.service] = svc
}
svc.Add(e.hostPath)
}
}
}
13 changes: 8 additions & 5 deletions pkg/compose/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,27 @@ import (
)

func Test_debounce(t *testing.T) {
ch := make(chan string)
ch := make(chan fileMapping)
var (
ran int
got []string
)
clock := clockwork.NewFakeClock()
ctx, stop := context.WithCancel(context.TODO())
ctx, stop := context.WithCancel(context.Background())
t.Cleanup(stop)
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
debounce(ctx, clock, quietPeriod, ch, func(services []string) {
got = append(got, services...)
debounce(ctx, clock, quietPeriod, ch, func(services rebuildServices) {
for svc := range services {
got = append(got, svc)
}
ran++
stop()
})
return nil
})
for i := 0; i < 100; i++ {
ch <- "test"
ch <- fileMapping{service: "test"}
}
assert.Equal(t, ran, 0)
clock.Advance(quietPeriod)
Expand Down