Skip to content

Commit 3f7b9ee

Browse files
committed
request with version
Signed-off-by: 童剑 <[email protected]>
1 parent 538a218 commit 3f7b9ee

File tree

16 files changed

+5138
-75
lines changed

16 files changed

+5138
-75
lines changed

client/go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ module github.com/tikv/pd/client
22

33
go 1.23.12
44

5+
replace github.com/pingcap/kvproto => github.com/bufferflies/kvproto v0.0.0-20251021115503-c6971b425f60
6+
57
require (
68
github.com/BurntSushi/toml v0.3.1
79
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5

client/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT
55
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
66
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
77
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
8+
github.com/bufferflies/kvproto v0.0.0-20251021115503-c6971b425f60 h1:TF0BW0SIJho0sM7d7OK7aHvwUO1YeEVFqQgUnzNP+9Q=
9+
github.com/bufferflies/kvproto v0.0.0-20251021115503-c6971b425f60/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
810
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwPhhyuFkbINB+2a1xATwk8SNDWnJiD41g=
911
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets=
1012
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
@@ -49,8 +51,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
4951
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
5052
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
5153
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
52-
github.com/pingcap/kvproto v0.0.0-20250616075548-d951fb623bb3 h1:OcZxUJEwZzFIqY8AkRIHuEK8U1X5OyLfqAwVnhaKsag=
53-
github.com/pingcap/kvproto v0.0.0-20250616075548-d951fb623bb3/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
5454
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
5555
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
5656
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

client/servicediscovery/service_discovery_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737

3838
"github.com/pingcap/failpoint"
3939
"github.com/pingcap/kvproto/pkg/pdpb"
40+
"github.com/pingcap/kvproto/pkg/tsopb"
4041

4142
"github.com/tikv/pd/client/errs"
4243
"github.com/tikv/pd/client/opt"
@@ -440,3 +441,26 @@ func TestGRPCDialOption(t *testing.T) {
440441
re.Error(err)
441442
re.Greater(time.Since(start), 500*time.Millisecond)
442443
}
444+
445+
func TestUpdateKeyspaceGroup(t *testing.T) {
446+
re := require.New(t)
447+
version := uint64(10)
448+
dis := keyspaceGroupSvcDiscovery{
449+
version: version,
450+
}
451+
ks := &tsopb.KeyspaceGroup{
452+
Id: 1,
453+
UserKind: "basic",
454+
Members: []*tsopb.KeyspaceGroupMember{
455+
{Address: "mock-1", IsPrimary: true},
456+
{Address: "mock-2", IsPrimary: false},
457+
},
458+
}
459+
_, _, success := dis.update(ks, "mock-2", []string{"mock-1"}, []string{"mock-1", "mock-2"}, version)
460+
re.False(success)
461+
re.Empty(dis.primaryURL)
462+
// leader changed
463+
_, _, success = dis.update(ks, "mock-2", []string{"mock-1"}, []string{"mock-1", "mock-2"}, version+1)
464+
re.True(success)
465+
re.Equal("mock-2", dis.primaryURL)
466+
}

client/servicediscovery/tso_service_discovery.go

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,27 @@ type keyspaceGroupSvcDiscovery struct {
6666
secondaryURLs []string
6767
// urls are the primary/secondary serving URL
6868
urls []string
69+
70+
// version is used to avoid updating stale info
71+
version uint64
72+
}
73+
74+
func (k *keyspaceGroupSvcDiscovery) getVersion() uint64 {
75+
return k.version
6976
}
7077

7178
func (k *keyspaceGroupSvcDiscovery) update(
7279
keyspaceGroup *tsopb.KeyspaceGroup,
7380
newPrimaryURL string,
7481
secondaryURLs, urls []string,
75-
) (oldPrimaryURL string, primarySwitched, secondaryChanged bool) {
82+
version uint64,
83+
) (oldPrimaryURL string, primarySwitched, success bool) {
7684
k.Lock()
7785
defer k.Unlock()
78-
86+
if k.version >= version {
87+
return "", false, false
88+
}
89+
success = true
7990
// If the new primary URL is empty, we don't switch the primary URL.
8091
oldPrimaryURL = k.primaryURL
8192
if len(newPrimaryURL) > 0 {
@@ -85,11 +96,11 @@ func (k *keyspaceGroupSvcDiscovery) update(
8596

8697
if !reflect.DeepEqual(k.secondaryURLs, secondaryURLs) {
8798
k.secondaryURLs = secondaryURLs
88-
secondaryChanged = true
8999
}
90100

91101
k.group = keyspaceGroup
92102
k.urls = urls
103+
k.version = version
93104
return
94105
}
95106

@@ -160,6 +171,7 @@ func NewTSOServiceDiscovery(
160171
primaryURL: "",
161172
secondaryURLs: make([]string, 0),
162173
urls: make([]string, 0),
174+
version: 0,
163175
}
164176
c.tsoServerDiscovery = &tsoServerDiscovery{urls: make([]string, 0)}
165177
// Start with the default keyspace group. The actual keyspace group, to which the keyspace belongs,
@@ -413,12 +425,16 @@ func (c *tsoServiceDiscovery) updateMember() error {
413425

414426
keyspaceID := c.GetKeyspaceID()
415427
var keyspaceGroup *tsopb.KeyspaceGroup
428+
var version uint64
429+
curVersion := c.keyspaceGroupSD.getVersion()
416430
if len(tsoServerURL) > 0 {
417-
keyspaceGroup, err = c.findGroupByKeyspaceID(keyspaceID, tsoServerURL, UpdateMemberTimeout)
431+
keyspaceGroup, version, err = c.findGroupByKeyspaceID(keyspaceID, tsoServerURL, UpdateMemberTimeout, c.keyspaceGroupSD.getVersion())
418432
if err != nil {
419433
log.Error("[tso] failed to find the keyspace group",
420434
zap.Uint32("keyspace-id-in-request", keyspaceID),
421435
zap.String("tso-server-url", tsoServerURL),
436+
zap.Uint64("response-version", version),
437+
zap.Uint64("current-version", curVersion),
422438
errs.ZapError(err))
423439
return err
424440
}
@@ -451,6 +467,7 @@ func (c *tsoServiceDiscovery) updateMember() error {
451467
Id: constants.DefaultKeyspaceGroupID,
452468
Members: members,
453469
}
470+
version = curVersion + 1
454471
}
455472

456473
oldGroupID := c.GetKeyspaceGroupID()
@@ -488,8 +505,12 @@ func (c *tsoServiceDiscovery) updateMember() error {
488505
}
489506
}
490507

491-
oldPrimary, primarySwitched, _ :=
492-
c.keyspaceGroupSD.update(keyspaceGroup, primaryURL, secondaryURLs, urls)
508+
oldPrimary, primarySwitched, success :=
509+
c.keyspaceGroupSD.update(keyspaceGroup, primaryURL, secondaryURLs, urls, version)
510+
if !success {
511+
log.Warn("[tso] failed to update keyspace group, met stale keyspace group info", zap.Uint64("version", version))
512+
return errors.Errorf("keyspace group version is stale, current version: %d, new version: %d", c.keyspaceGroupSD.getVersion(), version)
513+
}
493514
if primarySwitched {
494515
log.Info("[tso] updated keyspace group service discovery info",
495516
zap.Uint32("keyspace-id-in-request", keyspaceID),
@@ -505,15 +526,14 @@ func (c *tsoServiceDiscovery) updateMember() error {
505526
if len(primaryURL) == 0 {
506527
return errors.New("no primary URL found")
507528
}
508-
509529
return nil
510530
}
511531

512532
// Query the keyspace group info from the tso server by the keyspace ID. The server side will return
513533
// the info of the keyspace group to which this keyspace belongs.
514534
func (c *tsoServiceDiscovery) findGroupByKeyspaceID(
515-
keyspaceID uint32, tsoSrvURL string, timeout time.Duration,
516-
) (*tsopb.KeyspaceGroup, error) {
535+
keyspaceID uint32, tsoSrvURL string, timeout time.Duration, version uint64,
536+
) (*tsopb.KeyspaceGroup, uint64, error) {
517537
failpoint.Inject("unexpectedCallOfFindGroupByKeyspaceID", func(val failpoint.Value) {
518538
keyspaceToCheck, ok := val.(int)
519539
if ok && keyspaceID == uint32(keyspaceToCheck) {
@@ -525,7 +545,7 @@ func (c *tsoServiceDiscovery) findGroupByKeyspaceID(
525545

526546
cc, err := c.GetOrCreateGRPCConn(tsoSrvURL)
527547
if err != nil {
528-
return nil, err
548+
return nil, 0, err
529549
}
530550

531551
resp, err := tsopb.NewTSOClient(cc).FindGroupByKeyspaceID(
@@ -536,24 +556,30 @@ func (c *tsoServiceDiscovery) findGroupByKeyspaceID(
536556
KeyspaceGroupId: constants.DefaultKeyspaceGroupID,
537557
},
538558
KeyspaceId: keyspaceID,
559+
Version: version,
539560
})
540561
if err != nil {
541562
attachErr := errors.Errorf("error:%s target:%s status:%s",
542563
err, cc.Target(), cc.GetState().String())
543-
return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
564+
return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
544565
}
545566
if resp.GetHeader().GetError() != nil {
546567
attachErr := errors.Errorf("error:%s target:%s status:%s",
547568
resp.GetHeader().GetError().String(), cc.Target(), cc.GetState().String())
548-
return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
569+
return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
549570
}
550571
if resp.KeyspaceGroup == nil {
551572
attachErr := errors.Errorf("error:%s target:%s status:%s",
552573
"no keyspace group found", cc.Target(), cc.GetState().String())
553-
return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
574+
return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
575+
}
576+
if resp.Version < version {
577+
attachErr := errors.Errorf("error:%s target:%s response version:%d current version:%d",
578+
"response version less than the given version", cc.Target(), resp.Version, version)
579+
return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
554580
}
555581

556-
return resp.KeyspaceGroup, nil
582+
return resp.KeyspaceGroup, resp.GetVersion(), nil
557583
}
558584

559585
func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) {

errors.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -566,6 +566,11 @@ error = '''
566566
keyspace group %v does not exist
567567
'''
568568

569+
["PD:keyspace:ErrKeyspaceGroupVersionStale"]
570+
error = '''
571+
keyspace group version is stale
572+
'''
573+
569574
["PD:keyspace:ErrKeyspaceGroupNotInMerging"]
570575
error = '''
571576
keyspace group %v is not in merging state

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ go 1.23.12
77
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
88
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
99

10+
replace github.com/pingcap/kvproto => github.com/bufferflies/kvproto v0.0.0-20251021115503-c6971b425f60
11+
1012
require (
1113
github.com/AlekSi/gocov-xml v1.0.0
1214
github.com/BurntSushi/toml v1.5.0

0 commit comments

Comments
 (0)