Skip to content

Commit d3481ed

Browse files
committed
Potential reproduction of #20573
Evolve the code from Marek in https://github.com/serathius/etcd/blob/52673cee64e7da995f4bc700977a88a21a304066/tests/robustness/client/watch.go based on the work we did together on Friday, and #20234 from Marek before (#20221 (comment)). Please see discussion starting from #20349 (comment) Signed-off-by: Chun-Hung Tseng <[email protected]>
1 parent e7d0a6a commit d3481ed

File tree

3 files changed

+100
-43
lines changed

3 files changed

+100
-43
lines changed

tests/robustness/client/watch.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"context"
1919
"errors"
2020
"fmt"
21+
"time"
2122

2223
"go.uber.org/zap"
2324
"golang.org/x/sync/errgroup"
@@ -43,14 +44,28 @@ func CollectClusterWatchEvents(ctx context.Context, lg *zap.Logger, endpoints []
4344
return err
4445
})
4546
}
46-
47+
finish := make(chan struct{})
4748
g.Go(func() error {
4849
maxRevision := <-maxRevisionChan
4950
for _, memberChan := range memberMaxRevisionChans {
5051
memberChan <- maxRevision
5152
}
53+
close(finish)
5254
return nil
5355
})
56+
57+
for _, endpoint := range endpoints {
58+
g.Go(func() error {
59+
c, err := clientSet.NewClient([]string{endpoint})
60+
if err != nil {
61+
return err
62+
}
63+
defer c.Close()
64+
period := 10 * time.Millisecond
65+
return openWatchPeriodically(ctx, lg, &g, c, period, finish)
66+
})
67+
}
68+
5469
return g.Wait()
5570
}
5671

@@ -122,3 +137,42 @@ resetWatch:
122137
}
123138
}
124139
}
140+
141+
func openWatchPeriodically(ctx context.Context, lg *zap.Logger, g *errgroup.Group, c *RecordingClient, period time.Duration, finish <-chan struct{}) error {
142+
for {
143+
select {
144+
case <-ctx.Done():
145+
return ctx.Err()
146+
case <-finish:
147+
return nil
148+
case <-time.After(period):
149+
}
150+
g.Go(func() error {
151+
// targeting commit 866bc0717
152+
resp, err := c.Get(ctx, "/key")
153+
if err != nil {
154+
return err
155+
}
156+
// rev := resp.Header.Revision + (rand.Int64N(20) - 10) // reproduce OK (<2 min on my machine)
157+
rev := resp.Header.Revision - 10 //reproduce OK (<1 min on my machine)
158+
// rev := int64(0) // no reproduction
159+
// lg.Info("revisions", zap.Int64("rev", rev), zap.Int64("resp rev", resp.Header.Revision))
160+
161+
watchCtx, cancel := context.WithCancel(ctx)
162+
defer cancel()
163+
w := c.Watch(watchCtx, "", rev, true, true, true)
164+
for {
165+
select {
166+
case <-ctx.Done():
167+
return ctx.Err()
168+
case <-finish:
169+
return nil
170+
case _, ok := <-w:
171+
if !ok {
172+
return nil
173+
}
174+
}
175+
}
176+
})
177+
}
178+
}

tests/robustness/failpoint/failpoint.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,27 @@ const (
3636
)
3737

3838
var allFailpoints = []Failpoint{
39-
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic,
40-
DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
41-
BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
42-
BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
43-
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
44-
CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
45-
RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
46-
RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
47-
BeforeApplyOneConfChangeSleep,
48-
MemberReplace,
49-
MemberDowngrade,
50-
MemberDowngradeUpgrade,
51-
DropPeerNetwork,
52-
RaftBeforeSaveSleep,
53-
RaftAfterSaveSleep,
54-
ApplyBeforeOpenSnapshot,
55-
SleepBeforeSendWatchResponse,
39+
// KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic,
40+
// DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic,
41+
// BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic,
42+
// BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
43+
// CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic,
44+
// CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork,
45+
// RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic,
46+
// RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot,
47+
// BeforeApplyOneConfChangeSleep,
48+
// MemberReplace,
49+
// MemberDowngrade,
50+
// MemberDowngradeUpgrade,
51+
// DropPeerNetwork,
52+
// RaftBeforeSaveSleep,
53+
// RaftAfterSaveSleep,
54+
// ApplyBeforeOpenSnapshot,
55+
// SleepBeforeSendWatchResponse,
56+
RaftAfterSaveSnapPanic,
57+
RaftBeforeApplySnapPanic,
58+
RaftAfterApplySnapPanic,
59+
RaftAfterWALReleasePanic,
5660
}
5761

5862
func PickRandom(clus *e2e.EtcdProcessCluster, profile traffic.Profile) (Failpoint, error) {

tests/robustness/scenarios/scenarios.go

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424

2525
"go.etcd.io/etcd/api/v3/version"
2626
"go.etcd.io/etcd/client/pkg/v3/fileutil"
27-
"go.etcd.io/etcd/server/v3/etcdserver"
2827
"go.etcd.io/etcd/tests/v3/framework/e2e"
2928
"go.etcd.io/etcd/tests/v3/robustness/client"
3029
"go.etcd.io/etcd/tests/v3/robustness/failpoint"
@@ -40,16 +39,16 @@ type TrafficProfile struct {
4039
}
4140

4241
var trafficProfiles = []TrafficProfile{
43-
{
44-
Name: "EtcdHighTraffic",
45-
Traffic: traffic.EtcdPut,
46-
Profile: traffic.HighTrafficProfile,
47-
},
48-
{
49-
Name: "EtcdTrafficDeleteLeases",
50-
Traffic: traffic.EtcdPutDeleteLease,
51-
Profile: traffic.LowTraffic,
52-
},
42+
// {
43+
// Name: "EtcdHighTraffic",
44+
// Traffic: traffic.EtcdPut,
45+
// Profile: traffic.HighTrafficProfile,
46+
// },
47+
// {
48+
// Name: "EtcdTrafficDeleteLeases",
49+
// Traffic: traffic.EtcdPutDeleteLease,
50+
// Profile: traffic.LowTraffic,
51+
// },
5352
{
5453
Name: "KubernetesHighTraffic",
5554
Traffic: traffic.Kubernetes,
@@ -94,11 +93,11 @@ func Exploratory(_ *testing.T) []TestScenario {
9493
mixedVersionOption := options.WithClusterOptionGroups(random.PickRandom[options.ClusterOptions](mixedVersionOptionChoices))
9594

9695
baseOptions := []e2e.EPClusterOption{
97-
options.WithSnapshotCount(50, 100, 1000),
96+
options.WithSnapshotCount(50),
9897
options.WithSubsetOptions(randomizableOptions...),
9998
e2e.WithGoFailEnabled(true),
10099
// Set a low minimal compaction batch limit to allow for triggering multi batch compaction failpoints.
101-
options.WithCompactionBatchLimit(10, 100, 1000),
100+
options.WithCompactionBatchLimit(10),
102101
e2e.WithWatchProcessNotifyInterval(100 * time.Millisecond),
103102
}
104103

@@ -107,20 +106,20 @@ func Exploratory(_ *testing.T) []TestScenario {
107106
}
108107

109108
if e2e.CouldSetSnapshotCatchupEntries(e2e.BinPath.Etcd) {
110-
baseOptions = append(baseOptions, options.WithSnapshotCatchUpEntries(100, etcdserver.DefaultSnapshotCatchUpEntries))
109+
baseOptions = append(baseOptions, options.WithSnapshotCatchUpEntries(100))
111110
}
112111
scenarios := []TestScenario{}
113-
for _, tp := range trafficProfiles {
114-
name := filepath.Join(tp.Name, "ClusterOfSize1")
115-
clusterOfSize1Options := baseOptions
116-
clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1))
117-
scenarios = append(scenarios, TestScenario{
118-
Name: name,
119-
Traffic: tp.Traffic,
120-
Profile: tp.Profile,
121-
Cluster: *e2e.NewConfig(clusterOfSize1Options...),
122-
})
123-
}
112+
// for _, tp := range trafficProfiles {
113+
// name := filepath.Join(tp.Name, "ClusterOfSize1")
114+
// clusterOfSize1Options := baseOptions
115+
// clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1))
116+
// scenarios = append(scenarios, TestScenario{
117+
// Name: name,
118+
// Traffic: tp.Traffic,
119+
// Profile: tp.Profile,
120+
// Cluster: *e2e.NewConfig(clusterOfSize1Options...),
121+
// })
122+
// }
124123

125124
for _, tp := range trafficProfiles {
126125
name := filepath.Join(tp.Name, "ClusterOfSize3")

0 commit comments

Comments
 (0)