Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/tikv/pd/client

go 1.23.12

replace github.com/pingcap/kvproto => github.com/bufferflies/kvproto v0.0.0-20251021115503-c6971b425f60

require (
github.com/BurntSushi/toml v0.3.1
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bufferflies/kvproto v0.0.0-20251021115503-c6971b425f60 h1:TF0BW0SIJho0sM7d7OK7aHvwUO1YeEVFqQgUnzNP+9Q=
github.com/bufferflies/kvproto v0.0.0-20251021115503-c6971b425f60/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwPhhyuFkbINB+2a1xATwk8SNDWnJiD41g=
github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
Expand Down Expand Up @@ -49,8 +51,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20250616075548-d951fb623bb3 h1:OcZxUJEwZzFIqY8AkRIHuEK8U1X5OyLfqAwVnhaKsag=
github.com/pingcap/kvproto v0.0.0-20250616075548-d951fb623bb3/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
54 changes: 40 additions & 14 deletions client/servicediscovery/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,27 @@ type keyspaceGroupSvcDiscovery struct {
secondaryURLs []string
// urls are the primary/secondary serving URL
urls []string

// version is used to avoid updating stale info
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a comment to clarify where the version is sourced from and briefly explain its usage?

version uint64
}

func (k *keyspaceGroupSvcDiscovery) getVersion() uint64 {
return k.version
}

func (k *keyspaceGroupSvcDiscovery) update(
keyspaceGroup *tsopb.KeyspaceGroup,
newPrimaryURL string,
secondaryURLs, urls []string,
) (oldPrimaryURL string, primarySwitched, secondaryChanged bool) {
version uint64,
) (oldPrimaryURL string, primarySwitched, success bool) {
k.Lock()
defer k.Unlock()

if k.version > version {
return "", false, false
}
success = true
// If the new primary URL is empty, we don't switch the primary URL.
oldPrimaryURL = k.primaryURL
if len(newPrimaryURL) > 0 {
Expand All @@ -85,11 +96,11 @@ func (k *keyspaceGroupSvcDiscovery) update(

if !reflect.DeepEqual(k.secondaryURLs, secondaryURLs) {
k.secondaryURLs = secondaryURLs
secondaryChanged = true
}

k.group = keyspaceGroup
k.urls = urls
k.version = version
return
}

Expand Down Expand Up @@ -160,6 +171,7 @@ func NewTSOServiceDiscovery(
primaryURL: "",
secondaryURLs: make([]string, 0),
urls: make([]string, 0),
version: 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the TSO client starts up, does it use version=0 for the initial request?

Furthermore, if this initial request is routed to a TSO service with a lower version, is that behavior expected?

}
c.tsoServerDiscovery = &tsoServerDiscovery{urls: make([]string, 0)}
// Start with the default keyspace group. The actual keyspace group, to which the keyspace belongs,
Expand Down Expand Up @@ -413,12 +425,16 @@ func (c *tsoServiceDiscovery) updateMember() error {

keyspaceID := c.GetKeyspaceID()
var keyspaceGroup *tsopb.KeyspaceGroup
var version uint64
curVersion := c.keyspaceGroupSD.getVersion()
if len(tsoServerURL) > 0 {
keyspaceGroup, err = c.findGroupByKeyspaceID(keyspaceID, tsoServerURL, UpdateMemberTimeout)
keyspaceGroup, version, err = c.findGroupByKeyspaceID(keyspaceID, tsoServerURL, UpdateMemberTimeout, c.keyspaceGroupSD.getVersion())
if err != nil {
log.Error("[tso] failed to find the keyspace group",
zap.Uint32("keyspace-id-in-request", keyspaceID),
zap.String("tso-server-url", tsoServerURL),
zap.Uint64("response-version", version),
zap.Uint64("current-version", curVersion),
errs.ZapError(err))
return err
}
Expand Down Expand Up @@ -451,6 +467,7 @@ func (c *tsoServiceDiscovery) updateMember() error {
Id: constants.DefaultKeyspaceGroupID,
Members: members,
}
version = curVersion + 1
}

oldGroupID := c.GetKeyspaceGroupID()
Expand Down Expand Up @@ -488,8 +505,12 @@ func (c *tsoServiceDiscovery) updateMember() error {
}
}

oldPrimary, primarySwitched, _ :=
c.keyspaceGroupSD.update(keyspaceGroup, primaryURL, secondaryURLs, urls)
oldPrimary, primarySwitched, success :=
c.keyspaceGroupSD.update(keyspaceGroup, primaryURL, secondaryURLs, urls, version)
if !success {
log.Warn("[tso] failed to update keyspace group, met stale keyspace group info", zap.Uint64("version", version))
return errors.Errorf("keyspace group version is stale, current version: %d, new version: %d", c.keyspaceGroupSD.getVersion(), version)
}
if primarySwitched {
log.Info("[tso] updated keyspace group service discovery info",
zap.Uint32("keyspace-id-in-request", keyspaceID),
Expand All @@ -505,15 +526,14 @@ func (c *tsoServiceDiscovery) updateMember() error {
if len(primaryURL) == 0 {
return errors.New("no primary URL found")
}

return nil
}

// Query the keyspace group info from the tso server by the keyspace ID. The server side will return
// the info of the keyspace group to which this keyspace belongs.
func (c *tsoServiceDiscovery) findGroupByKeyspaceID(
keyspaceID uint32, tsoSrvURL string, timeout time.Duration,
) (*tsopb.KeyspaceGroup, error) {
keyspaceID uint32, tsoSrvURL string, timeout time.Duration, version uint64,
) (*tsopb.KeyspaceGroup, uint64, error) {
failpoint.Inject("unexpectedCallOfFindGroupByKeyspaceID", func(val failpoint.Value) {
keyspaceToCheck, ok := val.(int)
if ok && keyspaceID == uint32(keyspaceToCheck) {
Expand All @@ -525,7 +545,7 @@ func (c *tsoServiceDiscovery) findGroupByKeyspaceID(

cc, err := c.GetOrCreateGRPCConn(tsoSrvURL)
if err != nil {
return nil, err
return nil, 0, err
}

resp, err := tsopb.NewTSOClient(cc).FindGroupByKeyspaceID(
Expand All @@ -536,24 +556,30 @@ func (c *tsoServiceDiscovery) findGroupByKeyspaceID(
KeyspaceGroupId: constants.DefaultKeyspaceGroupID,
},
KeyspaceId: keyspaceID,
Version: version,
})
if err != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
err, cc.Target(), cc.GetState().String())
return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
}
if resp.GetHeader().GetError() != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
resp.GetHeader().GetError().String(), cc.Target(), cc.GetState().String())
return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
}
if resp.KeyspaceGroup == nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
"no keyspace group found", cc.Target(), cc.GetState().String())
return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
}
if resp.Version < version {
attachErr := errors.Errorf("error:%s target:%s response version:%d current version:%d",
"response version less than the given version", cc.Target(), resp.Version, version)
return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause()
}

return resp.KeyspaceGroup, nil
return resp.KeyspaceGroup, resp.GetVersion(), nil
}

func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) {
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,11 @@ error = '''
primary of keyspace group does not exist
'''

["PD:keyspace:ErrKeyspaceGroupVersionStale"]
error = '''
keyspace group version is stale
'''

["PD:keyspace:ErrKeyspaceGroupWithEmptyKeyspace"]
error = '''
keyspace group with empty keyspace
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ go 1.23.12
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch

replace github.com/pingcap/kvproto => github.com/bufferflies/kvproto v0.0.0-20251021115503-c6971b425f60

require (
github.com/AlekSi/gocov-xml v1.0.0
github.com/BurntSushi/toml v1.5.0
Expand Down
Loading
Loading