Skip to content

Commit 05da669

Browse files
OrangeBaoxyz2277wuyingjun-luckyzhouhaoA1
committed
add a control switch and fix watch bug
Co-authored-by: zhangyongxi <[email protected]> Co-authored-by: wuyingjun <[email protected]> Co-authored-by: zhouhao <[email protected]> Signed-off-by: baoyinghai <[email protected]>
1 parent 21b15f7 commit 05da669

File tree

20 files changed

+161
-125
lines changed

20 files changed

+161
-125
lines changed

cmd/apiserver/app/options/options.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
2727
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
2828
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
29+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
2930
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
3031
)
3132

@@ -46,7 +47,7 @@ type ClusterPediaServerOptions struct {
4647

4748
Storage *storageoptions.StorageOptions
4849

49-
Subscriber *watchoptions.MiddlerwareOptions
50+
Subscriber *watchoptions.MiddlewareOptions
5051
}
5152

5253
func NewServerOptions() *ClusterPediaServerOptions {
@@ -129,11 +130,13 @@ func (o *ClusterPediaServerOptions) Config() (*apiserver.Config, error) {
129130
StorageFactory: storage,
130131
}
131132

132-
err = watcher.NewSubscriber(o.Subscriber)
133-
watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize)
134-
135-
if err != nil {
136-
return nil, err
133+
middleware.SubscriberEnabled = o.Subscriber.Enabled
134+
if middleware.SubscriberEnabled {
135+
err = watcher.NewSubscriber(o.Subscriber)
136+
if err != nil {
137+
return nil, err
138+
}
139+
watchcomponents.InitEventCacheSize(o.Subscriber.CacheSize)
137140
}
138141

139142
return config, nil

cmd/clustersynchro-manager/app/options/options.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
2929
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
3030
"github.com/clusterpedia-io/clusterpedia/pkg/watcher"
31+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
3132
watchoptions "github.com/clusterpedia-io/clusterpedia/pkg/watcher/options"
3233
)
3334

@@ -50,7 +51,7 @@ type Options struct {
5051
WorkerNumber int // WorkerNumber is the number of worker goroutines
5152
PageSizeForResourceSync int64
5253
ShardingName string
53-
Publisher *watchoptions.MiddlerwareOptions
54+
Publisher *watchoptions.MiddlewareOptions
5455
}
5556

5657
func NewClusterSynchroManagerOptions() (*Options, error) {
@@ -137,9 +138,12 @@ func (o *Options) Config() (*config.Config, error) {
137138
return nil, err
138139
}
139140

140-
err = watcher.NewPulisher(o.Publisher)
141-
if err != nil {
142-
return nil, err
141+
middleware.PublisherEnabled = o.Publisher.Enabled
142+
if middleware.PublisherEnabled {
143+
err = watcher.NewPulisher(o.Publisher)
144+
if err != nil {
145+
return nil, err
146+
}
143147
}
144148

145149
kubeconfig, err := clientcmd.BuildConfigFromFlags(o.Master, o.Kubeconfig)

cmd/clustersynchro-manager/app/synchro.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager"
2727
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
2828
"github.com/clusterpedia-io/clusterpedia/pkg/version/verflag"
29+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
30+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
2931
)
3032

3133
func init() {
@@ -98,6 +100,12 @@ func Run(ctx context.Context, c *config.Config) error {
98100
}
99101

100102
if !c.LeaderElection.LeaderElect {
103+
if middleware.PublisherEnabled {
104+
err := middleware.GlobalPublisher.InitPublisher(ctx)
105+
if err != nil {
106+
return err
107+
}
108+
}
101109
synchromanager.Run(c.WorkerNumber, ctx.Done())
102110
return nil
103111
}
@@ -138,13 +146,23 @@ func Run(ctx context.Context, c *config.Config) error {
138146
defer close(done)
139147

140148
stopCh := ctx.Done()
149+
if middleware.PublisherEnabled {
150+
err := middleware.GlobalPublisher.InitPublisher(ctx)
151+
if err != nil {
152+
return
153+
}
154+
}
141155
synchromanager.Run(c.WorkerNumber, stopCh)
142156
},
143157
OnStoppedLeading: func() {
144158
klog.Info("leaderelection lost")
145159
if done != nil {
146160
<-done
147161
}
162+
if middleware.PublisherEnabled {
163+
middleware.GlobalPublisher.StopPublisher()
164+
components.EC.CloseChannels()
165+
}
148166
},
149167
},
150168
})

pkg/apiserver/apiserver.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,12 @@ func (config completedConfig) New() (*ClusterPediaServer, error) {
110110

111111
// init event cache pool
112112
eventStop := make(chan struct{})
113-
watchcomponents.InitEventCachePool(eventStop)
114-
err := middleware.GlobalSubscriber.InitSubscriber(eventStop)
115-
if err != nil {
116-
return nil, err
113+
if middleware.SubscriberEnabled {
114+
watchcomponents.InitEventCachePool(eventStop)
115+
err := middleware.GlobalSubscriber.InitSubscriber(eventStop)
116+
if err != nil {
117+
return nil, err
118+
}
117119
}
118120

119121
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config.ClientConfig)

pkg/generated/openapi/zz_generated.openapi.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/storage/internalstorage/resource_storage.go

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -269,19 +269,40 @@ func (s *ResourceStorage) GetObj(ctx context.Context, cluster, namespace, name s
269269
return obj, nil
270270
}
271271

272+
func (s *ResourceStorage) GenGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
273+
condition := map[string]interface{}{
274+
"namespace": namespace,
275+
"name": name,
276+
"group": s.storageGroupResource.Group,
277+
"version": s.storageVersion.Version,
278+
"resource": s.storageGroupResource.Resource,
279+
"deleted": false,
280+
}
281+
282+
if cluster != "" {
283+
condition["cluster"] = cluster
284+
}
285+
return s.db.WithContext(ctx).Model(&Resource{}).Select("cluster_resource_version, object").Where(condition)
286+
}
287+
272288
func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
273-
var objects [][]byte
274-
if result := s.genGetObjectQuery(ctx, cluster, namespace, name).First(&objects); result.Error != nil {
289+
var resource Resource
290+
if result := s.GenGetObjectQuery(ctx, cluster, namespace, name).First(&resource); result.Error != nil {
275291
return InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
276292
}
277293

278-
obj, _, err := s.codec.Decode(objects[0], nil, into)
294+
obj, _, err := s.codec.Decode(resource.Object, nil, into)
279295
if err != nil {
280296
return err
281297
}
282298
if obj != into {
283299
return fmt.Errorf("Failed to decode resource, into is %T", into)
284300
}
301+
metaObj, err := meta.Accessor(obj)
302+
if err != nil {
303+
return err
304+
}
305+
metaObj.SetResourceVersion(utils.ParseInt642Str(resource.ClusterResourceVersion))
285306
return nil
286307
}
287308

@@ -291,14 +312,13 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
291312
result = &ResourceMetadataList{}
292313
}
293314

294-
var condition map[string]interface{}
315+
condition := map[string]interface{}{
316+
"group": s.storageGroupResource.Group,
317+
"version": s.storageVersion.Version,
318+
"resource": s.storageGroupResource.Resource,
319+
}
295320
if !isAll {
296-
condition = map[string]interface{}{
297-
"group": s.storageGroupResource.Group,
298-
"version": s.storageVersion.Version,
299-
"resource": s.storageGroupResource.Resource,
300-
"deleted": false,
301-
}
321+
condition["deleted"] = false
302322
}
303323

304324
query := s.db.WithContext(ctx).Model(&Resource{}).Where(condition)
@@ -313,6 +333,7 @@ func (s *ResourceStorage) genListQuery(ctx context.Context, newfunc func() runti
313333
"group": s.storageGroupResource.Group,
314334
"version": s.storageVersion.Version,
315335
"resource": s.storageGroupResource.Resource,
336+
"deleted": false,
316337
}
317338
query := s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(condition)
318339
_, _, query, err := applyListOptionsToResourceQuery(s.db, query, opts)

pkg/storage/internalstorage/resource_storage_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,8 @@ func TestResourceStorage_genGetObjectQuery(t *testing.T) {
200200
"",
201201
"",
202202
expected{
203-
`SELECT "object" FROM "resources" WHERE "cluster" = '' AND "group" = '' AND "name" = '' AND "namespace" = '' AND "resource" = '' AND "version" = '' ORDER BY "resources"."id" LIMIT 1`,
204-
"SELECT `object` FROM `resources` WHERE `cluster` = '' AND `group` = '' AND `name` = '' AND `namespace` = '' AND `resource` = '' AND `version` = '' ORDER BY `resources`.`id` LIMIT 1",
203+
`SELECT cluster_resource_version, object FROM "resources" WHERE "deleted" = false AND "group" = '' AND "name" = '' AND "namespace" = '' AND "resource" = '' AND "version" = '' ORDER BY "resources"."id" LIMIT 1`,
204+
"SELECT cluster_resource_version, object FROM `resources` WHERE `deleted` = false AND `group` = '' AND `name` = '' AND `namespace` = '' AND `resource` = '' AND `version` = '' ORDER BY `resources`.`id` LIMIT 1",
205205
"",
206206
},
207207
},
@@ -212,8 +212,8 @@ func TestResourceStorage_genGetObjectQuery(t *testing.T) {
212212
"ns-1",
213213
"resource-1",
214214
expected{
215-
`SELECT "object" FROM "resources" WHERE "cluster" = 'cluster-1' AND "group" = 'apps' AND "name" = 'resource-1' AND "namespace" = 'ns-1' AND "resource" = 'deployments' AND "version" = 'v1' ORDER BY "resources"."id" LIMIT 1`,
216-
"SELECT `object` FROM `resources` WHERE `cluster` = 'cluster-1' AND `group` = 'apps' AND `name` = 'resource-1' AND `namespace` = 'ns-1' AND `resource` = 'deployments' AND `version` = 'v1' ORDER BY `resources`.`id` LIMIT 1",
215+
`SELECT cluster_resource_version, object FROM "resources" WHERE "cluster" = 'cluster-1' AND "deleted" = false AND "group" = 'apps' AND "name" = 'resource-1' AND "namespace" = 'ns-1' AND "resource" = 'deployments' AND "version" = 'v1' ORDER BY "resources"."id" LIMIT 1`,
216+
"SELECT cluster_resource_version, object FROM `resources` WHERE `cluster` = 'cluster-1' AND `deleted` = false AND `group` = 'apps' AND `name` = 'resource-1' AND `namespace` = 'ns-1' AND `resource` = 'deployments' AND `version` = 'v1' ORDER BY `resources`.`id` LIMIT 1",
217217
"",
218218
},
219219
},

pkg/storage/internalstorage/storage.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
internal "github.com/clusterpedia-io/api/clusterpedia"
1212
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
13+
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer"
1314
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
1415
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
1516
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
@@ -43,8 +44,8 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
4344
KeyFunc: utils.GetKeyFunc(gvr, config.Namespaced),
4445
}
4546

46-
// initEventCache is true when Apiserver starts, false when clustersynchro-manager starts
47-
if initEventCache {
47+
// SubscriberEnabled is true when Apiserver starts and middleware enabled
48+
if middleware.SubscriberEnabled {
4849
var cache *watchcomponents.EventCache
4950
buffer := watchcomponents.GetMultiClusterEventPool().GetClusterBufferByGVR(gvr)
5051
cachePool := watchcomponents.GetInitEventCachePool()
@@ -72,7 +73,7 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
7273

7374
resourceStorage.buffer = buffer
7475
resourceStorage.eventCache = cache
75-
} else {
76+
} else if middleware.PublisherEnabled { // PublisherEnabled is true when clustersynchro-manager starts and middleware enabled
7677
err := middleware.GlobalPublisher.PublishTopic(gvr, config.Codec)
7778
if err != nil {
7879
return nil, err
@@ -99,8 +100,11 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes
99100

100101
func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
101102
var resources []Resource
102-
result := f.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version").
103-
Where(map[string]interface{}{"cluster": cluster}).
103+
result := f.db.WithContext(ctx).Select("group", "version", "resource",
104+
"namespace", "name", "resource_version", "deleted", "published").
105+
Where(map[string]interface{}{"cluster": cluster, "deleted": false}).
106+
//In case deleted event be losted when synchro manager do a leaderelection or reboot
107+
Or(map[string]interface{}{"cluster": cluster, "deleted": true, "published": false}).
104108
Find(&resources)
105109
if result.Error != nil {
106110
return nil, InterpretDBError(cluster, result.Error)
@@ -119,7 +123,13 @@ func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string
119123
if resource.Namespace != "" {
120124
key = resource.Namespace + "/" + resource.Name
121125
}
122-
versions[key] = resource.ResourceVersion
126+
versions[key] = informer.StorageElement{
127+
Version: resource.ResourceVersion,
128+
Deleted: resource.Deleted,
129+
Published: resource.Published,
130+
Name: resource.Name,
131+
Namespace: resource.Namespace,
132+
}
123133
}
124134
return resourceversions, nil
125135
}

pkg/synchromanager/clustersynchro/informer/resourceversion_storage.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,21 @@ func (c *ResourceVersionStorage) Add(obj interface{}) error {
3333
if err != nil {
3434
return cache.KeyError{Obj: obj, Err: err}
3535
}
36+
37+
c.cacheStorage.Delete(key)
38+
3639
accessor, err := meta.Accessor(obj)
3740
if err != nil {
3841
return err
3942
}
4043

41-
c.cacheStorage.Add(key, accessor.GetResourceVersion())
44+
c.cacheStorage.Add(key, StorageElement{
45+
Version: accessor.GetResourceVersion(),
46+
Deleted: false,
47+
Published: true,
48+
Name: accessor.GetName(),
49+
Namespace: accessor.GetNamespace(),
50+
})
4251
return nil
4352
}
4453

@@ -52,7 +61,13 @@ func (c *ResourceVersionStorage) Update(obj interface{}) error {
5261
return err
5362
}
5463

55-
c.cacheStorage.Update(key, accessor.GetResourceVersion())
64+
c.cacheStorage.Update(key, StorageElement{
65+
Version: accessor.GetResourceVersion(),
66+
Deleted: false,
67+
Published: true,
68+
Name: accessor.GetName(),
69+
Namespace: accessor.GetNamespace(),
70+
})
5671
return nil
5772
}
5873

pkg/synchromanager/clustersynchro/resource_synchro.go

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features"
2828
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
2929
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
30+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
3031
)
3132

3233
type ResourceSynchroConfig struct {
@@ -394,22 +395,6 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
394395
if !ok {
395396
if _, ok = event.Object.(cache.DeletedFinalStateUnknown); !ok {
396397
return
397-
} else {
398-
dfs := event.Object.(cache.DeletedFinalStateUnknown)
399-
var se informer.StorageElement
400-
if se, ok = dfs.Obj.(informer.StorageElement); !ok {
401-
return
402-
}
403-
var err error
404-
obj, err = synchro.storage.GetObj(synchro.ctx, synchro.cluster, se.Namespace, se.Name)
405-
if err != nil {
406-
return
407-
}
408-
metaObj, err := meta.Accessor(obj)
409-
if err == nil {
410-
klog.Warning("DeletedFinalStateUnknown, name: ", metaObj.GetName(), ", time: ", metaObj.GetDeletionTimestamp(),
411-
", kind: ", obj.GetObjectKind().GroupVersionKind().Kind, ", cluster: ", synchro.cluster)
412-
}
413398
}
414399
}
415400
key, _ := cache.MetaNamespaceKeyFunc(obj)
@@ -442,19 +427,23 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
442427
Published: true,
443428
}
444429
synchro.rvsLock.Unlock()
445-
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
446-
if err != nil {
447-
return
430+
if middleware.PublisherEnabled {
431+
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
432+
if err != nil {
433+
return
434+
}
448435
}
449436
}
450437
} else {
451438
handler, callback = synchro.deleteResource, func(_ runtime.Object, eventType watch.EventType) {
452439
synchro.rvsLock.Lock()
453440
delete(synchro.rvs, key)
454441
synchro.rvsLock.Unlock()
455-
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
456-
if err != nil {
457-
return
442+
if middleware.PublisherEnabled {
443+
err := synchro.storage.ProcessEvent(context.TODO(), eventType, obj, synchro.cluster)
444+
if err != nil {
445+
return
446+
}
458447
}
459448
}
460449
}

0 commit comments

Comments
 (0)