Skip to content

Commit f7d6346

Browse files
server side code
Signed-off-by: AmoebaProtozoa <[email protected]>
1 parent c0ee4d7 commit f7d6346

File tree

8 files changed

+257
-134
lines changed

8 files changed

+257
-134
lines changed

client/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/tikv/pd/client
33
go 1.16
44

55
// TODO: Remove this once kvproto has been updated
6-
replace github.com/pingcap/kvproto => github.com/ystaticy/kvproto v0.0.0-20220419035825-6bb5c11da23d
6+
replace github.com/pingcap/kvproto => github.com/AmoebaProtozoa/kvproto v0.0.0-20220427045408-abeb7dbc9f22
77

88
require (
99
github.com/opentracing/opentracing-go v1.2.0

client/go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
22
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
3+
github.com/AmoebaProtozoa/kvproto v0.0.0-20220427045408-abeb7dbc9f22/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
34
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
45
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
56
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/tikv/pd
33
go 1.16
44

55
// TODO: Remove this once kvproto has been updated
6-
replace github.com/pingcap/kvproto => github.com/ystaticy/kvproto v0.0.0-20220419035825-6bb5c11da23d
6+
replace github.com/pingcap/kvproto => github.com/AmoebaProtozoa/kvproto v0.0.0-20220427045408-abeb7dbc9f22
77

88
require (
99
github.com/AlekSi/gocov-xml v1.0.0

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
22
github.com/AlekSi/gocov-xml v1.0.0 h1:4QctJBgXEkbzeKz6PJy6bt3JSPNSN4I2mITYW+eKUoQ=
33
github.com/AlekSi/gocov-xml v1.0.0/go.mod h1:J0qYeZ6tDg4oZubW9mAAgxlqw39PDfoEkzB3HXSbEuA=
4+
github.com/AmoebaProtozoa/kvproto v0.0.0-20220427044528-668e540bb708/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
45
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
56
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
67
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=

server/grpc_service.go

Lines changed: 144 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1437,38 +1437,80 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb
14371437
}, nil
14381438
}
14391439

1440-
// GetAllServiceGroupGcSafePoint used by RawKV
1441-
// returns GCSafePoint for all service groups as well as min GCSafePoint
1442-
func (s *GrpcServer) GetAllServiceGroupGcSafePoint(ctx context.Context, request *pdpb.GetAllServiceGroupGcSafePointRequest) (*pdpb.GetAllServiceGroupGcSafePointResponse, error) {
1440+
// GetServiceGroup return all service group ids
1441+
func (s *GrpcServer) GetServiceGroup(ctx context.Context, request *pdpb.GetServiceGroupRequest) (*pdpb.GetServiceGroupResponse, error) {
14431442
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) {
1444-
return pdpb.NewPDClient(client).GetAllServiceGroupGcSafePoint(ctx, request)
1443+
return pdpb.NewPDClient(client).GetServiceGroup(ctx, request)
14451444
}
14461445
if rsp, err := s.unaryMiddleware(ctx, request.GetHeader(), fn); err != nil {
14471446
return nil, err
14481447
} else if rsp != nil {
1449-
return rsp.(*pdpb.GetAllServiceGroupGcSafePointResponse), err
1448+
return rsp.(*pdpb.GetServiceGroupResponse), err
14501449
}
14511450

14521451
rc := s.GetRaftCluster()
14531452
if rc == nil {
1454-
return &pdpb.GetAllServiceGroupGcSafePointResponse{Header: s.notBootstrappedHeader()}, nil
1453+
return &pdpb.GetServiceGroupResponse{Header: s.notBootstrappedHeader()}, nil
14551454
}
14561455

14571456
var storage endpoint.GCSafePointStorage = s.storage
1458-
safePoints, err := storage.LoadAllServiceGroupGCSafePoints()
1459-
1457+
serviceGroupList, err := storage.LoadAllServiceGroup()
14601458
if err != nil {
14611459
return nil, err
14621460
}
14631461

1464-
return &pdpb.GetAllServiceGroupGcSafePointResponse{
1465-
Header: s.header(),
1466-
ServiceGroupSafePoint: safePoints,
1462+
return &pdpb.GetServiceGroupResponse{
1463+
Header: s.header(),
1464+
ServiceGroupId: serviceGroupList,
1465+
}, nil
1466+
}
1467+
1468+
// GetMinServiceSafePointByServiceGroup returns given service group's min service safe point
1469+
func (s *GrpcServer) GetMinServiceSafePointByServiceGroup(ctx context.Context, request *pdpb.GetMinServiceSafePointByServiceGroupRequest) (*pdpb.GetMinServiceSafePointByServiceGroupResponse, error) {
1470+
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) {
1471+
return pdpb.NewPDClient(client).GetMinServiceSafePointByServiceGroup(ctx, request)
1472+
}
1473+
if rsp, err := s.unaryMiddleware(ctx, request.GetHeader(), fn); err != nil {
1474+
return nil, err
1475+
} else if rsp != nil {
1476+
return rsp.(*pdpb.GetMinServiceSafePointByServiceGroupResponse), err
1477+
}
1478+
1479+
rc := s.GetRaftCluster()
1480+
if rc == nil {
1481+
return &pdpb.GetMinServiceSafePointByServiceGroupResponse{Header: s.notBootstrappedHeader()}, nil
1482+
}
1483+
1484+
var storage endpoint.GCSafePointStorage = s.storage
1485+
serviceGroupID := string(request.ServiceGroupId)
1486+
nowTSO, err := s.tsoAllocatorManager.HandleTSORequest(tso.GlobalDCLocation, 1)
1487+
if err != nil {
1488+
return nil, err
1489+
}
1490+
now, _ := tsoutil.ParseTimestamp(nowTSO)
1491+
min, err := storage.LoadMinServiceSafePointByServiceGroup(serviceGroupID, now)
1492+
if err != nil {
1493+
return nil, err
1494+
}
1495+
var returnSafePoint uint64
1496+
if min != nil {
1497+
returnSafePoint = min.SafePoint
1498+
}
1499+
// perform a get operation on a non-existing key to obtain current etcd revision number from response header
1500+
rsp, _ := s.client.Get(ctx, "NA")
1501+
currentRevision := rsp.Header.GetRevision()
1502+
return &pdpb.GetMinServiceSafePointByServiceGroupResponse{
1503+
Header: s.header(),
1504+
SafePoint: returnSafePoint,
1505+
Revision: currentRevision,
14671506
}, nil
14681507
}
14691508

14701509
// UpdateGCSafePointByServiceGroup used by gc_worker to update their gc safe points
14711510
func (s *GrpcServer) UpdateGCSafePointByServiceGroup(ctx context.Context, request *pdpb.UpdateGCSafePointByServiceGroupRequest) (*pdpb.UpdateGCSafePointByServiceGroupResponse, error) {
1511+
s.updateSafePointByServiceGroupLock.Lock()
1512+
defer s.updateSafePointByServiceGroupLock.Unlock()
1513+
14721514
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) {
14731515
return pdpb.NewPDClient(client).UpdateGCSafePointByServiceGroup(ctx, request)
14741516
}
@@ -1484,42 +1526,59 @@ func (s *GrpcServer) UpdateGCSafePointByServiceGroup(ctx context.Context, reques
14841526
}
14851527

14861528
var storage endpoint.GCSafePointStorage = s.storage
1487-
serviceGroupID := string(request.ServiceGroupId)
1488-
oldSafePoint, err := storage.LoadGCWorkerSafePoint(serviceGroupID)
1489-
if err != nil {
1490-
return nil, err
1529+
1530+
// check if revision changed since last min calculation
1531+
rsp, _ := s.client.Get(ctx, "NA")
1532+
currentRevision := rsp.Header.GetRevision()
1533+
if currentRevision != request.GetRevision() {
1534+
return &pdpb.UpdateGCSafePointByServiceGroupResponse{
1535+
Header: s.header(),
1536+
NewSafePoint: 0,
1537+
ValidRevision: false,
1538+
}, nil
14911539
}
1540+
serviceGroupID := string(request.ServiceGroupId)
14921541
newSafePoint := &endpoint.GCSafePoint{
14931542
ServiceGroupID: serviceGroupID,
14941543
SafePoint: request.SafePoint,
14951544
}
14961545

1497-
// Only save the safe point if it's greater than the previous one
1498-
if newSafePoint.SafePoint > oldSafePoint.SafePoint {
1499-
if err := storage.SaveGCWorkerSafePoint(serviceGroupID, newSafePoint); err != nil {
1546+
prev, err := storage.LoadGCWorkerSafePoint(serviceGroupID)
1547+
if err != nil {
1548+
return nil, err
1549+
}
1550+
// if no previous safepoint, treat it as 0
1551+
var oldSafePoint uint64 = 0
1552+
if prev != nil {
1553+
oldSafePoint = prev.SafePoint
1554+
}
1555+
1556+
// Only save the safe point if it's greater than the previous one, or if no previous one exist
1557+
if request.SafePoint > oldSafePoint {
1558+
if err := storage.SaveGCWorkerSafePoint(newSafePoint); err != nil {
15001559
return nil, err
15011560
}
15021561
log.Info("updated gc_worker safe point",
15031562
zap.String("service-group-id", serviceGroupID),
15041563
zap.Uint64("safe-point", newSafePoint.SafePoint))
1505-
} else if newSafePoint.SafePoint < oldSafePoint.SafePoint {
1564+
} else if newSafePoint.SafePoint < request.SafePoint {
15061565
log.Warn("trying to update gc_worker safe point",
15071566
zap.String("service-group-id", serviceGroupID),
1508-
zap.Uint64("old-safe-point", oldSafePoint.SafePoint),
1567+
zap.Uint64("old-safe-point", request.SafePoint),
15091568
zap.Uint64("new-safe-point", newSafePoint.SafePoint))
1510-
newSafePoint = oldSafePoint
1569+
newSafePoint.SafePoint = oldSafePoint
15111570
}
15121571
return &pdpb.UpdateGCSafePointByServiceGroupResponse{
1513-
Header: s.header(),
1514-
ServiceGroupId: request.ServiceGroupId,
1515-
NewSafePoint: newSafePoint.SafePoint,
1572+
Header: s.header(),
1573+
NewSafePoint: newSafePoint.SafePoint,
1574+
ValidRevision: true,
15161575
}, nil
15171576
}
15181577

15191578
// UpdateServiceSafePointByServiceGroup for services like CDC/BR/Lightning to update gc safe points in PD
15201579
func (s *GrpcServer) UpdateServiceSafePointByServiceGroup(ctx context.Context, request *pdpb.UpdateServiceSafePointByServiceGroupRequest) (*pdpb.UpdateServiceSafePointByServiceGroupResponse, error) {
1521-
s.serviceGroupSafePointLock.Lock()
1522-
defer s.serviceGroupSafePointLock.Unlock()
1580+
s.updateSafePointByServiceGroupLock.Lock()
1581+
defer s.updateSafePointByServiceGroupLock.Unlock()
15231582

15241583
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) {
15251584
return pdpb.NewPDClient(client).UpdateServiceSafePointByServiceGroup(ctx, request)
@@ -1538,26 +1597,46 @@ func (s *GrpcServer) UpdateServiceSafePointByServiceGroup(ctx context.Context, r
15381597
var storage endpoint.GCSafePointStorage = s.storage
15391598
serviceGroupID := string(request.ServiceGroupId)
15401599
serviceID := string(request.ServiceId)
1541-
// a less than 0 ttl means to remove the safe point
1600+
// a less than 0 ttl means to remove the safe point, immediately return after the deletion request
15421601
if request.TTL <= 0 {
15431602
if err := storage.RemoveServiceSafePointByServiceGroup(serviceGroupID, serviceID); err != nil {
15441603
return nil, err
15451604
}
1605+
return &pdpb.UpdateServiceSafePointByServiceGroupResponse{
1606+
Header: s.header(),
1607+
}, nil
15461608
}
15471609

15481610
nowTSO, err := s.tsoAllocatorManager.HandleTSORequest(tso.GlobalDCLocation, 1)
15491611
if err != nil {
15501612
return nil, err
15511613
}
15521614
now, _ := tsoutil.ParseTimestamp(nowTSO)
1553-
min, err := storage.LoadMinServiceSafePointByServiceGroup(serviceGroupID, now)
1615+
1616+
sspOld, err := storage.LoadServiceSafePoint(serviceGroupID, serviceID)
15541617
if err != nil {
15551618
return nil, err
15561619
}
1620+
gcsp, err := storage.LoadGCWorkerSafePoint(serviceGroupID)
1621+
if err != nil {
1622+
return nil, err
1623+
}
1624+
var oldServiceSafePoint, gcSafePoint, newServiceSafePoint uint64 = 0, 0, 0
1625+
if sspOld != nil {
1626+
oldServiceSafePoint = sspOld.SafePoint
1627+
newServiceSafePoint = oldServiceSafePoint // case where update denied
1628+
}
1629+
if gcsp != nil {
1630+
gcSafePoint = gcsp.SafePoint
1631+
}
1632+
1633+
// case where there is an old safepoint for the given service, we have to check that
1634+
// new safepoint >= old safepoint
1635+
caseUpdate := oldServiceSafePoint != 0 && request.SafePoint >= oldServiceSafePoint
1636+
// Or if no old safepoint and new safepoint >= gc safepoint
1637+
caseInit := oldServiceSafePoint == 0 && request.SafePoint >= gcSafePoint
15571638

1558-
// ToDO: Should this requirement be stronger? Like say request.SafePoint > old.SafePoint
1559-
// ToDO: Add proper requirements for new service
1560-
if request.TTL > 0 && (min == nil || request.SafePoint >= min.SafePoint) {
1639+
if caseUpdate || caseInit {
15611640
ssp := &endpoint.ServiceSafePoint{
15621641
ServiceID: serviceID,
15631642
ExpiredAt: now.Unix() + request.TTL,
@@ -1570,30 +1649,48 @@ func (s *GrpcServer) UpdateServiceSafePointByServiceGroup(ctx context.Context, r
15701649
if err := storage.SaveServiceSafePointByServiceGroup(serviceGroupID, ssp); err != nil {
15711650
return nil, err
15721651
}
1652+
newServiceSafePoint = request.SafePoint // case where update performed
15731653
log.Info("update service safe point by service group",
15741654
zap.String("service-group-id", serviceGroupID),
15751655
zap.String("service-id", ssp.ServiceID),
15761656
zap.Int64("expire-at", ssp.ExpiredAt),
15771657
zap.Uint64("safepoint", ssp.SafePoint))
1578-
// If the updated service is the original min, or if originally it's empty, look for min again
1579-
// note that this guarantees that min is not nil
1580-
if min == nil || serviceID == min.ServiceID {
1581-
min, err = storage.LoadMinServiceSafePointByServiceGroup(serviceGroupID, now)
1582-
if err != nil {
1583-
return nil, err
1584-
}
1585-
}
15861658
}
15871659

1588-
// the case where we just deleted the last service safe point
1589-
if min == nil {
1590-
min = &endpoint.ServiceSafePoint{}
1591-
}
15921660
return &pdpb.UpdateServiceSafePointByServiceGroupResponse{
1593-
Header: s.header(),
1594-
ServiceId: []byte(min.ServiceID),
1595-
TTL: min.ExpiredAt - now.Unix(),
1596-
MinSafePoint: min.SafePoint,
1661+
Header: s.header(),
1662+
GcSafePoint: gcSafePoint,
1663+
OldServiceSafePoint: oldServiceSafePoint,
1664+
NewServiceSafePoint: newServiceSafePoint,
1665+
}, nil
1666+
}
1667+
1668+
// GetAllServiceGroupGCSafePoint returns all service group's gc safe point
1669+
func (s *GrpcServer) GetAllServiceGroupGCSafePoint(ctx context.Context, request *pdpb.GetAllServiceGroupGCSafePointRequest) (*pdpb.GetAllServiceGroupGCSafePointResponse, error) {
1670+
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) {
1671+
return pdpb.NewPDClient(client).GetAllServiceGroupGCSafePoint(ctx, request)
1672+
}
1673+
if rsp, err := s.unaryMiddleware(ctx, request.GetHeader(), fn); err != nil {
1674+
return nil, err
1675+
} else if rsp != nil {
1676+
return rsp.(*pdpb.GetAllServiceGroupGCSafePointResponse), err
1677+
}
1678+
1679+
rc := s.GetRaftCluster()
1680+
if rc == nil {
1681+
return &pdpb.GetAllServiceGroupGCSafePointResponse{Header: s.notBootstrappedHeader()}, nil
1682+
}
1683+
1684+
var storage endpoint.GCSafePointStorage = s.storage
1685+
safePoints, err := storage.LoadAllServiceGroupGCSafePoints()
1686+
1687+
if err != nil {
1688+
return nil, err
1689+
}
1690+
1691+
return &pdpb.GetAllServiceGroupGCSafePointResponse{
1692+
Header: s.header(),
1693+
ServiceGroupSafePoint: safePoints,
15971694
}, nil
15981695
}
15991696

server/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,8 @@ type Server struct {
146146

147147
// serviceSafePointLock is a lock for UpdateServiceGCSafePoint
148148
serviceSafePointLock sync.Mutex
149-
// Lock for UpdateServiceSafePointByServiceGroup
150-
serviceGroupSafePointLock sync.Mutex
149+
// updateSafePointByServiceGroupLock protects two updateSafePoint methods in API V2
150+
updateSafePointByServiceGroupLock sync.Mutex
151151

152152
// hot region history info storage
153153
hotRegionStorage *storage.HotRegionStorage

0 commit comments

Comments
 (0)