From 79280357180005959be3aec8dd4aaf4195102103 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 21 Sep 2025 11:43:48 +0200 Subject: [PATCH] Reject watch request with -1 revision Signed-off-by: Marek Siarkowicz --- clientv3/integration/watch_test.go | 75 ++++++++++++++++++++++++++++++ etcdserver/api/v3rpc/watch.go | 16 +++++++ mvcc/watchable_store.go | 4 ++ proxy/grpcproxy/watch.go | 11 +++++ 4 files changed, 106 insertions(+) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index a5ac7beb10a8..05049d0630c7 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -30,6 +30,7 @@ import ( mvccpb "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/testutil" + "github.com/stretchr/testify/require" "google.golang.org/grpc/metadata" ) @@ -1184,3 +1185,77 @@ func testWatchClose(t *testing.T, wctx *watchctx) { t.Fatalf("read wch got %v; expected closed channel", wresp) } } + +func TestWatch(t *testing.T) { + defer testutil.AfterTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + client := clus.Client(0) + + tcs := []struct { + name string + key string + opts []clientv3.OpOption + wantError error + wantEvents []*clientv3.Event + }{ + { + name: "Watch with negative revision", + key: "/", + opts: []clientv3.OpOption{clientv3.WithRev(-1)}, + wantError: rpctypes.ErrCompacted, + }, + { + name: "Watch with zero revision", + key: "/", + opts: []clientv3.OpOption{clientv3.WithRev(0)}, + }, + { + name: "Watch with positive revision", + key: "/", + opts: []clientv3.OpOption{clientv3.WithRev(1)}, + }, + } + ctx := t.Context() + + t.Log("Open watches") + watches := make([]clientv3.WatchChan, len(tcs)) + for i, tc := range tcs { + watchCtx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + watches[i] = client.Watch(watchCtx, tc.key, tc.opts...) + } + + t.Log("Validate") + for i, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + _, err := collectEvents(ctx, watches[i]) + if tc.wantError == nil { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, tc.wantError.Error()) + } + }) + } +} + +func collectEvents(ctx context.Context, watch clientv3.WatchChan) (events []*clientv3.Event, err error) { + for { + select { + case resp, ok := <-watch: + if !ok { + return events, nil + } + err := resp.Err() + if err != nil { + return events, err + } + events = append(events, resp.Events...) + case <-ctx.Done(): + return events, ctx.Err() + // Watch resync interval * 1.5 + case <-time.After(150 * time.Millisecond): + return events, nil + } + } +} diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 8242c8460aa8..92efbf3cb782 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -276,6 +276,22 @@ func (sws *serverWatchStream) recvLoop() error { // support >= key queries creq.RangeEnd = []byte{} } + if creq.StartRevision < 0 { + wr := &pb.WatchResponse{ + Header: sws.newResponseHeader(sws.watchStream.Rev()), + WatchId: clientv3.InvalidWatchID, + Canceled: true, + Created: true, + CancelReason: rpctypes.ErrCompacted.Error(), + } + + select { + case sws.ctrlStream <- wr: + continue + case <-sws.closec: + return nil + } + } err := sws.isWatchPermitted(creq) if err != nil { diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 567b004f9cab..1f327ee51af3 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -354,6 +354,10 @@ func (s *watchableStore) syncWatchers() int { compactionRev := s.store.compactMainRev wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev) + if minRev < 0 { + s.store.lg.Warn("Unexpected negative revision range start", zap.Int64("minRev", minRev)) + minRev = 0 + } minBytes, maxBytes := newRevBytes(), newRevBytes() revToBytes(revision{main: minRev}, minBytes) revToBytes(revision{main: curRev + 1}, maxBytes) diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index c62f07416a4c..6b875d473ec1 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -230,6 +230,17 @@ func (wps *watchProxyStream) recvLoop() error { case *pb.WatchRequest_CreateRequest: cr := uv.CreateRequest + if cr.StartRevision < 0 { + wps.watchCh <- &pb.WatchResponse{ + Header: &pb.ResponseHeader{}, + WatchId: clientv3.InvalidWatchID, + Created: true, + Canceled: true, + CancelReason: rpctypes.ErrCompacted.Error(), + } + continue + } + if err := wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil { wps.watchCh <- &pb.WatchResponse{ Header: &pb.ResponseHeader{},