Skip to content

Commit 4174d1d

Browse files
authored
Merge pull request #20709 from serathius/watch-negative-revision-release-3.5
[release-3.5] Reject watch request with -1 revision
2 parents 1cf4bc4 + eb7ef2e commit 4174d1d

File tree

4 files changed

+107
-1
lines changed

4 files changed

+107
-1
lines changed

server/etcdserver/api/v3rpc/watch.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,22 @@ func (sws *serverWatchStream) recvLoop() error {
267267
// support >= key queries
268268
creq.RangeEnd = []byte{}
269269
}
270+
if creq.StartRevision < 0 {
271+
wr := &pb.WatchResponse{
272+
Header: sws.newResponseHeader(sws.watchStream.Rev()),
273+
WatchId: clientv3.InvalidWatchID,
274+
Canceled: true,
275+
Created: true,
276+
CancelReason: rpctypes.ErrCompacted.Error(),
277+
}
278+
279+
select {
280+
case sws.ctrlStream <- wr:
281+
continue
282+
case <-sws.closec:
283+
return nil
284+
}
285+
}
270286

271287
err := sws.isWatchPermitted(creq)
272288
if err != nil {

server/mvcc/watchable_store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,10 @@ func (s *watchableStore) syncWatchers() int {
354354
compactionRev := s.store.compactMainRev
355355

356356
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
357+
if minRev < 0 {
358+
s.store.lg.Warn("Unexpected negative revision range start", zap.Int64("minRev", minRev))
359+
minRev = 0
360+
}
357361
minBytes, maxBytes := newRevBytes(), newRevBytes()
358362
revToBytes(revision{main: minRev}, minBytes)
359363
revToBytes(revision{main: curRev + 1}, maxBytes)

server/proxy/grpcproxy/watch.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,17 @@ func (wps *watchProxyStream) recvLoop() error {
235235
case *pb.WatchRequest_CreateRequest:
236236
cr := uv.CreateRequest
237237

238+
if cr.StartRevision < 0 {
239+
wps.watchCh <- &pb.WatchResponse{
240+
Header: &pb.ResponseHeader{},
241+
WatchId: clientv3.InvalidWatchID,
242+
Created: true,
243+
Canceled: true,
244+
CancelReason: rpctypes.ErrCompacted.Error(),
245+
}
246+
continue
247+
}
248+
238249
if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
239250
wps.watchCh <- &pb.WatchResponse{
240251
Header: &pb.ResponseHeader{},

tests/integration/clientv3/watch_test.go

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ import (
2424
"testing"
2525
"time"
2626

27+
"github.com/stretchr/testify/require"
2728
mvccpb "go.etcd.io/etcd/api/v3/mvccpb"
2829
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
2930
"go.etcd.io/etcd/api/v3/version"
30-
"go.etcd.io/etcd/client/v3"
31+
clientv3 "go.etcd.io/etcd/client/v3"
3132
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
3233
"go.etcd.io/etcd/tests/v3/integration"
3334
"google.golang.org/grpc/metadata"
@@ -1214,3 +1215,77 @@ func testWatchClose(t *testing.T, wctx *watchctx) {
12141215
t.Fatalf("read wch got %v; expected closed channel", wresp)
12151216
}
12161217
}
1218+
1219+
func TestWatch(t *testing.T) {
1220+
integration.BeforeTest(t)
1221+
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
1222+
t.Cleanup(func() { clus.Terminate(t) })
1223+
client := clus.Client(0)
1224+
1225+
tcs := []struct {
1226+
name string
1227+
key string
1228+
opts []clientv3.OpOption
1229+
wantError error
1230+
wantEvents []*clientv3.Event
1231+
}{
1232+
{
1233+
name: "Watch with negative revision",
1234+
key: "/",
1235+
opts: []clientv3.OpOption{clientv3.WithRev(-1)},
1236+
wantError: rpctypes.ErrCompacted,
1237+
},
1238+
{
1239+
name: "Watch with zero revision",
1240+
key: "/",
1241+
opts: []clientv3.OpOption{clientv3.WithRev(0)},
1242+
},
1243+
{
1244+
name: "Watch with positive revision",
1245+
key: "/",
1246+
opts: []clientv3.OpOption{clientv3.WithRev(1)},
1247+
},
1248+
}
1249+
ctx := t.Context()
1250+
1251+
t.Log("Open watches")
1252+
watches := make([]clientv3.WatchChan, len(tcs))
1253+
for i, tc := range tcs {
1254+
watchCtx, cancel := context.WithTimeout(ctx, time.Second)
1255+
defer cancel()
1256+
watches[i] = client.Watch(watchCtx, tc.key, tc.opts...)
1257+
}
1258+
1259+
t.Log("Validate")
1260+
for i, tc := range tcs {
1261+
t.Run(tc.name, func(t *testing.T) {
1262+
_, err := collectEvents(ctx, watches[i])
1263+
if tc.wantError == nil {
1264+
require.NoError(t, err)
1265+
} else {
1266+
require.ErrorContains(t, err, tc.wantError.Error())
1267+
}
1268+
})
1269+
}
1270+
}
1271+
1272+
func collectEvents(ctx context.Context, watch clientv3.WatchChan) (events []*clientv3.Event, err error) {
1273+
for {
1274+
select {
1275+
case resp, ok := <-watch:
1276+
if !ok {
1277+
return events, nil
1278+
}
1279+
err := resp.Err()
1280+
if err != nil {
1281+
return events, err
1282+
}
1283+
events = append(events, resp.Events...)
1284+
case <-ctx.Done():
1285+
return events, ctx.Err()
1286+
// Watch resync interval * 1.5
1287+
case <-time.After(150 * time.Millisecond):
1288+
return events, nil
1289+
}
1290+
}
1291+
}

0 commit comments

Comments
 (0)