Skip to content

Commit 86a6821

Browse files
storage: Add APIs for RawKV GC (#4937)
ref #4865 Add APIs for RawKV GC. Signed-off-by: AmoebaProtozoa <[email protected]> Signed-off-by: David <[email protected]> Co-authored-by: Ti Chi Robot <[email protected]>
1 parent 4a9d7c0 commit 86a6821

File tree

4 files changed

+447
-0
lines changed

4 files changed

+447
-0
lines changed
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
// Copyright 2022 TiKV Project Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package endpoint
16+
17+
import (
18+
"encoding/json"
19+
"math"
20+
"strconv"
21+
"strings"
22+
"time"
23+
24+
"github.com/pingcap/errors"
25+
"github.com/pingcap/failpoint"
26+
"github.com/pingcap/log"
27+
"github.com/tikv/pd/pkg/errs"
28+
"go.etcd.io/etcd/clientv3"
29+
"go.uber.org/zap"
30+
)
31+
32+
// KeySpaceGCSafePoint is gcWorker's safepoint for specific key-space
33+
type KeySpaceGCSafePoint struct {
34+
SpaceID string `json:"space_id"`
35+
SafePoint uint64 `json:"safe_point,omitempty"`
36+
}
37+
38+
// KeySpaceGCSafePointStorage defines the storage operations on KeySpaces' safe points
39+
type KeySpaceGCSafePointStorage interface {
40+
// Service safe point interfaces.
41+
SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error
42+
LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error)
43+
LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error)
44+
RemoveServiceSafePoint(spaceID, serviceID string) error
45+
// GC safe point interfaces.
46+
SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error
47+
LoadKeySpaceGCSafePoint(spaceID string) (uint64, error)
48+
LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error)
49+
}
50+
51+
var _ KeySpaceGCSafePointStorage = (*StorageEndpoint)(nil)
52+
53+
// SaveServiceSafePoint saves service safe point under given key-space.
54+
func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error {
55+
if ssp.ServiceID == "" {
56+
return errors.New("service id of service safepoint cannot be empty")
57+
}
58+
key := KeySpaceServiceSafePointPath(spaceID, ssp.ServiceID)
59+
value, err := json.Marshal(ssp)
60+
if err != nil {
61+
return err
62+
}
63+
return se.Save(key, string(value))
64+
}
65+
66+
// LoadServiceSafePoint reads ServiceSafePoint for the given key-space ID and service name.
67+
// Return nil if no safepoint exist for given service or just expired.
68+
func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) {
69+
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
70+
value, err := se.Load(key)
71+
if err != nil || value == "" {
72+
return nil, err
73+
}
74+
ssp := &ServiceSafePoint{}
75+
if err := json.Unmarshal([]byte(value), ssp); err != nil {
76+
return nil, err
77+
}
78+
if ssp.ExpiredAt < time.Now().Unix() {
79+
go func() {
80+
if err = se.Remove(key); err != nil {
81+
log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err))
82+
}
83+
}()
84+
return nil, nil
85+
}
86+
return ssp, nil
87+
}
88+
89+
// LoadMinServiceSafePoint returns the minimum safepoint for the given key-space.
90+
// Note that gc worker safe point are store separately.
91+
// If no service safe point exist for the given key-space or all the service safe points just expired, return nil.
92+
func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) {
93+
prefix := KeySpaceServiceSafePointPrefix(spaceID)
94+
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
95+
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
96+
if err != nil {
97+
return nil, err
98+
}
99+
min := &ServiceSafePoint{SafePoint: math.MaxUint64}
100+
expiredKeys := make([]string, 0)
101+
for i, key := range keys {
102+
ssp := &ServiceSafePoint{}
103+
if err = json.Unmarshal([]byte(values[i]), ssp); err != nil {
104+
return nil, err
105+
}
106+
107+
// gather expired keys
108+
if ssp.ExpiredAt < now.Unix() {
109+
expiredKeys = append(expiredKeys, key)
110+
continue
111+
}
112+
if ssp.SafePoint < min.SafePoint {
113+
min = ssp
114+
}
115+
}
116+
// failpoint for immediate removal
117+
failpoint.Inject("removeExpiredKeys", func() {
118+
for _, key := range expiredKeys {
119+
if err = se.Remove(key); err != nil {
120+
log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err))
121+
}
122+
}
123+
expiredKeys = []string{}
124+
})
125+
// remove expired keys asynchronously
126+
go func() {
127+
for _, key := range expiredKeys {
128+
if err = se.Remove(key); err != nil {
129+
log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err))
130+
}
131+
}
132+
}()
133+
if min.SafePoint == math.MaxUint64 {
134+
// no service safe point or all of them are expired.
135+
return nil, nil
136+
}
137+
138+
// successfully found a valid min safe point.
139+
return min, nil
140+
}
141+
142+
// RemoveServiceSafePoint removes target ServiceSafePoint
143+
func (se *StorageEndpoint) RemoveServiceSafePoint(spaceID, serviceID string) error {
144+
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
145+
return se.Remove(key)
146+
}
147+
148+
// SaveKeySpaceGCSafePoint saves GCSafePoint to the given key-space.
149+
func (se *StorageEndpoint) SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error {
150+
value := strconv.FormatUint(safePoint, 16)
151+
return se.Save(KeySpaceGCSafePointPath(spaceID), value)
152+
}
153+
154+
// LoadKeySpaceGCSafePoint reads GCSafePoint for the given key-space.
155+
// Returns 0 if target safepoint not exist.
156+
func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID string) (uint64, error) {
157+
value, err := se.Load(KeySpaceGCSafePointPath(spaceID))
158+
if err != nil || value == "" {
159+
return 0, err
160+
}
161+
safePoint, err := strconv.ParseUint(value, 16, 64)
162+
if err != nil {
163+
return 0, err
164+
}
165+
return safePoint, nil
166+
}
167+
168+
// LoadAllKeySpaceGCSafePoints returns slice of KeySpaceGCSafePoint.
169+
// If withGCSafePoint set to false, returned safePoints will be 0.
170+
func (se *StorageEndpoint) LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error) {
171+
prefix := KeySpaceSafePointPrefix()
172+
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
173+
suffix := KeySpaceGCSafePointSuffix()
174+
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
175+
if err != nil {
176+
return nil, err
177+
}
178+
safePoints := make([]*KeySpaceGCSafePoint, 0, len(values))
179+
for i := range keys {
180+
// skip non gc safe points
181+
if !strings.HasSuffix(keys[i], suffix) {
182+
continue
183+
}
184+
safePoint := &KeySpaceGCSafePoint{}
185+
spaceID := strings.TrimPrefix(keys[i], prefix)
186+
spaceID = strings.TrimSuffix(spaceID, suffix)
187+
safePoint.SpaceID = spaceID
188+
if withGCSafePoint {
189+
value, err := strconv.ParseUint(values[i], 16, 64)
190+
if err != nil {
191+
return nil, err
192+
}
193+
safePoint.SafePoint = value
194+
}
195+
safePoints = append(safePoints, safePoint)
196+
}
197+
return safePoints, nil
198+
}

server/storage/endpoint/key_path.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ const (
3232
customScheduleConfigPath = "scheduler_config"
3333
gcWorkerServiceSafePointID = "gc_worker"
3434
minResolvedTS = "min_resolved_ts"
35+
keySpaceSafePointPrefix = "key_space/gc_safepoint"
36+
keySpaceGCSafePointSuffix = "gc"
3537
)
3638

3739
// AppendToRootPath appends the given key to the rootPath.
@@ -104,3 +106,33 @@ func gcSafePointServicePath(serviceID string) string {
104106
func MinResolvedTSPath() string {
105107
return path.Join(clusterPath, minResolvedTS)
106108
}
109+
110+
// KeySpaceServiceSafePointPrefix returns the prefix of given service's service safe point.
111+
// Prefix: /key_space/gc_safepoint/{space_id}/service/
112+
func KeySpaceServiceSafePointPrefix(spaceID string) string {
113+
return path.Join(keySpaceSafePointPrefix, spaceID, "service") + "/"
114+
}
115+
116+
// KeySpaceGCSafePointPath returns the gc safe point's path of the given key-space.
117+
// Path: /key_space/gc_safepoint/{space_id}/gc
118+
func KeySpaceGCSafePointPath(spaceID string) string {
119+
return path.Join(keySpaceSafePointPrefix, spaceID, keySpaceGCSafePointSuffix)
120+
}
121+
122+
// KeySpaceServiceSafePointPath returns the path of given service's service safe point.
123+
// Path: /key_space/gc_safepoint/{space_id}/service/{service_id}
124+
func KeySpaceServiceSafePointPath(spaceID, serviceID string) string {
125+
return path.Join(KeySpaceServiceSafePointPrefix(spaceID), serviceID)
126+
}
127+
128+
// KeySpaceSafePointPrefix returns prefix for all key-spaces' safe points.
129+
// Path: /key_space/gc_safepoint/
130+
func KeySpaceSafePointPrefix() string {
131+
return keySpaceSafePointPrefix + "/"
132+
}
133+
134+
// KeySpaceGCSafePointSuffix returns the suffix for any gc safepoint.
135+
// Postfix: /gc
136+
func KeySpaceGCSafePointSuffix() string {
137+
return "/" + keySpaceGCSafePointSuffix
138+
}

server/storage/storage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type Storage interface {
3939
endpoint.ReplicationStatusStorage
4040
endpoint.GCSafePointStorage
4141
endpoint.MinResolvedTSStorage
42+
endpoint.KeySpaceGCSafePointStorage
4243
}
4344

4445
// NewStorageWithMemoryBackend creates a new storage with memory backend.

0 commit comments

Comments
 (0)