Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
32c44ee
added storage methods for RawKV GC
AmoebaProtozoa May 12, 2022
b12eb9b
push back on updating kvproto in go.mod
AmoebaProtozoa May 12, 2022
d0b4f3b
linting
AmoebaProtozoa May 12, 2022
a2ba0e7
changed storage path structure
AmoebaProtozoa May 12, 2022
b9bb3e4
update comments
AmoebaProtozoa May 12, 2022
a7e3ece
added ByKeySpace suffix for disambiguity
AmoebaProtozoa May 12, 2022
7ff125e
removed default key spaces
AmoebaProtozoa May 13, 2022
1bdb642
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 13, 2022
1ebb51d
renaming, move delete expired safepoints to a goroutine
AmoebaProtozoa May 16, 2022
312704a
update tests
AmoebaProtozoa May 16, 2022
766b344
lint
AmoebaProtozoa May 16, 2022
de77dfc
added back KeySpaceGCSafePoint
AmoebaProtozoa May 16, 2022
d6eda93
remove expired all at once
AmoebaProtozoa May 16, 2022
cbe1ed0
address comments
AmoebaProtozoa May 16, 2022
9b27bcd
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 16, 2022
88610a0
move sleep to failpoint
AmoebaProtozoa May 16, 2022
995033e
modified failpoint to eliminate sleep
AmoebaProtozoa May 17, 2022
3099939
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 17, 2022
a825d37
log error when failed to remove expired service safe point
AmoebaProtozoa May 18, 2022
c1cb1c0
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 25, 2022
769a40b
Merge branch 'master' into RawKV_GC_API_storage
AmoebaProtozoa May 26, 2022
d8dd130
Merge branch 'master' into RawKV_GC_API_storage
ti-chi-bot May 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 198 additions & 0 deletions server/storage/endpoint/gc_key_space.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2022 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package endpoint

import (
"encoding/json"
"math"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// KeySpaceGCSafePoint is gcWorker's safepoint for specific key-space
type KeySpaceGCSafePoint struct {
SpaceID string `json:"space_id"`
SafePoint uint64 `json:"safe_point,omitempty"`
}

// KeySpaceGCSafePointStorage defines the storage operations on KeySpaces' safe points
type KeySpaceGCSafePointStorage interface {
// Service safe point interfaces.
SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error
LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error)
LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error)
RemoveServiceSafePoint(spaceID, serviceID string) error
// GC safe point interfaces.
SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error
LoadKeySpaceGCSafePoint(spaceID string) (uint64, error)
LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error)
}

var _ KeySpaceGCSafePointStorage = (*StorageEndpoint)(nil)

// SaveServiceSafePoint saves service safe point under given key-space.
func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error {
if ssp.ServiceID == "" {
return errors.New("service id of service safepoint cannot be empty")
}
key := KeySpaceServiceSafePointPath(spaceID, ssp.ServiceID)
value, err := json.Marshal(ssp)
if err != nil {
return err
}
return se.Save(key, string(value))
}

// LoadServiceSafePoint reads ServiceSafePoint for the given key-space ID and service name.
// Return nil if no safepoint exist for given service or just expired.
func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) {
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
value, err := se.Load(key)
if err != nil || value == "" {
return nil, err
}
ssp := &ServiceSafePoint{}
if err := json.Unmarshal([]byte(value), ssp); err != nil {
return nil, err
}
if ssp.ExpiredAt < time.Now().Unix() {
go func() {
if err = se.Remove(key); err != nil {
log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err))
}
}()
return nil, nil
}
return ssp, nil
}

// LoadMinServiceSafePoint returns the minimum safepoint for the given key-space.
// Note that gc worker safe point are store separately.
// If no service safe point exist for the given key-space or all the service safe points just expired, return nil.
func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) {
prefix := KeySpaceServiceSafePointPrefix(spaceID)
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}
min := &ServiceSafePoint{SafePoint: math.MaxUint64}
expiredKeys := make([]string, 0)
for i, key := range keys {
ssp := &ServiceSafePoint{}
if err = json.Unmarshal([]byte(values[i]), ssp); err != nil {
return nil, err
}

// gather expired keys
if ssp.ExpiredAt < now.Unix() {
expiredKeys = append(expiredKeys, key)
continue
}
if ssp.SafePoint < min.SafePoint {
min = ssp
}
}
// failpoint for immediate removal
failpoint.Inject("removeExpiredKeys", func() {
for _, key := range expiredKeys {
if err = se.Remove(key); err != nil {
log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err))
}
}
expiredKeys = []string{}
})
// remove expired keys asynchronously
go func() {
for _, key := range expiredKeys {
if err = se.Remove(key); err != nil {
log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err))
}
}
}()
if min.SafePoint == math.MaxUint64 {
// no service safe point or all of them are expired.
return nil, nil
}

// successfully found a valid min safe point.
return min, nil
}

// RemoveServiceSafePoint removes target ServiceSafePoint
func (se *StorageEndpoint) RemoveServiceSafePoint(spaceID, serviceID string) error {
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
return se.Remove(key)
}

// SaveKeySpaceGCSafePoint saves GCSafePoint to the given key-space.
func (se *StorageEndpoint) SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error {
value := strconv.FormatUint(safePoint, 16)
return se.Save(KeySpaceGCSafePointPath(spaceID), value)
}

// LoadKeySpaceGCSafePoint reads GCSafePoint for the given key-space.
// Returns 0 if target safepoint not exist.
func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID string) (uint64, error) {
value, err := se.Load(KeySpaceGCSafePointPath(spaceID))
if err != nil || value == "" {
return 0, err
}
safePoint, err := strconv.ParseUint(value, 16, 64)
if err != nil {
return 0, err
}
return safePoint, nil
}

// LoadAllKeySpaceGCSafePoints returns slice of KeySpaceGCSafePoint.
// If withGCSafePoint set to false, returned safePoints will be 0.
func (se *StorageEndpoint) LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error) {
prefix := KeySpaceSafePointPrefix()
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
suffix := KeySpaceGCSafePointSuffix()
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}
safePoints := make([]*KeySpaceGCSafePoint, 0, len(values))
for i := range keys {
// skip non gc safe points
if !strings.HasSuffix(keys[i], suffix) {
continue
}
safePoint := &KeySpaceGCSafePoint{}
spaceID := strings.TrimPrefix(keys[i], prefix)
spaceID = strings.TrimSuffix(spaceID, suffix)
safePoint.SpaceID = spaceID
if withGCSafePoint {
value, err := strconv.ParseUint(values[i], 16, 64)
if err != nil {
return nil, err
}
safePoint.SafePoint = value
}
safePoints = append(safePoints, safePoint)
}
return safePoints, nil
}
32 changes: 32 additions & 0 deletions server/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
customScheduleConfigPath = "scheduler_config"
gcWorkerServiceSafePointID = "gc_worker"
minResolvedTS = "min_resolved_ts"
keySpaceSafePointPrefix = "key_space/gc_safepoint"
keySpaceGCSafePointSuffix = "gc"
)

// AppendToRootPath appends the given key to the rootPath.
Expand Down Expand Up @@ -104,3 +106,33 @@ func gcSafePointServicePath(serviceID string) string {
func MinResolvedTSPath() string {
return path.Join(clusterPath, minResolvedTS)
}

// KeySpaceServiceSafePointPrefix returns the prefix of given service's service safe point.
// Prefix: /key_space/gc_safepoint/{space_id}/service/
func KeySpaceServiceSafePointPrefix(spaceID string) string {
return path.Join(keySpaceSafePointPrefix, spaceID, "service") + "/"
}

// KeySpaceGCSafePointPath returns the gc safe point's path of the given key-space.
// Path: /key_space/gc_safepoint/{space_id}/gc
func KeySpaceGCSafePointPath(spaceID string) string {
return path.Join(keySpaceSafePointPrefix, spaceID, keySpaceGCSafePointSuffix)
}

// KeySpaceServiceSafePointPath returns the path of given service's service safe point.
// Path: /key_space/gc_safepoint/{space_id}/service/{service_id}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we know the endpoint types from service_id? such as it's from tidb, raw_kv client, cdc or others?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can tell by service_id alone,
for example, both TiDB and raw_kv's gc_worker may use gc_worker as their service_id

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... Is it different in space_id? like cdc_xxx, tidb_xxx, client_xxxx . I would like to know who use the path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, we use spaceID to differentiate key spaces, things like
default_rawkv, default_txnkv, default_tidb
serviceID is used to identify the specific service under that KeySpace, like
cdc, br, etc.

tidb cdc and rawkv cdc may have the same serviceID but different spaceIDs
rawkv cdc and rawkv br have the same spaceID but different serviceIDs

func KeySpaceServiceSafePointPath(spaceID, serviceID string) string {
return path.Join(KeySpaceServiceSafePointPrefix(spaceID), serviceID)
}

// KeySpaceSafePointPrefix returns prefix for all key-spaces' safe points.
// Path: /key_space/gc_safepoint/
func KeySpaceSafePointPrefix() string {
return keySpaceSafePointPrefix + "/"
}

// KeySpaceGCSafePointSuffix returns the suffix for any gc safepoint.
// Postfix: /gc
func KeySpaceGCSafePointSuffix() string {
return "/" + keySpaceGCSafePointSuffix
}
1 change: 1 addition & 0 deletions server/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Storage interface {
endpoint.ReplicationStatusStorage
endpoint.GCSafePointStorage
endpoint.MinResolvedTSStorage
endpoint.KeySpaceGCSafePointStorage
}

// NewStorageWithMemoryBackend creates a new storage with memory backend.
Expand Down
Loading