Skip to content

Commit 48e7cce

Browse files
committed
Support watch for get
Signed-off-by: Shiming Zhang <[email protected]>
1 parent d345af1 commit 48e7cce

File tree

4 files changed

+110
-6
lines changed

4 files changed

+110
-6
lines changed

augerctl/command/get_command.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ type getFlagpole struct {
3131
Output string
3232
ChunkSize int64
3333
Prefix string
34+
35+
Watch bool
36+
WatchOnly bool
3437
}
3538

3639
var (
@@ -89,6 +92,8 @@ func newCtlGetCommand(f *flagpole) *cobra.Command {
8992
cmd.Flags().Int64Var(&flags.ChunkSize, "chunk-size", 500, "chunk size of the list pager")
9093
cmd.Flags().StringVar(&flags.Prefix, "prefix", "/registry", "prefix to prepend to the resource")
9194

95+
cmd.Flags().BoolVarP(&flags.Watch, "watch", "w", false, "after listing/getting the requested object, watch for changes")
96+
cmd.Flags().BoolVar(&flags.WatchOnly, "watch-only", false, "watch for changes to the requested object(s), without listing/getting first")
9297
return cmd
9398
}
9499

@@ -124,13 +129,30 @@ func getCommand(ctx context.Context, etcdclient client.Client, flags *getFlagpol
124129
client.WithResponse(printer.Print),
125130
}
126131

127-
// TODO: Support watch
132+
if flags.Watch {
133+
if !flags.WatchOnly {
134+
rev, err := etcdclient.Get(ctx, flags.Prefix,
135+
opOpts...,
136+
)
137+
if err != nil {
138+
return err
139+
}
140+
opOpts = append(opOpts, client.WithRevision(rev))
141+
}
128142

129-
_, err := etcdclient.Get(ctx, flags.Prefix,
130-
opOpts...,
131-
)
132-
if err != nil {
133-
return err
143+
err := etcdclient.Watch(ctx, flags.Prefix,
144+
opOpts...,
145+
)
146+
if err != nil {
147+
return err
148+
}
149+
} else {
150+
_, err := etcdclient.Get(ctx, flags.Prefix,
151+
opOpts...,
152+
)
153+
if err != nil {
154+
return err
155+
}
134156
}
135157

136158
return nil

augerctl/command/printer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ func NewPrinter(w io.Writer, printerType string) Printer {
4040

4141
func formatResponse(w io.Writer, outMediaType string, kv *client.KeyValue) error {
4242
value := kv.Value
43+
if kv.PrevValue != nil {
44+
value = kv.PrevValue
45+
}
4346
inMediaType, _, err := encoding.DetectAndExtract(value)
4447
if err != nil {
4548
_, err0 := fmt.Fprintf(w, "---\n# %s | raw | %v\n# %s\n", kv.Key, err, value)

pkg/client/client.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ type Client interface {
3131
// Get is a method that retrieves a key-value pair from the etcd server.
3232
// It returns the revision of the key-value pair
3333
Get(ctx context.Context, prefix string, opOpts ...OpOption) (rev int64, err error)
34+
35+
// Watch is a method that watches for changes to a key-value pair on the etcd server.
36+
Watch(ctx context.Context, prefix string, opOpts ...OpOption) error
3437
}
3538

3639
// client is the etcd client.
@@ -89,6 +92,13 @@ func WithChunkSize(chunkSize int64) OpOption {
8992
}
9093
}
9194

95+
// WithRevision sets the revision for the target.
96+
func WithRevision(revision int64) OpOption {
97+
return func(o *op) {
98+
o.revision = revision
99+
}
100+
}
101+
92102
func opOption(opts []OpOption) op {
93103
var opt op
94104
for _, o := range opts {
@@ -101,6 +111,9 @@ func opOption(opts []OpOption) op {
101111
type KeyValue struct {
102112
Key []byte
103113
Value []byte
114+
115+
// For delete event
116+
PrevValue []byte
104117
}
105118

106119
func iterateList(kvs []*mvccpb.KeyValue, callback func(kv *KeyValue) error) error {

pkg/client/client_watch.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package client
18+
19+
import (
20+
"context"
21+
"fmt"
22+
23+
clientv3 "go.etcd.io/etcd/client/v3"
24+
)
25+
26+
func (c *client) Watch(ctx context.Context, prefix string, opOpts ...OpOption) error {
27+
opt := opOption(opOpts)
28+
if opt.response == nil {
29+
return fmt.Errorf("response is required")
30+
}
31+
32+
path, single, err := getPrefix(prefix, opt.gr, opt.name, opt.namespace)
33+
if err != nil {
34+
return err
35+
}
36+
37+
opts := make([]clientv3.OpOption, 0, 3)
38+
39+
if !single {
40+
opts = append(opts, clientv3.WithPrefix())
41+
}
42+
43+
if opt.revision != 0 {
44+
opts = append(opts, clientv3.WithRev(opt.revision))
45+
}
46+
47+
opts = append(opts, clientv3.WithPrevKV())
48+
49+
watchChan := c.client.Watch(ctx, path, opts...)
50+
for watchResp := range watchChan {
51+
for _, event := range watchResp.Events {
52+
r := &KeyValue{
53+
Key: event.Kv.Key,
54+
Value: event.Kv.Value,
55+
}
56+
if event.PrevKv != nil {
57+
r.PrevValue = event.PrevKv.Value
58+
}
59+
err := opt.response(r)
60+
if err != nil {
61+
return err
62+
}
63+
}
64+
}
65+
return nil
66+
}

0 commit comments

Comments
 (0)