Skip to content

Commit b2080cb

Browse files
zhouhaoA1xyz2277OrangeBaowuyingjun-lucky
committed
add a control switch for watch middleware
Co-authored-by: zhangyongxi <[email protected]> Co-authored-by: baoyinghai <[email protected]> Co-authored-by: wuyingjun <[email protected]> Signed-off-by: zhouhaoA1 <[email protected]>
1 parent b8e1f74 commit b2080cb

File tree

15 files changed

+99
-45
lines changed

15 files changed

+99
-45
lines changed

cmd/apiserver/app/options/options.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
2727
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
2828
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
29+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
2930
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
3031
)
3132

@@ -46,7 +47,7 @@ type ClusterPediaServerOptions struct {
4647

4748
Storage *storageoptions.StorageOptions
4849

49-
Subscriber *watchoptions.MiddlerwareOptions
50+
Subscriber *watchoptions.MiddlewareOptions
5051
}
5152

5253
func NewServerOptions() *ClusterPediaServerOptions {
@@ -129,11 +130,13 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
129130
StorageFactory: storage,
130131
}
131132

132-
err = watcher.NewSubscriber(o.Subscriber)
133-
watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize)
134-
135-
if err != nil {
136-
return nil, err
133+
middleware.SubscriberEnabled = o.Subscriber.Enabled
134+
if middleware.SubscriberEnabled {
135+
err = watcher.NewSubscriber(o.Subscriber)
136+
if err != nil {
137+
return nil, err
138+
}
139+
watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize)
137140
}
138141

139142
return config, nil

cmd/clustersynchro-manager/app/options/options.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
2929
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
3030
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
31+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
3132
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
3233
)
3334

@@ -49,7 +50,7 @@ type Options struct {
4950

5051
WorkerNumber int // WorkerNumber is the number of worker goroutines
5152
PageSizeForResourceSync int64
52-
Publisher *watchoptions.MiddlerwareOptions
53+
Publisher *watchoptions.MiddlewareOptions
5354
}
5455

5556
func NewClusterSynchroManagerOptions() (*Options, error) {
@@ -135,9 +136,12 @@ func (o *Options) Config() (*config.Config, error) {
135136
return nil, err
136137
}
137138

138-
err = watcher.NewPulisher(o.Publisher)
139-
if err != nil {
140-
return nil, err
139+
middleware.PublisherEnabled = o.Publisher.Enabled
140+
if middleware.PublisherEnabled {
141+
err = watcher.NewPulisher(o.Publisher)
142+
if err != nil {
143+
return nil, err
144+
}
141145
}
142146

143147
kubeconfig, err := clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)

cmd/clustersynchro-manager/app/synchro.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager"
2727
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
2828
"github.com/clusterpedia-io/clusterpedia/pkg/version/verflag"
29+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
30+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
2931
)
3032

3133
func init() {
@@ -98,6 +100,12 @@ func Run(ctx context.Context, c *config.Config) error {
98100
}
99101

100102
if !c.LeaderElection.LeaderElect {
103+
if middleware.PublisherEnabled {
104+
err := middleware.GlobalPublisher.InitPublisher(ctx)
105+
if err != nil {
106+
return err
107+
}
108+
}
101109
synchromanager.Run(c.WorkerNumber, ctx.Done())
102110
return nil
103111
}
@@ -138,13 +146,23 @@ func Run(ctx context.Context, c *config.Config) error {
138146
defer close(done)
139147

140148
stopCh := ctx.Done()
149+
if middleware.PublisherEnabled {
150+
err := middleware.GlobalPublisher.InitPublisher(ctx)
151+
if err != nil {
152+
return
153+
}
154+
}
141155
synchromanager.Run(c.WorkerNumber, stopCh)
142156
},
143157
OnStoppedLeading: func() {
144158
klog.Info("leaderelection lost")
145159
if done != nil {
146160
<-done
147161
}
162+
if middleware.PublisherEnabled {
163+
middleware.GlobalPublisher.StopPublisher()
164+
components.EC.CloseChannels()
165+
}
148166
},
149167
},
150168
})

pkg/apiserver/apiserver.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,12 @@ func (config completedConfig) New() (*ClusterPediaServer, error) {
110110

111111
// init event cache pool
112112
eventStop := make(chan struct{})
113-
watchcomponents.InitEventCachePool(eventStop)
114-
err := middleware.GlobalSubscriber.InitSubscriber(eventStop)
115-
if err != nil {
116-
return nil, err
113+
if middleware.SubscriberEnabled {
114+
watchcomponents.InitEventCachePool(eventStop)
115+
err := middleware.GlobalSubscriber.InitSubscriber(eventStop)
116+
if err != nil {
117+
return nil, err
118+
}
117119
}
118120

119121
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config.ClientConfig)

pkg/generated/openapi/zz_generated.openapi.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/storage/internalstorage/resource_storage_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -195,13 +195,13 @@ func TestResourceStorage_genGetObjectQuery(t *testing.T) {
195195
}{
196196
{
197197
"empty",
198-
schema.GroupVersionResource{},
198+
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"},
199199
"",
200200
"",
201201
"",
202202
expected{
203-
`SELECT "object" FROM "resources" WHERE "cluster" = '' AND "group" = '' AND "name" = '' AND "namespace" = '' AND "resource" = '' AND "version" = '' ORDER BY "resources"."id" LIMIT 1`,
204-
"SELECT `object` FROM `resources` WHERE `cluster` = '' AND `group` = '' AND `name` = '' AND `namespace` = '' AND `resource` = '' AND `version` = '' ORDER BY `resources`.`id` LIMIT 1",
203+
`SELECT cluster_resource_version, object FROM "apps_v1_deployments" WHERE "deleted" = false AND "name" = '' AND "namespace" = '' ORDER BY "apps_v1_deployments"."id" LIMIT 1`,
204+
"SELECT cluster_resource_version, object FROM `apps_v1_deployments` WHERE `deleted` = false AND `name` = '' AND `namespace` = '' ORDER BY `apps_v1_deployments`.`id` LIMIT 1",
205205
"",
206206
},
207207
},
@@ -212,8 +212,8 @@ func TestResourceStorage_genGetObjectQuery(t *testing.T) {
212212
"ns-1",
213213
"resource-1",
214214
expected{
215-
`SELECT "object" FROM "resources" WHERE "cluster" = 'cluster-1' AND "group" = 'apps' AND "name" = 'resource-1' AND "namespace" = 'ns-1' AND "resource" = 'deployments' AND "version" = 'v1' ORDER BY "resources"."id" LIMIT 1`,
216-
"SELECT `object` FROM `resources` WHERE `cluster` = 'cluster-1' AND `group` = 'apps' AND `name` = 'resource-1' AND `namespace` = 'ns-1' AND `resource` = 'deployments' AND `version` = 'v1' ORDER BY `resources`.`id` LIMIT 1",
215+
`SELECT cluster_resource_version, object FROM "apps_v1_deployments" WHERE "cluster" = 'cluster-1' AND "deleted" = false AND "name" = 'resource-1' AND "namespace" = 'ns-1' ORDER BY "apps_v1_deployments"."id" LIMIT 1`,
216+
"SELECT cluster_resource_version, object FROM `apps_v1_deployments` WHERE `cluster` = 'cluster-1' AND `deleted` = false AND `name` = 'resource-1' AND `namespace` = 'ns-1' ORDER BY `apps_v1_deployments`.`id` LIMIT 1",
217217
"",
218218
},
219219
},
@@ -257,8 +257,8 @@ func TestResourceStorage_genListObjectQuery(t *testing.T) {
257257
appsv1.SchemeGroupVersion.WithResource("deployments"),
258258
&internal.ListOptions{},
259259
expected{
260-
`SELECT * FROM "resources" WHERE "group" = 'apps' AND "resource" = 'deployments' AND "version" = 'v1'`,
261-
"SELECT * FROM `resources` WHERE `group` = 'apps' AND `resource` = 'deployments' AND `version` = 'v1'",
260+
`SELECT * FROM "apps_v1_deployments"`,
261+
"SELECT * FROM `apps_v1_deployments`",
262262
"",
263263
},
264264
},
@@ -314,8 +314,8 @@ func TestResourceStorage_deleteObject(t *testing.T) {
314314
"",
315315
"",
316316
expected{
317-
`DELETE FROM "resources" WHERE "cluster" = '' AND "group" = 'apps' AND "name" = '' AND "namespace" = '' AND "resource" = 'deployments' AND "version" = 'v1'`,
318-
"DELETE FROM `resources` WHERE `cluster` = '' AND `group` = 'apps' AND `name` = '' AND `namespace` = '' AND `resource` = 'deployments' AND `version` = 'v1'",
317+
`DELETE FROM "apps_v1_deployments" WHERE "cluster" = '' AND "name" = '' AND "namespace" = ''`,
318+
"DELETE FROM `apps_v1_deployments` WHERE `cluster` = '' AND `name` = '' AND `namespace` = ''",
319319
"",
320320
},
321321
},
@@ -326,8 +326,8 @@ func TestResourceStorage_deleteObject(t *testing.T) {
326326
"ns-1",
327327
"resource-1",
328328
expected{
329-
`DELETE FROM "resources" WHERE "cluster" = 'cluster-1' AND "group" = 'apps' AND "name" = 'resource-1' AND "namespace" = 'ns-1' AND "resource" = 'deployments' AND "version" = 'v1'`,
330-
"DELETE FROM `resources` WHERE `cluster` = 'cluster-1' AND `group` = 'apps' AND `name` = 'resource-1' AND `namespace` = 'ns-1' AND `resource` = 'deployments' AND `version` = 'v1'",
329+
`DELETE FROM "apps_v1_deployments" WHERE "cluster" = 'cluster-1' AND "name" = 'resource-1' AND "namespace" = 'ns-1'`,
330+
"DELETE FROM `apps_v1_deployments` WHERE `cluster` = 'cluster-1' AND `name` = 'resource-1' AND `namespace` = 'ns-1'",
331331
"",
332332
},
333333
},

pkg/storage/internalstorage/storage.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
5959
KeyFunc: utils.GetKeyFunc(gvr, config.Namespaced),
6060
}
6161

62-
// initEventCache is true when Apiserver starts, false when clustersynchro-manager starts
63-
if initEventCache {
62+
// SubscriberEnabled is true when Apiserver starts and middleware enabled
63+
if middleware.SubscriberEnabled {
6464
var cache *watchcomponents.EventCache
6565
buffer := watchcomponents.GetMultiClusterEventPool().GetClusterBufferByGVR(gvr)
6666
cachePool := watchcomponents.GetInitEventCachePool()
@@ -88,7 +88,7 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
8888

8989
resourceStorage.buffer = buffer
9090
resourceStorage.eventCache = cache
91-
} else {
91+
} else if middleware.PublisherEnabled { // PublisherEnabled is true when clustersynchro-manager starts and middleware enabled
9292
err := middleware.GlobalPublisher.PublishTopic(gvr, config.Codec)
9393
if err != nil {
9494
return nil, err

pkg/synchromanager/clustersynchro/resource_synchro.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features"
2828
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
2929
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
30+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
3031
)
3132

3233
type ResourceSynchroConfig struct {
@@ -426,19 +427,23 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
426427
Published: true,
427428
}
428429
synchro.rvsLock.Unlock()
429-
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
430-
if err != nil {
431-
return
430+
if middleware.PublisherEnabled {
431+
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
432+
if err != nil {
433+
return
434+
}
432435
}
433436
}
434437
} else {
435438
handler, callback = synchro.deleteResource, func(_ runtime.Object, eventType watch.EventType) {
436439
synchro.rvsLock.Lock()
437440
delete(synchro.rvs, key)
438441
synchro.rvsLock.Unlock()
439-
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
440-
if err != nil {
441-
return
442+
if middleware.PublisherEnabled {
443+
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
444+
if err != nil {
445+
return
446+
}
442447
}
443448
}
444449
}

pkg/watcher/middleware/publisher.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
)
1212

1313
var GlobalPublisher Publisher
14+
var PublisherEnabled bool = false
1415

1516
type Publisher interface {
1617
InitPublisher(ctx context.Context) error

pkg/watcher/middleware/rabbitmq/register.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ const (
1515
SubscribeerName = "rabbitmq"
1616
)
1717

18-
func NewPulisher(mo *options.MiddlerwareOptions) (middleware.Publisher, error) {
18+
func NewPulisher(mo *options.MiddlewareOptions) (middleware.Publisher, error) {
1919
if mo.MaxConnections <= 0 {
2020
mo.MaxConnections = 3
2121
}
@@ -34,7 +34,7 @@ func NewPulisher(mo *options.MiddlerwareOptions) (middleware.Publisher, error) {
3434
return publisher, nil
3535
}
3636

37-
func NewSubscriber(mo *options.MiddlerwareOptions) (middleware.Subscriber, error) {
37+
func NewSubscriber(mo *options.MiddlewareOptions) (middleware.Subscriber, error) {
3838
if mo.MaxConnections <= 0 {
3939
mo.MaxConnections = 3
4040
}

0 commit comments

Comments
 (0)