Skip to content

Commit be2c13e

Browse files
authored
Set the maximum number of load retries and retry time (#22633)
### **User description** ## What type of PR is this? - [ ] API-change - [x] BUG - [x] Improvement - [ ] Documentation - [ ] Feature - [ ] Test and CI - [ ] Code Refactoring ## Which issue(s) this PR fixes: issue matrixorigin/MO-Cloud#6448 ## What this PR does / why we need it: Set the maximum number of load retries and retry time ___ ### **PR Type** Bug fix, Enhancement ___ ### **Description** - Added retry mechanism with max 100 retries and 3-minute timeout for load operations - Enhanced logging for long-running read/write operations (>1 minute) - Improved error handling to track consecutive failures and prevent infinite loops - Refactored timing measurements to capture read/write durations more accurately ___ ### Diagram Walkthrough ```mermaid flowchart LR A["Read packet"] --> B["Check read time"] B --> C["Write to pipe"] C --> D["Check write time"] D --> E{"Error occurred?"} E -- "Yes" --> F["Increment error counter"] F --> G{"Max retries or timeout?"} G -- "Yes" --> H["Return error"] G -- "No" --> A E -- "No" --> I["Reset error counter"] I --> A ``` <details> <summary><h3> File Walkthrough</h3></summary> <table><thead><tr><th></th><th align="left">Relevant files</th></tr></thead><tbody><tr><td><strong>Bug fix</strong></td><td><table> <tr> <td> <details> <summary><strong>mysql_cmd_executor.go</strong><dd><code>Add retry limits and enhanced error handling for load operations</code></dd></summary> <hr> pkg/frontend/mysql_cmd_executor.go <ul><li>Added retry mechanism with maximum 100 consecutive errors and 3-minute <br>timeout<br> <li> Enhanced logging for read/write operations exceeding 1 minute duration<br> <li> Improved error tracking with <code>retError</code> variable to preserve first error<br> <li> Refactored timing measurements for more accurate read/write duration <br>capture</ul> </details> </td> <td><a href="https://github.com/matrixorigin/matrixone/pull/22633/files#diff-af2611d5fc89704398fe09d09644efa41fec8931b395eda292f2f474f1216275">+43/-14</a>&nbsp; </td> </tr> </table></td></tr></tr></tbody></table> </details> ___
1 parent 48f076d commit be2c13e

File tree

1 file changed

+43
-14
lines changed

1 file changed

+43
-14
lines changed

pkg/frontend/mysql_cmd_executor.go

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2378,13 +2378,17 @@ func canExecuteStatementInUncommittedTransaction(reqCtx context.Context, ses FeS
23782378
func readThenWrite(ses FeSession, execCtx *ExecCtx, param *tree.ExternParam, writer *io.PipeWriter, mysqlRrWr MysqlRrWr, skipWrite bool, epoch uint64) (_ bool, _ time.Duration, _ time.Duration, err error) {
23792379
var readTime, writeTime time.Duration
23802380
var payload []byte
2381-
readStart := time.Now()
2381+
start := time.Now()
23822382
defer func() {
23832383
if err != nil {
23842384
mysqlRrWr.FreeLoadLocal()
23852385
}
23862386
}()
23872387
payload, err = mysqlRrWr.ReadLoadLocalPacket()
2388+
readTime = time.Since(start)
2389+
if readTime > time.Minute {
2390+
logutil.Infof("ReadLoadLocalPacket longtime %v, size %d", readTime, len(payload))
2391+
}
23882392
if err != nil {
23892393
if errors.Is(err, errorInvalidLength0) {
23902394
return skipWrite, readTime, writeTime, err
@@ -2394,23 +2398,26 @@ func readThenWrite(ses FeSession, execCtx *ExecCtx, param *tree.ExternParam, wri
23942398
}
23952399
return skipWrite, readTime, writeTime, err
23962400
}
2397-
readTime = time.Since(readStart)
23982401

23992402
//empty packet means the file is over.
2400-
length := len(payload)
2401-
if length == 0 {
2403+
size := len(payload)
2404+
if size == 0 {
24022405
return skipWrite, readTime, writeTime, errorInvalidLength0
24032406
}
2404-
ses.CountPayload(length)
2407+
ses.CountPayload(size)
24052408

24062409
// If inner error occurs(unexpected or expected(ctrl-c)), proc.Base.LoadLocalReader will be closed.
24072410
// Then write will return error, but we need to read the rest of the data and not write it to pipe.
24082411
// So we need a flag[skipWrite] to tell us whether we need to write the data to pipe.
24092412
// https://github.com/matrixorigin/matrixone/issues/6665#issuecomment-1422236478
24102413

2411-
writeStart := time.Now()
2414+
start = time.Now()
24122415
if !skipWrite {
24132416
_, err = writer.Write(payload)
2417+
writeTime = time.Since(start)
2418+
if writeTime > time.Minute {
2419+
logutil.Infof("WritePacket longtime %v, size %d", writeTime, len(payload))
2420+
}
24142421
if err != nil {
24152422
ses.Errorf(execCtx.reqCtx,
24162423
"Failed to load local file",
@@ -2419,7 +2426,6 @@ func readThenWrite(ses FeSession, execCtx *ExecCtx, param *tree.ExternParam, wri
24192426
zap.Error(err))
24202427
skipWrite = true
24212428
}
2422-
writeTime = time.Since(writeStart)
24232429

24242430
}
24252431
return skipWrite, readTime, writeTime, err
@@ -2443,34 +2449,36 @@ func processLoadLocal(ses FeSession, execCtx *ExecCtx, param *tree.ExternParam,
24432449
defer func() {
24442450
close(quitC)
24452451
}()
2446-
mysqlRrWr := ses.GetResponser().MysqlRrWr()
2452+
mysqlRwer := ses.GetResponser().MysqlRrWr()
24472453
defer func() {
24482454
err2 := writer.Close()
24492455
if err == nil {
24502456
err = err2
24512457
}
24522458
//free load local buffer anyway
2453-
mysqlRrWr.FreeLoadLocal()
2459+
mysqlRwer.FreeLoadLocal()
24542460
}()
24552461
err = plan2.InitInfileParam(param)
24562462
if err != nil {
24572463
return
24582464
}
2459-
err = mysqlRrWr.WriteLocalInfileRequest(param.Filepath)
2465+
err = mysqlRwer.WriteLocalInfileRequest(param.Filepath)
24602466
if err != nil {
24612467
return
24622468
}
24632469
var skipWrite bool
24642470
skipWrite = false
24652471
var readTime, writeTime time.Duration
2472+
var retError error
24662473
start := time.Now()
2467-
epoch, printEvery, minReadTime, maxReadTime, minWriteTime, maxWriteTime := uint64(0), uint64(1024*60), 24*time.Hour, time.Nanosecond, 24*time.Hour, time.Nanosecond
2474+
epoch, printTime, minReadTime, maxReadTime, minWriteTime, maxWriteTime := uint64(0), uint64(1024*60), 24*time.Hour, time.Nanosecond, 24*time.Hour, time.Nanosecond
24682475

2469-
skipWrite, readTime, writeTime, err = readThenWrite(ses, execCtx, param, writer, mysqlRrWr, skipWrite, epoch)
2476+
skipWrite, readTime, writeTime, err = readThenWrite(ses, execCtx, param, writer, mysqlRwer, skipWrite, epoch)
24702477
if err != nil {
24712478
if errors.Is(err, errorInvalidLength0) {
24722479
return nil
24732480
}
2481+
retError = err
24742482
}
24752483
if readTime > maxReadTime {
24762484
maxReadTime = readTime
@@ -2486,13 +2494,34 @@ func processLoadLocal(ses FeSession, execCtx *ExecCtx, param *tree.ExternParam,
24862494
minWriteTime = writeTime
24872495
}
24882496

2497+
const maxRetries = 100 // Maximum number of consecutive errors
2498+
const maxTotalTime = 3 * time.Minute // Maximum total consecutive processing time
2499+
var consecutiveErrors int
2500+
consecutiveLoopStartTime := time.Now()
2501+
24892502
for {
2490-
skipWrite, readTime, writeTime, err = readThenWrite(ses, execCtx, param, writer, mysqlRrWr, skipWrite, epoch)
2503+
skipWrite, readTime, writeTime, err = readThenWrite(ses, execCtx, param, writer, mysqlRwer, skipWrite, epoch)
24912504
if err != nil {
24922505
if errors.Is(err, errorInvalidLength0) {
2506+
if retError != nil {
2507+
err = retError
2508+
break
2509+
}
24932510
err = nil
24942511
break
24952512
}
2513+
retError = err
2514+
consecutiveErrors++
2515+
ses.Errorf(execCtx.reqCtx, "readThenWrite error (attempt %d): %v", consecutiveErrors, err)
2516+
time.Sleep(10 * time.Millisecond)
2517+
2518+
if consecutiveErrors >= maxRetries || time.Since(consecutiveLoopStartTime) > maxTotalTime {
2519+
return moerr.NewInternalErrorf(execCtx.reqCtx,
2520+
"load local file failed: consecutive errors (%d), timeout after %v", maxRetries, maxTotalTime)
2521+
}
2522+
} else {
2523+
consecutiveErrors = 0
2524+
consecutiveLoopStartTime = time.Now()
24962525
}
24972526

24982527
if readTime > maxReadTime {
@@ -2509,7 +2538,7 @@ func processLoadLocal(ses FeSession, execCtx *ExecCtx, param *tree.ExternParam,
25092538
minWriteTime = writeTime
25102539
}
25112540

2512-
if epoch%printEvery == 0 {
2541+
if epoch%printTime == 0 {
25132542
if execCtx.isIssue3482 {
25142543
ses.Infof(execCtx.reqCtx, "load local '%s', epoch: %d, skipWrite: %v, minReadTime: %s, maxReadTime: %s, minWriteTime: %s, maxWriteTime: %s,\n", param.Filepath, epoch, skipWrite, minReadTime.String(), maxReadTime.String(), minWriteTime.String(), maxWriteTime.String())
25152544
}

0 commit comments

Comments
 (0)