Skip to content

Commit a2e7068

Browse files
zhouhaoA1xyz2277OrangeBaowuyingjun-lucky
committed
add a control switch for watch middleware
Co-authored-by: zhangyongxi <[email protected]> Co-authored-by: baoyinghai <[email protected]> Co-authored-by: wuyingjun <[email protected]> Signed-off-by: zhouhaoA1 <[email protected]>
1 parent e44176f commit a2e7068

File tree

15 files changed

+99
-45
lines changed

15 files changed

+99
-45
lines changed

cmd/apiserver/app/options/options.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
2727
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
2828
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
29+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
2930
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
3031
)
3132

@@ -46,7 +47,7 @@ type ClusterPediaServerOptions struct {
4647

4748
Storage *storageoptions.StorageOptions
4849

49-
Subscriber *watchoptions.MiddlerwareOptions
50+
Subscriber *watchoptions.MiddlewareOptions
5051
}
5152

5253
func NewServerOptions() *ClusterPediaServerOptions {
@@ -128,11 +129,13 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
128129
StorageFactory: storage,
129130
}
130131

131-
err = watcher.NewSubscriber(o.Subscriber)
132-
watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize)
133-
134-
if err != nil {
135-
return nil, err
132+
middleware.SubscriberEnabled = o.Subscriber.Enabled
133+
if middleware.SubscriberEnabled {
134+
err = watcher.NewSubscriber(o.Subscriber)
135+
if err != nil {
136+
return nil, err
137+
}
138+
watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize)
136139
}
137140

138141
return config, nil

cmd/clustersynchro-manager/app/options/options.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
2828
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
2929
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
30+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
3031
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
3132
)
3233

@@ -47,7 +48,7 @@ type Options struct {
4748
KubeStateMetrics *kubestatemetrics.Options
4849

4950
WorkerNumber int // WorkerNumber is the number of worker goroutines
50-
Publisher *watchoptions.MiddlerwareOptions
51+
Publisher *watchoptions.MiddlewareOptions
5152
}
5253

5354
func NewClusterSynchroManagerOptions() (*Options, error) {
@@ -130,9 +131,12 @@ func (o *Options) Config() (*config.Config, error) {
130131
return nil, err
131132
}
132133

133-
err = watcher.NewPulisher(o.Publisher)
134-
if err != nil {
135-
return nil, err
134+
middleware.PublisherEnabled = o.Publisher.Enabled
135+
if middleware.PublisherEnabled {
136+
err = watcher.NewPulisher(o.Publisher)
137+
if err != nil {
138+
return nil, err
139+
}
136140
}
137141

138142
kubeconfig, err := clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)

cmd/clustersynchro-manager/app/synchro.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager"
2727
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
2828
"github.com/clusterpedia-io/clusterpedia/pkg/version/verflag"
29+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
30+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
2931
)
3032

3133
func init() {
@@ -98,6 +100,12 @@ func Run(ctx context.Context, c *config.Config) error {
98100
}
99101

100102
if !c.LeaderElection.LeaderElect {
103+
if middleware.PublisherEnabled {
104+
err := middleware.GlobalPublisher.InitPublisher(ctx)
105+
if err != nil {
106+
return err
107+
}
108+
}
101109
synchromanager.Run(c.WorkerNumber, ctx.Done())
102110
return nil
103111
}
@@ -138,13 +146,23 @@ func Run(ctx context.Context, c *config.Config) error {
138146
defer close(done)
139147

140148
stopCh := ctx.Done()
149+
if middleware.PublisherEnabled {
150+
err := middleware.GlobalPublisher.InitPublisher(ctx)
151+
if err != nil {
152+
return
153+
}
154+
}
141155
synchromanager.Run(c.WorkerNumber, stopCh)
142156
},
143157
OnStoppedLeading: func() {
144158
klog.Info("leaderelection lost")
145159
if done != nil {
146160
<-done
147161
}
162+
if middleware.PublisherEnabled {
163+
middleware.GlobalPublisher.StopPublisher()
164+
components.EC.CloseChannels()
165+
}
148166
},
149167
},
150168
})

pkg/apiserver/apiserver.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,12 @@ func (config completedConfig) New() (*ClusterPediaServer, error) {
110110

111111
// init event cache pool
112112
eventStop := make(chan struct{})
113-
watchcomponents.InitEventCachePool(eventStop)
114-
err := middleware.GlobalSubscriber.InitSubscriber(eventStop)
115-
if err != nil {
116-
return nil, err
113+
if middleware.SubscriberEnabled {
114+
watchcomponents.InitEventCachePool(eventStop)
115+
err := middleware.GlobalSubscriber.InitSubscriber(eventStop)
116+
if err != nil {
117+
return nil, err
118+
}
117119
}
118120

119121
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config.ClientConfig)

pkg/generated/openapi/zz_generated.openapi.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/storage/internalstorage/resource_storage_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -188,13 +188,13 @@ func TestResourceStorage_genGetObjectQuery(t *testing.T) {
188188
}{
189189
{
190190
"empty",
191-
schema.GroupVersionResource{},
191+
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"},
192192
"",
193193
"",
194194
"",
195195
expected{
196-
`SELECT "object" FROM "resources" WHERE "cluster" = '' AND "group" = '' AND "name" = '' AND "namespace" = '' AND "resource" = '' AND "version" = '' ORDER BY "resources"."id" LIMIT 1`,
197-
"SELECT `object` FROM `resources` WHERE `cluster` = '' AND `group` = '' AND `name` = '' AND `namespace` = '' AND `resource` = '' AND `version` = '' ORDER BY `resources`.`id` LIMIT 1",
196+
`SELECT cluster_resource_version, object FROM "apps_v1_deployments" WHERE "deleted" = false AND "name" = '' AND "namespace" = '' ORDER BY "apps_v1_deployments"."id" LIMIT 1`,
197+
"SELECT cluster_resource_version, object FROM `apps_v1_deployments` WHERE `deleted` = false AND `name` = '' AND `namespace` = '' ORDER BY `apps_v1_deployments`.`id` LIMIT 1",
198198
"",
199199
},
200200
},
@@ -205,8 +205,8 @@ func TestResourceStorage_genGetObjectQuery(t *testing.T) {
205205
"ns-1",
206206
"resource-1",
207207
expected{
208-
`SELECT "object" FROM "resources" WHERE "cluster" = 'cluster-1' AND "group" = 'apps' AND "name" = 'resource-1' AND "namespace" = 'ns-1' AND "resource" = 'deployments' AND "version" = 'v1' ORDER BY "resources"."id" LIMIT 1`,
209-
"SELECT `object` FROM `resources` WHERE `cluster` = 'cluster-1' AND `group` = 'apps' AND `name` = 'resource-1' AND `namespace` = 'ns-1' AND `resource` = 'deployments' AND `version` = 'v1' ORDER BY `resources`.`id` LIMIT 1",
208+
`SELECT cluster_resource_version, object FROM "apps_v1_deployments" WHERE "cluster" = 'cluster-1' AND "deleted" = false AND "name" = 'resource-1' AND "namespace" = 'ns-1' ORDER BY "apps_v1_deployments"."id" LIMIT 1`,
209+
"SELECT cluster_resource_version, object FROM `apps_v1_deployments` WHERE `cluster` = 'cluster-1' AND `deleted` = false AND `name` = 'resource-1' AND `namespace` = 'ns-1' ORDER BY `apps_v1_deployments`.`id` LIMIT 1",
210210
"",
211211
},
212212
},
@@ -250,8 +250,8 @@ func TestResourceStorage_genListObjectQuery(t *testing.T) {
250250
appsv1.SchemeGroupVersion.WithResource("deployments"),
251251
&internal.ListOptions{},
252252
expected{
253-
`SELECT * FROM "resources" WHERE "group" = 'apps' AND "resource" = 'deployments' AND "version" = 'v1'`,
254-
"SELECT * FROM `resources` WHERE `group` = 'apps' AND `resource` = 'deployments' AND `version` = 'v1'",
253+
`SELECT * FROM "apps_v1_deployments"`,
254+
"SELECT * FROM `apps_v1_deployments`",
255255
"",
256256
},
257257
},
@@ -307,8 +307,8 @@ func TestResourceStorage_deleteObject(t *testing.T) {
307307
"",
308308
"",
309309
expected{
310-
`DELETE FROM "resources" WHERE "cluster" = '' AND "group" = 'apps' AND "name" = '' AND "namespace" = '' AND "resource" = 'deployments' AND "version" = 'v1'`,
311-
"DELETE FROM `resources` WHERE `cluster` = '' AND `group` = 'apps' AND `name` = '' AND `namespace` = '' AND `resource` = 'deployments' AND `version` = 'v1'",
310+
`DELETE FROM "apps_v1_deployments" WHERE "cluster" = '' AND "name" = '' AND "namespace" = ''`,
311+
"DELETE FROM `apps_v1_deployments` WHERE `cluster` = '' AND `name` = '' AND `namespace` = ''",
312312
"",
313313
},
314314
},
@@ -319,8 +319,8 @@ func TestResourceStorage_deleteObject(t *testing.T) {
319319
"ns-1",
320320
"resource-1",
321321
expected{
322-
`DELETE FROM "resources" WHERE "cluster" = 'cluster-1' AND "group" = 'apps' AND "name" = 'resource-1' AND "namespace" = 'ns-1' AND "resource" = 'deployments' AND "version" = 'v1'`,
323-
"DELETE FROM `resources` WHERE `cluster` = 'cluster-1' AND `group` = 'apps' AND `name` = 'resource-1' AND `namespace` = 'ns-1' AND `resource` = 'deployments' AND `version` = 'v1'",
322+
`DELETE FROM "apps_v1_deployments" WHERE "cluster" = 'cluster-1' AND "name" = 'resource-1' AND "namespace" = 'ns-1'`,
323+
"DELETE FROM `apps_v1_deployments` WHERE `cluster` = 'cluster-1' AND `name` = 'resource-1' AND `namespace` = 'ns-1'",
324324
"",
325325
},
326326
},

pkg/storage/internalstorage/storage.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
5959
KeyFunc: utils.GetKeyFunc(gvr, config.Namespaced),
6060
}
6161

62-
// initEventCache is true when Apiserver starts, false when clustersynchro-manager starts
63-
if initEventCache {
62+
// SubscriberEnabled is true when Apiserver starts and middleware enabled
63+
if middleware.SubscriberEnabled {
6464
var cache *watchcomponents.EventCache
6565
buffer := watchcomponents.GetMultiClusterEventPool().GetClusterBufferByGVR(gvr)
6666
cachePool := watchcomponents.GetInitEventCachePool()
@@ -88,7 +88,7 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
8888

8989
resourceStorage.buffer = buffer
9090
resourceStorage.eventCache = cache
91-
} else {
91+
} else if middleware.PublisherEnabled { // PublisherEnabled is true when clustersynchro-manager starts and middleware enabled
9292
err := middleware.GlobalPublisher.PublishTopic(gvr, config.Codec)
9393
if err != nil {
9494
return nil, err

pkg/synchromanager/clustersynchro/resource_synchro.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features"
2828
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
2929
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
30+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
3031
)
3132

3233
type ResourceSynchro struct {
@@ -384,19 +385,23 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
384385
Published: true,
385386
}
386387
synchro.rvsLock.Unlock()
387-
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
388-
if err != nil {
389-
return
388+
if middleware.PublisherEnabled {
389+
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
390+
if err != nil {
391+
return
392+
}
390393
}
391394
}
392395
} else {
393396
handler, callback = synchro.deleteResource, func(_ runtime.Object, eventType watch.EventType) {
394397
synchro.rvsLock.Lock()
395398
delete(synchro.rvs, key)
396399
synchro.rvsLock.Unlock()
397-
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
398-
if err != nil {
399-
return
400+
if middleware.PublisherEnabled {
401+
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
402+
if err != nil {
403+
return
404+
}
400405
}
401406
}
402407
}

pkg/watcher/middleware/publisher.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
)
1212

1313
var GlobalPublisher Publisher
14+
var PublisherEnabled bool = false
1415

1516
type Publisher interface {
1617
InitPublisher(ctx context.Context) error

pkg/watcher/middleware/rabbitmq/register.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ const (
1515
SubscribeerName = "rabbitmq"
1616
)
1717

18-
func NewPulisher(mo *options.MiddlerwareOptions) (middleware.Publisher, error) {
18+
func NewPulisher(mo *options.MiddlewareOptions) (middleware.Publisher, error) {
1919
if mo.MaxConnections <= 0 {
2020
mo.MaxConnections = 3
2121
}
@@ -34,7 +34,7 @@ func NewPulisher(mo *options.MiddlerwareOptions) (middleware.Publisher, error) {
3434
return publisher, nil
3535
}
3636

37-
func NewSubscriber(mo *options.MiddlerwareOptions) (middleware.Subscriber, error) {
37+
func NewSubscriber(mo *options.MiddlewareOptions) (middleware.Subscriber, error) {
3838
if mo.MaxConnections <= 0 {
3939
mo.MaxConnections = 3
4040
}

0 commit comments

Comments
 (0)