From 2b68160431a3f4a3720968afc249cd7371f21975 Mon Sep 17 00:00:00 2001 From: Chao Zhang Date: Tue, 4 Jan 2022 17:52:59 +0800 Subject: [PATCH] chore: override the ETCD put and delete methods Signed-off-by: Chao Zhang --- .gitignore | 1 + Makefile | 8 ++ backends/mysql/mysql.go | 23 ++++- etcd.go | 118 ++++++++++++++++++++++- go.mod | 3 + go.sum | 5 + {examples => integrations}/mysql/go.mod | 4 +- {examples => integrations}/mysql/go.sum | 1 + {examples => integrations}/mysql/main.go | 6 +- server.go | 2 + 10 files changed, 161 insertions(+), 10 deletions(-) rename {examples => integrations}/mysql/go.mod (55%) rename {examples => integrations}/mysql/go.sum (99%) rename {examples => integrations}/mysql/main.go (93%) diff --git a/.gitignore b/.gitignore index 485dee6..c542349 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .idea +apisix-mysql diff --git a/Makefile b/Makefile index e3390c5..75753c2 100644 --- a/Makefile +++ b/Makefile @@ -13,14 +13,22 @@ # limitations under the License. # +.PHONY: test test: @go test ./... +.PHONY: bench bench: @go test -bench '^Benchmark' ./... +.PHONY: gofmt gofmt: @find . -name "*.go" | xargs gofmt -w +.PHONY: lint lint: @golangci-lint run + +.PHONY: build-apisix-mysql +build-apisix-mysql: + @cd integrations/mysql && go build -o ../../apisix-mysql github.com/api7/etcd-adapter/integrations/mysql diff --git a/backends/mysql/mysql.go b/backends/mysql/mysql.go index 7dcc475..20f651f 100644 --- a/backends/mysql/mysql.go +++ b/backends/mysql/mysql.go @@ -17,6 +17,7 @@ package mysql import ( "context" + "strings" "github.com/k3s-io/kine/pkg/drivers/generic" mysqldriver "github.com/k3s-io/kine/pkg/drivers/mysql" @@ -26,12 +27,22 @@ import ( // Options contains settings for controlling the connection to MySQL. type Options struct { - DSN string - ConnPool generic.ConnectionPoolConfig + // Whether append the slash after the key when watching on it. + // For instance, if the key is `/apisix/routes` and this field is + // true, then the watch key will be `/apisix/routes/`, this field + // is required as the prefix watch mechanism of kine only works + // if the key is ended with the slash. + // + // Note slash won't be added again if the key is already ended with it. + AppendSlashOnWatch bool + DSN string + ConnPool generic.ConnectionPoolConfig } type mysqlCache struct { server.Backend + + opts *Options } // NewMySQLCache returns a server.Backend interface which was implemented with @@ -44,6 +55,7 @@ func NewMySQLCache(ctx context.Context, options *Options) (server.Backend, error } mc := &mysqlCache{ Backend: backend, + opts: options, } return mc, nil } @@ -51,3 +63,10 @@ func NewMySQLCache(ctx context.Context, options *Options) (server.Backend, error func (m *mysqlCache) Start(ctx context.Context) error { return m.Backend.Start(ctx) } + +func (m *mysqlCache) Watch(ctx context.Context, key string, revision int64) <-chan []*server.Event { + if m.opts.AppendSlashOnWatch && !strings.HasSuffix(key, "/") { + key += "/" + } + return m.Backend.Watch(ctx, key, revision) +} diff --git a/etcd.go b/etcd.go index 8b38220..f3181ee 100644 --- a/etcd.go +++ b/etcd.go @@ -8,8 +8,12 @@ import ( "net/http" "github.com/k3s-io/kine/pkg/server" + "go.etcd.io/etcd/api/v3/etcdserverpb" + clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" "github.com/api7/etcd-adapter/backends/btree" "github.com/api7/etcd-adapter/backends/mysql" @@ -67,15 +71,22 @@ type adapter struct { eventsCh chan []*Event backend server.Backend - bridge *server.KVServerBridge + bridge *bridge } +// AdapterOptions contains fields that can control the adapter behaviors. type AdapterOptions struct { Logger *zap.Logger Backend BackendKind MySQLOptions *mysql.Options } +// bridge wraps the server.KVServerBridge so that we can overrides some +// methods of it to overcome the constraints. +type bridge struct { + *server.KVServerBridge +} + // NewEtcdAdapter new an etcd adapter instance. func NewEtcdAdapter(opts *AdapterOptions) Adapter { var ( @@ -100,7 +111,10 @@ func NewEtcdAdapter(opts *AdapterOptions) Adapter { panic("unknown backend") } - bridge := server.New(backend, "") + bridge := &bridge{ + KVServerBridge: server.New(backend, ""), + } + a := &adapter{ logger: logger, eventsCh: make(chan []*Event), @@ -256,3 +270,103 @@ func (a *adapter) showVersion(w http.ResponseWriter, _ *http.Request) { ) } } + +func (b *bridge) Put(ctx context.Context, r *etcdserverpb.PutRequest) (*etcdserverpb.PutResponse, error) { + rangeResp, err := b.Range(ctx, &etcdserverpb.RangeRequest{ + Key: r.Key, + }) + if err != nil { + return nil, err + } + + cmp := clientv3.ModRevision(string(r.Key)) + if len(rangeResp.Kvs) > 0 { + cmp = clientv3.Compare(cmp, "=", rangeResp.Kvs[0].ModRevision) + } else { + cmp = clientv3.Compare(cmp, "=", 0) + } + + txn := &etcdserverpb.TxnRequest{ + Compare: []*etcdserverpb.Compare{ + (*etcdserverpb.Compare)(&cmp), + }, + Success: []*etcdserverpb.RequestOp{ + { + Request: &etcdserverpb.RequestOp_RequestPut{ + RequestPut: r, + }, + }, + }, + Failure: []*etcdserverpb.RequestOp{ + { + Request: &etcdserverpb.RequestOp_RequestRange{ + RequestRange: &etcdserverpb.RangeRequest{ + Key: r.Key, + }, + }, + }, + }, + } + + resp, err := b.KVServerBridge.Txn(ctx, txn) + if err != nil { + return nil, err + } + if len(resp.Responses) == 0 { + return nil, fmt.Errorf("broken internal put implementation") + } + return resp.Responses[0].GetResponsePut(), nil +} + +func (b *bridge) DeleteRange(ctx context.Context, r *etcdserverpb.DeleteRangeRequest) (*etcdserverpb.DeleteRangeResponse, error) { + // See https://github.com/k3s-io/kine/blob/c1edece777/pkg/server/delete.go#L9 + // to learn how kine decides whether this is a DELETE request. + txn := &etcdserverpb.TxnRequest{ + Compare: []*etcdserverpb.Compare{}, + Success: []*etcdserverpb.RequestOp{ + { + Request: &etcdserverpb.RequestOp_RequestRange{ + RequestRange: &etcdserverpb.RangeRequest{ + Key: r.Key, + RangeEnd: r.RangeEnd, + }, + }, + }, + { + Request: &etcdserverpb.RequestOp_RequestDeleteRange{ + RequestDeleteRange: r, + }, + }, + }, + } + + resp, err := b.KVServerBridge.Txn(ctx, txn) + if err != nil { + return nil, err + } + // TODO fix the kine bug that the response type is not DeleteRange. + rangeResp := resp.Responses[0].GetResponseRange() + if len(resp.Responses) == 0 { + return nil, fmt.Errorf("broken internal delete_range implementation") + } + + deleteRangeResp := &etcdserverpb.DeleteRangeResponse{ + Header: rangeResp.Header, + Deleted: int64(len(rangeResp.Kvs)), + } + return deleteRangeResp, nil +} + +// Register copies the behaviors of the b.KVServerBridge as we want to override +// some RPC implementation of ETCD. +func (b *bridge) Register(server *grpc.Server) { + etcdserverpb.RegisterLeaseServer(server, b) + etcdserverpb.RegisterWatchServer(server, b) + etcdserverpb.RegisterKVServer(server, b) + etcdserverpb.RegisterClusterServer(server, b) + etcdserverpb.RegisterMaintenanceServer(server, b) + + hsrv := health.NewServer() + hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) + healthpb.RegisterHealthServer(server, hsrv) +} diff --git a/go.mod b/go.mod index 7e59194..fe28696 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/api7/etcd-adapter go 1.16 require ( + github.com/coreos/etcd v3.3.15+incompatible github.com/google/btree v1.0.1 github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/k3s-io/kine v0.8.1 @@ -10,9 +11,11 @@ require ( github.com/soheilhy/cmux v0.1.5 github.com/stretchr/testify v1.7.0 github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 + go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.0 go.uber.org/zap v1.18.1 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 google.golang.org/grpc v1.38.0 + grpc.go4.org v0.0.0-20170609214715-11d0a25b4919 ) diff --git a/go.sum b/go.sum index b04a3e8..7f94d49 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,7 @@ github.com/coreos/bbolt v1.3.1-coreos.6/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE55 github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/etcd v3.3.15+incompatible h1:+9RjdC18gMxNQVvSiXvObLu29mOFmkgdsB4cRTlV+EE= github.com/coreos/etcd v3.3.15+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc= @@ -91,6 +92,7 @@ github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzA github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -253,6 +255,7 @@ github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OI github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= @@ -813,6 +816,8 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= +grpc.go4.org v0.0.0-20170609214715-11d0a25b4919 h1:tmXTu+dfa+d9Evp8NpJdgOy6+rt8/x4yG7qPBrtNfLY= +grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/examples/mysql/go.mod b/integrations/mysql/go.mod similarity index 55% rename from examples/mysql/go.mod rename to integrations/mysql/go.mod index bba13b5..71e5c39 100644 --- a/examples/mysql/go.mod +++ b/integrations/mysql/go.mod @@ -1,10 +1,10 @@ -module github.com/api7/etcd-adapter/examples/mysql +module github.com/api7/etcd-adapter/integrations/mysql go 1.16 require ( github.com/api7/etcd-adapter v0.1.1 - go.uber.org/zap v1.19.1 + go.uber.org/zap v1.19.1 // indirect ) replace github.com/api7/etcd-adapter v0.1.1 => ../../ diff --git a/examples/mysql/go.sum b/integrations/mysql/go.sum similarity index 99% rename from examples/mysql/go.sum rename to integrations/mysql/go.sum index 74bc7fc..635ff86 100644 --- a/examples/mysql/go.sum +++ b/integrations/mysql/go.sum @@ -808,6 +808,7 @@ gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= +grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/examples/mysql/main.go b/integrations/mysql/main.go similarity index 93% rename from examples/mysql/main.go rename to integrations/mysql/main.go index 3bc9b91..ff9ad2d 100644 --- a/examples/mysql/main.go +++ b/integrations/mysql/main.go @@ -19,18 +19,16 @@ import ( "net" "time" - "go.uber.org/zap" - adapter "github.com/api7/etcd-adapter" "github.com/api7/etcd-adapter/backends/mysql" ) func main() { opts := &adapter.AdapterOptions{ - Logger: zap.NewExample(), Backend: adapter.BackendMySQL, MySQLOptions: &mysql.Options{ - DSN: "root@tcp(127.0.0.1:4000)/apisix", + AppendSlashOnWatch: true, + DSN: "root@tcp(127.0.0.1:4000)/apisix", }, } a := adapter.NewEtcdAdapter(opts) diff --git a/server.go b/server.go index 316ef18..9cd7619 100644 --- a/server.go +++ b/server.go @@ -55,6 +55,8 @@ func (a *adapter) Serve(ctx context.Context, l net.Listener) error { grpc.KeepaliveParams(kp), ) a.grpcSrv = grpcSrv + + // Copy the codes from KVServerBridge.Register. a.bridge.Register(grpcSrv) if gwmux, err := a.registerGateway(l.Addr().String()); err != nil {