-
Notifications
You must be signed in to change notification settings - Fork 747
client: updating keyspace group must check the previous version #9862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 7 commits
3f7b9ee
66ba782
eb0219b
eec042f
727b351
d7835d5
a05cff6
07950e5
e091432
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -66,16 +66,27 @@ type keyspaceGroupSvcDiscovery struct { | |
| secondaryURLs []string | ||
| // urls are the primary/secondary serving URL | ||
| urls []string | ||
|
|
||
| // version is used to avoid updating stale info | ||
| version uint64 | ||
| } | ||
|
|
||
| func (k *keyspaceGroupSvcDiscovery) getVersion() uint64 { | ||
| return k.version | ||
| } | ||
|
|
||
| func (k *keyspaceGroupSvcDiscovery) update( | ||
| keyspaceGroup *tsopb.KeyspaceGroup, | ||
| newPrimaryURL string, | ||
| secondaryURLs, urls []string, | ||
| ) (oldPrimaryURL string, primarySwitched, secondaryChanged bool) { | ||
| version uint64, | ||
| ) (oldPrimaryURL string, primarySwitched, success bool) { | ||
| k.Lock() | ||
| defer k.Unlock() | ||
|
|
||
| if k.version > version { | ||
| return "", false, false | ||
| } | ||
| success = true | ||
| // If the new primary URL is empty, we don't switch the primary URL. | ||
| oldPrimaryURL = k.primaryURL | ||
| if len(newPrimaryURL) > 0 { | ||
|
|
@@ -85,11 +96,11 @@ func (k *keyspaceGroupSvcDiscovery) update( | |
|
|
||
| if !reflect.DeepEqual(k.secondaryURLs, secondaryURLs) { | ||
| k.secondaryURLs = secondaryURLs | ||
| secondaryChanged = true | ||
| } | ||
|
|
||
| k.group = keyspaceGroup | ||
| k.urls = urls | ||
| k.version = version | ||
| return | ||
| } | ||
|
|
||
|
|
@@ -160,6 +171,7 @@ func NewTSOServiceDiscovery( | |
| primaryURL: "", | ||
| secondaryURLs: make([]string, 0), | ||
| urls: make([]string, 0), | ||
| version: 0, | ||
|
||
| } | ||
| c.tsoServerDiscovery = &tsoServerDiscovery{urls: make([]string, 0)} | ||
| // Start with the default keyspace group. The actual keyspace group, to which the keyspace belongs, | ||
|
|
@@ -413,12 +425,16 @@ func (c *tsoServiceDiscovery) updateMember() error { | |
|
|
||
| keyspaceID := c.GetKeyspaceID() | ||
| var keyspaceGroup *tsopb.KeyspaceGroup | ||
| var version uint64 | ||
| curVersion := c.keyspaceGroupSD.getVersion() | ||
| if len(tsoServerURL) > 0 { | ||
| keyspaceGroup, err = c.findGroupByKeyspaceID(keyspaceID, tsoServerURL, UpdateMemberTimeout) | ||
| keyspaceGroup, version, err = c.findGroupByKeyspaceID(keyspaceID, tsoServerURL, UpdateMemberTimeout, c.keyspaceGroupSD.getVersion()) | ||
| if err != nil { | ||
| log.Error("[tso] failed to find the keyspace group", | ||
| zap.Uint32("keyspace-id-in-request", keyspaceID), | ||
| zap.String("tso-server-url", tsoServerURL), | ||
| zap.Uint64("response-version", version), | ||
| zap.Uint64("current-version", curVersion), | ||
| errs.ZapError(err)) | ||
| return err | ||
| } | ||
|
|
@@ -451,6 +467,7 @@ func (c *tsoServiceDiscovery) updateMember() error { | |
| Id: constants.DefaultKeyspaceGroupID, | ||
| Members: members, | ||
| } | ||
| version = curVersion + 1 | ||
| } | ||
|
|
||
| oldGroupID := c.GetKeyspaceGroupID() | ||
|
|
@@ -488,8 +505,12 @@ func (c *tsoServiceDiscovery) updateMember() error { | |
| } | ||
| } | ||
|
|
||
| oldPrimary, primarySwitched, _ := | ||
| c.keyspaceGroupSD.update(keyspaceGroup, primaryURL, secondaryURLs, urls) | ||
| oldPrimary, primarySwitched, success := | ||
| c.keyspaceGroupSD.update(keyspaceGroup, primaryURL, secondaryURLs, urls, version) | ||
| if !success { | ||
| log.Warn("[tso] failed to update keyspace group, met stale keyspace group info", zap.Uint64("version", version)) | ||
| return errors.Errorf("keyspace group version is stale, current version: %d, new version: %d", c.keyspaceGroupSD.getVersion(), version) | ||
| } | ||
| if primarySwitched { | ||
| log.Info("[tso] updated keyspace group service discovery info", | ||
| zap.Uint32("keyspace-id-in-request", keyspaceID), | ||
|
|
@@ -505,15 +526,14 @@ func (c *tsoServiceDiscovery) updateMember() error { | |
| if len(primaryURL) == 0 { | ||
| return errors.New("no primary URL found") | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Query the keyspace group info from the tso server by the keyspace ID. The server side will return | ||
| // the info of the keyspace group to which this keyspace belongs. | ||
| func (c *tsoServiceDiscovery) findGroupByKeyspaceID( | ||
| keyspaceID uint32, tsoSrvURL string, timeout time.Duration, | ||
| ) (*tsopb.KeyspaceGroup, error) { | ||
| keyspaceID uint32, tsoSrvURL string, timeout time.Duration, version uint64, | ||
| ) (*tsopb.KeyspaceGroup, uint64, error) { | ||
| failpoint.Inject("unexpectedCallOfFindGroupByKeyspaceID", func(val failpoint.Value) { | ||
| keyspaceToCheck, ok := val.(int) | ||
| if ok && keyspaceID == uint32(keyspaceToCheck) { | ||
|
|
@@ -525,7 +545,7 @@ func (c *tsoServiceDiscovery) findGroupByKeyspaceID( | |
|
|
||
| cc, err := c.GetOrCreateGRPCConn(tsoSrvURL) | ||
| if err != nil { | ||
| return nil, err | ||
| return nil, 0, err | ||
| } | ||
|
|
||
| resp, err := tsopb.NewTSOClient(cc).FindGroupByKeyspaceID( | ||
|
|
@@ -536,24 +556,30 @@ func (c *tsoServiceDiscovery) findGroupByKeyspaceID( | |
| KeyspaceGroupId: constants.DefaultKeyspaceGroupID, | ||
| }, | ||
| KeyspaceId: keyspaceID, | ||
| Version: version, | ||
| }) | ||
| if err != nil { | ||
| attachErr := errors.Errorf("error:%s target:%s status:%s", | ||
| err, cc.Target(), cc.GetState().String()) | ||
| return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| } | ||
| if resp.GetHeader().GetError() != nil { | ||
| attachErr := errors.Errorf("error:%s target:%s status:%s", | ||
| resp.GetHeader().GetError().String(), cc.Target(), cc.GetState().String()) | ||
| return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| } | ||
| if resp.KeyspaceGroup == nil { | ||
| attachErr := errors.Errorf("error:%s target:%s status:%s", | ||
| "no keyspace group found", cc.Target(), cc.GetState().String()) | ||
| return nil, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| } | ||
| if resp.Version < version { | ||
| attachErr := errors.Errorf("error:%s target:%s response version:%d current version:%d", | ||
| "response version less than the given version", cc.Target(), resp.Version, version) | ||
| return nil, 0, errs.ErrClientFindGroupByKeyspaceID.Wrap(attachErr).GenWithStackByCause() | ||
| } | ||
|
|
||
| return resp.KeyspaceGroup, nil | ||
| return resp.KeyspaceGroup, resp.GetVersion(), nil | ||
| } | ||
|
|
||
| func (c *tsoServiceDiscovery) getTSOServer(sd ServiceDiscovery) (string, error) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding a comment to clarify where the version is sourced from and briefly explain its usage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done