Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ require (
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.39.0 // indirect
golang.org/x/text v0.32.0 // indirect
golang.org/x/time v0.9.0 // indirect
golang.org/x/time v0.9.0
golang.org/x/tools v0.40.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251213004720-97cd9d5aeac2 // indirect
Expand Down
46 changes: 27 additions & 19 deletions server/internal/election/candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewCandidate(
ttl: ttl,
errCh: make(chan error, 1),
onClaim: onClaim,
watchOp: store.Watch(electionName),
}
}

Expand Down Expand Up @@ -105,7 +106,9 @@ func (c *Candidate) Start(ctx context.Context) error {
c.done = done
c.ticker = ticker

go c.watch(ctx)
if err := c.watch(ctx, done); err != nil {
return err
}

if c.IsLeader() {
c.logger.Debug().Msg("i am the current leader")
Expand All @@ -129,11 +132,7 @@ func (c *Candidate) Stop(ctx context.Context) error {
return nil
}

if c.watchOp != nil {
c.watchOp.Close()
c.watchOp = nil
}

c.watchOp.Close()
c.done <- struct{}{}
c.running = false

Expand Down Expand Up @@ -221,33 +220,42 @@ func (c *Candidate) attemptClaim(ctx context.Context) error {
return nil
}

func (c *Candidate) watch(ctx context.Context) {
c.mu.Lock()
defer c.mu.Unlock()

func (c *Candidate) watch(ctx context.Context, done chan struct{}) error {
c.logger.Debug().Msg("starting watch")

c.watchOp = c.store.Watch(c.electionName)
err := c.watchOp.Watch(ctx, func(evt *storage.Event[*StoredElection]) {
err := c.watchOp.Watch(ctx, func(evt *storage.Event[*StoredElection]) error {
switch evt.Type {
case storage.EventTypeDelete:
// The delete event will fire simultaneously with the ticker in some
// types of outages, so the claim might have already been created
// when this handler runs, even though its for a 'delete' event.
if err := c.lockAndCheckClaim(ctx); err != nil {
c.errCh <- err
return err
}
case storage.EventTypeError:
c.logger.Debug().Err(evt.Err).Msg("encountered a watch error")
if errors.Is(evt.Err, storage.ErrWatchClosed) && c.running {
// Restart the watch if we're still running.
c.watch(ctx)
}
c.logger.Warn().Err(evt.Err).Msg("encountered a watch error")
case storage.EventTypeUnknown:
c.logger.Debug().Msg("encountered unknown watch event type")
}
return nil
})
if err != nil {
c.errCh <- fmt.Errorf("failed to start watch: %w", err)
return fmt.Errorf("failed to start watch: %w", err)
}
go func() {
for {
select {
case <-ctx.Done():
return
case <-done:
return
case err := <-c.watchOp.Error():
c.errCh <- err
}
}
}()

return nil
}

func (c *Candidate) release(ctx context.Context) error {
Expand Down
8 changes: 6 additions & 2 deletions server/internal/election/candidate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
func TestCandidate(t *testing.T) {
server := storagetest.NewEtcdTestServer(t)
client := server.Client(t)
loggerFactory := testutils.LoggerFactory(t)
store := election.NewElectionStore(client, uuid.NewString())
electionSvc := election.NewService(store, loggerFactory)

t.Run("basic functionality", func(t *testing.T) {
loggerFactory := testutils.LoggerFactory(t)
electionSvc := election.NewService(store, loggerFactory)

ctx := t.Context()
name := election.Name(uuid.NewString())
candidate := electionSvc.NewCandidate(name, "host-1", time.Second)
Expand Down Expand Up @@ -62,6 +63,9 @@ func TestCandidate(t *testing.T) {
})

t.Run("multiple candidates", func(t *testing.T) {
loggerFactory := testutils.LoggerFactory(t)
electionSvc := election.NewService(store, loggerFactory)

bElected := make(chan struct{}, 1)

ctx := t.Context()
Expand Down
34 changes: 15 additions & 19 deletions server/internal/migrate/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ func (r *Runner) Run(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, migrationTimeout)
defer cancel()

r.watch(ctx)
r.watchOp = r.store.Revision.Watch()
if err := r.watch(ctx); err != nil {
return err
}
defer r.watchOp.Close()

r.candidate.AddHandlers(func(_ context.Context) {
Expand All @@ -99,25 +102,15 @@ func (r *Runner) Run(ctx context.Context) error {
}
}

func (r *Runner) watch(ctx context.Context) {
func (r *Runner) watch(ctx context.Context) error {
r.logger.Debug().Msg("starting watch")

if len(r.migrations) == 0 {
r.errCh <- errors.New("watch called with empty migrations list")
return
return errors.New("watch called with empty migrations list")
}
targetRevision := r.migrations[len(r.migrations)-1].Identifier()

// Ensure that any previous watches were closed. Close thread-safe and
// idempotent.
if r.watchOp != nil {
r.watchOp.Close()
}

// Since we're not specifying a start version on the watch, this will always
// fire for the current revision.
r.watchOp = r.store.Revision.Watch()
err := r.watchOp.Watch(ctx, func(evt *storage.Event[*StoredRevision]) {
err := r.watchOp.Watch(ctx, func(evt *storage.Event[*StoredRevision]) error {
switch evt.Type {
case storage.EventTypePut:
if evt.Value.Identifier == targetRevision {
Expand All @@ -126,15 +119,18 @@ func (r *Runner) watch(ctx context.Context) {
})
}
case storage.EventTypeError:
r.logger.Debug().Err(evt.Err).Msg("encountered a watch error")
if errors.Is(evt.Err, storage.ErrWatchClosed) {
r.watch(ctx)
}
r.logger.Warn().Err(evt.Err).Msg("encountered error in watch")
case storage.EventTypeUnknown:
r.logger.Debug().Msg("encountered unknown watch event type")
}
return nil
})
if err != nil {
r.errCh <- fmt.Errorf("failed to start watch: %w", err)
return fmt.Errorf("failed to start watch: %w", err)
}
r.watchOp.PropagateErrors(ctx, r.errCh)

return nil
}

func (r *Runner) runMigrations(ctx context.Context) error {
Expand Down
37 changes: 13 additions & 24 deletions server/internal/scheduler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package scheduler

import (
"context"
"errors"
"fmt"
"path"
"sync"
Expand Down Expand Up @@ -68,17 +67,9 @@ func (s *Service) Start(ctx context.Context) error {
scheduler.WithDistributedElector(s.elector)
s.scheduler = scheduler

jobs, err := s.store.GetAll().Exec(ctx)
if err != nil {
s.logger.Debug().Err(err).Msg("failed to retrieve scheduled jobs from store")
}
for _, job := range jobs {
if err := s.registerJob(ctx, job); err != nil {
return fmt.Errorf("failed to register scheduled job: %w", err)
}
if err := s.watchJobChanges(ctx); err != nil {
return err
}

go s.watchJobChanges(ctx)
s.scheduler.StartAsync()

return nil
Expand Down Expand Up @@ -163,36 +154,34 @@ func (s *Service) ListScheduledJobs() []string {
}
return ids
}
func (s *Service) watchJobChanges(ctx context.Context) {
func (s *Service) watchJobChanges(ctx context.Context) error {
s.logger.Debug().Msg("watching for scheduled job changes")

s.watchOp = s.store.WatchJobs()
err := s.watchOp.Watch(ctx, func(e *storage.Event[*StoredScheduledJob]) {
err := s.watchOp.Watch(ctx, func(e *storage.Event[*StoredScheduledJob]) error {
switch e.Type {
case storage.EventTypePut:
s.logger.Debug().Str("job_id", e.Value.ID).Msg("detected job creation or update")
if err := s.registerJob(ctx, e.Value); err != nil {
s.errCh <- fmt.Errorf("failed to register job from watch: %w", err)
return fmt.Errorf("failed to register job from watch: %w", err)
}
case storage.EventTypeDelete:
jobID := path.Base(e.Key)
s.logger.Debug().Str("job_id", jobID).Msg("detected job deletion")
s.UnregisterJob(jobID)
case storage.EventTypeError:
s.logger.Debug().Err(e.Err).Msg("encountered a watch error")
if errors.Is(e.Err, storage.ErrWatchClosed) {
defer s.watchJobChanges(ctx)
}
default:
s.logger.Warn().
Err(e.Err).
Str("event_type", string(e.Type)).
Msg("unhandled event type in scheduled job watch")
s.logger.Warn().Err(e.Err).Msg("encountered error in watch")
case storage.EventTypeUnknown:
s.logger.Debug().Msg("encountered unknown watch event type")
}
return nil
})
if err != nil {
s.logger.Debug().Err(err).Msg("job watch exited with error")
return fmt.Errorf("failed to initialize job watch: %w", err)
}
s.watchOp.PropagateErrors(ctx, s.errCh)

return nil
}

func (s *Service) Error() <-chan error {
Expand Down
8 changes: 0 additions & 8 deletions server/internal/storage/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,3 @@ var ErrDuplicateKeysInTransaction = errors.New("duplicate keys in transaction")
// ErrWatchAlreadyInProgress indicates that the WatchOp has already been started
// and cannot be started again until it's closed.
var ErrWatchAlreadyInProgress = errors.New("watch already in progress")

// ErrWatchUntilTimedOut indicates that the condition given to Watch.Until was
// not met before the given timeout.
var ErrWatchUntilTimedOut = errors.New("timed out waiting for watch condition")

// ErrWatchClosed indicates that the server has forced the watch to close.
// Callers should either restart or recreate the watch in that case.
var ErrWatchClosed = errors.New("watch closed by server")
34 changes: 18 additions & 16 deletions server/internal/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,15 @@ type DeleteValueOp[V Value] interface {
type EventType string

const (
EventTypePut = "put"
EventTypeDelete = "delete"
EventTypeError = "error"
EventTypeUnknown = "unknown"
EventTypePut EventType = "put"
EventTypeDelete EventType = "delete"
EventTypeError EventType = "error"
EventTypeUnknown EventType = "unknown"
)

// Event is generated by a modification to the watched key.
type Event[V Value] struct {
// Err will be non-nil when Type is "error". The error will be an
// ErrWatchClosed if the server forced the watch to close. In that case,
// callers are able to call Watch() again to restart the watch.
// Err will be non-nil when Type is "error".
Err error
Type EventType
Key string
Expand All @@ -115,16 +113,20 @@ type Event[V Value] struct {

// WatchOp watches one or more keys for modifications.
type WatchOp[V Value] interface {
// Watch persistently watches for modifications until Close is called or
// until Etcd returns an error. Watch will automatically call Close in case
// of an error. This method is non-blocking.
Watch(ctx context.Context, handle func(e *Event[V])) error
// Until blocks until either the timeout has elapsed or until handle returns
// true. Until automatically calls Close before returning.
Until(ctx context.Context, timeout time.Duration, handle func(e *Event[V]) bool) error
// Close cancels the active watch and enables callers to use Watch or Until
// again.
// Watch performs an initial Get of current items, calls handle for each,
// then persistently watches for modifications, calling handle for each
// event. The initial Get is blocking, and then it starts the watch in a
// goroutine. Errors originating from the watch or handler are reported via
// the Error() channel.
Watch(ctx context.Context, handle func(e *Event[V]) error) error
// Close cancels the active watch and enables callers to use Watch again.
Close()
// Error reports errors that originate from the watch or from the handler.
Error() <-chan error
// PropagateErrors will propagate errors from the watch's Error() channel to
// the given error channel in a goroutine until the given context is
// complete.
PropagateErrors(ctx context.Context, ch chan error)
}

// VersionUpdater are the methods that an operation can implement to support
Expand Down
Loading