Skip to content

Commit afeab20

Browse files
committed
Reject watch request with -1 revision and make rangeEvents safe against negative revision
Signed-off-by: Marek Siarkowicz <[email protected]>
1 parent b7420c5 commit afeab20

File tree

5 files changed

+115
-5
lines changed

5 files changed

+115
-5
lines changed

server/etcdserver/api/v3rpc/watch.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,22 @@ func (sws *serverWatchStream) recvLoop() error {
272272
// support >= key queries
273273
creq.RangeEnd = []byte{}
274274
}
275+
if creq.StartRevision < 0 {
276+
wr := &pb.WatchResponse{
277+
Header: sws.newResponseHeader(sws.watchStream.Rev()),
278+
WatchId: clientv3.InvalidWatchID,
279+
Canceled: true,
280+
Created: true,
281+
CancelReason: rpctypes.ErrCompacted.Error(),
282+
}
283+
284+
select {
285+
case sws.ctrlStream <- wr:
286+
continue
287+
case <-sws.closec:
288+
return nil
289+
}
290+
}
275291

276292
err := sws.isWatchPermitted(creq)
277293
if err != nil {

server/proxy/grpcproxy/watch.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,17 @@ func (wps *watchProxyStream) recvLoop() error {
235235
case *pb.WatchRequest_CreateRequest:
236236
cr := uv.CreateRequest
237237

238+
if cr.StartRevision < 0 {
239+
wps.watchCh <- &pb.WatchResponse{
240+
Header: &pb.ResponseHeader{},
241+
WatchId: clientv3.InvalidWatchID,
242+
Created: true,
243+
Canceled: true,
244+
CancelReason: rpctypes.ErrCompacted.Error(),
245+
}
246+
continue
247+
}
248+
238249
if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
239250
wps.watchCh <- &pb.WatchResponse{
240251
Header: &pb.ResponseHeader{},

server/storage/mvcc/watchable_store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,10 @@ func rangeEventsWithReuse(lg *zap.Logger, b backend.Backend, evs []mvccpb.Event,
447447

448448
// rangeEvents returns events in range [minRev, maxRev).
449449
func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64) []mvccpb.Event {
450+
if minRev < 0 {
451+
lg.Warn("Unexpected negative revision range start", zap.Int64("minRev", minRev))
452+
minRev = 0
453+
}
450454
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
451455
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
452456
maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes)

server/storage/mvcc/watchable_store_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -405,13 +405,14 @@ func TestRangeEvents(t *testing.T) {
405405
expectEvents []mvccpb.Event
406406
}{
407407
// maxRev, top to bottom
408-
{minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]},
409-
{minRev: 2, maxRev: 5, expectEvents: expectEvents[0:3]},
410-
{minRev: 2, maxRev: 4, expectEvents: expectEvents[0:2]},
411-
{minRev: 2, maxRev: 3, expectEvents: expectEvents[0:1]},
412-
{minRev: 2, maxRev: 2, expectEvents: expectEvents[0:0]},
408+
{minRev: -1, maxRev: 6, expectEvents: expectEvents[0:5]},
409+
{minRev: -1, maxRev: 5, expectEvents: expectEvents[0:3]},
410+
{minRev: -1, maxRev: 4, expectEvents: expectEvents[0:2]},
411+
{minRev: -1, maxRev: 3, expectEvents: expectEvents[0:1]},
412+
{minRev: -1, maxRev: 2, expectEvents: expectEvents[0:0]},
413413

414414
// minRev, bottom to top
415+
{minRev: -1, maxRev: 6, expectEvents: expectEvents[0:5]},
415416
{minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]},
416417
{minRev: 3, maxRev: 6, expectEvents: expectEvents[1:5]},
417418
{minRev: 4, maxRev: 6, expectEvents: expectEvents[2:5]},

tests/integration/clientv3/watch_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"testing"
2626
"time"
2727

28+
"github.com/google/go-cmp/cmp"
2829
"github.com/stretchr/testify/require"
2930
"google.golang.org/grpc/metadata"
3031

@@ -1133,3 +1134,80 @@ func testWatchClose(t *testing.T, wctx *watchctx) {
11331134
wresp, ok := <-wch
11341135
require.Falsef(t, ok, "read wch got %v; expected closed channel", wresp)
11351136
}
1137+
1138+
func TestWatch(t *testing.T) {
1139+
integration2.BeforeTest(t)
1140+
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1})
1141+
t.Cleanup(func() { clus.Terminate(t) })
1142+
client := clus.Client(0)
1143+
1144+
tcs := []struct {
1145+
name string
1146+
key string
1147+
opts []clientv3.OpOption
1148+
wantError error
1149+
wantEvents []*clientv3.Event
1150+
}{
1151+
{
1152+
name: "Watch with negative revision",
1153+
key: "/",
1154+
opts: []clientv3.OpOption{clientv3.WithRev(-1)},
1155+
wantError: rpctypes.ErrCompacted,
1156+
},
1157+
{
1158+
name: "Watch with zero revision",
1159+
key: "/",
1160+
opts: []clientv3.OpOption{clientv3.WithRev(0)},
1161+
},
1162+
{
1163+
name: "Watch with positive revision",
1164+
key: "/",
1165+
opts: []clientv3.OpOption{clientv3.WithRev(1)},
1166+
},
1167+
}
1168+
ctx := t.Context()
1169+
1170+
t.Log("Open watches")
1171+
watches := make([]clientv3.WatchChan, len(tcs))
1172+
for i, tc := range tcs {
1173+
watchCtx, cancel := context.WithTimeout(ctx, time.Second)
1174+
defer cancel()
1175+
watches[i] = client.Watch(watchCtx, tc.key, tc.opts...)
1176+
}
1177+
1178+
t.Log("Validate")
1179+
for i, tc := range tcs {
1180+
t.Run(tc.name, func(t *testing.T) {
1181+
events, err := collectEvents(ctx, watches[i])
1182+
if tc.wantError == nil {
1183+
require.NoError(t, err)
1184+
} else {
1185+
require.ErrorContains(t, err, tc.wantError.Error())
1186+
}
1187+
if diff := cmp.Diff(tc.wantEvents, events); diff != "" {
1188+
t.Errorf("unexpected events (-want +got):\n%s", diff)
1189+
}
1190+
})
1191+
}
1192+
}
1193+
1194+
func collectEvents(ctx context.Context, watch clientv3.WatchChan) (events []*clientv3.Event, err error) {
1195+
for {
1196+
select {
1197+
case resp, ok := <-watch:
1198+
if !ok {
1199+
return events, nil
1200+
}
1201+
err := resp.Err()
1202+
if err != nil {
1203+
return events, err
1204+
}
1205+
events = append(events, resp.Events...)
1206+
case <-ctx.Done():
1207+
return events, ctx.Err()
1208+
// Watch resync interval * 1.5
1209+
case <-time.After(150 * time.Millisecond):
1210+
return events, nil
1211+
}
1212+
}
1213+
}

0 commit comments

Comments
 (0)