Skip to content

Commit 0427f80

Browse files
committed
Reject watch request with -1 revision and make rangeEvents safe against negative revision
1 parent b7420c5 commit 0427f80

File tree

4 files changed

+105
-5
lines changed

4 files changed

+105
-5
lines changed

server/etcdserver/api/v3rpc/watch.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,22 @@ func (sws *serverWatchStream) recvLoop() error {
272272
// support >= key queries
273273
creq.RangeEnd = []byte{}
274274
}
275+
if creq.StartRevision < 0 {
276+
wr := &pb.WatchResponse{
277+
Header: sws.newResponseHeader(sws.watchStream.Rev()),
278+
WatchId: clientv3.InvalidWatchID,
279+
Canceled: true,
280+
Created: true,
281+
CancelReason: rpctypes.ErrCompacted.Error(),
282+
}
283+
284+
select {
285+
case sws.ctrlStream <- wr:
286+
continue
287+
case <-sws.closec:
288+
return nil
289+
}
290+
}
275291

276292
err := sws.isWatchPermitted(creq)
277293
if err != nil {

server/storage/mvcc/watchable_store.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,9 @@ func rangeEventsWithReuse(lg *zap.Logger, b backend.Backend, evs []mvccpb.Event,
447447

448448
// rangeEvents returns events in range [minRev, maxRev).
449449
func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64) []mvccpb.Event {
450+
if minRev < 0 {
451+
minRev = 0
452+
}
450453
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
451454
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
452455
maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes)

server/storage/mvcc/watchable_store_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -405,13 +405,14 @@ func TestRangeEvents(t *testing.T) {
405405
expectEvents []mvccpb.Event
406406
}{
407407
// maxRev, top to bottom
408-
{minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]},
409-
{minRev: 2, maxRev: 5, expectEvents: expectEvents[0:3]},
410-
{minRev: 2, maxRev: 4, expectEvents: expectEvents[0:2]},
411-
{minRev: 2, maxRev: 3, expectEvents: expectEvents[0:1]},
412-
{minRev: 2, maxRev: 2, expectEvents: expectEvents[0:0]},
408+
{minRev: -1, maxRev: 6, expectEvents: expectEvents[0:5]},
409+
{minRev: -1, maxRev: 5, expectEvents: expectEvents[0:3]},
410+
{minRev: -1, maxRev: 4, expectEvents: expectEvents[0:2]},
411+
{minRev: -1, maxRev: 3, expectEvents: expectEvents[0:1]},
412+
{minRev: -1, maxRev: 2, expectEvents: expectEvents[0:0]},
413413

414414
// minRev, bottom to top
415+
{minRev: -1, maxRev: 6, expectEvents: expectEvents[0:5]},
415416
{minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]},
416417
{minRev: 3, maxRev: 6, expectEvents: expectEvents[1:5]},
417418
{minRev: 4, maxRev: 6, expectEvents: expectEvents[2:5]},

tests/integration/clientv3/watch_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"testing"
2626
"time"
2727

28+
"github.com/google/go-cmp/cmp"
29+
"github.com/stretchr/testify/assert"
2830
"github.com/stretchr/testify/require"
2931
"google.golang.org/grpc/metadata"
3032

@@ -33,6 +35,7 @@ import (
3335
"go.etcd.io/etcd/api/v3/version"
3436
clientv3 "go.etcd.io/etcd/client/v3"
3537
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
38+
"go.etcd.io/etcd/tests/v3/framework/integration"
3639
integration2 "go.etcd.io/etcd/tests/v3/framework/integration"
3740
)
3841

@@ -1133,3 +1136,80 @@ func testWatchClose(t *testing.T, wctx *watchctx) {
11331136
wresp, ok := <-wch
11341137
require.Falsef(t, ok, "read wch got %v; expected closed channel", wresp)
11351138
}
1139+
1140+
func TestWatch(t *testing.T) {
1141+
integration.BeforeTest(t)
1142+
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
1143+
t.Cleanup(func() { clus.Terminate(t) })
1144+
client := clus.Client(0)
1145+
ctx := t.Context()
1146+
1147+
tcs := []struct {
1148+
name string
1149+
key string
1150+
opts []clientv3.OpOption
1151+
wantError error
1152+
wantEvents []*clientv3.Event
1153+
}{
1154+
{
1155+
name: "Watch with negative revision",
1156+
key: "/",
1157+
opts: []clientv3.OpOption{clientv3.WithRev(-1)},
1158+
wantError: rpctypes.ErrCompacted,
1159+
},
1160+
{
1161+
name: "Watch with zero revision",
1162+
key: "/",
1163+
opts: []clientv3.OpOption{clientv3.WithRev(0)},
1164+
},
1165+
{
1166+
name: "Watch with positive revision",
1167+
key: "/",
1168+
opts: []clientv3.OpOption{clientv3.WithRev(1)},
1169+
},
1170+
}
1171+
1172+
t.Log("Open watches")
1173+
watches := make([]clientv3.WatchChan, len(tcs))
1174+
for i, tc := range tcs {
1175+
watches[i] = client.Watch(ctx, tc.key, tc.opts...)
1176+
}
1177+
1178+
t.Log("Validate")
1179+
for i, tc := range tcs {
1180+
t.Run(tc.name, func(t *testing.T) {
1181+
events, err := collectEvents(ctx, watches[i])
1182+
if tc.wantError == nil {
1183+
assert.NoError(t, err)
1184+
} else {
1185+
assert.ErrorContains(t, err, tc.wantError.Error())
1186+
}
1187+
if diff := cmp.Diff(tc.wantEvents, events); diff != "" {
1188+
t.Errorf("unexpected events (-want +got):\n%s", diff)
1189+
}
1190+
})
1191+
}
1192+
}
1193+
1194+
func collectEvents(ctx context.Context, watch clientv3.WatchChan) (events []*clientv3.Event, err error) {
1195+
for {
1196+
select {
1197+
case resp, ok := <-watch:
1198+
if !ok {
1199+
return events, nil
1200+
}
1201+
err := resp.Err()
1202+
if err != nil {
1203+
return events, err
1204+
}
1205+
for _, ev := range resp.Events {
1206+
events = append(events, ev)
1207+
}
1208+
case <-ctx.Done():
1209+
return events, ctx.Err()
1210+
// Watch resync interval * 1.5
1211+
case <-time.After(150 * time.Millisecond):
1212+
return events, nil
1213+
}
1214+
}
1215+
}

0 commit comments

Comments
 (0)