Skip to content

Commit 5e59808

Browse files
committed
watch: add file delete/rename handling
This approach mimics Tilt's behavior[^1]: 1. At sync time, `stat` the path on host 2. If the path does not exist -> `rm` from container 3. If the path exists -> sync to container By handling things this way, we're always syncing based on the true state, regardless of what's happened in the interim. For example, a common pattern in POSIX tools is to create a file and then rename it over an existing file. Based on timing, this could be a sync, delete, sync (every file gets seen & processed) OR a delete, sync (by the the time we process the event, the "temp" file is already gone, so we just delete it from the container, where it never existed, but that's fine since we deletes are idempotent thanks to the `-f` flag on `rm`). Additionally, when syncing, if the `stat` call shows it's for a directory, we ignore it. Otherwise, duplicate, nested copies of the entire path could get synced in. (On some OSes, an event for the directory gets dispatched when a file inside of it is modified. In practice, I think we might want this pushed further down in the watching code, but since we're already `stat`ing the paths here now, it's a good place to handle it.) Lastly, there's some very light changes to the text when it does a full rebuild that will list out the (merged) set of paths that triggered it. We can continue to improve the output, but this is really helpful for understanding why it's rebuilding. [^1]: https://github.com/tilt-dev/tilt/blob/db7f887b0658ed042069dc0ff4cb266fe0596c23/internal/controllers/core/liveupdate/reconciler.go#L911
1 parent 03f0ed1 commit 5e59808

File tree

2 files changed

+105
-43
lines changed

2 files changed

+105
-43
lines changed

pkg/compose/watch.go

Lines changed: 97 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ package compose
1717
import (
1818
"context"
1919
"fmt"
20+
"io/fs"
21+
"os"
22+
"path"
2023
"path/filepath"
2124
"strings"
2225
"time"
@@ -50,9 +53,30 @@ type Trigger struct {
5053

5154
const quietPeriod = 2 * time.Second
5255

53-
func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint:gocyclo
54-
needRebuild := make(chan string)
55-
needSync := make(chan api.CopyOptions, 5)
56+
// fileMapping contains the Compose service and modified host system path.
57+
//
58+
// For file sync, the container path is also included.
59+
// For rebuild, there is no container path, so it is always empty.
60+
type fileMapping struct {
61+
// service that the file event is for.
62+
service string
63+
// hostPath that was created/modified/deleted outside the container.
64+
//
65+
// This is the path as seen from the user's perspective, e.g.
66+
// - C:\Users\moby\Documents\hello-world\main.go
67+
// - /Users/moby/Documents/hello-world/main.go
68+
hostPath string
69+
// containerPath for the target file inside the container (only populated
70+
// for sync events, not rebuild).
71+
//
72+
// This is the path as used in Docker CLI commands, e.g.
73+
// - /workdir/main.go
74+
containerPath string
75+
}
76+
77+
func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint:gocyclo
78+
needRebuild := make(chan fileMapping)
79+
needSync := make(chan fileMapping)
5680

5781
eg, ctx := errgroup.WithContext(ctx)
5882
eg.Go(func() error {
@@ -120,38 +144,37 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
120144
case <-ctx.Done():
121145
return nil
122146
case event := <-watcher.Events():
123-
path := event.Path()
147+
hostPath := event.Path()
124148

125149
for _, trigger := range config.Watch {
126-
logrus.Debugf("change detected on %s - comparing with %s", path, trigger.Path)
127-
if watch.IsChild(trigger.Path, path) {
128-
fmt.Fprintf(s.stderr(), "change detected on %s\n", path)
150+
logrus.Debugf("change detected on %s - comparing with %s", hostPath, trigger.Path)
151+
if watch.IsChild(trigger.Path, hostPath) {
152+
fmt.Fprintf(s.stderr(), "change detected on %s\n", hostPath)
153+
154+
f := fileMapping{
155+
hostPath: hostPath,
156+
service: name,
157+
}
129158

130159
switch trigger.Action {
131160
case WatchActionSync:
132-
logrus.Debugf("modified file %s triggered sync", path)
133-
rel, err := filepath.Rel(trigger.Path, path)
161+
logrus.Debugf("modified file %s triggered sync", hostPath)
162+
rel, err := filepath.Rel(trigger.Path, hostPath)
134163
if err != nil {
135164
return err
136165
}
137-
dest := filepath.Join(trigger.Target, rel)
138-
needSync <- api.CopyOptions{
139-
Source: path,
140-
Destination: fmt.Sprintf("%s:%s", name, dest),
141-
}
166+
// always use Unix-style paths for inside the container
167+
f.containerPath = path.Join(trigger.Target, rel)
168+
needSync <- f
142169
case WatchActionRebuild:
143-
logrus.Debugf("modified file %s requires image to be rebuilt", path)
144-
needRebuild <- name
170+
logrus.Debugf("modified file %s requires image to be rebuilt", hostPath)
171+
needRebuild <- f
145172
default:
146173
return fmt.Errorf("watch action %q is not supported", trigger)
147174
}
148175
continue WATCH
149176
}
150177
}
151-
152-
// default
153-
needRebuild <- name
154-
155178
case err := <-watcher.Errors():
156179
return err
157180
}
@@ -183,11 +206,25 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project)
183206
return config, nil
184207
}
185208

186-
func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services []string) {
187-
return func(services []string) {
188-
fmt.Fprintf(s.stderr(), "Updating %s after changes were detected\n", strings.Join(services, ", "))
209+
func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services rebuildServices) {
210+
return func(services rebuildServices) {
211+
serviceNames := make([]string, 0, len(services))
212+
allPaths := make(utils.Set[string])
213+
for serviceName, paths := range services {
214+
serviceNames = append(serviceNames, serviceName)
215+
for p := range paths {
216+
allPaths.Add(p)
217+
}
218+
}
219+
220+
fmt.Fprintf(
221+
s.stderr(),
222+
"Rebuilding %s after changes were detected:%s\n",
223+
strings.Join(serviceNames, ", "),
224+
strings.Join(append([]string{""}, allPaths.Elements()...), "\n - "),
225+
)
189226
imageIds, err := s.build(ctx, project, api.BuildOptions{
190-
Services: services,
227+
Services: serviceNames,
191228
})
192229
if err != nil {
193230
fmt.Fprintf(s.stderr(), "Build failed\n")
@@ -201,11 +238,11 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje
201238

202239
err = s.Up(ctx, project, api.UpOptions{
203240
Create: api.CreateOptions{
204-
Services: services,
241+
Services: serviceNames,
205242
Inherit: true,
206243
},
207244
Start: api.StartOptions{
208-
Services: services,
245+
Services: serviceNames,
209246
Project: project,
210247
},
211248
})
@@ -215,39 +252,61 @@ func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Proje
215252
}
216253
}
217254

218-
func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync chan api.CopyOptions) func() error {
255+
func (s *composeService) makeSyncFn(ctx context.Context, project *types.Project, needSync <-chan fileMapping) func() error {
219256
return func() error {
220257
for {
221258
select {
222259
case <-ctx.Done():
223260
return nil
224261
case opt := <-needSync:
225-
err := s.Copy(ctx, project.Name, opt)
226-
if err != nil {
227-
return err
262+
if fi, statErr := os.Stat(opt.hostPath); statErr == nil && !fi.IsDir() {
263+
err := s.Copy(ctx, project.Name, api.CopyOptions{
264+
Source: opt.hostPath,
265+
Destination: fmt.Sprintf("%s:%s", opt.service, opt.containerPath),
266+
})
267+
if err != nil {
268+
return err
269+
}
270+
fmt.Fprintf(s.stderr(), "%s updated\n", opt.containerPath)
271+
} else if errors.Is(statErr, fs.ErrNotExist) {
272+
_, err := s.Exec(ctx, project.Name, api.RunOptions{
273+
Service: opt.service,
274+
Command: []string{"rm", "-rf", opt.containerPath},
275+
Index: 1,
276+
})
277+
if err != nil {
278+
logrus.Warnf("failed to delete %q from %s: %v", opt.containerPath, opt.service, err)
279+
}
280+
fmt.Fprintf(s.stderr(), "%s deleted from container\n", opt.containerPath)
228281
}
229-
fmt.Fprintf(s.stderr(), "%s updated\n", opt.Destination)
230282
}
231283
}
232284
}
233285
}
234286

235-
func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input chan string, fn func(services []string)) {
236-
services := utils.Set[string]{}
287+
type rebuildServices map[string]utils.Set[string]
288+
289+
func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileMapping, fn func(services rebuildServices)) {
290+
services := make(rebuildServices)
237291
t := clock.AfterFunc(delay, func() {
238292
if len(services) > 0 {
239-
refresh := services.Elements()
240-
services.Clear()
241-
fn(refresh)
293+
fn(services)
294+
// TODO(milas): this is a data race!
295+
services = make(rebuildServices)
242296
}
243297
})
244298
for {
245299
select {
246300
case <-ctx.Done():
247301
return
248-
case service := <-input:
302+
case e := <-input:
249303
t.Reset(delay)
250-
services.Add(service)
304+
svc, ok := services[e.service]
305+
if !ok {
306+
svc = make(utils.Set[string])
307+
services[e.service] = svc
308+
}
309+
svc.Add(e.hostPath)
251310
}
252311
}
253312
}

pkg/compose/watch_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,27 @@ import (
2424
)
2525

2626
func Test_debounce(t *testing.T) {
27-
ch := make(chan string)
27+
ch := make(chan fileMapping)
2828
var (
2929
ran int
3030
got []string
3131
)
3232
clock := clockwork.NewFakeClock()
33-
ctx, stop := context.WithCancel(context.TODO())
33+
ctx, stop := context.WithCancel(context.Background())
34+
t.Cleanup(stop)
3435
eg, ctx := errgroup.WithContext(ctx)
3536
eg.Go(func() error {
36-
debounce(ctx, clock, quietPeriod, ch, func(services []string) {
37-
got = append(got, services...)
37+
debounce(ctx, clock, quietPeriod, ch, func(services rebuildServices) {
38+
for svc := range services {
39+
got = append(got, svc)
40+
}
3841
ran++
3942
stop()
4043
})
4144
return nil
4245
})
4346
for i := 0; i < 100; i++ {
44-
ch <- "test"
47+
ch <- fileMapping{service: "test"}
4548
}
4649
assert.Equal(t, ran, 0)
4750
clock.Advance(quietPeriod)

0 commit comments

Comments
 (0)