diff --git a/go.mod b/go.mod index 28d6dd87..5281ce86 100644 --- a/go.mod +++ b/go.mod @@ -152,7 +152,7 @@ require ( golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.39.0 // indirect golang.org/x/text v0.32.0 // indirect - golang.org/x/time v0.9.0 // indirect + golang.org/x/time v0.9.0 golang.org/x/tools v0.40.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251213004720-97cd9d5aeac2 // indirect diff --git a/server/internal/election/candidate.go b/server/internal/election/candidate.go index 7b34ea61..71806bf5 100644 --- a/server/internal/election/candidate.go +++ b/server/internal/election/candidate.go @@ -60,6 +60,7 @@ func NewCandidate( ttl: ttl, errCh: make(chan error, 1), onClaim: onClaim, + watchOp: store.Watch(electionName), } } @@ -105,7 +106,9 @@ func (c *Candidate) Start(ctx context.Context) error { c.done = done c.ticker = ticker - go c.watch(ctx) + if err := c.watch(ctx, done); err != nil { + return err + } if c.IsLeader() { c.logger.Debug().Msg("i am the current leader") @@ -129,11 +132,7 @@ func (c *Candidate) Stop(ctx context.Context) error { return nil } - if c.watchOp != nil { - c.watchOp.Close() - c.watchOp = nil - } - + c.watchOp.Close() c.done <- struct{}{} c.running = false @@ -221,33 +220,42 @@ func (c *Candidate) attemptClaim(ctx context.Context) error { return nil } -func (c *Candidate) watch(ctx context.Context) { - c.mu.Lock() - defer c.mu.Unlock() - +func (c *Candidate) watch(ctx context.Context, done chan struct{}) error { c.logger.Debug().Msg("starting watch") - c.watchOp = c.store.Watch(c.electionName) - err := c.watchOp.Watch(ctx, func(evt *storage.Event[*StoredElection]) { + err := c.watchOp.Watch(ctx, func(evt *storage.Event[*StoredElection]) error { switch evt.Type { case storage.EventTypeDelete: // The delete event will fire simultaneously with the ticker in some // types of outages, so the claim might have already been created // when this handler runs, even though its for a 'delete' event. if err := c.lockAndCheckClaim(ctx); err != nil { - c.errCh <- err + return err } case storage.EventTypeError: - c.logger.Debug().Err(evt.Err).Msg("encountered a watch error") - if errors.Is(evt.Err, storage.ErrWatchClosed) && c.running { - // Restart the watch if we're still running. - c.watch(ctx) - } + c.logger.Warn().Err(evt.Err).Msg("encountered a watch error") + case storage.EventTypeUnknown: + c.logger.Debug().Msg("encountered unknown watch event type") } + return nil }) if err != nil { - c.errCh <- fmt.Errorf("failed to start watch: %w", err) + return fmt.Errorf("failed to start watch: %w", err) } + go func() { + for { + select { + case <-ctx.Done(): + return + case <-done: + return + case err := <-c.watchOp.Error(): + c.errCh <- err + } + } + }() + + return nil } func (c *Candidate) release(ctx context.Context) error { diff --git a/server/internal/election/candidate_test.go b/server/internal/election/candidate_test.go index 44ef48e5..5d35dadc 100644 --- a/server/internal/election/candidate_test.go +++ b/server/internal/election/candidate_test.go @@ -16,11 +16,12 @@ import ( func TestCandidate(t *testing.T) { server := storagetest.NewEtcdTestServer(t) client := server.Client(t) - loggerFactory := testutils.LoggerFactory(t) store := election.NewElectionStore(client, uuid.NewString()) - electionSvc := election.NewService(store, loggerFactory) t.Run("basic functionality", func(t *testing.T) { + loggerFactory := testutils.LoggerFactory(t) + electionSvc := election.NewService(store, loggerFactory) + ctx := t.Context() name := election.Name(uuid.NewString()) candidate := electionSvc.NewCandidate(name, "host-1", time.Second) @@ -62,6 +63,9 @@ func TestCandidate(t *testing.T) { }) t.Run("multiple candidates", func(t *testing.T) { + loggerFactory := testutils.LoggerFactory(t) + electionSvc := election.NewService(store, loggerFactory) + bElected := make(chan struct{}, 1) ctx := t.Context() diff --git a/server/internal/migrate/runner.go b/server/internal/migrate/runner.go index e0e7cb1b..80d07975 100644 --- a/server/internal/migrate/runner.go +++ b/server/internal/migrate/runner.go @@ -74,7 +74,10 @@ func (r *Runner) Run(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, migrationTimeout) defer cancel() - r.watch(ctx) + r.watchOp = r.store.Revision.Watch() + if err := r.watch(ctx); err != nil { + return err + } defer r.watchOp.Close() r.candidate.AddHandlers(func(_ context.Context) { @@ -99,25 +102,15 @@ func (r *Runner) Run(ctx context.Context) error { } } -func (r *Runner) watch(ctx context.Context) { +func (r *Runner) watch(ctx context.Context) error { r.logger.Debug().Msg("starting watch") if len(r.migrations) == 0 { - r.errCh <- errors.New("watch called with empty migrations list") - return + return errors.New("watch called with empty migrations list") } targetRevision := r.migrations[len(r.migrations)-1].Identifier() - // Ensure that any previous watches were closed. Close thread-safe and - // idempotent. - if r.watchOp != nil { - r.watchOp.Close() - } - - // Since we're not specifying a start version on the watch, this will always - // fire for the current revision. - r.watchOp = r.store.Revision.Watch() - err := r.watchOp.Watch(ctx, func(evt *storage.Event[*StoredRevision]) { + err := r.watchOp.Watch(ctx, func(evt *storage.Event[*StoredRevision]) error { switch evt.Type { case storage.EventTypePut: if evt.Value.Identifier == targetRevision { @@ -126,15 +119,18 @@ func (r *Runner) watch(ctx context.Context) { }) } case storage.EventTypeError: - r.logger.Debug().Err(evt.Err).Msg("encountered a watch error") - if errors.Is(evt.Err, storage.ErrWatchClosed) { - r.watch(ctx) - } + r.logger.Warn().Err(evt.Err).Msg("encountered error in watch") + case storage.EventTypeUnknown: + r.logger.Debug().Msg("encountered unknown watch event type") } + return nil }) if err != nil { - r.errCh <- fmt.Errorf("failed to start watch: %w", err) + return fmt.Errorf("failed to start watch: %w", err) } + r.watchOp.PropagateErrors(ctx, r.errCh) + + return nil } func (r *Runner) runMigrations(ctx context.Context) error { diff --git a/server/internal/scheduler/service.go b/server/internal/scheduler/service.go index 0ac7deb5..d6b97a0f 100644 --- a/server/internal/scheduler/service.go +++ b/server/internal/scheduler/service.go @@ -2,7 +2,6 @@ package scheduler import ( "context" - "errors" "fmt" "path" "sync" @@ -68,17 +67,9 @@ func (s *Service) Start(ctx context.Context) error { scheduler.WithDistributedElector(s.elector) s.scheduler = scheduler - jobs, err := s.store.GetAll().Exec(ctx) - if err != nil { - s.logger.Debug().Err(err).Msg("failed to retrieve scheduled jobs from store") - } - for _, job := range jobs { - if err := s.registerJob(ctx, job); err != nil { - return fmt.Errorf("failed to register scheduled job: %w", err) - } + if err := s.watchJobChanges(ctx); err != nil { + return err } - - go s.watchJobChanges(ctx) s.scheduler.StartAsync() return nil @@ -163,36 +154,34 @@ func (s *Service) ListScheduledJobs() []string { } return ids } -func (s *Service) watchJobChanges(ctx context.Context) { +func (s *Service) watchJobChanges(ctx context.Context) error { s.logger.Debug().Msg("watching for scheduled job changes") s.watchOp = s.store.WatchJobs() - err := s.watchOp.Watch(ctx, func(e *storage.Event[*StoredScheduledJob]) { + err := s.watchOp.Watch(ctx, func(e *storage.Event[*StoredScheduledJob]) error { switch e.Type { case storage.EventTypePut: s.logger.Debug().Str("job_id", e.Value.ID).Msg("detected job creation or update") if err := s.registerJob(ctx, e.Value); err != nil { - s.errCh <- fmt.Errorf("failed to register job from watch: %w", err) + return fmt.Errorf("failed to register job from watch: %w", err) } case storage.EventTypeDelete: jobID := path.Base(e.Key) s.logger.Debug().Str("job_id", jobID).Msg("detected job deletion") s.UnregisterJob(jobID) case storage.EventTypeError: - s.logger.Debug().Err(e.Err).Msg("encountered a watch error") - if errors.Is(e.Err, storage.ErrWatchClosed) { - defer s.watchJobChanges(ctx) - } - default: - s.logger.Warn(). - Err(e.Err). - Str("event_type", string(e.Type)). - Msg("unhandled event type in scheduled job watch") + s.logger.Warn().Err(e.Err).Msg("encountered error in watch") + case storage.EventTypeUnknown: + s.logger.Debug().Msg("encountered unknown watch event type") } + return nil }) if err != nil { - s.logger.Debug().Err(err).Msg("job watch exited with error") + return fmt.Errorf("failed to initialize job watch: %w", err) } + s.watchOp.PropagateErrors(ctx, s.errCh) + + return nil } func (s *Service) Error() <-chan error { diff --git a/server/internal/storage/errors.go b/server/internal/storage/errors.go index fec94780..50e8c615 100644 --- a/server/internal/storage/errors.go +++ b/server/internal/storage/errors.go @@ -24,11 +24,3 @@ var ErrDuplicateKeysInTransaction = errors.New("duplicate keys in transaction") // ErrWatchAlreadyInProgress indicates that the WatchOp has already been started // and cannot be started again until it's closed. var ErrWatchAlreadyInProgress = errors.New("watch already in progress") - -// ErrWatchUntilTimedOut indicates that the condition given to Watch.Until was -// not met before the given timeout. -var ErrWatchUntilTimedOut = errors.New("timed out waiting for watch condition") - -// ErrWatchClosed indicates that the server has forced the watch to close. -// Callers should either restart or recreate the watch in that case. -var ErrWatchClosed = errors.New("watch closed by server") diff --git a/server/internal/storage/interface.go b/server/internal/storage/interface.go index 5cce40e5..91079d80 100644 --- a/server/internal/storage/interface.go +++ b/server/internal/storage/interface.go @@ -94,17 +94,15 @@ type DeleteValueOp[V Value] interface { type EventType string const ( - EventTypePut = "put" - EventTypeDelete = "delete" - EventTypeError = "error" - EventTypeUnknown = "unknown" + EventTypePut EventType = "put" + EventTypeDelete EventType = "delete" + EventTypeError EventType = "error" + EventTypeUnknown EventType = "unknown" ) // Event is generated by a modification to the watched key. type Event[V Value] struct { - // Err will be non-nil when Type is "error". The error will be an - // ErrWatchClosed if the server forced the watch to close. In that case, - // callers are able to call Watch() again to restart the watch. + // Err will be non-nil when Type is "error". Err error Type EventType Key string @@ -115,16 +113,20 @@ type Event[V Value] struct { // WatchOp watches one or more keys for modifications. type WatchOp[V Value] interface { - // Watch persistently watches for modifications until Close is called or - // until Etcd returns an error. Watch will automatically call Close in case - // of an error. This method is non-blocking. - Watch(ctx context.Context, handle func(e *Event[V])) error - // Until blocks until either the timeout has elapsed or until handle returns - // true. Until automatically calls Close before returning. - Until(ctx context.Context, timeout time.Duration, handle func(e *Event[V]) bool) error - // Close cancels the active watch and enables callers to use Watch or Until - // again. + // Watch performs an initial Get of current items, calls handle for each, + // then persistently watches for modifications, calling handle for each + // event. The initial Get is blocking, and then it starts the watch in a + // goroutine. Errors originating from the watch or handler are reported via + // the Error() channel. + Watch(ctx context.Context, handle func(e *Event[V]) error) error + // Close cancels the active watch and enables callers to use Watch again. Close() + // Error reports errors that originate from the watch or from the handler. + Error() <-chan error + // PropagateErrors will propagate errors from the watch's Error() channel to + // the given error channel in a goroutine until the given context is + // complete. + PropagateErrors(ctx context.Context, ch chan error) } // VersionUpdater are the methods that an operation can implement to support diff --git a/server/internal/storage/watch.go b/server/internal/storage/watch.go index 1b0981c6..449c409d 100644 --- a/server/internal/storage/watch.go +++ b/server/internal/storage/watch.go @@ -2,22 +2,26 @@ package storage import ( "context" - "errors" "fmt" + "slices" "sync" - "time" + "sync/atomic" - "github.com/pgEdge/control-plane/server/internal/utils" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "golang.org/x/time/rate" ) type watchOp[V Value] struct { - mu sync.Mutex - client *clientv3.Client - key string - options []clientv3.OpOption - ch clientv3.WatchChan - cancel context.CancelFunc + mu sync.Mutex + client *clientv3.Client + key string + options []clientv3.OpOption + revision int64 + ch clientv3.WatchChan + cancel context.CancelFunc + errCh chan error + running atomic.Bool } func NewWatchOp[V Value](client *clientv3.Client, key string, options ...clientv3.OpOption) WatchOp[V] { @@ -25,6 +29,8 @@ func NewWatchOp[V Value](client *clientv3.Client, key string, options ...clientv client: client, key: key, options: options, + // An error will terminate the watch, so we only need capacity for 1 + errCh: make(chan error, 1), } } @@ -36,93 +42,174 @@ func NewWatchPrefixOp[V Value](client *clientv3.Client, key string, options ...c client: client, key: ensureTrailingSlash(key), options: allOptions, + // An error will terminate the watch, so we only need capacity for 1 + errCh: make(chan error, 1), } } -func (o *watchOp[V]) start(ctx context.Context) error { +// load performs an initial Get of the current items at the watched key or +// prefix, calls handle for each, stores the revision from the response header. +func (o *watchOp[V]) load(ctx context.Context, handle func(e *Event[V]) error) error { o.mu.Lock() defer o.mu.Unlock() - if o.ch != nil { - return ErrWatchAlreadyInProgress + resp, err := o.client.Get(ctx, o.key, o.options...) + if err != nil { + return fmt.Errorf("failed to get initial items for watch: %w", err) + } + + for _, kv := range resp.Kvs { + if err := handle(convertKVToEvent[V](kv)); err != nil { + return err + } + } + + o.revision = resp.Header.Revision + + return nil +} + +// setupWatch initializes the etcd watch channel, starting from o.revision if +// it has been set. +func (o *watchOp[V]) setupWatch(ctx context.Context) error { + o.mu.Lock() + defer o.mu.Unlock() + + if o.cancel != nil { + // This will be true if we're restarting the watch. + o.cancel() + } + + watchOptions := slices.Clone(o.options) + if o.revision > 0 { + watchOptions = append(watchOptions, clientv3.WithRev(o.revision+1)) } ctx, cancel := context.WithCancel(ctx) o.cancel = cancel - o.ch = o.client.Watch(ctx, o.key, o.options...) + o.ch = o.client.Watch(ctx, o.key, watchOptions...) return nil } -func (o *watchOp[V]) Watch(ctx context.Context, handle func(e *Event[V])) error { - if err := o.start(ctx); err != nil { - return err +func (o *watchOp[V]) reportErr(err error) { + if err != nil && o.running.Load() { + // We avoid reporting errors if the watch was intentionally stopped. + o.errCh <- err + } +} + +func (o *watchOp[V]) Watch(ctx context.Context, handle func(e *Event[V]) error) error { + if o.running.Load() { + return ErrWatchAlreadyInProgress } + o.running.Store(true) + if err := o.load(ctx, handle); err != nil { + return err + } go func() { - o.mu.Lock() - - for resp := range o.ch { - if err := resp.Err(); err != nil { - o.mu.Unlock() - o.Close() - handle(&Event[V]{ - Type: EventTypeError, - Err: fmt.Errorf("%w: %s", ErrWatchClosed, err), - }) + // Allow 1 restart per second + restartLimiter := rate.NewLimiter(1, 1) + + for { + if err := o.setupWatch(ctx); err != nil { + o.reportErr(err) return } + eventLoop: + for { + select { + case resp := <-o.ch: + if resp.Header.Revision > o.revision { + // We always want to bump this revision, even in case of + // an error. + o.revision = resp.Header.Revision + } + if err := resp.Err(); err != nil { + // The watch can be interrupted for a few benign + // reasons. Rather than push that down to the clients, + // we only report an error if we're unable to open the + // watch again. + break eventLoop + } + for _, event := range resp.Events { + if err := handle(convertEvent[V](event)); err != nil { + o.reportErr(err) + o.Close() + return + } + } + case <-ctx.Done(): + o.reportErr(ctx.Err()) + return + } + } - for _, event := range resp.Events { - handle(convertEvent[V](event)) + if !o.running.Load() { + // Exit if Close was called. + return + } + if err := ctx.Err(); err != nil { + o.reportErr(err) + return + } + if err := restartLimiter.Wait(ctx); err != nil { + o.reportErr(fmt.Errorf("failed to wait for next watch restart: %w", err)) + return } } - - o.mu.Unlock() }() return nil } -func (o *watchOp[V]) Until(ctx context.Context, timeout time.Duration, handle func(e *Event[V]) bool) error { - defer o.Close() - - err := utils.WithTimeout(ctx, timeout, func(ctx context.Context) error { - if err := o.start(ctx); err != nil { - return err - } +func (o *watchOp[V]) Close() { + o.mu.Lock() + defer o.mu.Unlock() - o.mu.Lock() - defer o.mu.Unlock() + o.running.Store(false) + if o.cancel != nil { + o.cancel() + o.cancel = nil + } +} - for resp := range o.ch { - if err := resp.Err(); err != nil { - return fmt.Errorf("watch failed: %w", err) - } +func (o *watchOp[V]) Error() <-chan error { + return o.errCh +} - for _, event := range resp.Events { - if handle(convertEvent[V](event)) { - return nil +func (o *watchOp[V]) PropagateErrors(ctx context.Context, ch chan error) { + go func() { + for { + select { + case <-ctx.Done(): + return + case err := <-o.errCh: + // We intentionally drop errors that happen after the + // application context is cancelled. + if ctx.Err() == nil { + ch <- err } } } - - return nil - }) - if errors.Is(err, utils.ErrTimedOut) { - // Convert to a more specific timeout error - return ErrWatchUntilTimedOut - } - return err + }() } -func (o *watchOp[V]) Close() { - if o.cancel != nil { - o.cancel() +func convertKVToEvent[V Value](kv *mvccpb.KeyValue) *Event[V] { + v, err := decodeKV[V](kv) + if err != nil { + return &Event[V]{ + Type: EventTypeError, + Err: err, + } + } + return &Event[V]{ + Type: EventTypePut, + Key: string(kv.Key), + Value: v, + IsCreate: kv.CreateRevision == kv.ModRevision, } - o.mu.Lock() - o.ch = nil - o.mu.Unlock() } func convertEvent[V Value](in *clientv3.Event) *Event[V] { diff --git a/server/internal/storage/watch_test.go b/server/internal/storage/watch_test.go index f7bfd6bf..edfa11d8 100644 --- a/server/internal/storage/watch_test.go +++ b/server/internal/storage/watch_test.go @@ -1,107 +1,103 @@ package storage_test import ( - "context" "testing" - "time" - "github.com/pgEdge/control-plane/server/internal/storage" - "github.com/pgEdge/control-plane/server/internal/storage/storagetest" + "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/pgEdge/control-plane/server/internal/storage" + "github.com/pgEdge/control-plane/server/internal/storage/storagetest" ) func TestWatchOp(t *testing.T) { server := storagetest.NewEtcdTestServer(t) - // defer server.Close() client := server.Client(t) - // defer client.Close() - - t.Run("Until", func(t *testing.T) { - t.Run("success", func(t *testing.T) { - ctx := context.Background() - watch := storage.NewWatchOp[*TestValue](client, "foo") - - done := make(chan error, 1) - go func() { - done <- watch.Until(ctx, 5*time.Second, func(e *storage.Event[*TestValue]) bool { - expectedVal := &TestValue{ - SomeField: "bar", - } - expectedVal.SetVersion(1) - expected := &storage.Event[*TestValue]{ - Type: storage.EventTypePut, - Key: "foo", - IsCreate: true, - Value: expectedVal, - } - - assert.NoError(t, e.Err) - assert.Equal(t, expected, e) - - return true - }) - }() - - err := storage.NewCreateOp(client, "foo", &TestValue{SomeField: "bar"}). - Exec(ctx) - require.NoError(t, err) - - // Block until watch completes - err = <-done - - assert.NoError(t, err) - }) - - t.Run("timeout", func(t *testing.T) { - ctx := context.Background() - watch := storage.NewWatchOp[*TestValue](client, "bar") - - done := make(chan error, 1) - go func() { - done <- watch.Until(ctx, 500*time.Millisecond, func(e *storage.Event[*TestValue]) bool { - // Should not be reached in a successful run - t.Fail() - - return true - }) - }() - - // Block until watch completes - err := <-done - - assert.ErrorIs(t, err, storage.ErrWatchUntilTimedOut) - }) + + t.Run("delivers initial items via Get", func(t *testing.T) { + ctx := t.Context() + key := uuid.NewString() + + // Pre-create a key before starting the watch. + err := storage.NewCreateOp(client, key, &TestValue{SomeField: "existing"}). + Exec(ctx) + require.NoError(t, err) + + watch := storage.NewWatchOp[*TestValue](client, key) + + received := make(chan *storage.Event[*TestValue], 1) + handler := func(e *storage.Event[*TestValue]) error { + received <- e + return nil + } + require.NoError(t, watch.Watch(ctx, handler)) + t.Cleanup(watch.Close) + + e := <-received + assert.Equal(t, storage.EventTypePut, e.Type) + assert.Equal(t, key, e.Key) + assert.Equal(t, "existing", e.Value.SomeField) }) - t.Run("Watch", func(t *testing.T) { - ctx := context.Background() - watch := storage.NewWatchOp[*TestValue](client, "baz") - - done := make(chan bool, 1) - err := watch.Watch(ctx, func(e *storage.Event[*TestValue]) { - expectedVal := &TestValue{ - SomeField: "qux", - } - expectedVal.SetVersion(1) - expected := &storage.Event[*TestValue]{ - Type: storage.EventTypePut, - Key: "baz", - IsCreate: true, - Value: expectedVal, - } - - assert.NoError(t, e.Err) - assert.Equal(t, expected, e) - - done <- true - }) - assert.NoError(t, err) - - err = storage.NewCreateOp(client, "baz", &TestValue{SomeField: "qux"}). + t.Run("delivers subsequent modifications", func(t *testing.T) { + ctx := t.Context() + key := uuid.NewString() + + watch := storage.NewWatchOp[*TestValue](client, key) + + received := make(chan *storage.Event[*TestValue], 1) + handler := func(e *storage.Event[*TestValue]) error { + received <- e + return nil + } + require.NoError(t, watch.Watch(ctx, handler)) + t.Cleanup(watch.Close) + + err := storage.NewCreateOp(client, key, &TestValue{SomeField: "qux"}). + Exec(ctx) + require.NoError(t, err) + + e := <-received + assert.Equal(t, storage.EventTypePut, e.Type) + assert.Equal(t, key, e.Key) + assert.Equal(t, "qux", e.Value.SomeField) + assert.True(t, e.IsCreate) + }) + + t.Run("returns error from initial get", func(t *testing.T) { + ctx := t.Context() + key := uuid.NewString() + + watch := storage.NewWatchOp[*TestValue](client, key) + + // Pre-create a key so the initial Get delivers an event. + err := storage.NewCreateOp(client, key, &TestValue{SomeField: "v"}). + Exec(ctx) + require.NoError(t, err) + + sentinel := assert.AnError + handler := func(e *storage.Event[*TestValue]) error { + return sentinel + } + require.ErrorIs(t, watch.Watch(ctx, handler), sentinel) + }) + + t.Run("delivers error from handler", func(t *testing.T) { + ctx := t.Context() + watch := storage.NewWatchOp[*TestValue](client, "watch-err") + + sentinel := assert.AnError + handler := func(e *storage.Event[*TestValue]) error { + return sentinel + } + require.NoError(t, watch.Watch(ctx, handler)) + t.Cleanup(watch.Close) + + err := storage.NewCreateOp(client, "watch-err", &TestValue{SomeField: "v"}). Exec(ctx) require.NoError(t, err) - assert.True(t, <-done) + require.ErrorIs(t, <-watch.Error(), sentinel) }) } diff --git a/server/internal/testutils/logger.go b/server/internal/testutils/logger.go index 395d9374..7dbc41d5 100644 --- a/server/internal/testutils/logger.go +++ b/server/internal/testutils/logger.go @@ -12,7 +12,9 @@ func Logger(t testing.TB) zerolog.Logger { t.Helper() if testing.Verbose() { - return zerolog.New(zerolog.NewTestWriter(t)) + return zerolog.New(zerolog.NewTestWriter(t)).With(). + Str("test_name", t.Name()). + Logger() } return zerolog.Nop()