Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion br/pkg/restore/split/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ go_test(
],
embed = [":split"],
flaky = True,
shard_count = 28,
shard_count = 30,
deps = [
"//br/pkg/errors",
"//br/pkg/restore/utils",
Expand All @@ -84,6 +84,9 @@ go_test(
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/pdpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//clients/router",
"@com_github_tikv_pd_client//opt",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/split/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func TestScanRegionEmptyResult(t *testing.T) {
mockPDClient := NewMockPDClientForSplit()
keys := [][]byte{[]byte(""), []byte("")}
mockPDClient.SetRegions(keys)
mockPDClient.scanRegions.errors = []error{nil, nil, nil, nil}
mockPDClient.scanRegions.errors = []error{nil, nil, nil, nil, nil, nil, nil, nil}
mockClient := &pdClient{
client: mockPDClient,
splitBatchKeyCnt: 100,
Expand Down
59 changes: 44 additions & 15 deletions br/pkg/restore/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (rs *RegionSplitter) WaitForScatterRegionsTimeout(ctx context.Context, regi
return leftRegions
}

func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) error {
func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo, limitted bool) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment for limited?

// current pd can't guarantee the consistency of returned regions
if len(regions) == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion, "scan region return empty result, startKey: %s, endKey: %s",
Expand All @@ -165,7 +165,7 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro
regions[0].Region.Id,
redact.Key(regions[0].Region.StartKey), redact.Key(startKey),
regions[0].Region.RegionEpoch.String())
} else if len(regions[len(regions)-1].Region.EndKey) != 0 &&
} else if !limitted && len(regions[len(regions)-1].Region.EndKey) != 0 &&
bytes.Compare(regions[len(regions)-1].Region.EndKey, endKey) < 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"last region %d's endKey(%s) < endKey(%s), region epoch: %s",
Expand All @@ -179,11 +179,19 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader is nil", cur.Region.Id)
}
if cur.Leader.StoreId == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader's store id is 0", cur.Region.Id)
}
for _, r := range regions[1:] {
if r.Leader == nil {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader is nil", r.Region.Id)
}
if r.Leader.StoreId == 0 {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's leader's store id is 0", r.Region.Id)
}
if !bytes.Equal(cur.Region.EndKey, r.Region.StartKey) {
return errors.Annotatef(berrors.ErrPDBatchScanRegion,
"region %d's endKey not equal to next region %d's startKey, endKey: %s, startKey: %s, region epoch: %s %s",
Expand All @@ -197,6 +205,34 @@ func checkRegionConsistency(startKey, endKey []byte, regions []*RegionInfo) erro
return nil
}

func scanRegionsLimitWithRetry(
ctx context.Context, client SplitClient, startKey, endKey []byte, limit int, mustLeader bool,
) ([]*RegionInfo, bool, error) {
var (
batch []*RegionInfo
err error
)
_ = utils.WithRetry(ctx, func() error {
defer func() { mustLeader = mustLeader || err != nil }()
if mustLeader {
batch, err = client.ScanRegions(ctx, startKey, endKey, limit)
} else {
batch, err = client.ScanRegions(ctx, startKey, endKey, limit, opt.WithAllowFollowerHandle())
}
if err != nil {
return err
}
if err = checkRegionConsistency(startKey, endKey, batch, true); err != nil {
log.Warn("failed to scan region, retrying",
logutil.ShortError(err),
zap.Int("regionLength", len(batch)))
return err
}
return nil
}, NewWaitRegionOnlineBackoffer())
return batch, mustLeader, err
}

// PaginateScanRegion scan regions with a limit pagination and return all regions
// at once. The returned regions are continuous and cover the key range. If not,
// or meet errors, it will retry internally.
Expand All @@ -210,24 +246,17 @@ func PaginateScanRegion(

var (
lastRegions []*RegionInfo
lastErr error
err error
mustLeader = false
backoffer = NewWaitRegionOnlineBackoffer()
)
_ = utils.WithRetry(ctx, func() error {
var err error
defer func() {
lastErr = err
}()
defer func() { mustLeader = true }()
regions := make([]*RegionInfo, 0, 16)
scanStartKey := startKey
for {
var batch []*RegionInfo
if lastErr != nil {
batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit)
} else {
batch, err = client.ScanRegions(ctx, scanStartKey, endKey, limit, opt.WithAllowFollowerHandle())
}

batch, mustLeader, err = scanRegionsLimitWithRetry(ctx, client, scanStartKey, endKey, limit, mustLeader)
if err != nil {
err = errors.Annotatef(berrors.ErrPDBatchScanRegion.Wrap(err), "scan regions from start-key:%s, err: %s",
redact.Key(scanStartKey), err.Error())
Expand All @@ -252,7 +281,7 @@ func PaginateScanRegion(
}
lastRegions = regions

if err = checkRegionConsistency(startKey, endKey, regions); err != nil {
if err = checkRegionConsistency(startKey, endKey, regions, false); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving this to here and not call checkRegionConsistency?

Image

log.Warn("failed to scan region, retrying",
logutil.ShortError(err),
zap.Int("regionLength", len(regions)))
Expand All @@ -261,7 +290,7 @@ func PaginateScanRegion(
return nil
}, backoffer)

return lastRegions, lastErr
return lastRegions, err
}

// checkPartRegionConsistency only checks the continuity of regions and the first region consistency.
Expand Down
194 changes: 193 additions & 1 deletion br/pkg/restore/split/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -592,6 +595,165 @@ func TestPaginateScanRegion(t *testing.T) {
checkRegionsBoundaries(t, got, [][]byte{{1}, {2}, {3}, {4}, {5}})
}

type mockPDErrorClient struct {
pd.Client
t *testing.T
testCases []scanRegionTestCase
}

type scanRegionTestCase struct {
allowFollowerHandle bool
caseError error
caseRegion []*router.Region
}

func (s *mockPDErrorClient) ScanRegions(_ context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
testCase := s.testCases[0]
s.testCases = s.testCases[1:]
op := &opt.GetRegionOp{}
for _, opt := range opts {
opt(op)
}
require.Equal(s.t, testCase.allowFollowerHandle, op.AllowFollowerHandle)
return testCase.caseRegion, testCase.caseError
}

func newCaseRegion(kids []uint64) []*router.Region {
regions := make([]*router.Region, 0, len(kids))
for _, kid := range kids {
regions = append(regions, &router.Region{
Meta: &metapb.Region{
Id: kid,
StartKey: fmt.Appendf(nil, "%03d", kid),
EndKey: fmt.Appendf(nil, "%03d", kid+1),
},
Leader: &metapb.Peer{
Id: kid,
StoreId: kid,
},
})
}
return regions
}

func rk(kid int) []byte {
return fmt.Appendf(nil, "%03d5", kid)
}

func TestScanRegionsLimitWithRetry(t *testing.T) {
ctx := context.Background()
mockPDClient := &mockPDErrorClient{t: t}
mockClient := &pdClient{client: mockPDClient}

backup := WaitRegionOnlineAttemptTimes
WaitRegionOnlineAttemptTimes = 3
t.Cleanup(func() {
WaitRegionOnlineAttemptTimes = backup
})

caseError := errors.Annotate(berrors.ErrPDBatchScanRegion, "case error")
// Case 1: must leader is false, and the first try is failed
mockPDClient.testCases = []scanRegionTestCase{
{allowFollowerHandle: true, caseError: caseError},
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 3})},
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
}
_, mustLeader, err := scanRegionsLimitWithRetry(ctx, mockClient, rk(1), rk(2), 128, false)
require.True(t, mustLeader)
require.NoError(t, err)
require.Len(t, mockPDClient.testCases, 0)

// Case 2: must leader is false, and the first try is successful
mockPDClient.testCases = []scanRegionTestCase{
{allowFollowerHandle: true, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
}
_, mustLeader, err = scanRegionsLimitWithRetry(ctx, mockClient, rk(1), rk(2), 128, false)
require.False(t, mustLeader)
require.NoError(t, err)
require.Len(t, mockPDClient.testCases, 0)

// Case 3: must leader is true, and the first try is failed
mockPDClient.testCases = []scanRegionTestCase{
{allowFollowerHandle: false, caseError: caseError},
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 3})},
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
}
_, mustLeader, err = scanRegionsLimitWithRetry(ctx, mockClient, rk(1), rk(2), 128, true)
require.True(t, mustLeader)
require.NoError(t, err)
require.Len(t, mockPDClient.testCases, 0)

// Case4: must leader is true, and the first try is successful
mockPDClient.testCases = []scanRegionTestCase{
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
}
_, mustLeader, err = scanRegionsLimitWithRetry(ctx, mockClient, rk(1), rk(2), 128, true)
require.True(t, mustLeader)
require.NoError(t, err)
require.Len(t, mockPDClient.testCases, 0)
}

func checkRegions(t *testing.T, startKid, endKid uint64, regions []*RegionInfo) {
require.Len(t, regions, int(endKid-startKid+1))
i := 0
for kid := startKid; kid <= endKid; kid += 1 {
require.Equal(t, kid, regions[i].Leader.Id)
require.Equal(t, kid, regions[i].Leader.StoreId)
require.Equal(t, kid, regions[i].Region.Id)
require.Equal(t, fmt.Appendf(nil, "%03d", kid), regions[i].Region.StartKey)
require.Equal(t, fmt.Appendf(nil, "%03d", kid+1), regions[i].Region.EndKey)
i += 1
}
}

func TestPaginateScanRegion2(t *testing.T) {
ctx := context.Background()
mockPDClient := &mockPDErrorClient{t: t}
mockClient := &pdClient{client: mockPDClient}

backup := WaitRegionOnlineAttemptTimes
WaitRegionOnlineAttemptTimes = 3
t.Cleanup(func() {
WaitRegionOnlineAttemptTimes = backup
})

caseError := errors.Annotate(berrors.ErrPDBatchScanRegion, "case error")
// Case 1: must leader is false, and the first try is failed
mockPDClient.testCases = []scanRegionTestCase{
{allowFollowerHandle: true, caseError: caseError},
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 3})},
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{4, 5})},
}
regions, err := PaginateScanRegion(ctx, mockClient, rk(1), rk(5), 3)
require.NoError(t, err)
checkRegions(t, 1, 5, regions)

// Case 2: must leader is false, and the first try is successful
mockPDClient.testCases = []scanRegionTestCase{
{allowFollowerHandle: true, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
{allowFollowerHandle: true, caseError: caseError},
{allowFollowerHandle: false, caseError: caseError},
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{4, 5, 6})},
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{7, 8, 9})},
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{10})},
}
regions, err = PaginateScanRegion(ctx, mockClient, rk(1), rk(10), 3)
require.NoError(t, err)
checkRegions(t, 1, 10, regions)

// Case 3: must leader is false, and the first try paginate try is failed
mockPDClient.testCases = []scanRegionTestCase{
{allowFollowerHandle: true, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
{allowFollowerHandle: true, caseRegion: newCaseRegion([]uint64{4, 5})},
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{1, 2, 3})},
{allowFollowerHandle: false, caseRegion: newCaseRegion([]uint64{4, 5, 6})},
}
regions, err = PaginateScanRegion(ctx, mockClient, rk(1), rk(6), 3)
require.NoError(t, err)
checkRegions(t, 1, 6, regions)
}

func TestRegionConsistency(t *testing.T) {
cases := []struct {
startKey []byte
Expand Down Expand Up @@ -685,9 +847,39 @@ func TestRegionConsistency(t *testing.T) {
},
},
},
{
codec.EncodeBytes([]byte{}, []byte("c")),
codec.EncodeBytes([]byte{}, []byte("e")),
"region 6's leader's store id is 0(.*?)",
[]*RegionInfo{
{
Leader: &metapb.Peer{
Id: 6,
StoreId: 0,
},
Region: &metapb.Region{
Id: 6,
StartKey: codec.EncodeBytes([]byte{}, []byte("c")),
EndKey: codec.EncodeBytes([]byte{}, []byte("d")),
RegionEpoch: nil,
},
},
{
Leader: &metapb.Peer{
Id: 6,
StoreId: 0,
},
Region: &metapb.Region{
Id: 8,
StartKey: codec.EncodeBytes([]byte{}, []byte("d")),
EndKey: codec.EncodeBytes([]byte{}, []byte("e")),
},
},
},
},
}
for _, ca := range cases {
err := checkRegionConsistency(ca.startKey, ca.endKey, ca.regions)
err := checkRegionConsistency(ca.startKey, ca.endKey, ca.regions, false)
require.Error(t, err)
require.Regexp(t, ca.err, err.Error())
}
Expand Down