Skip to content

Commit 8c4d3fc

Browse files
committed
Add e2e test to reproduce issue #19406
Signed-off-by: Miancheng Lin <[email protected]>
1 parent 5122d43 commit 8c4d3fc

File tree

2 files changed

+95
-0
lines changed

2 files changed

+95
-0
lines changed

server/storage/mvcc/kvstore_compaction.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyVal
4949

5050
tx := s.b.BatchTx()
5151
tx.LockOutsideApply()
52+
// gofail: var compactAfterAcquiredBatchTxLock struct{}
5253
keys, values := tx.UnsafeRange(schema.Key, last, end, int64(batchNum))
5354
for i := range keys {
5455
rev = BytesToRev(keys[i])

tests/e2e/reproduce_19406_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package e2e
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"testing"
8+
"time"
9+
10+
clientv3 "go.etcd.io/etcd/client/v3"
11+
"go.etcd.io/etcd/pkg/v3/stringutil"
12+
"go.etcd.io/etcd/tests/v3/framework/e2e"
13+
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
// TestReproduce19406 reproduces the issue: https://github.com/etcd-io/etcd/issues/19406
18+
func TestReproduce19406(t *testing.T) {
19+
e2e.BeforeTest(t)
20+
21+
compactionBatchLimit := 1
22+
23+
ctx := context.TODO()
24+
CompactionSleepInterval := 100 * time.Millisecond
25+
26+
clus, cerr := e2e.NewEtcdProcessCluster(ctx, t,
27+
e2e.WithClusterSize(1),
28+
e2e.WithGoFailEnabled(true),
29+
e2e.WithCompactionBatchLimit(compactionBatchLimit),
30+
e2e.WithCompactionSleepInterval(CompactionSleepInterval),
31+
)
32+
33+
require.NoError(t, cerr)
34+
t.Cleanup(func() { require.NoError(t, clus.Stop()) })
35+
36+
// produce some data
37+
cli := newClient(t, clus.EndpointsGRPC(), e2e.ClientConfig{})
38+
valueSize := 10
39+
var latestRevision int64
40+
for i := 0; i <= 50; i++ {
41+
resp, err := cli.Put(ctx, fmt.Sprintf("%d", i), stringutil.RandString(uint(valueSize)))
42+
require.NoError(t, err)
43+
latestRevision = resp.Header.Revision
44+
}
45+
46+
// sleep for CompactionSleepInterval
47+
require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "compactAfterAcquiredBatchTxLock",
48+
fmt.Sprintf(`sleep("%s")`, CompactionSleepInterval)))
49+
50+
// start compaction
51+
compactionFinished := make(chan struct{}) // Channel to signal completion of compaction
52+
var wg sync.WaitGroup
53+
wg.Add(1)
54+
go func() {
55+
defer wg.Done() // Decrement counter when done
56+
defer close(compactionFinished)
57+
t.Log("start compaction...")
58+
_, err := cli.Compact(context.Background(), latestRevision, clientv3.WithCompactPhysical())
59+
require.NoError(t, err)
60+
t.Log("finished compaction...")
61+
}()
62+
63+
// Compaction runs in batches. During each batch, it acquires a lock, releases it at the end,
64+
// and then waits for a CompactionSleepInterval before starting the next batch. This pause
65+
// allows PUT requests to be processed.
66+
//
67+
// Therefore, the expected PUT request latency should fall between
68+
// [each batch execution time, each batch execution time + CompactionSleepInterval],
69+
// which in this case is [CompactionSleepInterval, CompactionSleepInterval × 2].
70+
numOfWriters := 10
71+
putLatencyLimit := CompactionSleepInterval * 2
72+
for i := 1; i <= numOfWriters; i++ {
73+
wg.Add(1) // Increment WaitGroup counter
74+
75+
go func() {
76+
defer wg.Done() // Decrement counter when done
77+
for {
78+
select {
79+
case <-compactionFinished: // compaction finished
80+
return
81+
default:
82+
start := time.Now()
83+
_, err := cli.Put(ctx, "foo", "far")
84+
require.NoError(t, err)
85+
86+
if time.Since(start) > putLatencyLimit {
87+
t.Fatalf("Test failed: put latency is larger than %s", putLatencyLimit)
88+
}
89+
}
90+
}
91+
}()
92+
}
93+
wg.Wait()
94+
}

0 commit comments

Comments
 (0)