Skip to content

Commit 7b7fbe3

Browse files
fix: update Requeue and Reassign logic to fix performance degradation when many events are queued (#310)
Logic for requeueing and reassigning did not limit the number of step runs to requeue, so when events accumulate with no worker present it causes memory to spike along with a very high query latency on the database. This commit limits the number of step runs returned in the requeue and reassign queries, and also properly locks step run rows for these queries so only a step run in a PENDING or PENDING_ASSIGNMENT state can be requeued. It also improves performance of the `AssignStepRunToWorker` query and ensures that `maxRuns` on workers are always respected through the introduction of a `WorkerSemaphore` model. The value gets decremented when a step run is assigned and incremented when a step run is in a final state. Co-authored-by: Luca Steeb <[email protected]> * Update controller.go --------- Co-authored-by: steebchen <[email protected]>
1 parent e385e09 commit 7b7fbe3

File tree

25 files changed

+801
-327
lines changed

25 files changed

+801
-327
lines changed

examples/loadtest/cli/cli_e2e_test.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111

1212
"go.uber.org/goleak"
1313

14+
"github.com/hatchet-dev/hatchet/internal/config/shared"
15+
"github.com/hatchet-dev/hatchet/internal/logger"
1416
"github.com/hatchet-dev/hatchet/internal/testutils"
1517
)
1618

@@ -22,8 +24,18 @@ func TestLoadCLI(t *testing.T) {
2224
eventsPerSecond int
2325
delay time.Duration
2426
wait time.Duration
27+
workerDelay time.Duration
2528
concurrency int
2629
}
30+
31+
l = logger.NewStdErr(
32+
&shared.LoggerConfigFile{
33+
Level: "warn",
34+
Format: "console",
35+
},
36+
"loadtest",
37+
)
38+
2739
tests := []struct {
2840
name string
2941
args args
@@ -46,6 +58,16 @@ func TestLoadCLI(t *testing.T) {
4658
wait: 30 * time.Second,
4759
concurrency: 0,
4860
},
61+
}, {
62+
name: "test for many queued events and little worker throughput",
63+
args: args{
64+
duration: 60 * time.Second,
65+
eventsPerSecond: 100,
66+
delay: 0 * time.Second,
67+
workerDelay: 60 * time.Second,
68+
wait: 240 * time.Second,
69+
concurrency: 0,
70+
},
4971
}}
5072

5173
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
@@ -65,8 +87,7 @@ func TestLoadCLI(t *testing.T) {
6587

6688
for _, tt := range tests {
6789
t.Run(tt.name, func(t *testing.T) {
68-
69-
if err := do(tt.args.duration, tt.args.eventsPerSecond, tt.args.delay, tt.args.wait, tt.args.concurrency); (err != nil) != tt.wantErr {
90+
if err := do(tt.args.duration, tt.args.eventsPerSecond, tt.args.delay, tt.args.wait, tt.args.concurrency, tt.args.workerDelay); (err != nil) != tt.wantErr {
7091
t.Errorf("do() error = %v, wantErr %v", err, tt.wantErr)
7192
}
7293
})

examples/loadtest/cli/do.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
"time"
88
)
99

10-
func do(duration time.Duration, eventsPerSecond int, delay time.Duration, wait time.Duration, concurrency int) error {
11-
log.Printf("testing with duration=%s, eventsPerSecond=%d, delay=%s, wait=%s, concurrency=%d", duration, eventsPerSecond, delay, wait, concurrency)
10+
func do(duration time.Duration, eventsPerSecond int, delay time.Duration, wait time.Duration, concurrency int, workerDelay time.Duration) error {
11+
l.Info().Msgf("testing with duration=%s, eventsPerSecond=%d, delay=%s, wait=%s, concurrency=%d", duration, eventsPerSecond, delay, wait, concurrency)
1212

1313
ctx, cancel := context.WithCancel(context.Background())
1414
defer cancel()
@@ -23,6 +23,11 @@ func do(duration time.Duration, eventsPerSecond int, delay time.Duration, wait t
2323
ch := make(chan int64, 2)
2424
durations := make(chan time.Duration, eventsPerSecond*int(duration.Seconds())*3)
2525
go func() {
26+
if workerDelay.Seconds() > 0 {
27+
l.Info().Msgf("wait %s before starting the worker", workerDelay)
28+
time.Sleep(workerDelay)
29+
}
30+
l.Info().Msg("starting worker now")
2631
count, uniques := run(ctx, delay, durations, concurrency)
2732
ch <- count
2833
ch <- uniques
@@ -35,7 +40,7 @@ func do(duration time.Duration, eventsPerSecond int, delay time.Duration, wait t
3540
executed := <-ch
3641
uniques := <-ch
3742

38-
log.Printf("ℹ️ emitted %d, executed %d, uniques %d, using %d events/s", emitted, executed, uniques, eventsPerSecond)
43+
l.Info().Msgf("emitted %d, executed %d, uniques %d, using %d events/s", emitted, executed, uniques, eventsPerSecond)
3944

4045
if executed == 0 {
4146
return fmt.Errorf("❌ no events executed")
@@ -53,6 +58,7 @@ func do(duration time.Duration, eventsPerSecond int, delay time.Duration, wait t
5358
totalDurationScheduled += <-scheduled
5459
}
5560
scheduleTimePerEvent := totalDurationScheduled / time.Duration(emitted)
61+
5662
log.Printf("ℹ️ average scheduling time per event: %s", scheduleTimePerEvent)
5763

5864
if emitted != executed {

examples/loadtest/cli/emit.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package main
33
import (
44
"context"
55
"fmt"
6-
"log"
76
"sync"
87
"time"
98

@@ -34,25 +33,27 @@ func emit(ctx context.Context, amountPerSecond int, duration time.Duration, sche
3433
select {
3534
case <-ticker.C:
3635
mx.Lock()
37-
id += 1
38-
mx.Unlock()
36+
id++
3937

4038
go func(id int64) {
39+
var err error
4140
ev := Event{CreatedAt: time.Now(), ID: id}
42-
fmt.Println("pushed event", ev.ID)
41+
l.Info().Msgf("pushed event %d", ev.ID)
4342
err = c.Event().Push(context.Background(), "load-test:event", ev)
4443
if err != nil {
4544
panic(fmt.Errorf("error pushing event: %w", err))
4645
}
4746
took := time.Since(ev.CreatedAt)
48-
fmt.Println("pushed event", ev.ID, "took", took)
47+
l.Info().Msgf("pushed event %d took %s", ev.ID, took)
4948
scheduled <- took
5049
}(id)
50+
51+
mx.Unlock()
5152
case <-timer:
52-
log.Println("done emitting events due to timer at", id)
53+
l.Info().Msg("done emitting events due to timer")
5354
return
5455
case <-ctx.Done():
55-
log.Println("done emitting events due to interruption at", id)
56+
l.Info().Msgf("done emitting events due to interruption at %d", id)
5657
return
5758
}
5859
}

examples/loadtest/cli/main.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,23 @@ import (
55
"time"
66

77
"github.com/joho/godotenv"
8+
"github.com/rs/zerolog"
89
"github.com/spf13/cobra"
10+
11+
"github.com/hatchet-dev/hatchet/internal/config/shared"
12+
"github.com/hatchet-dev/hatchet/internal/logger"
913
)
1014

15+
var l zerolog.Logger
16+
1117
func main() {
1218
var events int
1319
var concurrency int
1420
var duration time.Duration
1521
var wait time.Duration
1622
var delay time.Duration
23+
var workerDelay time.Duration
24+
var logLevel string
1725

1826
var loadtest = &cobra.Command{
1927
Use: "loadtest",
@@ -23,7 +31,15 @@ func main() {
2331
panic(err)
2432
}
2533

26-
if err := do(duration, events, delay, wait, concurrency); err != nil {
34+
l = logger.NewStdErr(
35+
&shared.LoggerConfigFile{
36+
Level: logLevel,
37+
Format: "console",
38+
},
39+
"loadtest",
40+
)
41+
42+
if err := do(duration, events, delay, wait, concurrency, workerDelay); err != nil {
2743
log.Println(err)
2844
panic("load test failed")
2945
}
@@ -35,6 +51,8 @@ func main() {
3551
loadtest.Flags().DurationVarP(&duration, "duration", "d", 10*time.Second, "duration specifies the total time to run the load test")
3652
loadtest.Flags().DurationVarP(&delay, "delay", "D", 0, "delay specifies the time to wait in each event to simulate slow tasks")
3753
loadtest.Flags().DurationVarP(&wait, "wait", "w", 10*time.Second, "wait specifies the total time to wait until events complete")
54+
loadtest.Flags().DurationVarP(&workerDelay, "workerDelay", "p", 0*time.Second, "workerDelay specifies the time to wait before starting the worker")
55+
loadtest.Flags().StringVarP(&logLevel, "level", "l", "info", "logLevel specifies the log level (debug, info, warn, error)")
3856

3957
cmd := &cobra.Command{Use: "app"}
4058
cmd.AddCommand(loadtest)

examples/loadtest/cli/run.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ func getConcurrencyKey(ctx worker.HatchetContext) (string, error) {
1919
}
2020

2121
func run(ctx context.Context, delay time.Duration, executions chan<- time.Duration, concurrency int) (int64, int64) {
22-
c, err := client.New()
22+
c, err := client.New(
23+
client.WithLogLevel("warn"),
24+
)
2325

2426
if err != nil {
2527
panic(err)
@@ -29,6 +31,8 @@ func run(ctx context.Context, delay time.Duration, executions chan<- time.Durati
2931
worker.WithClient(
3032
c,
3133
),
34+
worker.WithLogLevel("warn"),
35+
worker.WithMaxRuns(200),
3236
)
3337

3438
if err != nil {
@@ -60,7 +64,7 @@ func run(ctx context.Context, delay time.Duration, executions chan<- time.Durati
6064
}
6165

6266
took := time.Since(input.CreatedAt)
63-
fmt.Println("executing", input.ID, "took", took)
67+
l.Info().Msgf("executing %d took %s", input.ID, took)
6468

6569
mx.Lock()
6670
executions <- took
@@ -69,13 +73,16 @@ func run(ctx context.Context, delay time.Duration, executions chan<- time.Durati
6973
for i := 0; i < len(executed)-1; i++ {
7074
if executed[i] == input.ID {
7175
duplicate = true
72-
fmt.Println("DUPLICATE:", input.ID)
76+
break
7377
}
7478
}
79+
if duplicate {
80+
l.Warn().Str("step-run-id", ctx.StepRunId()).Msgf("duplicate %d", input.ID)
81+
}
7582
if !duplicate {
76-
uniques += 1
83+
uniques++
7784
}
78-
count += 1
85+
count++
7986
executed = append(executed, input.ID)
8087
mx.Unlock()
8188

examples/loadtest/rampup/do.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@ import (
77
"slices"
88
"sync"
99
"time"
10+
11+
"github.com/rs/zerolog"
1012
)
1113

14+
var l zerolog.Logger
15+
1216
func do(duration time.Duration, startEventsPerSecond, amount int, increase, delay, wait, maxAcceptableDuration, maxAcceptableSchedule time.Duration, includeDroppedEvents bool, concurrency int) error {
13-
log.Printf("testing with duration=%s, amount=%d, increase=%d, delay=%s, wait=%s, concurrency=%d", duration, amount, increase, delay, wait, concurrency)
17+
l.Debug().Msgf("testing with duration=%s, amount=%d, increase=%d, delay=%s, wait=%s, concurrency=%d", duration, amount, increase, delay, wait, concurrency)
1418

1519
ctx, cancel := context.WithCancel(context.Background())
1620
defer cancel()
@@ -32,7 +36,7 @@ func do(duration time.Duration, startEventsPerSecond, amount int, increase, dela
3236

3337
go func() {
3438
for s := range scheduled {
35-
log.Printf("scheduled %d", s)
39+
l.Debug().Msgf("scheduled %d", s)
3640
idLock.Lock()
3741
ids = append(ids, s)
3842
idLock.Unlock()
@@ -46,7 +50,7 @@ func do(duration time.Duration, startEventsPerSecond, amount int, increase, dela
4650
if includeDroppedEvents {
4751
panic(fmt.Errorf("event %d did not execute in time", s))
4852
} else {
49-
log.Printf("warning: event %d did not execute in time", s)
53+
l.Warn().Msgf("event %d did not execute in time", s)
5054
}
5155
}
5256
}
@@ -56,7 +60,7 @@ func do(duration time.Duration, startEventsPerSecond, amount int, increase, dela
5660

5761
go func() {
5862
for e := range executed {
59-
log.Printf("executed %d", e)
63+
l.Debug().Msgf("executed %d", e)
6064
idLock.Lock()
6165
ids = slices.DeleteFunc(ids, func(s int64) bool {
6266
return s == e

examples/loadtest/rampup/emit.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package rampup
33
import (
44
"context"
55
"fmt"
6-
"log"
76
"sync"
87
"time"
98

@@ -40,34 +39,36 @@ func emit(ctx context.Context, startEventsPerSecond, amount int, increase, durat
4039
if eventsPerSecond < 1 {
4140
eventsPerSecond = 1
4241
}
43-
log.Printf("emitting %d events per second", eventsPerSecond)
42+
l.Debug().Msgf("emitting %d events per second", eventsPerSecond)
4443
select {
4544
case <-time.After(time.Second / time.Duration(eventsPerSecond)):
4645
mx.Lock()
4746
id += 1
48-
mx.Unlock()
4947

5048
go func(id int64) {
49+
var err error
5150
ev := Event{CreatedAt: time.Now(), ID: id}
52-
fmt.Println("pushed event", ev.ID)
51+
l.Debug().Msgf("pushed event %d", ev.ID)
5352
err = c.Event().Push(context.Background(), "load-test:event", ev)
5453
if err != nil {
5554
panic(fmt.Errorf("error pushing event: %w", err))
5655
}
5756
took := time.Since(ev.CreatedAt)
58-
fmt.Println("pushed event", ev.ID, "took", took)
57+
l.Debug().Msgf("pushed event %d took %s", ev.ID, took)
5958

6059
if took > maxAcceptableSchedule {
6160
panic(fmt.Errorf("event took too long to schedule: %s at %d events/s", took, eventsPerSecond))
6261
}
6362

6463
scheduled <- id
6564
}(id)
65+
66+
mx.Unlock()
6667
case <-timer:
67-
log.Println("done emitting events due to timer at", id)
68+
l.Debug().Msgf("done emitting events due to timer at %d", id)
6869
return
6970
case <-ctx.Done():
70-
log.Println("done emitting events due to interruption at", id)
71+
l.Debug().Msgf("done emitting events due to interruption at %d", id)
7172
return
7273
}
7374
}

examples/loadtest/rampup/ramp_up_e2e_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"testing"
1010
"time"
1111

12+
"github.com/hatchet-dev/hatchet/internal/config/shared"
13+
"github.com/hatchet-dev/hatchet/internal/logger"
1214
"github.com/hatchet-dev/hatchet/internal/testutils"
1315
)
1416

@@ -30,6 +32,15 @@ func TestRampUp(t *testing.T) {
3032
concurrency int
3133
startEventsPerSecond int
3234
}
35+
36+
l = logger.NewStdErr(
37+
&shared.LoggerConfigFile{
38+
Level: "warn",
39+
Format: "console",
40+
},
41+
"loadtest",
42+
)
43+
3344
tests := []struct {
3445
name string
3546
args args

examples/loadtest/rampup/run.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ func getConcurrencyKey(ctx worker.HatchetContext) (string, error) {
1919
}
2020

2121
func run(ctx context.Context, delay time.Duration, concurrency int, maxAcceptableDuration time.Duration, hook chan<- time.Duration, executedCh chan<- int64) (int64, int64) {
22-
c, err := client.New()
22+
c, err := client.New(
23+
client.WithLogLevel("warn"),
24+
)
2325

2426
if err != nil {
2527
panic(err)
@@ -29,6 +31,8 @@ func run(ctx context.Context, delay time.Duration, concurrency int, maxAcceptabl
2931
worker.WithClient(
3032
c,
3133
),
34+
worker.WithLogLevel("warn"),
35+
worker.WithMaxRuns(200),
3236
)
3337

3438
if err != nil {
@@ -60,7 +64,8 @@ func run(ctx context.Context, delay time.Duration, concurrency int, maxAcceptabl
6064
}
6165

6266
took := time.Since(input.CreatedAt)
63-
fmt.Println("executing", input.ID, "took", took)
67+
68+
l.Debug().Msgf("executing %d took %s", input.ID, took)
6469

6570
if took > maxAcceptableDuration {
6671
hook <- took
@@ -78,7 +83,7 @@ func run(ctx context.Context, delay time.Duration, concurrency int, maxAcceptabl
7883
}
7984
}
8085
if duplicate {
81-
fmt.Println("DUPLICATE:", input.ID)
86+
l.Warn().Str("step-run-id", ctx.StepRunId()).Msgf("duplicate %d", input.ID)
8287
} else {
8388
uniques += 1
8489
}

0 commit comments

Comments
 (0)