Skip to content

Commit c5944e3

Browse files
client side code
Signed-off-by: AmoebaProtozoa <[email protected]>
1 parent f7d6346 commit c5944e3

File tree

2 files changed

+81
-29
lines changed

2 files changed

+81
-29
lines changed

client/client.go

Lines changed: 75 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,14 @@ type Client interface {
106106
// determine the safepoint for multiple services, it does not trigger a GC
107107
// job. Use UpdateGCSafePoint to trigger the GC job if needed.
108108
UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error)
109-
// UpdateGCSafePointByServiceGroup update GC safe point, the update will only be successful if proposed
110-
// safe point is later than the old one
111-
// returns the new safePoint after the update attempt (may return the old safe point if update rejected)
112-
UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64) (uint64, error)
113-
// UpdateServiceSafePointByServiceGroup update service safe point for specific service under given service group
114-
// pass in a ttl less than 0 to remove the target service safe point instead
115-
// will return the min safePoint of the serviceGroup after the update,
116-
// if no service safePoint exists after the given operation, return 0
117-
UpdateServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (uint64, error)
118-
// GetAllServiceGroupGcSafePoint returns a list containing gc safe point for each service group
119-
GetAllServiceGroupGcSafePoint(ctx context.Context) ([]*pdpb.ServiceGroupSafepoint, error)
109+
110+
// GC API V2
111+
GetServiceGroup(ctx context.Context) ([]string, error)
112+
GetMinServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID string) (safePoint uint64, revision int64, err error)
113+
UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64, revision int64) (isSuccessful bool, newSafePoint uint64, validRevision bool, err error)
114+
UpdateServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (isSuccessful bool, gcSafePoint, oldSafePoint, newSafePoint uint64, err error)
115+
GetAllServiceGroupGCSafePoint(ctx context.Context) ([]*pdpb.ServiceGroupSafePoint, error)
116+
120117
// ScatterRegion scatters the specified region. Should use it for a batch of regions,
121118
// and the distribution of these regions will be dispersed.
122119
// NOTICE: This method is the old version of ScatterRegions, you should use the later one as your first choice.
@@ -1668,19 +1665,71 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
16681665
return resp.GetMinSafePoint(), nil
16691666
}
16701667

1671-
func (c *client) UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64) (uint64, error) {
1668+
func (c *client) GetServiceGroup(ctx context.Context) ([]string, error) {
1669+
if span := opentracing.SpanFromContext(ctx); span != nil {
1670+
span = opentracing.StartSpan("pdclient.GetServiceGroup", opentracing.ChildOf(span.Context()))
1671+
defer span.Finish()
1672+
}
1673+
start := time.Now()
1674+
defer func() { cmdDurationGetServiceGroup.Observe(time.Since(start).Seconds()) }()
1675+
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
1676+
req := &pdpb.GetServiceGroupRequest{
1677+
Header: c.requestHeader(),
1678+
}
1679+
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
1680+
resp, err := c.getClient().GetServiceGroup(ctx, req)
1681+
cancel()
1682+
1683+
if err != nil {
1684+
cmdFailedDurationGetServiceGroup.Observe(time.Since(start).Seconds())
1685+
c.ScheduleCheckLeader()
1686+
return nil, errors.WithStack(err)
1687+
}
1688+
1689+
// have to return a slice of string
1690+
returnSlice := make([]string, len(resp.ServiceGroupId))
1691+
for _, serviceGroupID := range resp.ServiceGroupId {
1692+
returnSlice = append(returnSlice, string(serviceGroupID))
1693+
}
1694+
return returnSlice, nil
1695+
}
1696+
func (c *client) GetMinServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID string) (safePoint uint64, revision int64, err error) {
1697+
if span := opentracing.SpanFromContext(ctx); span != nil {
1698+
span = opentracing.StartSpan("pdclient.GetMinServiceSafePointByServiceGroup", opentracing.ChildOf(span.Context()))
1699+
defer span.Finish()
1700+
}
1701+
start := time.Now()
1702+
defer func() { cmdDurationGetMinServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds()) }()
1703+
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
1704+
req := &pdpb.GetMinServiceSafePointByServiceGroupRequest{
1705+
Header: c.requestHeader(),
1706+
ServiceGroupId: []byte(serviceGroupID),
1707+
}
1708+
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
1709+
resp, err := c.getClient().GetMinServiceSafePointByServiceGroup(ctx, req)
1710+
cancel()
1711+
1712+
if err != nil {
1713+
cmdFailedDurationGetMinServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds())
1714+
c.ScheduleCheckLeader()
1715+
return 0, 0, errors.WithStack(err)
1716+
}
1717+
1718+
return resp.SafePoint, resp.Revision, nil
1719+
}
1720+
func (c *client) UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGroupID string, safePoint uint64, revision int64) (isSuccessful bool, newSafePoint uint64, validRevision bool, err error) {
16721721
if span := opentracing.SpanFromContext(ctx); span != nil {
16731722
span = opentracing.StartSpan("pdclient.UpdateGCSafePointByServiceGroup", opentracing.ChildOf(span.Context()))
16741723
defer span.Finish()
16751724
}
16761725
start := time.Now()
16771726
defer func() { cmdDurationUpdateGCSafePointByServiceGroup.Observe(time.Since(start).Seconds()) }()
1678-
16791727
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
16801728
req := &pdpb.UpdateGCSafePointByServiceGroupRequest{
16811729
Header: c.requestHeader(),
16821730
ServiceGroupId: []byte(serviceGroupID),
16831731
SafePoint: safePoint,
1732+
Revision: revision,
16841733
}
16851734
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
16861735
resp, err := c.getClient().UpdateGCSafePointByServiceGroup(ctx, req)
@@ -1689,11 +1738,12 @@ func (c *client) UpdateGCSafePointByServiceGroup(ctx context.Context, serviceGro
16891738
if err != nil {
16901739
cmdFailedDurationUpdateGCSafePointByServiceGroup.Observe(time.Since(start).Seconds())
16911740
c.ScheduleCheckLeader()
1692-
return 0, errors.WithStack(err)
1741+
return false, 0, false, errors.WithStack(err)
16931742
}
1694-
return resp.GetNewSafePoint(), nil
1743+
// if requested safepoint is the new safepoint, then update succeeded
1744+
return safePoint == resp.NewSafePoint, resp.NewSafePoint, resp.ValidRevision, nil
16951745
}
1696-
func (c *client) UpdateServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
1746+
func (c *client) UpdateServiceSafePointByServiceGroup(ctx context.Context, serviceGroupID, serviceID string, ttl int64, safePoint uint64) (isSuccessful bool, gcSafePoint, oldSafePoint, newSafePoint uint64, err error) {
16971747
if span := opentracing.SpanFromContext(ctx); span != nil {
16981748
span = opentracing.StartSpan("pdclient.UpdateServiceSafePointByServiceGroup", opentracing.ChildOf(span.Context()))
16991749
defer span.Finish()
@@ -1715,35 +1765,33 @@ func (c *client) UpdateServiceSafePointByServiceGroup(ctx context.Context, servi
17151765
if err != nil {
17161766
cmdFailedDurationUpdateServiceSafePointByServiceGroup.Observe(time.Since(start).Seconds())
17171767
c.ScheduleCheckLeader()
1718-
return 0, errors.WithStack(err)
1768+
return false, 0, 0, 0, errors.WithStack(err)
17191769
}
17201770

1721-
return resp.GetMinSafePoint(), nil
1771+
return resp.NewServiceSafePoint == safePoint, resp.GcSafePoint, resp.OldServiceSafePoint, resp.NewServiceSafePoint, nil
17221772
}
1723-
1724-
func (c *client) GetAllServiceGroupGcSafePoint(ctx context.Context) ([]*pdpb.ServiceGroupSafepoint, error) {
1773+
func (c *client) GetAllServiceGroupGCSafePoint(ctx context.Context) ([]*pdpb.ServiceGroupSafePoint, error) {
17251774
if span := opentracing.SpanFromContext(ctx); span != nil {
1726-
span = opentracing.StartSpan("pdclient.GetAllServiceGroupGcSafePoint", opentracing.ChildOf(span.Context()))
1775+
span = opentracing.StartSpan("pdclient.GetAllServiceGroupGCSafePoint", opentracing.ChildOf(span.Context()))
17271776
defer span.Finish()
17281777
}
1729-
17301778
start := time.Now()
1731-
defer func() { cmdDurationGetAllServiceGroupGcSafePoint.Observe(time.Since(start).Seconds()) }()
1779+
defer func() { cmdDurationGetAllServiceGroupGCSafePoint.Observe(time.Since(start).Seconds()) }()
17321780
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
1733-
req := &pdpb.GetAllServiceGroupGcSafePointRequest{
1781+
req := &pdpb.GetAllServiceGroupGCSafePointRequest{
17341782
Header: c.requestHeader(),
17351783
}
17361784
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
1737-
resp, err := c.getClient().GetAllServiceGroupGcSafePoint(ctx, req)
1785+
resp, err := c.getClient().GetAllServiceGroupGCSafePoint(ctx, req)
17381786
cancel()
17391787

17401788
if err != nil {
1741-
cmdFailedDurationGetAllServiceGroupGcSafePoint.Observe(time.Since(start).Seconds())
1789+
cmdFailedDurationGetAllServiceGroupGCSafePoint.Observe(time.Since(start).Seconds())
17421790
c.ScheduleCheckLeader()
17431791
return nil, errors.WithStack(err)
17441792
}
17451793

1746-
return resp.GetServiceGroupSafePoint(), nil
1794+
return resp.ServiceGroupSafePoint, nil
17471795
}
17481796

17491797
func (c *client) ScatterRegion(ctx context.Context, regionID uint64) error {

client/metrics.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,11 @@ var (
9999
cmdDurationGetOperator = cmdDuration.WithLabelValues("get_operator")
100100
cmdDurationSplitRegions = cmdDuration.WithLabelValues("split_regions")
101101
cmdDurationSplitAndScatterRegions = cmdDuration.WithLabelValues("split_and_scatter_regions")
102+
cmdDurationGetServiceGroup = cmdDuration.WithLabelValues("get_service_group")
103+
cmdDurationGetMinServiceSafePointByServiceGroup = cmdDuration.WithLabelValues("get_min_service_safe_point_by_service_group")
102104
cmdDurationUpdateGCSafePointByServiceGroup = cmdDuration.WithLabelValues("update_gc_safe_point_by_service_group")
103105
cmdDurationUpdateServiceSafePointByServiceGroup = cmdDuration.WithLabelValues("update_service_safe_point_by_service_group")
104-
cmdDurationGetAllServiceGroupGcSafePoint = cmdDuration.WithLabelValues("get_all_service_group_gc_safe_point")
106+
cmdDurationGetAllServiceGroupGCSafePoint = cmdDuration.WithLabelValues("get_all_service_group_gc_safe_point")
105107

106108
cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
107109
cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso")
@@ -113,9 +115,11 @@ var (
113115
cmdFailedDurationGetAllStores = cmdFailedDuration.WithLabelValues("get_all_stores")
114116
cmdFailedDurationUpdateGCSafePoint = cmdFailedDuration.WithLabelValues("update_gc_safe_point")
115117
cmdFailedDurationUpdateServiceGCSafePoint = cmdFailedDuration.WithLabelValues("update_service_gc_safe_point")
118+
cmdFailedDurationGetServiceGroup = cmdFailedDuration.WithLabelValues("get_service_group")
119+
cmdFailedDurationGetMinServiceSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("get_min_service_safe_point_by_service_group")
116120
cmdFailedDurationUpdateGCSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("update_gc_safe_point_by_service_group")
117121
cmdFailedDurationUpdateServiceSafePointByServiceGroup = cmdFailedDuration.WithLabelValues("update_service_safe_point_by_service_group")
118-
cmdFailedDurationGetAllServiceGroupGcSafePoint = cmdFailedDuration.WithLabelValues("get_all_service_group_gc_safe_point")
122+
cmdFailedDurationGetAllServiceGroupGCSafePoint = cmdFailedDuration.WithLabelValues("get_all_service_group_gc_safe_point")
119123
requestDurationTSO = requestDuration.WithLabelValues("tso")
120124
)
121125

0 commit comments

Comments
 (0)