-
Notifications
You must be signed in to change notification settings - Fork 747
client: updating keyspace group must check the previous version #9862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
bufferflies
wants to merge
7
commits into
tikv:master
Choose a base branch
from
bufferflies:keyspace/version
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+5,247
−75
Open
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
3f7b9ee
request with version
bufferflies 66ba782
request with version
bufferflies eb0219b
add watch failed test
bufferflies eec042f
resolve conflict
bufferflies 727b351
remove unused code
bufferflies d7835d5
Merge branch 'master' of github.com:tikv/pd into keyspace/version
bufferflies a05cff6
link
bufferflies File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,16 +66,27 @@ type keyspaceGroupSvcDiscovery struct { | |
| secondaryURLs []string | ||
| // urls are the primary/secondary serving URL | ||
| urls []string | ||
|
|
||
| // version is used to avoid updating stale info | ||
| version uint64 | ||
| } | ||
|
|
||
| func (k *keyspaceGroupSvcDiscovery) getVersion() uint64 { | ||
| return k.version | ||
| } | ||
|
|
||
| func (k *keyspaceGroupSvcDiscovery) update( | ||
| keyspaceGroup *tsopb.KeyspaceGroup, | ||
| newPrimaryURL string, | ||
| secondaryURLs, urls []string, | ||
| ) (oldPrimaryURL string, primarySwitched, secondaryChanged bool) { | ||
| version uint64, | ||
| ) (oldPrimaryURL string, primarySwitched, success bool) { | ||
| k.Lock() | ||
| defer k.Unlock() | ||
|
|
||
| if k.version > version { | ||
| return "", false, false | ||
| } | ||
| success = true | ||
| // If the new primary URL is empty, we don't switch the primary URL. | ||
| oldPrimaryURL = k.primaryURL | ||
| if len(newPrimaryURL) > 0 { | ||
|
|
@@ -85,11 +96,11 @@ func (k *keyspaceGroupSvcDiscovery) update( | |
|
|
||
| if !reflect.DeepEqual(k.secondaryURLs, secondaryURLs) { | ||
| k.secondaryURLs = secondaryURLs | ||
| secondaryChanged = true | ||
| } | ||
|
|
||
| k.group = keyspaceGroup | ||
| k.urls = urls | ||
| k.version = version | ||
| return | ||
| } | ||
|
|
||
|
|
@@ -160,6 +171,7 @@ func NewTSOServiceDiscovery( | |
| primaryURL: "", | ||
| secondaryURLs: make([]string, 0), | ||
| urls: make([]string, 0), | ||
| version: 0, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the TSO client starts up, does it use version=0 for the initial request? Furthermore, if this initial request is routed to a TSO service with a lower version, is that behavior expected? |
||
| } | ||
| c.tsoServerDiscovery = &tsoServerDiscovery{urls: make([]string, 0)} | ||
| // Start with the default keyspace group. The actual keyspace group, to which the keyspace belongs, | ||
|
|
@@ -413,12 +425,16 @@ func (c *tsoServiceDiscovery) updateMember() error { | |
|
|
||
| keyspaceID := c.GetKeyspaceID() | ||
| var keyspaceGroup *tsopb.KeyspaceGroup | ||
| var version uint64 | ||
| curVersion := c.keyspaceGroupSD.getVersion() | ||
| if len(tsoServerURL) > 0 { | ||
| keyspaceGroup, err = c.findGroupByKeyspaceID(keyspaceID, tsoServerURL, UpdateMemberTimeout) | ||
| keyspaceGroup, version, err = c.findGroupByKeyspaceID(keyspaceID, tsoServerURL, UpdateMemberTimeout, c.keyspaceGroupSD.getVersion()) | ||
| if err != nil { | ||
| log.Error("[tso] failed to find the keyspace group", | ||
| zap.Uint32("keyspace-id-in-request", keyspaceID), | ||
| zap.String("tso-server-url", tsoServerURL), | ||
| zap.Uint64("response-version", version), | ||
| zap.Uint64("current-version", curVersion), | ||
| errs.ZapError(err)) | ||
| return err | ||
| } | ||
|
|
@@ -451,6 +467,7 @@ func (c *tsoServiceDiscovery) updateMember() error { | |
| Id: constants.DefaultKeyspaceGroupID, | ||
| Members: members, | ||
| } | ||
| version = curVersion + 1 | ||
| } | ||
|
|
||
| oldGroupID := c.GetKeyspaceGroupID() | ||
|
|
@@ -488,8 +505,12 @@ func (c *tsoServiceDiscovery) updateMember() error { | |
| } | ||
| } | ||
|
|
||
| oldPrimary, primarySwitched, _ := | ||
| c.keyspaceGroupSD.update(keyspaceGroup, primaryURL, secondaryURLs, urls) | ||
| oldPrimary, primarySwitched, success := | ||
| c.keyspaceGroupSD.update(keyspaceGroup, primaryURL, secondaryURLs, urls, version) | ||
| if !success { | ||
| log.Warn("[tso] failed to update keyspace group, met stale keyspace group info", zap.Uint64("version", version)) | ||
| return errors.Errorf("keyspace group version is stale, current version: %d, new version: %d", c.keyspaceGroupSD.getVersion(), version) | ||
| } | ||
| if primarySwitched { | ||
| log.Info("[tso] updated keyspace group service discovery info", | ||
| zap.Uint32("keyspace-id-in-request", keyspaceID), | ||
|
|
@@ -505,15 +526,14 @@ func (c *tsoServiceDiscovery) updateMember() error { | |
| if len(primaryURL) == 0 { | ||
| return errors.New("no primary URL found") | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Query the keyspace group info from the tso server by the keyspace ID. The server side will return | ||
| // the info of the keyspace group to which this keyspace belongs. | ||
| func (c *tsoServiceDiscovery) findGroupByKeyspaceID( | ||
| keyspaceID uint32, tsoSrvURL string, timeout time.Duration, | ||
| ) (*tsopb.KeyspaceGroup, error) { | ||
| keyspaceID uint32, tsoSrvURL string, timeout time.Duration, version uint64, | ||
| ) (*tsopb.KeyspaceGroup, uint64, error) { | ||
| failpoint.Inject("unexpectedCallOfFindGroupByKeyspaceID", func(val failpoint.Value) { | ||
| keyspaceToCheck, ok := val.(int) | ||
| if ok && keyspaceID == uint32(keyspaceToCheck) { | ||
|
|
@@ -525,7 +545,7 @@ func (c *tsoServiceDiscovery) findGroupByKeyspaceID( | |
|
|
||
| cc, err := c.GetOrCreateGRPCConn(tsoSrvURL) | ||
| if err != nil { | ||
| return nil, err | ||
| return nil, 0, err | ||
| } | ||
|
|
||
| resp, err := tsopb.NewTSOClient(cc).FindGroupByKeyspaceID( | ||
|
|
@@ -536,24 +556,30 @@ func (c *tsoServiceDiscovery) findGroupByKeyspaceID( | |
| KeyspaceGroupId: constants.DefaultKeyspaceGroupID, | ||
| }, | ||
| KeyspaceId: keyspaceID, | ||
| Version: version, | ||
| }) | ||
| if err != nil { | ||
| attachErr := errors.Errorf("error:%s target:%s status:%s", | ||
| err, cc.Target(), cc.GetState().String()) | ||
| return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| } | ||
| if resp.GetHeader().GetError() != nil { | ||
| attachErr := errors.Errorf("error:%s target:%s status:%s", | ||
| resp.GetHeader().GetError().String(), cc.Target(), cc.GetState().String()) | ||
| return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| } | ||
| if resp.KeyspaceGroup == nil { | ||
| attachErr := errors.Errorf("error:%s target:%s status:%s", | ||
| "no keyspace group found", cc.Target(), cc.GetState().String()) | ||
| return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| } | ||
| if resp.Version < version { | ||
| attachErr := errors.Errorf("error:%s target:%s response version:%d current version:%d", | ||
| "response version less than the given version", cc.Target(), resp.Version, version) | ||
| return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| } | ||
|
|
||
| return resp.KeyspaceGroup, nil | ||
| return resp.KeyspaceGroup, resp.GetVersion(), nil | ||
| } | ||
|
|
||
| func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) { | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding a comment to clarify where the version is sourced from and briefly explain its usage?