Skip to content

Commit a62af50

Browse files
OrangeBaoxyz2277wuyingjun-luckyzhouhaoA1
committed
divide multiple tables by gvr and fix watch bug
Co-authored-by: zhangyongxi <[email protected]> Co-authored-by: wuyingjun <[email protected]> Co-authored-by: zhouhao <[email protected]> Signed-off-by: baoyinghai <[email protected]>
1 parent e426f11 commit a62af50

File tree

9 files changed

+139
-140
lines changed

9 files changed

+139
-140
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package internalstorage
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"k8s.io/apimachinery/pkg/runtime/schema"
8+
)
9+
10+
// GetTable return table name using gvr string
11+
func GetTable(gvr schema.GroupVersionResource) string {
12+
group := strings.ReplaceAll(gvr.Group, ".", "_")
13+
return fmt.Sprintf("%s_%s_%s", group, gvr.Version, gvr.Resource)
14+
}

pkg/storage/internalstorage/register.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,6 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) {
9393
sqlDB.SetMaxOpenConns(connPool.MaxOpenConns)
9494
sqlDB.SetConnMaxLifetime(connPool.ConnMaxLifetime)
9595

96-
if err := db.AutoMigrate(&Resource{}); err != nil {
97-
return nil, err
98-
}
99-
10096
return &StorageFactory{db}, nil
10197
}
10298

pkg/storage/internalstorage/resource_storage.go

Lines changed: 42 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,12 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
7474
condition := map[string]interface{}{
7575
"namespace": metaobj.GetNamespace(),
7676
"name": metaobj.GetName(),
77-
"group": s.storageGroupResource.Group,
78-
"version": s.storageVersion.Version,
79-
"resource": s.storageGroupResource.Resource,
8077
"deleted": true,
8178
}
8279
if cluster != "" {
8380
condition["cluster"] = cluster
8481
}
85-
dbResult := s.db.Model(&Resource{}).Where(condition).Delete(&Resource{})
82+
dbResult := s.db.Model(&Resource{}).Table(GetTable(newGvr(s))).Where(condition).Delete(&Resource{})
8683
if dbResult.Error != nil {
8784
err = InterpretResourceDBError(cluster, metaobj.GetName(), dbResult.Error)
8885
return fmt.Errorf("[Create]: Object %s/%s has been created failed in step one, err: %v", metaobj.GetName(), metaobj.GetNamespace(), err)
@@ -117,7 +114,7 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
117114
resource.DeletedAt = sql.NullTime{Time: deletedAt.Time, Valid: true}
118115
}
119116

120-
result := s.db.WithContext(ctx).Create(&resource)
117+
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Create(&resource)
121118
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
122119
}
123120

@@ -162,23 +159,17 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
162159
updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true}
163160
}
164161

165-
result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{
162+
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Where(map[string]interface{}{
166163
"cluster": cluster,
167-
"group": s.storageGroupResource.Group,
168-
"version": s.storageVersion.Version,
169-
"resource": s.storageGroupResource.Resource,
170164
"namespace": metaobj.GetNamespace(),
171165
"name": metaobj.GetName(),
172166
}).Updates(updatedResource)
173167
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
174168
}
175169

176170
func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB {
177-
return s.db.Model(&Resource{}).Where(map[string]interface{}{
171+
return s.db.Table(GetTable(newGvr(s))).Model(&Resource{}).Where(map[string]interface{}{
178172
"cluster": cluster,
179-
"group": s.storageGroupResource.Group,
180-
"version": s.storageVersion.Version,
181-
"resource": s.storageGroupResource.Resource,
182173
"namespace": namespace,
183174
"name": name,
184175
}).Delete(&Resource{})
@@ -211,32 +202,26 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim
211202
condition := map[string]interface{}{
212203
"cluster": cluster,
213204
"namespace": metaobj.GetNamespace(),
214-
"group": s.storageGroupResource.Group,
215-
"version": s.storageVersion.Version,
216-
"resource": s.storageGroupResource.Resource,
217205
}
218206
if metaobj.GetName() != "" {
219207
condition["name"] = metaobj.GetName()
220208
}
221209

222-
result := s.db.WithContext(ctx).Model(&Resource{}).Where(condition).Updates(updatedResource)
210+
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Where(condition).Updates(updatedResource)
223211
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
224212
}
225213

226214
func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
227215
condition := map[string]interface{}{
228216
"namespace": namespace,
229217
"name": name,
230-
"group": s.storageGroupResource.Group,
231-
"version": s.storageVersion.Version,
232-
"resource": s.storageGroupResource.Resource,
233218
"deleted": false,
234219
}
235220

236221
if cluster != "" {
237222
condition["cluster"] = cluster
238223
}
239-
return s.db.WithContext(ctx).Model(&Resource{}).Select("cluster_resource_version, object").Where(condition)
224+
return s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Select("cluster_resource_version, object").Where(condition)
240225
}
241226

242227
func (s *ResourceStorage) GetObj(ctx context.Context, cluster, namespace, name string) (runtime.Object, error) {
@@ -245,12 +230,9 @@ func (s *ResourceStorage) GetObj(ctx context.Context, cluster, namespace, name s
245230
"namespace": namespace,
246231
"name": name,
247232
"cluster": cluster,
248-
"group": s.storageGroupResource.Group,
249-
"version": s.storageVersion.Version,
250-
"resource": s.storageGroupResource.Resource,
251233
}
252234

253-
result := s.db.WithContext(ctx).Model(&Resource{}).
235+
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).
254236
Select("cluster_resource_version, object").Where(condition).First(&resource)
255237
if result.Error != nil {
256238
return nil, InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
@@ -268,19 +250,37 @@ func (s *ResourceStorage) GetObj(ctx context.Context, cluster, namespace, name s
268250
return obj, nil
269251
}
270252

253+
func (s *ResourceStorage) GenGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
254+
condition := map[string]interface{}{
255+
"namespace": namespace,
256+
"name": name,
257+
"deleted": false,
258+
}
259+
260+
if cluster != "" {
261+
condition["cluster"] = cluster
262+
}
263+
return s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Select("cluster_resource_version, object").Where(condition)
264+
}
265+
271266
func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
272-
var objects [][]byte
273-
if result := s.genGetObjectQuery(ctx, cluster, namespace, name).First(&objects); result.Error != nil {
267+
var resource Resource
268+
if result := s.GenGetObjectQuery(ctx, cluster, namespace, name).First(&resource); result.Error != nil {
274269
return InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
275270
}
276271

277-
obj, _, err := s.codec.Decode(objects[0], nil, into)
272+
obj, _, err := s.codec.Decode(resource.Object, nil, into)
278273
if err != nil {
279274
return err
280275
}
281276
if obj != into {
282277
return fmt.Errorf("Failed to decode resource, into is %T", into)
283278
}
279+
metaObj, err := meta.Accessor(obj)
280+
if err != nil {
281+
return err
282+
}
283+
metaObj.SetResourceVersion(utils.ParseInt642Str(resource.ClusterResourceVersion))
284284
return nil
285285
}
286286

@@ -293,14 +293,11 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
293293
var condition map[string]interface{}
294294
if !isAll {
295295
condition = map[string]interface{}{
296-
"group": s.storageGroupResource.Group,
297-
"version": s.storageVersion.Version,
298-
"resource": s.storageGroupResource.Resource,
299-
"deleted": false,
296+
"deleted": false,
300297
}
301298
}
302299

303-
query := s.db.WithContext(ctx).Model(&Resource{}).Where(condition)
300+
query := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Where(condition)
304301
offset, amount, query, err := applyListOptionsToResourceQuery(s.db, query, opts)
305302
return offset, amount, query, result, err
306303
}
@@ -309,11 +306,9 @@ func (s *ResourceStorage) genListQuery(ctx context.Context, newfunc func() runti
309306
var result [][]byte
310307

311308
condition := map[string]interface{}{
312-
"group": s.storageGroupResource.Group,
313-
"version": s.storageVersion.Version,
314-
"resource": s.storageGroupResource.Resource,
309+
"deleted": false,
315310
}
316-
query := s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(condition)
311+
query := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Select("object").Where(condition)
317312
_, _, query, err := applyListOptionsToResourceQuery(s.db, query, opts)
318313
if err != nil {
319314
return nil, err
@@ -530,6 +525,14 @@ func (s *ResourceStorage) fetchInitEvents(ctx context.Context, rv string, newfun
530525
}
531526
}
532527

528+
func newGvr(s *ResourceStorage) schema.GroupVersionResource {
529+
return schema.GroupVersionResource{
530+
Group: s.storageGroupResource.Group,
531+
Version: s.storageVersion.Version,
532+
Resource: s.storageGroupResource.Resource,
533+
}
534+
}
535+
533536
func getObjectListAndMaxCrv(objList []Object, onlyMetada bool) ([]Object, []string, string, error) {
534537
crvs := make([]string, 0, len(objList))
535538
var maxCrv int64 = 0
@@ -575,12 +578,7 @@ func getObjectListAndMaxCrv(objList []Object, onlyMetada bool) ([]Object, []stri
575578
func (s *ResourceStorage) GetMaxCrv(ctx context.Context) (string, error) {
576579
maxCrv := "0"
577580
var metadataList ResourceMetadataList
578-
condition := map[string]interface{}{
579-
"group": s.storageGroupResource.Group,
580-
"version": s.storageVersion.Version,
581-
"resource": s.storageGroupResource.Resource,
582-
}
583-
result := s.db.WithContext(ctx).Model(&Resource{}).Select("cluster_resource_version").Where(condition).Order("cluster_resource_version DESC").Limit(1).Find(&metadataList)
581+
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Select("cluster_resource_version").Order("cluster_resource_version DESC").Limit(1).Find(&metadataList)
584582
if result.Error != nil {
585583
return maxCrv, InterpretResourceDBError("", s.storageGroupResource.Resource, result.Error)
586584
}
@@ -610,15 +608,12 @@ func (s *ResourceStorage) PublishEvent(ctx context.Context, wc *watchcomponents.
610608
}
611609

612610
condition := map[string]interface{}{
613-
"group": s.storageGroupResource.Group,
614-
"version": s.storageVersion.Version,
615-
"resource": s.storageGroupResource.Resource,
616611
"cluster": wc.Cluster,
617612
"namespace": metaObj.GetNamespace(),
618613
"name": metaObj.GetName(),
619614
}
620615

621-
s.db.WithContext(ctx).Model(&Resource{}).Where(condition).Updates(updatedResource)
616+
s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Where(condition).Updates(updatedResource)
622617
}
623618

624619
func (s *ResourceStorage) GenCrv2Event(event *watch.Event) {

pkg/storage/internalstorage/storage.go

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,22 @@ package internalstorage
33
import (
44
"context"
55
"fmt"
6+
"sync"
67

78
"gorm.io/gorm"
89
"k8s.io/apimachinery/pkg/runtime/schema"
910
"k8s.io/apimachinery/pkg/watch"
1011

1112
internal "github.com/clusterpedia-io/api/clusterpedia"
1213
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
14+
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer"
1315
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
1416
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
1517
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
1618
)
1719

20+
var mutex sync.Mutex
21+
1822
type StorageFactory struct {
1923
db *gorm.DB
2024
}
@@ -30,6 +34,18 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
3034
Resource: config.StorageGroupResource.Resource,
3135
}
3236

37+
mutex.Lock()
38+
defer mutex.Unlock()
39+
if !s.db.Migrator().HasTable(GetTable(gvr)) {
40+
if err := s.db.AutoMigrate(&Resource{}); err != nil {
41+
return nil, err
42+
}
43+
err := s.db.Migrator().RenameTable("resources", GetTable(gvr))
44+
if err != nil {
45+
return nil, err
46+
}
47+
}
48+
3349
resourceStorage := &ResourceStorage{
3450
db: s.db,
3551
codec: config.Codec,
@@ -99,11 +115,26 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes
99115

100116
func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
101117
var resources []Resource
102-
result := f.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version").
103-
Where(map[string]interface{}{"cluster": cluster}).
104-
Find(&resources)
105-
if result.Error != nil {
106-
return nil, InterpretDBError(cluster, result.Error)
118+
mutex.Lock()
119+
tables, err := f.db.Migrator().GetTables()
120+
if err != nil {
121+
mutex.Unlock()
122+
return nil, err
123+
}
124+
mutex.Unlock()
125+
for _, table := range tables {
126+
var tableResources []Resource
127+
result := f.db.WithContext(ctx).Table(table).Select("group", "version", "resource",
128+
"namespace", "name", "resource_version", "deleted", "published").
129+
Where(map[string]interface{}{"cluster": cluster, "deleted": false}).
130+
//In case deleted event be losted when synchro manager do a leaderelection or reboot
131+
Or(map[string]interface{}{"cluster": cluster, "deleted": true, "published": false}).
132+
Find(&tableResources)
133+
if result.Error != nil {
134+
return nil, InterpretDBError(cluster, result.Error)
135+
}
136+
137+
resources = append(resources, tableResources...)
107138
}
108139

109140
resourceversions := make(map[schema.GroupVersionResource]map[string]interface{})
@@ -119,22 +150,37 @@ func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string
119150
if resource.Namespace != "" {
120151
key = resource.Namespace + "/" + resource.Name
121152
}
122-
versions[key] = resource.ResourceVersion
153+
versions[key] = informer.StorageElement{
154+
Version: resource.ResourceVersion,
155+
Deleted: resource.Deleted,
156+
Published: resource.Published,
157+
Name: resource.Name,
158+
Namespace: resource.Namespace,
159+
}
123160
}
124161
return resourceversions, nil
125162
}
126163

127164
func (f *StorageFactory) CleanCluster(ctx context.Context, cluster string) error {
128-
result := f.db.WithContext(ctx).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
129-
return InterpretDBError(cluster, result.Error)
165+
mutex.Lock()
166+
tables, err := f.db.Migrator().GetTables()
167+
if err != nil {
168+
mutex.Unlock()
169+
return err
170+
}
171+
mutex.Unlock()
172+
for _, table := range tables {
173+
result := f.db.WithContext(ctx).Table(table).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
174+
if result.Error != nil {
175+
return InterpretDBError(cluster, result.Error)
176+
}
177+
}
178+
return nil
130179
}
131180

132181
func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error {
133-
result := s.db.Where(map[string]interface{}{
134-
"cluster": cluster,
135-
"group": gvr.Group,
136-
"version": gvr.Version,
137-
"resource": gvr.Resource,
182+
result := s.db.Table(GetTable(gvr)).Where(map[string]interface{}{
183+
"cluster": cluster,
138184
}).Delete(&Resource{})
139185
return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), result.Error)
140186
}

pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,15 @@ func (informer *resourceVersionInformer) HandleDeltas(deltas cache.Deltas, isInI
7979
}
8080

8181
informer.handler.OnAdd(d.Object, isInInitialList)
82-
continue
82+
break
8383
}
8484

8585
if d.Type == cache.Replaced {
8686
if v := compareResourceVersion(d.Object, version); v <= 0 {
8787
if v == 0 {
8888
informer.handler.OnSync(d.Object)
8989
}
90-
continue
90+
break
9191
}
9292
}
9393

0 commit comments

Comments
 (0)