Skip to content

Commit b8e1f74

Browse files
OrangeBaoxyz2277wuyingjun-luckyzhouhaoA1
committed
divide multiple tables by gvr and fix watch bug
Co-authored-by: zhangyongxi <[email protected]> Co-authored-by: wuyingjun <[email protected]> Co-authored-by: zhouhao <[email protected]> Signed-off-by: baoyinghai <[email protected]>
1 parent 57466f7 commit b8e1f74

File tree

9 files changed

+140
-138
lines changed

9 files changed

+140
-138
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package internalstorage
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"k8s.io/apimachinery/pkg/runtime/schema"
8+
)
9+
10+
// GetTable return table name using gvr string
11+
func GetTable(gvr schema.GroupVersionResource) string {
12+
group := strings.ReplaceAll(gvr.Group, ".", "_")
13+
return fmt.Sprintf("%s_%s_%s", group, gvr.Version, gvr.Resource)
14+
}

pkg/storage/internalstorage/register.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,6 @@ func NewStorageFactory(configPath string) (storage.StorageFactory, error) {
9393
sqlDB.SetMaxOpenConns(connPool.MaxOpenConns)
9494
sqlDB.SetConnMaxLifetime(connPool.ConnMaxLifetime)
9595

96-
if err := db.AutoMigrate(&Resource{}); err != nil {
97-
return nil, err
98-
}
99-
10096
return &StorageFactory{db}, nil
10197
}
10298

pkg/storage/internalstorage/resource_storage.go

Lines changed: 42 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -75,15 +75,12 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
7575
condition := map[string]interface{}{
7676
"namespace": metaobj.GetNamespace(),
7777
"name": metaobj.GetName(),
78-
"group": s.storageGroupResource.Group,
79-
"version": s.storageVersion.Version,
80-
"resource": s.storageGroupResource.Resource,
8178
"deleted": true,
8279
}
8380
if cluster != "" {
8481
condition["cluster"] = cluster
8582
}
86-
dbResult := s.db.Model(&Resource{}).Where(condition).Delete(&Resource{})
83+
dbResult := s.db.Model(&Resource{}).Table(GetTable(newGvr(s))).Where(condition).Delete(&Resource{})
8784
if dbResult.Error != nil {
8885
err = InterpretResourceDBError(cluster, metaobj.GetName(), dbResult.Error)
8986
return fmt.Errorf("[Create]: Object %s/%s has been created failed in step one, err: %v", metaobj.GetName(), metaobj.GetNamespace(), err)
@@ -118,7 +115,7 @@ func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtim
118115
resource.DeletedAt = sql.NullTime{Time: deletedAt.Time, Valid: true}
119116
}
120117

121-
result := s.db.WithContext(ctx).Create(&resource)
118+
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Create(&resource)
122119
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
123120
}
124121

@@ -163,23 +160,17 @@ func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtim
163160
updatedResource["deleted_at"] = sql.NullTime{Time: deletedAt.Time, Valid: true}
164161
}
165162

166-
result := s.db.WithContext(ctx).Model(&Resource{}).Where(map[string]interface{}{
163+
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Where(map[string]interface{}{
167164
"cluster": cluster,
168-
"group": s.storageGroupResource.Group,
169-
"version": s.storageVersion.Version,
170-
"resource": s.storageGroupResource.Resource,
171165
"namespace": metaobj.GetNamespace(),
172166
"name": metaobj.GetName(),
173167
}).Updates(updatedResource)
174168
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
175169
}
176170

177171
func (s *ResourceStorage) deleteObject(cluster, namespace, name string) *gorm.DB {
178-
return s.db.Model(&Resource{}).Where(map[string]interface{}{
172+
return s.db.Table(GetTable(newGvr(s))).Model(&Resource{}).Where(map[string]interface{}{
179173
"cluster": cluster,
180-
"group": s.storageGroupResource.Group,
181-
"version": s.storageVersion.Version,
182-
"resource": s.storageGroupResource.Resource,
183174
"namespace": namespace,
184175
"name": name,
185176
}).Delete(&Resource{})
@@ -212,32 +203,26 @@ func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtim
212203
condition := map[string]interface{}{
213204
"cluster": cluster,
214205
"namespace": metaobj.GetNamespace(),
215-
"group": s.storageGroupResource.Group,
216-
"version": s.storageVersion.Version,
217-
"resource": s.storageGroupResource.Resource,
218206
}
219207
if metaobj.GetName() != "" {
220208
condition["name"] = metaobj.GetName()
221209
}
222210

223-
result := s.db.WithContext(ctx).Model(&Resource{}).Where(condition).Updates(updatedResource)
211+
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Where(condition).Updates(updatedResource)
224212
return InterpretResourceDBError(cluster, metaobj.GetName(), result.Error)
225213
}
226214

227215
func (s *ResourceStorage) genGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
228216
condition := map[string]interface{}{
229217
"namespace": namespace,
230218
"name": name,
231-
"group": s.storageGroupResource.Group,
232-
"version": s.storageVersion.Version,
233-
"resource": s.storageGroupResource.Resource,
234219
"deleted": false,
235220
}
236221

237222
if cluster != "" {
238223
condition["cluster"] = cluster
239224
}
240-
return s.db.WithContext(ctx).Model(&Resource{}).Select("cluster_resource_version, object").Where(condition)
225+
return s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Select("cluster_resource_version, object").Where(condition)
241226
}
242227

243228
func (s *ResourceStorage) GetObj(ctx context.Context, cluster, namespace, name string) (runtime.Object, error) {
@@ -246,12 +231,9 @@ func (s *ResourceStorage) GetObj(ctx context.Context, cluster, namespace, name s
246231
"namespace": namespace,
247232
"name": name,
248233
"cluster": cluster,
249-
"group": s.storageGroupResource.Group,
250-
"version": s.storageVersion.Version,
251-
"resource": s.storageGroupResource.Resource,
252234
}
253235

254-
result := s.db.WithContext(ctx).Model(&Resource{}).
236+
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).
255237
Select("cluster_resource_version, object").Where(condition).First(&resource)
256238
if result.Error != nil {
257239
return nil, InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
@@ -269,19 +251,37 @@ func (s *ResourceStorage) GetObj(ctx context.Context, cluster, namespace, name s
269251
return obj, nil
270252
}
271253

254+
func (s *ResourceStorage) GenGetObjectQuery(ctx context.Context, cluster, namespace, name string) *gorm.DB {
255+
condition := map[string]interface{}{
256+
"namespace": namespace,
257+
"name": name,
258+
"deleted": false,
259+
}
260+
261+
if cluster != "" {
262+
condition["cluster"] = cluster
263+
}
264+
return s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Select("cluster_resource_version, object").Where(condition)
265+
}
266+
272267
func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error {
273-
var objects [][]byte
274-
if result := s.genGetObjectQuery(ctx, cluster, namespace, name).First(&objects); result.Error != nil {
268+
var resource Resource
269+
if result := s.GenGetObjectQuery(ctx, cluster, namespace, name).First(&resource); result.Error != nil {
275270
return InterpretResourceDBError(cluster, namespace+"/"+name, result.Error)
276271
}
277272

278-
obj, _, err := s.codec.Decode(objects[0], nil, into)
273+
obj, _, err := s.codec.Decode(resource.Object, nil, into)
279274
if err != nil {
280275
return err
281276
}
282277
if obj != into {
283278
return fmt.Errorf("Failed to decode resource, into is %T", into)
284279
}
280+
metaObj, err := meta.Accessor(obj)
281+
if err != nil {
282+
return err
283+
}
284+
metaObj.SetResourceVersion(utils.ParseInt642Str(resource.ClusterResourceVersion))
285285
return nil
286286
}
287287

@@ -294,14 +294,11 @@ func (s *ResourceStorage) genListObjectsQuery(ctx context.Context, opts *interna
294294
var condition map[string]interface{}
295295
if !isAll {
296296
condition = map[string]interface{}{
297-
"group": s.storageGroupResource.Group,
298-
"version": s.storageVersion.Version,
299-
"resource": s.storageGroupResource.Resource,
300-
"deleted": false,
297+
"deleted": false,
301298
}
302299
}
303300

304-
query := s.db.WithContext(ctx).Model(&Resource{}).Where(condition)
301+
query := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Where(condition)
305302
offset, amount, query, err := applyListOptionsToResourceQuery(s.db, query, opts)
306303
return offset, amount, query, result, err
307304
}
@@ -310,11 +307,9 @@ func (s *ResourceStorage) genListQuery(ctx context.Context, newfunc func() runti
310307
var result [][]byte
311308

312309
condition := map[string]interface{}{
313-
"group": s.storageGroupResource.Group,
314-
"version": s.storageVersion.Version,
315-
"resource": s.storageGroupResource.Resource,
310+
"deleted": false,
316311
}
317-
query := s.db.WithContext(ctx).Model(&Resource{}).Select("object").Where(condition)
312+
query := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Select("object").Where(condition)
318313
_, _, query, err := applyListOptionsToResourceQuery(s.db, query, opts)
319314
if err != nil {
320315
return nil, err
@@ -531,6 +526,14 @@ func (s *ResourceStorage) fetchInitEvents(ctx context.Context, rv string, newfun
531526
}
532527
}
533528

529+
func newGvr(s *ResourceStorage) schema.GroupVersionResource {
530+
return schema.GroupVersionResource{
531+
Group: s.storageGroupResource.Group,
532+
Version: s.storageVersion.Version,
533+
Resource: s.storageGroupResource.Resource,
534+
}
535+
}
536+
534537
func getObjectListAndMaxCrv(objList []Object, onlyMetada bool) ([]Object, []string, string, error) {
535538
crvs := make([]string, 0, len(objList))
536539
var maxCrv int64 = 0
@@ -576,12 +579,7 @@ func getObjectListAndMaxCrv(objList []Object, onlyMetada bool) ([]Object, []stri
576579
func (s *ResourceStorage) GetMaxCrv(ctx context.Context) (string, error) {
577580
maxCrv := "0"
578581
var metadataList ResourceMetadataList
579-
condition := map[string]interface{}{
580-
"group": s.storageGroupResource.Group,
581-
"version": s.storageVersion.Version,
582-
"resource": s.storageGroupResource.Resource,
583-
}
584-
result := s.db.WithContext(ctx).Model(&Resource{}).Select("cluster_resource_version").Where(condition).Order("cluster_resource_version DESC").Limit(1).Find(&metadataList)
582+
result := s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Select("cluster_resource_version").Order("cluster_resource_version DESC").Limit(1).Find(&metadataList)
585583
if result.Error != nil {
586584
return maxCrv, InterpretResourceDBError("", s.storageGroupResource.Resource, result.Error)
587585
}
@@ -611,15 +609,12 @@ func (s *ResourceStorage) PublishEvent(ctx context.Context, wc *watchcomponents.
611609
}
612610

613611
condition := map[string]interface{}{
614-
"group": s.storageGroupResource.Group,
615-
"version": s.storageVersion.Version,
616-
"resource": s.storageGroupResource.Resource,
617612
"cluster": wc.Cluster,
618613
"namespace": metaObj.GetNamespace(),
619614
"name": metaObj.GetName(),
620615
}
621616

622-
s.db.WithContext(ctx).Model(&Resource{}).Where(condition).Updates(updatedResource)
617+
s.db.WithContext(ctx).Table(GetTable(newGvr(s))).Model(&Resource{}).Where(condition).Updates(updatedResource)
623618
}
624619

625620
func (s *ResourceStorage) GenCrv2Event(event *watch.Event) {

pkg/storage/internalstorage/storage.go

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,22 @@ package internalstorage
33
import (
44
"context"
55
"fmt"
6+
"sync"
67

78
"gorm.io/gorm"
89
"k8s.io/apimachinery/pkg/runtime/schema"
910
"k8s.io/apimachinery/pkg/watch"
1011

1112
internal "github.com/clusterpedia-io/api/clusterpedia"
1213
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
14+
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer"
1315
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
1416
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
1517
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/middleware"
1618
)
1719

20+
var mutex sync.Mutex
21+
1822
type StorageFactory struct {
1923
db *gorm.DB
2024
}
@@ -30,6 +34,18 @@ func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfi
3034
Resource: config.StorageGroupResource.Resource,
3135
}
3236

37+
mutex.Lock()
38+
defer mutex.Unlock()
39+
if !s.db.Migrator().HasTable(GetTable(gvr)) {
40+
if err := s.db.AutoMigrate(&Resource{}); err != nil {
41+
return nil, err
42+
}
43+
err := s.db.Migrator().RenameTable("resources", GetTable(gvr))
44+
if err != nil {
45+
return nil, err
46+
}
47+
}
48+
3349
resourceStorage := &ResourceStorage{
3450
db: s.db,
3551
codec: config.Codec,
@@ -99,11 +115,26 @@ func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionRes
99115

100116
func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) {
101117
var resources []Resource
102-
result := f.db.WithContext(ctx).Select("group", "version", "resource", "namespace", "name", "resource_version").
103-
Where(map[string]interface{}{"cluster": cluster}).
104-
Find(&resources)
105-
if result.Error != nil {
106-
return nil, InterpretDBError(cluster, result.Error)
118+
mutex.Lock()
119+
tables, err := f.db.Migrator().GetTables()
120+
if err != nil {
121+
mutex.Unlock()
122+
return nil, err
123+
}
124+
mutex.Unlock()
125+
for _, table := range tables {
126+
var tableResources []Resource
127+
result := f.db.WithContext(ctx).Table(table).Select("group", "version", "resource",
128+
"namespace", "name", "resource_version", "deleted", "published").
129+
Where(map[string]interface{}{"cluster": cluster, "deleted": false}).
130+
//In case deleted event be losted when synchro manager do a leaderelection or reboot
131+
Or(map[string]interface{}{"cluster": cluster, "deleted": true, "published": false}).
132+
Find(&tableResources)
133+
if result.Error != nil {
134+
return nil, InterpretDBError(cluster, result.Error)
135+
}
136+
137+
resources = append(resources, tableResources...)
107138
}
108139

109140
resourceversions := make(map[schema.GroupVersionResource]map[string]interface{})
@@ -119,22 +150,37 @@ func (f *StorageFactory) GetResourceVersions(ctx context.Context, cluster string
119150
if resource.Namespace != "" {
120151
key = resource.Namespace + "/" + resource.Name
121152
}
122-
versions[key] = resource.ResourceVersion
153+
versions[key] = informer.StorageElement{
154+
Version: resource.ResourceVersion,
155+
Deleted: resource.Deleted,
156+
Published: resource.Published,
157+
Name: resource.Name,
158+
Namespace: resource.Namespace,
159+
}
123160
}
124161
return resourceversions, nil
125162
}
126163

127164
func (f *StorageFactory) CleanCluster(ctx context.Context, cluster string) error {
128-
result := f.db.WithContext(ctx).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
129-
return InterpretDBError(cluster, result.Error)
165+
mutex.Lock()
166+
tables, err := f.db.Migrator().GetTables()
167+
if err != nil {
168+
mutex.Unlock()
169+
return err
170+
}
171+
mutex.Unlock()
172+
for _, table := range tables {
173+
result := f.db.WithContext(ctx).Table(table).Where(map[string]interface{}{"cluster": cluster}).Delete(&Resource{})
174+
if result.Error != nil {
175+
return InterpretDBError(cluster, result.Error)
176+
}
177+
}
178+
return nil
130179
}
131180

132181
func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error {
133-
result := s.db.Where(map[string]interface{}{
134-
"cluster": cluster,
135-
"group": gvr.Group,
136-
"version": gvr.Version,
137-
"resource": gvr.Resource,
182+
result := s.db.Table(GetTable(gvr)).Where(map[string]interface{}{
183+
"cluster": cluster,
138184
}).Delete(&Resource{})
139185
return InterpretDBError(fmt.Sprintf("%s/%s", cluster, gvr), result.Error)
140186
}

0 commit comments

Comments
 (0)