Skip to content

Commit f9d68b5

Browse files
authored
Merge pull request #20711 from serathius/watch-negative-revision-release-3.4
[release-3.4] Reject watch request with -1 revision
2 parents ccad432 + 7928035 commit f9d68b5

File tree

4 files changed

+106
-0
lines changed

4 files changed

+106
-0
lines changed

clientv3/integration/watch_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
mvccpb "go.etcd.io/etcd/mvcc/mvccpb"
3131
"go.etcd.io/etcd/pkg/testutil"
3232

33+
"github.com/stretchr/testify/require"
3334
"google.golang.org/grpc/metadata"
3435
)
3536

@@ -1184,3 +1185,77 @@ func testWatchClose(t *testing.T, wctx *watchctx) {
11841185
t.Fatalf("read wch got %v; expected closed channel", wresp)
11851186
}
11861187
}
1188+
1189+
func TestWatch(t *testing.T) {
1190+
defer testutil.AfterTest(t)
1191+
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
1192+
defer clus.Terminate(t)
1193+
client := clus.Client(0)
1194+
1195+
tcs := []struct {
1196+
name string
1197+
key string
1198+
opts []clientv3.OpOption
1199+
wantError error
1200+
wantEvents []*clientv3.Event
1201+
}{
1202+
{
1203+
name: "Watch with negative revision",
1204+
key: "/",
1205+
opts: []clientv3.OpOption{clientv3.WithRev(-1)},
1206+
wantError: rpctypes.ErrCompacted,
1207+
},
1208+
{
1209+
name: "Watch with zero revision",
1210+
key: "/",
1211+
opts: []clientv3.OpOption{clientv3.WithRev(0)},
1212+
},
1213+
{
1214+
name: "Watch with positive revision",
1215+
key: "/",
1216+
opts: []clientv3.OpOption{clientv3.WithRev(1)},
1217+
},
1218+
}
1219+
ctx := t.Context()
1220+
1221+
t.Log("Open watches")
1222+
watches := make([]clientv3.WatchChan, len(tcs))
1223+
for i, tc := range tcs {
1224+
watchCtx, cancel := context.WithTimeout(ctx, time.Second)
1225+
defer cancel()
1226+
watches[i] = client.Watch(watchCtx, tc.key, tc.opts...)
1227+
}
1228+
1229+
t.Log("Validate")
1230+
for i, tc := range tcs {
1231+
t.Run(tc.name, func(t *testing.T) {
1232+
_, err := collectEvents(ctx, watches[i])
1233+
if tc.wantError == nil {
1234+
require.NoError(t, err)
1235+
} else {
1236+
require.ErrorContains(t, err, tc.wantError.Error())
1237+
}
1238+
})
1239+
}
1240+
}
1241+
1242+
func collectEvents(ctx context.Context, watch clientv3.WatchChan) (events []*clientv3.Event, err error) {
1243+
for {
1244+
select {
1245+
case resp, ok := <-watch:
1246+
if !ok {
1247+
return events, nil
1248+
}
1249+
err := resp.Err()
1250+
if err != nil {
1251+
return events, err
1252+
}
1253+
events = append(events, resp.Events...)
1254+
case <-ctx.Done():
1255+
return events, ctx.Err()
1256+
// Watch resync interval * 1.5
1257+
case <-time.After(150 * time.Millisecond):
1258+
return events, nil
1259+
}
1260+
}
1261+
}

etcdserver/api/v3rpc/watch.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,22 @@ func (sws *serverWatchStream) recvLoop() error {
276276
// support >= key queries
277277
creq.RangeEnd = []byte{}
278278
}
279+
if creq.StartRevision < 0 {
280+
wr := &pb.WatchResponse{
281+
Header: sws.newResponseHeader(sws.watchStream.Rev()),
282+
WatchId: clientv3.InvalidWatchID,
283+
Canceled: true,
284+
Created: true,
285+
CancelReason: rpctypes.ErrCompacted.Error(),
286+
}
287+
288+
select {
289+
case sws.ctrlStream <- wr:
290+
continue
291+
case <-sws.closec:
292+
return nil
293+
}
294+
}
279295

280296
err := sws.isWatchPermitted(creq)
281297
if err != nil {

mvcc/watchable_store.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,10 @@ func (s *watchableStore) syncWatchers() int {
354354
compactionRev := s.store.compactMainRev
355355

356356
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
357+
if minRev < 0 {
358+
s.store.lg.Warn("Unexpected negative revision range start", zap.Int64("minRev", minRev))
359+
minRev = 0
360+
}
357361
minBytes, maxBytes := newRevBytes(), newRevBytes()
358362
revToBytes(revision{main: minRev}, minBytes)
359363
revToBytes(revision{main: curRev + 1}, maxBytes)

proxy/grpcproxy/watch.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,17 @@ func (wps *watchProxyStream) recvLoop() error {
230230
case *pb.WatchRequest_CreateRequest:
231231
cr := uv.CreateRequest
232232

233+
if cr.StartRevision < 0 {
234+
wps.watchCh <- &pb.WatchResponse{
235+
Header: &pb.ResponseHeader{},
236+
WatchId: clientv3.InvalidWatchID,
237+
Created: true,
238+
Canceled: true,
239+
CancelReason: rpctypes.ErrCompacted.Error(),
240+
}
241+
continue
242+
}
243+
233244
if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil {
234245
wps.watchCh <- &pb.WatchResponse{
235246
Header: &pb.ResponseHeader{},

0 commit comments

Comments
 (0)