diff --git a/docs/en/14-reference/03-taos-sql/22-meta.md b/docs/en/14-reference/03-taos-sql/22-meta.md index d4499cd77478..4a2afb9b736a 100644 --- a/docs/en/14-reference/03-taos-sql/22-meta.md +++ b/docs/en/14-reference/03-taos-sql/22-meta.md @@ -331,7 +331,7 @@ Note: Users with SYSINFO property set to 0 cannot view this table. |:----|:-----------|:-----------|:--------------------| | 1 | db_name | VARCHAR(32) | Database name | | 2 | vgroup_id | INT | vgroup ID | -| 3 | wal | BIGINT | WAL file size, in KB | +| 3 | wal_size | BIGINT | WAL file size, in KB | | 4 | data1 | BIGINT | Data file size on primary storage, in KB | | 5 | data2 | BIGINT | Data file size on secondary storage, in KB | | 6 | data3 | BIGINT | Data file size on tertiary storage, in KB | diff --git a/docs/zh/14-reference/03-taos-sql/22-meta.md b/docs/zh/14-reference/03-taos-sql/22-meta.md index 130332a19aa3..7897362a10c5 100644 --- a/docs/zh/14-reference/03-taos-sql/22-meta.md +++ b/docs/zh/14-reference/03-taos-sql/22-meta.md @@ -335,7 +335,7 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数 |:----|:-----------|:------------|:--------| | 1 | db_name | VARCHAR(32) | 数据库名称 | | 2 | vgroup_id | INT | vgroup 的 ID | -| 3 | wal | BIGINT | wal 文件大小,单位为 KB | +| 3 | wal_size | BIGINT | wal 文件大小,单位为 KB | | 4 | data1 | BIGINT | 一级存储上数据文件的大小,单位为 KB | | 5 | data2 | BIGINT | 二级存储上数据文件的大小,单位为 KB | | 6 | data3 | BIGINT | 三级存储上数据文件的大小,单位为 KB | diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4efead7199fc..16c1c62f7b07 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -333,6 +333,7 @@ typedef enum ENodeType { QUERY_NODE_REVOKE_STMT, QUERY_NODE_ALTER_CLUSTER_STMT, QUERY_NODE_S3MIGRATE_DATABASE_STMT, + QUERY_NODE_TRIM_DATABASE_WAL_STMT, QUERY_NODE_CREATE_TSMA_STMT, QUERY_NODE_DROP_TSMA_STMT, QUERY_NODE_CREATE_VIRTUAL_TABLE_STMT, @@ -375,6 +376,7 @@ typedef enum ENodeType { QUERY_NODE_ASSIGN_LEADER_STMT, QUERY_NODE_SHOW_CREATE_TSMA_STMT, QUERY_NODE_SHOW_CREATE_VTABLE_STMT, + QUERY_NODE_SET_VGROUP_KEEP_VERSION_STMT, // show statement nodes // see 'sysTableShowAdapter', 'SYSTABLE_SHOW_TYPE_OFFSET' @@ -1711,6 +1713,14 @@ typedef struct { int32_t tSerializeSVS3MigrateDbReq(void* buf, int32_t bufLen, SVS3MigrateDbReq* pReq); int32_t tDeserializeSVS3MigrateDbReq(void* buf, int32_t bufLen, SVS3MigrateDbReq* pReq); +typedef struct { + int32_t vgId; + int64_t keepVersion; +} SMndSetVgroupKeepVersionReq; + +int32_t tSerializeSMndSetVgroupKeepVersionReq(void* buf, int32_t bufLen, SMndSetVgroupKeepVersionReq* pReq); +int32_t tDeserializeSMndSetVgroupKeepVersionReq(void* buf, int32_t bufLen, SMndSetVgroupKeepVersionReq* pReq); + typedef struct { int32_t timestampSec; int32_t ttlDropMaxCount; @@ -4094,6 +4104,13 @@ int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamR int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq); void tFreeMDropStreamReq(SMDropStreamReq* pReq); +typedef struct SVndSetKeepVersionReq { + int64_t keepVersion; +} SVndSetKeepVersionReq; + +int32_t tSerializeSVndSetKeepVersionReq(void* buf, int32_t bufLen, SVndSetKeepVersionReq* pReq); +int32_t tDeserializeSVndSetKeepVersionReq(void* buf, int32_t bufLen, SVndSetKeepVersionReq* pReq); + typedef struct SVUpdateCheckpointInfoReq { SMsgHead head; int64_t streamId; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 9928656b82bd..ebb2d3e6224e 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -135,6 +135,8 @@ TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_INFO, "get-db-info", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_WHITELIST_DUAL, "get-user-whitelist-dual", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_IP_WHITE_DUAL, "retrieve-ip-white-dual", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_SET_VGROUP_KEEP_VERSION, "set-vgroup-keep-version", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TRIM_DB_WAL, "trim-db-wal", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_DND_MSG) TD_NEW_MSG_SEG(TDMT_MND_MSG) // 1<<8 @@ -327,6 +329,8 @@ TD_DEF_MSG_TYPE(TDMT_VND_TABLE_NAME, "vnode-table-name", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_VSUBTABLES_META, "vnode-virtual_stables-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_VSTB_REF_DBS, "vnode-virtual-stables-ref-dbs", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SET_KEEP_VERSION, "vnode-set-keep-version", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TRIM_WAL, "vnode-trim-wal", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8 diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index ded8ac49c69e..ed9c1c51707c 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -742,6 +742,17 @@ typedef struct SBalanceVgroupLeaderStmt { char dbName[TSDB_DB_NAME_LEN]; } SBalanceVgroupLeaderStmt; +typedef struct SSetVgroupKeepVersionStmt { + ENodeType type; + int32_t vgId; + int64_t keepVersion; +} SSetVgroupKeepVersionStmt; + +typedef struct STrimDbWalStmt { + ENodeType type; + char dbName[TSDB_DB_FNAME_LEN]; +} STrimDbWalStmt; + typedef struct SMergeVgroupStmt { ENodeType type; int32_t vgId1; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 7bbdb629caf0..6ca3d7d3bcbe 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -281,7 +281,7 @@ int64_t syncGetTerm(int64_t rid); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); -int32_t syncEndSnapshot(int64_t rid); +int32_t syncEndSnapshot(int64_t rid, bool forceTrim); int32_t syncLeaderTransfer(int64_t rid); int32_t syncStepDown(int64_t rid, SyncTerm newTerm); bool syncIsReadyForRead(int64_t rid); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 5af07f4deb56..209dc18c9866 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -119,6 +119,8 @@ typedef struct SWal { TdThreadRwlock mutex; // ref SHashObj *pRefHash; // refId -> SWalRef + // keep version for preventing auto deletion + int64_t keepVersion; // path char path[WAL_PATH_LEN]; @@ -180,7 +182,7 @@ int32_t walCommit(SWal *, int64_t ver); int32_t walRollback(SWal *, int64_t ver); // notify that previous logs can be pruned safely int32_t walBeginSnapshot(SWal *, int64_t ver, int64_t logRetention); -int32_t walEndSnapshot(SWal *); +int32_t walEndSnapshot(SWal *, bool forceTrim); int32_t walRestoreFromSnapshot(SWal *, int64_t ver); void walApplyVer(SWal *, int64_t ver); @@ -219,6 +221,7 @@ int64_t walGetLastVer(SWal *); int64_t walGetVerRetention(SWal *pWal, int64_t bytes); int64_t walGetCommittedVer(SWal *); int64_t walGetAppliedVer(SWal *); +int32_t walSetKeepVersion(SWal *pWal, int64_t ver); #ifdef __cplusplus } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a7c841fbc95c..9f43c968719c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -88,6 +88,7 @@ struct tmq_conf_t { char clientId[TSDB_CLIENT_ID_LEN]; char groupId[TSDB_CGROUP_LEN]; int8_t autoCommit; + int8_t enableWalMarker; int8_t resetOffset; int8_t withTbName; int8_t snapEnable; @@ -118,6 +119,7 @@ struct tmq_t { int8_t withTbName; int8_t useSnapshot; int8_t autoCommit; + int8_t enableWalMarker; int32_t autoCommitInterval; int32_t sessionTimeoutMs; int32_t heartBeatIntervalMs; @@ -304,6 +306,7 @@ tmq_conf_t* tmq_conf_new() { conf->withTbName = false; conf->autoCommit = true; + conf->enableWalMarker = false; conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; conf->resetOffset = TMQ_OFFSET__RESET_LATEST; conf->enableBatchMeta = false; @@ -364,6 +367,19 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } + if (strcasecmp(key, "enable.wal.marker") == 0) { + if (strcasecmp(value, "true") == 0) { + conf->enableWalMarker = true; + return TMQ_CONF_OK; + } else if (strcasecmp(value, "false") == 0) { + conf->enableWalMarker = false; + return TMQ_CONF_OK; + } else { + tqErrorC("invalid value for enable.wal.marker:%s", value); + return TMQ_CONF_INVALID; + } + } + if (strcasecmp(key, "auto.commit.interval.ms") == 0) { int64_t tmp; code = taosStr2int64(value, &tmp); @@ -659,6 +675,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse SMqVgOffset pOffset = {0}; pOffset.consumerId = tmq->consumerId; + // pOffset.markWal = tmq->enableWalMarker; pOffset.offset.val = *offset; (void)snprintf(pOffset.offset.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopicName); int32_t len = 0; @@ -785,6 +802,52 @@ static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClient return *pVg == NULL ? TSDB_CODE_TMQ_INVALID_VGID : TSDB_CODE_SUCCESS; } +static int32_t sendWalMarkMsgToMnodeCb(void* param, SDataBuf* pMsg, int32_t code) { + if (pMsg) { + taosMemoryFreeClear(pMsg->pEpSet); + taosMemoryFreeClear(pMsg->pData); + } + tqDebugC("sendWalMarkMsgToMnodeCb code:%d", code); + return 0; +} + +static void asyncSendWalMarkMsgToMnode(tmq_t* tmq, int32_t vgId, int64_t keepVersion) { + if (tmq == NULL) return ; + void* buf = NULL; + SMsgSendInfo* sendInfo = NULL; + SMndSetVgroupKeepVersionReq req = {0}; + + tqDebugC("consumer:0x%" PRIx64 " send vgId:%d keepVersion:%"PRId64, tmq->consumerId, vgId, keepVersion); + req.vgId = vgId; + req.keepVersion = keepVersion; + + int32_t tlen = tSerializeSMndSetVgroupKeepVersionReq(NULL, 0, &req); + buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + return; + } + tlen = tSerializeSMndSetVgroupKeepVersionReq(buf, tlen, &req); + + sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + taosMemoryFree(buf); + return; + } + + sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL}; + sendInfo->requestId = generateRequestId(); + sendInfo->fp = sendWalMarkMsgToMnodeCb; + sendInfo->msgType = TDMT_MND_SET_VGROUP_KEEP_VERSION; + + SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); + + int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); + if (code != 0) { + tqErrorC("consumer:0x%" PRIx64 " send wal mark msg to mnode failed, code:%s", tmq->consumerId, + tstrerror(terrno)); + } +} + static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal, SMqClientVg* pVg, SMqCommitCbParamSet* pParamSet){ if (tmq == NULL || pTopicName == NULL || offsetVal == NULL || pVg == NULL || pParamSet == NULL) { return TSDB_CODE_INVALID_PARA; @@ -811,6 +874,9 @@ static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal return code; } + if (tmq->enableWalMarker && offsetVal->type == TMQ_OFFSET__LOG) { + asyncSendWalMarkMsgToMnode(tmq, pVg->vgId, offsetVal->version); + } tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s", tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf); tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal); @@ -1538,6 +1604,7 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { if (pMsg) { taosMemoryFreeClear(pMsg->pEpSet); + taosMemoryFreeClear(pMsg->pData); } if (param == NULL) { @@ -1739,6 +1806,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tstrncpy(pTmq->groupId, conf->groupId, TSDB_CGROUP_LEN); pTmq->withTbName = conf->withTbName; pTmq->useSnapshot = conf->snapEnable; + pTmq->enableWalMarker = conf->enableWalMarker; pTmq->autoCommit = conf->autoCommit; pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->sessionTimeoutMs = conf->sessionTimeoutMs; diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 90b60f7f927f..7cc6b8779cc1 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -6085,6 +6085,42 @@ int32_t tDeserializeSTrimDbReq(void *buf, int32_t bufLen, STrimDbReq *pReq) { TAOS_CHECK_EXIT(tStartDecode(&decoder)); TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->db)); TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->maxSpeed)); + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + return code; +} + +int32_t tSerializeSVndSetKeepVersionReq(void *buf, int32_t bufLen, SVndSetKeepVersionReq *pReq) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->keepVersion)); + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSVndSetKeepVersionReq(void *buf, int32_t bufLen, SVndSetKeepVersionReq *pReq) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->keepVersion)); tEndDecode(&decoder); _exit: @@ -6202,6 +6238,45 @@ int32_t tDeserializeSVS3MigrateDbReq(void *buf, int32_t bufLen, SVS3MigrateDbReq return code; } +int32_t tSerializeSMndSetVgroupKeepVersionReq(void *buf, int32_t bufLen, SMndSetVgroupKeepVersionReq *pReq) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->vgId)); + TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->keepVersion)); + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMndSetVgroupKeepVersionReq(void *buf, int32_t bufLen, SMndSetVgroupKeepVersionReq *pReq) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->vgId)); + TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->keepVersion)); + + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + return code; +} + int32_t tSerializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; @@ -12552,6 +12627,7 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { int32_t tEncodeMqVgOffset(SEncoder *pEncoder, const SMqVgOffset *pOffset) { TAOS_CHECK_RETURN(tEncodeSTqOffset(pEncoder, &pOffset->offset)); TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pOffset->consumerId)); + // TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pOffset->markWal)); return 0; } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index a5c4f93f66d3..5edd11544737 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -311,6 +311,8 @@ static const SSysDbTableSchema vgroupsSchema[] = { {.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "cacheelements", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true}, + {.name = "keep_version", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = true}, + {.name = "keep_version_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, // {.name = "compact_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, }; @@ -501,7 +503,7 @@ static const SSysDbTableSchema encryptionsSchema[] = { static const SSysDbTableSchema diskUsageSchema[] = { {.name = "db_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "wal", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "wal_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "data1", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "data2", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "data3", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index b25f19823514..d0da5d257598 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -159,6 +159,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_TRIM_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TRIM_DB_WAL, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_S3MIGRATE_DB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; @@ -168,6 +169,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ARB_ASSIGN_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_BALANCE_VGROUP_LEADER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SET_VGROUP_KEEP_VERSION, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -249,6 +251,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TTL_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_WAL_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -281,6 +284,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SET_KEEP_VERSION_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index bbe3a31fb56d..cd3e53b72aa1 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -112,6 +112,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessSetKeepVersionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmProcessCheckLearnCatchupReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0280f55a54a2..9485cc3b21c1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -649,6 +649,44 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } +int32_t vmProcessSetKeepVersionReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + SMsgHead *pHead = pMsg->pCont; + pHead->contLen = ntohl(pHead->contLen); + pHead->vgId = ntohl(pHead->vgId); + + SVndSetKeepVersionReq req = {0}; + if (tDeserializeSVndSetKeepVersionReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead), + &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + dInfo("vgId:%d, set wal keep version to %" PRId64, pHead->vgId, req.keepVersion); + + SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); + if (pVnode == NULL) { + dError("vgId:%d, failed to set keep version since %s", pHead->vgId, terrstr()); + terrno = TSDB_CODE_VND_NOT_EXIST; + return -1; + } + + // Directly call vnodeSetWalKeepVersion for immediate effect (< 1ms) + // This bypasses Raft to avoid timing issues where WAL might be deleted + // before keepVersion is set through the Raft consensus process + int32_t code = vnodeSetWalKeepVersion(pVnode->pImpl, req.keepVersion); + if (code != TSDB_CODE_SUCCESS) { + dError("vgId:%d, failed to set keepVersion to %" PRId64 " since %s", pHead->vgId, req.keepVersion, tstrerror(code)); + terrno = code; + vmReleaseVnode(pMgmt, pVnode); + return -1; + } + + dInfo("vgId:%d, successfully set keepVersion to %" PRId64, pHead->vgId, req.keepVersion); + + vmReleaseVnode(pMgmt, pVnode); + return 0; +} + int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SAlterVnodeHashRangeReq req = {0}; if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) { @@ -1062,9 +1100,11 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_SET_KEEP_VERSION, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM_WAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 80d2d32f4a47..c76e1a48b8c9 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -72,6 +72,9 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { case TDMT_VND_DISABLE_WRITE: code = vmProcessDisableVnodeWriteReq(pMgmt, pMsg); break; + case TDMT_VND_SET_KEEP_VERSION: + code = vmProcessSetKeepVersionReq(pMgmt, pMsg); + break; case TDMT_VND_ALTER_HASHRANGE: code = vmProcessAlterHashRangeReq(pMgmt, pMsg); break; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 6f316f5cb210..5b944db51312 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -526,6 +526,8 @@ typedef struct { void* pTsma; int32_t numOfCachedTables; int32_t syncConfChangeVer; + int64_t keepVersion; // WAL keep version, -1 for disabled + int64_t keepVersionTime; // WAL keep version time } SVgObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 367ce1d6153a..a072dcc10143 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -51,6 +51,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq); static int32_t mndProcessDropDbReq(SRpcMsg *pReq); static int32_t mndProcessUseDbReq(SRpcMsg *pReq); static int32_t mndProcessTrimDbReq(SRpcMsg *pReq); +static int32_t mndProcessTrimDbWalReq(SRpcMsg *pReq); static int32_t mndProcessS3MigrateDbReq(SRpcMsg *pReq); static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity); static void mndCancelGetNextDb(SMnode *pMnode, void *pIter); @@ -78,6 +79,7 @@ int32_t mndInitDb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_USE_DB, mndProcessUseDbReq); mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq); mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB, mndProcessTrimDbReq); + mndSetMsgHandle(pMnode, TDMT_MND_TRIM_DB_WAL, mndProcessTrimDbWalReq); mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq); mndSetMsgHandle(pMnode, TDMT_MND_S3MIGRATE_DB, mndProcessS3MigrateDbReq); mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_INFO, mndProcessUseDbReq); @@ -2201,6 +2203,80 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) { TAOS_RETURN(code); } +static int32_t mndTrimDbWal(SMnode *pMnode, SDbObj *pDb) { + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + void *pIter = NULL; + int32_t code = 0; + + // Iterate through all vgroups in this database + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + // Check if this vgroup belongs to the database + if (pVgroup->dbUid != pDb->uid) { + sdbRelease(pSdb, pVgroup); + continue; + } + + // Prepare message to send to vnode + int32_t contLen = sizeof(SMsgHead); + SMsgHead *pHead = rpcMallocCont(contLen); + if (pHead == NULL) { + sdbCancelFetch(pSdb, pVgroup); + sdbRelease(pSdb, pVgroup); + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + + // Send TRIM WAL request to vnode + SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM_WAL, .pCont = pHead, .contLen = contLen}; + SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); + code = tmsgSendReq(&epSet, &rpcMsg); + if (code != 0) { + mError("vgId:%d, failed to send vnode-trim-wal request since 0x%x", pVgroup->vgId, code); + } else { + mInfo("vgId:%d, send vnode-trim-wal request to vnode", pVgroup->vgId); + } + sdbRelease(pSdb, pVgroup); + } + + return code; +} + +static int32_t mndProcessTrimDbWalReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SDbObj *pDb = NULL; + STrimDbReq trimReq = {0}; + + TAOS_CHECK_GOTO(tDeserializeSTrimDbReq(pReq->pCont, pReq->contLen, &trimReq), NULL, _OVER); + + mInfo("db:%s, start to trim WAL", trimReq.db); + + pDb = mndAcquireDb(pMnode, trimReq.db); + if (pDb == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + goto _OVER; + } + + TAOS_CHECK_GOTO(mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_TRIM_DB, pDb), NULL, _OVER); + + code = mndTrimDbWal(pMnode, pDb); + +_OVER: + if (code != 0) { + mError("db:%s, failed to process trim db wal req since %s", trimReq.db, terrstr()); + } + + mndReleaseDb(pMnode, pDb); + TAOS_RETURN(code); +} + static int32_t mndS3MigrateDb(SMnode *pMnode, SDbObj *pDb) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index d934ac0d5314..f2d4a5ba1cd7 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -49,6 +49,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq); static int32_t mndProcessDropStbReq(SRpcMsg *pReq); static int32_t mndProcessDropTtltbRsp(SRpcMsg *pReq); static int32_t mndProcessTrimDbRsp(SRpcMsg *pReq); +static int32_t mndProcessTrimDbWalRsp(SRpcMsg *pReq); static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -83,6 +84,7 @@ int32_t mndInitStb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TTL_TABLE_RSP, mndProcessDropTtltbRsp); mndSetMsgHandle(pMnode, TDMT_VND_TRIM_RSP, mndProcessTrimDbRsp); + mndSetMsgHandle(pMnode, TDMT_VND_TRIM_WAL_RSP, mndProcessTrimDbWalRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq); @@ -3039,6 +3041,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, static int32_t mndProcessDropTtltbRsp(SRpcMsg *pRsp) { return 0; } static int32_t mndProcessTrimDbRsp(SRpcMsg *pRsp) { return 0; } +static int32_t mndProcessTrimDbWalRsp(SRpcMsg *pRsp) { return 0; } static int32_t mndProcessS3MigrateDbRsp(SRpcMsg *pRsp) { return 0; } static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 18c7bcca6ce8..45190b7c7097 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -46,6 +46,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq); static int32_t mndProcessVgroupBalanceLeaderMsg(SRpcMsg *pReq); +static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq); int32_t mndInitVgroup(SMnode *pMnode) { SSdbTable table = { @@ -63,6 +64,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_SET_KEEP_VERSION_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp); @@ -77,6 +79,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { // mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessVgroupBalanceLeaderMsg); mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP, mndProcessBalanceVgroupMsg); mndSetMsgHandle(pMnode, TDMT_MND_BALANCE_VGROUP_LEADER, mndProcessVgroupBalanceLeaderMsg); + mndSetMsgHandle(pMnode, TDMT_MND_SET_VGROUP_KEEP_VERSION, mndProcessSetVgroupKeepVersionReq); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup); @@ -112,6 +115,8 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { SDB_SET_INT32(pRaw, dataPos, pVgid->dnodeId, _OVER) } SDB_SET_INT32(pRaw, dataPos, pVgroup->syncConfChangeVer, _OVER) + SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersion, _OVER) + SDB_SET_INT64(pRaw, dataPos, pVgroup->keepVersionTime, _OVER) SDB_SET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) @@ -170,7 +175,12 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { if (dataPos + sizeof(int32_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) { SDB_GET_INT32(pRaw, dataPos, &pVgroup->syncConfChangeVer, _OVER) } - + if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) { + SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersion, _OVER) + } + if (dataPos + sizeof(int64_t) + VGROUP_RESERVE_SIZE <= pRaw->dataLen) { + SDB_GET_INT64(pRaw, dataPos, &pVgroup->keepVersionTime, _OVER) + } SDB_GET_RESERVE(pRaw, dataPos, VGROUP_RESERVE_SIZE, _OVER) terrno = 0; @@ -236,6 +246,8 @@ static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew) { pOld->hashEnd = pNew->hashEnd; pOld->replica = pNew->replica; pOld->isTsma = pNew->isTsma; + pOld->keepVersion = pNew->keepVersion; + pOld->keepVersionTime = pNew->keepVersionTime; for (int32_t i = 0; i < pNew->replica; ++i) { SVnodeGid *pNewGid = &pNew->vnodeGid[i]; for (int32_t j = 0; j < pOld->replica; ++j) { @@ -1003,6 +1015,8 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) { memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN); pVgroup->dbUid = pDb->uid; pVgroup->replica = 1; + pVgroup->keepVersion = -1; // default: WAL keep version disabled + pVgroup->keepVersionTime = 0; if (mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray) != 0) return -1; taosArrayDestroy(pArray); @@ -1056,6 +1070,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray * memcpy(pVgroup->dbName, pDb->name, TSDB_DB_FNAME_LEN); pVgroup->dbUid = pDb->uid; pVgroup->replica = pDb->cfg.replications; + pVgroup->keepVersion = -1; // default: WAL keep version disabled + pVgroup->keepVersionTime = 0; if ((code = mndGetAvailableDnode(pMnode, pDb, pVgroup, pArray)) != 0) { goto _OVER; @@ -1291,6 +1307,20 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p mError("vgId:%d, failed to set isTsma, since %s", pVgroup->vgId, tstrerror(code)); return code; } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->keepVersion, false); + if (code != 0) { + mError("vgId:%d, failed to set keepVersion, since %s", pVgroup->vgId, tstrerror(code)); + return code; + } + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->keepVersionTime, false); + if (code != 0) { + mError("vgId:%d, failed to set keepVersionTime, since %s", pVgroup->vgId, tstrerror(code)); + return code; + } + numOfRows++; sdbRelease(pSdb, pVgroup); } @@ -3856,3 +3886,132 @@ int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, TAOS_CHECK_RETURN(mndAddCompactVnodeAction(pMnode, pTrans, pDb, pVgroup, compactTs, tw, metaOnly)); return 0; } + +static int32_t mndProcessSetVgroupKeepVersionReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = TSDB_CODE_SUCCESS; + STrans *pTrans = NULL; + SVgObj *pVgroup = NULL; + + SMndSetVgroupKeepVersionReq req = {0}; + if (tDeserializeSMndSetVgroupKeepVersionReq(pReq->pCont, pReq->contLen, &req) != 0) { + code = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + mInfo("start to set vgroup keep version, vgId:%d, keepVersion:%" PRId64, req.vgId, req.keepVersion); + + // Check permission + if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB)) != 0) { + goto _OVER; + } + + // Get vgroup + pVgroup = mndAcquireVgroup(pMnode, req.vgId); + if (pVgroup == NULL) { + code = TSDB_CODE_MND_VGROUP_NOT_EXIST; + mError("vgId:%d not exist, failed to set keep version", req.vgId); + goto _OVER; + } + + // Create transaction + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "set-vgroup-keep-version"); + if (pTrans == NULL) { + code = terrno != 0 ? terrno : TSDB_CODE_MND_RETURN_VALUE_NULL; + mndReleaseVgroup(pMnode, pVgroup); + goto _OVER; + } + + mndTransSetSerial(pTrans); + mInfo("trans:%d, used to set vgroup keep version, vgId:%d keepVersion:%" PRId64, pTrans->id, req.vgId, + req.keepVersion); + + // Update SVgObj's keepVersion in mnode + SVgObj newVgroup = {0}; + memcpy(&newVgroup, pVgroup, sizeof(SVgObj)); + newVgroup.keepVersion = req.keepVersion; + newVgroup.keepVersionTime = taosGetTimestampMs(); + + // Add prepare log for SDB vgroup update (execute in PREPARE stage, before redo actions) + SSdbRaw *pCommitRaw = mndVgroupActionEncode(&newVgroup); + if (pCommitRaw == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + mndReleaseVgroup(pMnode, pVgroup); + goto _OVER; + } + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + code = terrno; + sdbFreeRaw(pCommitRaw); + mndReleaseVgroup(pMnode, pVgroup); + goto _OVER; + } + if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) { + mError("vgId:%d, failed to set raw status to ready, error:%s, line:%d", pVgroup->vgId, tstrerror(code), __LINE__); + sdbFreeRaw(pCommitRaw); + mndReleaseVgroup(pMnode, pVgroup); + goto _OVER; + } + + // Prepare message for vnodes + SVndSetKeepVersionReq vndReq = {.keepVersion = req.keepVersion}; + int32_t reqLen = tSerializeSVndSetKeepVersionReq(NULL, 0, &vndReq); + int32_t contLen = reqLen + sizeof(SMsgHead); + + // Send to all replicas of the vgroup + for (int32_t i = 0; i < pVgroup->replica; ++i) { + SMsgHead *pHead = taosMemoryMalloc(contLen); + if (pHead == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + mndReleaseVgroup(pMnode, pVgroup); + goto _OVER; + } + + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + + if (tSerializeSVndSetKeepVersionReq((char *)pHead + sizeof(SMsgHead), reqLen, &vndReq) < 0) { + taosMemoryFree(pHead); + code = TSDB_CODE_OUT_OF_MEMORY; + mndReleaseVgroup(pMnode, pVgroup); + goto _OVER; + } + + // Get dnode and add action to transaction + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId); + if (pDnode == NULL) { + taosMemoryFree(pHead); + code = TSDB_CODE_MND_DNODE_NOT_EXIST; + mndReleaseVgroup(pMnode, pVgroup); + goto _OVER; + } + + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + action.pCont = pHead; + action.contLen = contLen; + action.msgType = TDMT_VND_SET_KEEP_VERSION; + action.acceptableCode = TSDB_CODE_VND_STOPPED; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pHead); + code = terrno; + mndReleaseVgroup(pMnode, pVgroup); + goto _OVER; + } + } + + mndReleaseVgroup(pMnode, pVgroup); + + // Prepare and execute transaction + if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { + goto _OVER; + } + + code = TSDB_CODE_ACTION_IN_PROGRESS; + +_OVER: + if (pTrans != NULL) mndTransDrop(pTrans); + + return code; +} diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 03a680b9cf96..6fb9095e38fc 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -629,7 +629,7 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) { if (code == 0) { if (pSdb->pWal != NULL) { if (pSdb->sync > 0) { - code = syncEndSnapshot(pSdb->sync); + code = syncEndSnapshot(pSdb->sync, false); } } } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 5fec6ec46586..7c723fe1edcc 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -73,6 +73,7 @@ int32_t vnodeStart(SVnode *pVnode); void vnodeStop(SVnode *pVnode); int64_t vnodeGetSyncHandle(SVnode *pVnode); int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); +int32_t vnodeSetWalKeepVersion(SVnode *pVnode, int64_t keepVersion); void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t *numOfTables, int64_t *numOfNormalTables); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeGetTableList(void *pVnode, int8_t type, SArray *pList); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 87fb49011d26..1308e6f43834 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -140,6 +140,7 @@ int32_t vnodeCommitInfo(const char* dir); int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); int32_t vnodeSyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode); +int32_t vnodeAsyncCommitEx(SVnode* pVnode, bool forceTrim); bool vnodeShouldRollback(SVnode* pVnode); // vnodeSync.c diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7a7840bfc54e..2dcf63fe9bb9 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -599,6 +599,7 @@ struct SCommitInfo { SVnodeInfo info; SVnode* pVnode; TXN* txn; + bool forceTrim; // force trim WAL, ignore keepVersion constraint }; struct SCompactInfo { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b17cc63b9962..258c0bccaaa3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -271,6 +271,11 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t goto end; // no need to update the offset value } + // if (pOffset->val.type == TMQ_OFFSET__LOG && vgOffset.markWal) { + // int32_t ret = walSetKeepVersion(pTq->pVnode->pWal, pOffset->val.version); + // tqDebug("set wal reader keep version to %" PRId64 " for vgId:%d sub:%s, code:%d", pOffset->val.version, vgId, + // pOffset->subKey, ret); + // } // save the new offset value code = taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); if (code != 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 4d5dbda97334..ab7ac7a9ef60 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -375,7 +375,7 @@ static int32_t vnodeCommit(void *arg) { static void vnodeCommitCancel(void *arg) { taosMemoryFree(arg); } -int vnodeAsyncCommit(SVnode *pVnode) { +int vnodeAsyncCommitEx(SVnode *pVnode, bool forceTrim) { int32_t code = 0; int32_t lino = 0; @@ -384,6 +384,8 @@ int vnodeAsyncCommit(SVnode *pVnode) { TSDB_CHECK_CODE(code = terrno, lino, _exit); } + pInfo->forceTrim = forceTrim; + // prepare to commit code = vnodePrepareCommit(pVnode, pInfo); TSDB_CHECK_CODE(code, lino, _exit); @@ -397,12 +399,14 @@ int vnodeAsyncCommit(SVnode *pVnode) { taosMemoryFree(pInfo); vError("vgId:%d %s failed at line %d since %s" PRId64, TD_VID(pVnode), __func__, lino, tstrerror(code)); } else { - vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64, TD_VID(pVnode), - pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied); + vInfo("vgId:%d, vnode async commit done, commitId:%" PRId64 " term:%" PRId64 " applied:%" PRId64 " forceTrim:%d", + TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applyTerm, pVnode->state.applied, forceTrim); } return code; } +int vnodeAsyncCommit(SVnode *pVnode) { return vnodeAsyncCommitEx(pVnode, false); } + int32_t vnodeSyncCommit(SVnode *pVnode) { int32_t lino; int32_t code = vnodeAsyncCommit(pVnode); @@ -475,7 +479,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { return -1; } - code = syncEndSnapshot(pVnode->sync); + code = syncEndSnapshot(pVnode->sync, pInfo->forceTrim); TSDB_CHECK_CODE(code, lino, _exit); code = tqCommitOffset(pVnode->pTq); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 7d2eb5ea9d63..c8f792eadf33 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -616,6 +616,13 @@ ESyncRole vnodeGetRole(SVnode *pVnode) { return syncGetRole(pVnode->sync); } int32_t vnodeUpdateArbTerm(SVnode *pVnode, int64_t arbTerm) { return syncUpdateArbTerm(pVnode->sync, arbTerm); } int32_t vnodeGetArbToken(SVnode *pVnode, char *outToken) { return syncGetArbToken(pVnode->sync, outToken); } +int32_t vnodeSetWalKeepVersion(SVnode *pVnode, int64_t keepVersion) { + if (pVnode == NULL || pVnode->pWal == NULL) { + return TSDB_CODE_INVALID_PARA; + } + return walSetKeepVersion(pVnode->pWal, keepVersion); +} + void vnodeStop(SVnode *pVnode) {} int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 0c95c5a0023e..73b80360d101 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -41,6 +41,7 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pR static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessTrimWalReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessS3MigrateReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, SRpcMsg *pOriginalMsg); @@ -692,6 +693,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg case TDMT_VND_TRIM: if (vnodeProcessTrimReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; + case TDMT_VND_TRIM_WAL: + if (vnodeProcessTrimWalReq(pVnode, pReq, len, pRsp) < 0) goto _err; + break; case TDMT_VND_S3MIGRATE: if (vnodeProcessS3MigrateReq(pVnode, ver, pReq, len, pRsp) < 0) goto _err; break; @@ -1109,6 +1113,24 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int3 return code; } +static int32_t vnodeProcessTrimWalReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp) { + int32_t code = 0; + + vInfo("vgId:%d, process trim wal request, force clean expired WAL files by triggering commit with forceTrim", + pVnode->config.vgId); + + // Trigger a commit with forceTrim flag + // This will properly calculate ver through sync layer and apply forceTrim during snapshot + code = vnodeAsyncCommitEx(pVnode, true); + if (code != TSDB_CODE_SUCCESS) { + vError("vgId:%d, failed to trigger trim wal commit since %s", pVnode->config.vgId, tstrerror(code)); + } else { + vInfo("vgId:%d, successfully triggered trim wal commit", pVnode->config.vgId); + } + + return code; +} + extern int32_t vnodeAsyncS3Migrate(SVnode *pVnode, int64_t now); static int32_t vnodeProcessS3MigrateReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 3bb213e741fa..68772c43b7ce 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -216,6 +216,10 @@ const char* nodesNodeName(ENodeType type) { case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: return "BalanceVgroupLeaderStmt"; + case QUERY_NODE_SET_VGROUP_KEEP_VERSION_STMT: + return "SetVgroupKeepVersionStmt"; + case QUERY_NODE_TRIM_DATABASE_WAL_STMT: + return "TrimDbWalStmt"; case QUERY_NODE_MERGE_VGROUP_STMT: return "MergeVgroupStmt"; case QUERY_NODE_SHOW_DB_ALIVE_STMT: @@ -9002,6 +9006,10 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to serialize. case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: return TSDB_CODE_SUCCESS; + case QUERY_NODE_SET_VGROUP_KEEP_VERSION_STMT: + return TSDB_CODE_SUCCESS; // SSetVgroupKeepVersionStmt has simple fields, no need to serialize. + case QUERY_NODE_TRIM_DATABASE_WAL_STMT: + return TSDB_CODE_SUCCESS; // STrimDbWalStmt has simple fields, no need to serialize. case QUERY_NODE_MERGE_VGROUP_STMT: return mergeVgroupStmtToJson(pObj, pJson); case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: @@ -9398,6 +9406,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return TSDB_CODE_SUCCESS; case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: return TSDB_CODE_SUCCESS; // SBalanceVgroupLeaderStmt has no fields to deserialize. + case QUERY_NODE_SET_VGROUP_KEEP_VERSION_STMT: + return TSDB_CODE_SUCCESS; // SSetVgroupKeepVersionStmt has simple fields, no need to deserialize. + case QUERY_NODE_TRIM_DATABASE_WAL_STMT: + return TSDB_CODE_SUCCESS; // STrimDbWalStmt has simple fields, no need to deserialize. case QUERY_NODE_MERGE_VGROUP_STMT: return jsonToMergeVgroupStmt(pJson, pObj); case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 09e4f3ef4a3d..ac578aa3f230 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -657,6 +657,12 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: code = makeNode(type, sizeof(SBalanceVgroupLeaderStmt), &pNode); break; + case QUERY_NODE_SET_VGROUP_KEEP_VERSION_STMT: + code = makeNode(type, sizeof(SSetVgroupKeepVersionStmt), &pNode); + break; + case QUERY_NODE_TRIM_DATABASE_WAL_STMT: + code = makeNode(type, sizeof(STrimDbWalStmt), &pNode); + break; case QUERY_NODE_MERGE_VGROUP_STMT: code = makeNode(type, sizeof(SMergeVgroupStmt), &pNode); break; @@ -1582,6 +1588,8 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_ASSIGN_LEADER_STMT: case QUERY_NODE_BALANCE_VGROUP_LEADER_STMT: // no pointer field case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: // no pointer field + case QUERY_NODE_SET_VGROUP_KEEP_VERSION_STMT: // no pointer field + case QUERY_NODE_TRIM_DATABASE_WAL_STMT: // no pointer field case QUERY_NODE_MERGE_VGROUP_STMT: // no pointer field break; case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT: diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index 8703b579eb58..dec58ebf2b4d 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -210,6 +210,7 @@ SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, STo SNode* createAlterDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pOptions); SNode* createFlushDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName); SNode* createTrimDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, int32_t maxSpeed); +SNode* createTrimDbWalStmt(SAstCreateContext* pCxt, SToken* pDbName); SNode* createS3MigrateDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName); SNode* createCompactStmt(SAstCreateContext* pCxt, SToken* pDbName, SNode* pStart, SNode* pEnd, bool metaOnly); SNode* createCompactVgroupsStmt(SAstCreateContext* pCxt, SNode* pDbName, SNodeList* vgidList, SNode* pStart, @@ -343,6 +344,7 @@ SNode* createBalanceVgroupStmt(SAstCreateContext* pCxt); SNode* createAssignLeaderStmt(SAstCreateContext* pCxt); SNode* createBalanceVgroupLeaderStmt(SAstCreateContext* pCxt, const SToken* pVgId); SNode* createBalanceVgroupLeaderDBNameStmt(SAstCreateContext* pCxt, const SToken* pDbName); +SNode* createSetVgroupKeepVersionStmt(SAstCreateContext* pCxt, const SToken* pVgId, const SToken* pKeepVersion); SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2); SNode* createRedistributeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId, SNodeList* pDnodes); SNode* createSplitVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 7cb5cdb5a81a..ab2474c67294 100755 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -241,6 +241,7 @@ cmd ::= USE db_name(A). cmd ::= ALTER DATABASE db_name(A) alter_db_options(B). { pCxt->pRootNode = createAlterDatabaseStmt(pCxt, &A, B); } cmd ::= FLUSH DATABASE db_name(A). { pCxt->pRootNode = createFlushDatabaseStmt(pCxt, &A); } cmd ::= TRIM DATABASE db_name(A) speed_opt(B). { pCxt->pRootNode = createTrimDatabaseStmt(pCxt, &A, B); } +cmd ::= TRIM DATABASE db_name(A) WAL. { pCxt->pRootNode = createTrimDbWalStmt(pCxt, &A); } cmd ::= S3MIGRATE DATABASE db_name(A). { pCxt->pRootNode = createS3MigrateDatabaseStmt(pCxt, &A); } cmd ::= COMPACT DATABASE db_name(A) start_opt(B) end_opt(C) meta_only(D). { pCxt->pRootNode = createCompactStmt(pCxt, &A, B, C, D); } cmd ::= COMPACT db_name_cond_opt(A) VGROUPS IN NK_LP integer_list(B) NK_RP start_opt(C) end_opt(D) meta_only(E). { pCxt->pRootNode = createCompactVgroupsStmt(pCxt, A, B, C, D, E); } @@ -928,6 +929,7 @@ cmd ::= ASSIGN LEADER FORCE. cmd ::= BALANCE VGROUP LEADER on_vgroup_id(A). { pCxt->pRootNode = createBalanceVgroupLeaderStmt(pCxt, &A); } cmd ::= BALANCE VGROUP LEADER DATABASE db_name(A). { pCxt->pRootNode = createBalanceVgroupLeaderDBNameStmt(pCxt, &A); } +cmd ::= ALTER VGROUP NK_INTEGER(A) SET KEEP NK_INTEGER(B). { pCxt->pRootNode = createSetVgroupKeepVersionStmt(pCxt, &A, &B); } cmd ::= MERGE VGROUP NK_INTEGER(A) NK_INTEGER(B). { pCxt->pRootNode = createMergeVgroupStmt(pCxt, &A, &B); } cmd ::= REDISTRIBUTE VGROUP NK_INTEGER(A) dnode_list(B). { pCxt->pRootNode = createRedistributeVgroupStmt(pCxt, &A, B); } cmd ::= SPLIT VGROUP NK_INTEGER(A). { pCxt->pRootNode = createSplitVgroupStmt(pCxt, &A); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index b25cbba354b3..28b7dd82ccf9 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -2292,6 +2292,18 @@ SNode* createTrimDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName, int32_t return NULL; } +SNode* createTrimDbWalStmt(SAstCreateContext* pCxt, SToken* pDbName) { + CHECK_PARSER_STATUS(pCxt); + CHECK_NAME(checkDbName(pCxt, pDbName, false)); + STrimDbWalStmt* pStmt = NULL; + pCxt->errCode = nodesMakeNode(QUERY_NODE_TRIM_DATABASE_WAL_STMT, (SNode**)&pStmt); + CHECK_MAKE_NODE(pStmt); + COPY_STRING_FORM_ID_TOKEN(pStmt->dbName, pDbName); + return (SNode*)pStmt; +_err: + return NULL; +} + SNode* createS3MigrateDatabaseStmt(SAstCreateContext* pCxt, SToken* pDbName) { CHECK_PARSER_STATUS(pCxt); CHECK_NAME(checkDbName(pCxt, pDbName, false)); @@ -4423,6 +4435,22 @@ SNode* createBalanceVgroupLeaderDBNameStmt(SAstCreateContext* pCxt, const SToken return NULL; } +SNode* createSetVgroupKeepVersionStmt(SAstCreateContext* pCxt, const SToken* pVgId, const SToken* pKeepVersion) { + CHECK_PARSER_STATUS(pCxt); + SSetVgroupKeepVersionStmt* pStmt = NULL; + pCxt->errCode = nodesMakeNode(QUERY_NODE_SET_VGROUP_KEEP_VERSION_STMT, (SNode**)&pStmt); + CHECK_MAKE_NODE(pStmt); + if (NULL != pVgId && NULL != pVgId->z) { + pStmt->vgId = taosStr2Int32(pVgId->z, NULL, 10); + } + if (NULL != pKeepVersion && NULL != pKeepVersion->z) { + pStmt->keepVersion = taosStr2Int64(pKeepVersion->z, NULL, 10); + } + return (SNode*)pStmt; +_err: + return NULL; +} + SNode* createMergeVgroupStmt(SAstCreateContext* pCxt, const SToken* pVgId1, const SToken* pVgId2) { CHECK_PARSER_STATUS(pCxt); SMergeVgroupStmt* pStmt = NULL; diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index e5252b7ff5c2..41bfa2602a1e 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -320,6 +320,7 @@ static SKeyword keywordTable[] = { {"VNODE", TK_VNODE}, {"VNODES", TK_VNODES}, {"VTABLE", TK_VTABLE}, + {"WAL", TK_WAL}, {"WAL_FSYNC_PERIOD", TK_WAL_FSYNC_PERIOD}, {"WAL_LEVEL", TK_WAL_LEVEL}, {"WAL_RETENTION_PERIOD", TK_WAL_RETENTION_PERIOD}, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index fb18ea153aae..e41819adc272 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -9781,6 +9781,15 @@ static int32_t translateTrimDatabase(STranslateContext* pCxt, STrimDatabaseStmt* return buildCmdMsg(pCxt, TDMT_MND_TRIM_DB, (FSerializeFunc)tSerializeSTrimDbReq, &req); } +static int32_t translateTrimDbWal(STranslateContext* pCxt, STrimDbWalStmt* pStmt) { + STrimDbReq req = {.maxSpeed = 0}; // WAL trim doesn't need maxSpeed + SName name = {0}; + int32_t code = tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName)); + if (TSDB_CODE_SUCCESS != code) return code; + (void)tNameGetFullDbName(&name, req.db); + return buildCmdMsg(pCxt, TDMT_MND_TRIM_DB_WAL, (FSerializeFunc)tSerializeSTrimDbReq, &req); +} + static int32_t checkColumnOptions(SNodeList* pList) { SNode* pNode; FOREACH(pNode, pList) { @@ -14289,6 +14298,15 @@ static int32_t translateBalanceVgroupLeader(STranslateContext* pCxt, SBalanceVgr return code; } +static int32_t translateSetVgroupKeepVersion(STranslateContext* pCxt, SSetVgroupKeepVersionStmt* pStmt) { + SMndSetVgroupKeepVersionReq req = {0}; + req.vgId = pStmt->vgId; + req.keepVersion = pStmt->keepVersion; + int32_t code = + buildCmdMsg(pCxt, TDMT_MND_SET_VGROUP_KEEP_VERSION, (FSerializeFunc)tSerializeSMndSetVgroupKeepVersionReq, &req); + return code; +} + static int32_t translateMergeVgroup(STranslateContext* pCxt, SMergeVgroupStmt* pStmt) { SMergeVgroupReq req = {.vgId1 = pStmt->vgId1, .vgId2 = pStmt->vgId2}; return buildCmdMsg(pCxt, TDMT_MND_MERGE_VGROUP, (FSerializeFunc)tSerializeSMergeVgroupReq, &req); @@ -14916,6 +14934,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { case QUERY_NODE_TRIM_DATABASE_STMT: code = translateTrimDatabase(pCxt, (STrimDatabaseStmt*)pNode); break; + case QUERY_NODE_TRIM_DATABASE_WAL_STMT: + code = translateTrimDbWal(pCxt, (STrimDbWalStmt*)pNode); + break; case QUERY_NODE_S3MIGRATE_DATABASE_STMT: code = translateS3MigrateDatabase(pCxt, (SS3MigrateDatabaseStmt*)pNode); break; @@ -15056,6 +15077,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { case QUERY_NODE_BALANCE_VGROUP_LEADER_DATABASE_STMT: code = translateBalanceVgroupLeader(pCxt, (SBalanceVgroupLeaderStmt*)pNode); break; + case QUERY_NODE_SET_VGROUP_KEEP_VERSION_STMT: + code = translateSetVgroupKeepVersion(pCxt, (SSetVgroupKeepVersionStmt*)pNode); + break; case QUERY_NODE_MERGE_VGROUP_STMT: code = translateMergeVgroup(pCxt, (SMergeVgroupStmt*)pNode); break; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 36a7a6bf2ad1..6b415f548345 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -525,7 +525,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { TAOS_RETURN(code); } -int32_t syncEndSnapshot(int64_t rid) { +int32_t syncEndSnapshot(int64_t rid, bool forceTrim) { int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { @@ -537,7 +537,7 @@ int32_t syncEndSnapshot(int64_t rid) { if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) { SSyncLogStoreData* pData = pSyncNode->pLogStore->data; - code = walEndSnapshot(pData->pWal); + code = walEndSnapshot(pData->pWal, forceTrim); if (code != 0) { sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code)); syncNodeRelease(pSyncNode); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 93b51d52d086..84f0c0e93550 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -38,6 +38,41 @@ int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVe int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; } +int32_t walSetKeepVersion(SWal *pWal, int64_t ver) { + int32_t code = 0; + + if (pWal == NULL) { + wError("failed to set keep version, pWal is NULL"); + return TSDB_CODE_INVALID_PARA; + } + + if (ver < 0) { + wError("vgId:%d, failed to set keep version, invalid ver:%" PRId64, pWal->cfg.vgId, ver); + return TSDB_CODE_INVALID_PARA; + } + + TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); + + int64_t oldKeepVersion = pWal->keepVersion; + pWal->keepVersion = ver; + + // Save metadata to persist keepVersion + code = walSaveMeta(pWal); + + TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); + + if (code != TSDB_CODE_SUCCESS) { + wError("vgId:%d, failed to save wal meta after setting keep version to %" PRId64 " since %s", pWal->cfg.vgId, ver, + tstrerror(code)); + return code; + } + + wInfo("vgId:%d, wal keep version set from %" PRId64 " to %" PRId64 " and persisted", pWal->cfg.vgId, oldKeepVersion, + ver); + + return TSDB_CODE_SUCCESS; +} + static FORCE_INLINE int walBuildMetaName(SWal* pWal, int64_t metaVer, char* buf) { return snprintf(buf, WAL_FILE_LEN, "%s%smeta-ver%" PRIi64, pWal->path, TD_DIRSEP, metaVer); } @@ -809,6 +844,8 @@ int32_t walMetaSerialize(SWal* pWal, char** serialized) { if (cJSON_AddStringToObject(pMeta, "commitVer", buf) == NULL) goto _err; (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->vers.lastVer); if (cJSON_AddStringToObject(pMeta, "lastVer", buf) == NULL) goto _err; + (void)snprintf(buf, WAL_JSON_BUF_SIZE, "%" PRId64, pWal->keepVersion); + if (cJSON_AddStringToObject(pMeta, "keepVersion", buf) == NULL) goto _err; if (!cJSON_AddItemToObject(pRoot, "files", pFiles)) goto _err; SWalFileInfo* pData = pWal->fileInfoSet->pData; @@ -864,6 +901,13 @@ int32_t walMetaDeserialize(SWal* pWal, const char* bytes) { pField = cJSON_GetObjectItem(pMeta, "lastVer"); if (!pField) goto _err; pWal->vers.lastVer = atoll(cJSON_GetStringValue(pField)); + // Load keepVersion, default to -1 if not present (backward compatibility) + pField = cJSON_GetObjectItem(pMeta, "keepVersion"); + if (pField) { + pWal->keepVersion = atoll(cJSON_GetStringValue(pField)); + } else { + pWal->keepVersion = -1; + } pFiles = cJSON_GetObjectItem(pRoot, "files"); int sz = cJSON_GetArraySize(pFiles); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 0deeda0463e5..e1a523aeefce 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -175,6 +175,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { pWal->pLogFile = NULL; pWal->pIdxFile = NULL; pWal->writeCur = -1; + pWal->keepVersion = -1; pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo)); if (pWal->fileInfoSet == NULL) { wError("vgId:%d, failed to init taosArray of fileInfoSet since %s, path:%s", pWal->cfg.vgId, strerror(ERRNO), diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index c28be3d70fa3..8fd6a81c6a90 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -362,7 +362,7 @@ int32_t walBeginSnapshot(SWal *pWal, int64_t ver, int64_t logRetention) { TAOS_RETURN(code); } -int32_t walEndSnapshot(SWal *pWal) { +int32_t walEndSnapshot(SWal *pWal, bool forceTrim) { int32_t code = 0, lino = 0; if (pWal->cfg.level == TAOS_WAL_SKIP) { @@ -373,8 +373,8 @@ int32_t walEndSnapshot(SWal *pWal) { int64_t ver = pWal->vers.verInSnapshotting; wDebug("vgId:%d, wal end snapshot for index:%" PRId64 ", log retention:%" PRId64 " first index:%" PRId64 - ", last index:%" PRId64, - pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer); + ", last index:%" PRId64 ", keep version:%" PRId64 ", forceTrim:%d", + pWal->cfg.vgId, ver, pWal->vers.logRetention, pWal->vers.firstVer, pWal->vers.lastVer, pWal->keepVersion, forceTrim); if (ver == -1) { TAOS_CHECK_GOTO(TSDB_CODE_FAILED, &lino, _exit); @@ -401,6 +401,16 @@ int32_t walEndSnapshot(SWal *pWal) { ver = TMIN(ver, refVer); } + // check keepVersion constraint (skip if forceTrim is true) + if (!forceTrim && pWal->keepVersion >= 0) { + wInfo("vgId:%d, wal keep version constraint, keep version:%" PRId64 ", calculated ver:%" PRId64, pWal->cfg.vgId, + pWal->keepVersion, ver); + ver = TMIN(ver, pWal->keepVersion - 1); + } else if (forceTrim && pWal->keepVersion >= 0) { + wInfo("vgId:%d, wal force trim, ignore keep version constraint:%" PRId64 ", calculated ver:%" PRId64, + pWal->cfg.vgId, pWal->keepVersion, ver); + } + // find files safe to delete int deleteCnt = 0; int64_t newTotSize = pWal->totSize; diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index f62f234092d6..a642db750e53 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -2,6 +2,9 @@ #include #include #include +#include +#include +#include #include "walInt.h" @@ -354,7 +357,7 @@ TEST_F(WalCleanEnv, rollbackMultiFile) { ASSERT_EQ(pWal->vers.lastVer, i); if (i == 5) { walBeginSnapshot(pWal, i, 0); - walEndSnapshot(pWal); + walEndSnapshot(pWal, false); } } code = walRollback(pWal, 12); @@ -392,7 +395,7 @@ TEST_F(WalCleanDeleteEnv, roll) { walBeginSnapshot(pWal, i - 1, 0); ASSERT_EQ(pWal->vers.verInSnapshotting, i - 1); - walEndSnapshot(pWal); + walEndSnapshot(pWal, false); ASSERT_EQ(pWal->vers.snapshotVer, i - 1); ASSERT_EQ(pWal->vers.verInSnapshotting, -1); @@ -408,7 +411,7 @@ TEST_F(WalCleanDeleteEnv, roll) { code = walBeginSnapshot(pWal, i - 1, 0); ASSERT_EQ(code, 0); - code = walEndSnapshot(pWal); + code = walEndSnapshot(pWal, false); ASSERT_EQ(code, 0); } @@ -799,7 +802,7 @@ TEST_F(WalSkipLevel, roll) { code = walCommit(pWal, i); } walBeginSnapshot(pWal, i - 1, 0); - walEndSnapshot(pWal); + walEndSnapshot(pWal, false); code = walAppendLog(pWal, 5, 0, syncMeta, (void*)ranStr, ranStrLen, NULL); ASSERT_NE(code, 0); for (; i < 200; i++) { @@ -809,7 +812,7 @@ TEST_F(WalSkipLevel, roll) { } code = walBeginSnapshot(pWal, i - 1, 0); ASSERT_EQ(code, 0); - code = walEndSnapshot(pWal); + code = walEndSnapshot(pWal, false); ASSERT_EQ(code, 0); } @@ -822,4 +825,122 @@ TEST_F(WalEncrypted, write) { } code = walSaveMeta(pWal); ASSERT_EQ(code, 0); +} + +TEST_F(WalKeepEnv, walSetKeepVersionBasic) { + walResetEnv(); + int code; + + // Test setting keep version + code = walSetKeepVersion(pWal, 50); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->keepVersion, 50); + + // Test setting different keep version + code = walSetKeepVersion(pWal, 100); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->keepVersion, 100); + + // Test invalid parameter (negative version) + code = walSetKeepVersion(pWal, -5); + ASSERT_NE(code, 0); + + // Test NULL pointer + code = walSetKeepVersion(NULL, 50); + ASSERT_NE(code, 0); +} + +TEST_F(WalCleanDeleteEnv, walSetKeepVersionWithDeletion) { + int code; + int i; + + // Write 200 logs + for (i = 0; i < 200; i++) { + code = walAppendLog(pWal, i, 0, syncMeta, (void*)ranStr, ranStrLen, NULL); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, i); + code = walCommit(pWal, i); + ASSERT_EQ(pWal->vers.commitVer, i); + } + + // Set keep version to 50, so versions >= 50 should not be deleted + code = walSetKeepVersion(pWal, 50); + ASSERT_EQ(code, 0); + + // Trigger snapshot, this should not delete logs with version >= 50 + walBeginSnapshot(pWal, i - 1, 0); + ASSERT_EQ(pWal->vers.verInSnapshotting, i - 1); + walEndSnapshot(pWal, false); + ASSERT_EQ(pWal->vers.snapshotVer, i - 1); + ASSERT_EQ(pWal->vers.verInSnapshotting, -1); + + // The firstVer should be no greater than 50 + ASSERT_LE(pWal->vers.firstVer, 50); + + // Continue writing more logs + for (; i < 300; i++) { + code = walAppendLog(pWal, i, 0, syncMeta, (void*)ranStr, ranStrLen, NULL); + ASSERT_EQ(code, 0); + code = walCommit(pWal, i); + ASSERT_EQ(pWal->vers.commitVer, i); + } + + // Update keep version to 150 + code = walSetKeepVersion(pWal, 150); + ASSERT_EQ(code, 0); + + // Trigger another snapshot + code = walBeginSnapshot(pWal, i - 1, 0); + ASSERT_EQ(code, 0); + code = walEndSnapshot(pWal, false); + ASSERT_EQ(code, 0); + + // The firstVer should be no greater than 150 + ASSERT_LE(pWal->vers.firstVer, 150); +} + +TEST_F(WalKeepEnv, walSetKeepVersionConcurrent) { + walResetEnv(); + int code; + + // Write some logs first + for (int i = 0; i < 100; i++) { + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, i); + int len = strlen(newStr); + code = walAppendLog(pWal, i, 0, syncMeta, newStr, len, NULL); + ASSERT_EQ(code, 0); + } + + // Test concurrent calls to walSetKeepVersion with multiple threads + const int numThreads = 10; + const int callsPerThread = 100; + std::vector threads; + std::atomic successCount(0); + + for (int i = 0; i < numThreads; i++) { + threads.push_back(std::thread([this, i, &successCount, callsPerThread]() { + for (int j = 0; j < callsPerThread; j++) { + int64_t ver = i * callsPerThread + j; + int code = walSetKeepVersion(pWal, ver); + if (code == 0) { + successCount.fetch_add(1); + } + } + })); + } + + // Wait for all threads to complete + for (auto& thread : threads) { + thread.join(); + } + + // All calls should succeed + ASSERT_EQ(successCount.load(), numThreads * callsPerThread); + + // The final keepVersion should be one of the values set by the threads + // We can't predict which one exactly due to race conditions, but it should be valid + int64_t finalVersion = pWal->keepVersion; + ASSERT_GE(finalVersion, 0); + ASSERT_LT(finalVersion, numThreads * callsPerThread); } \ No newline at end of file diff --git a/test/cases/06-DataIngestion/05-Others/test_wal_keep_version_high_availability.py b/test/cases/06-DataIngestion/05-Others/test_wal_keep_version_high_availability.py new file mode 100644 index 000000000000..427787659d96 --- /dev/null +++ b/test/cases/06-DataIngestion/05-Others/test_wal_keep_version_high_availability.py @@ -0,0 +1,126 @@ +import os +import time +import json +import subprocess +import threading + +from new_test_framework.utils import tdLog, tdSql, sc, tdDnodes,clusterComCheck + + +class TestWalKeepVersionTrim: + + def setup_class(cls): + tdLog.debug(f"start to execute {__file__}") + + + def test_wal_keep_version_high_availability(self): + """Test WAL keep version and trim functionality + + This test verifies: + 1. prepare data + 2. stop dnode 3 + 3. set keep version 0 + 4. start dnode 3 + 5. check wal keep version + + Catalog: + - Database:WAL + + Since: v3.3.6.31 + + Labels: common,ci + + Jira: TS-7567 + + History: + - 2025-11-06 beryl bao created + """ + + clusterComCheck.checkDnodes(3) + + subprocess.run("taosBenchmark -t 100 -n 100000 -a 3 -y", shell=True, check=True) + tdSql.execute("alter database test WAL_RETENTION_PERIOD 0") + + # stop dnode 3 + tdDnodes.stoptaosd(3) + time.sleep(5) + + # set keep version 0 in a separate thread + def alter_vgroup(): + tdSql.execute("alter vgroup 2 set keep 0") + + thread = threading.Thread(target=alter_vgroup) + thread.start() + + tdDnodes.starttaosd(3) + time.sleep(5) + + thread.join() + + tdSql.execute("flush database test") + + tdSql.query("show test.vgroups") + tdSql.checkData(0, 18, 0) + tdSql.checkData(1, 18, -1) + + max_retry = 30 + # check wal vgId 3 firstVer is greater than 0 means flush finished + for dnode_id in [1,2,3]: + check_ver = False + for _ in range(max_retry): + ver = self.get_wal_file_first_version(dnode_id, 3) + tdLog.info(f"dnode{dnode_id} vg3 firstVer: {ver}") + if ver > 0: + check_ver = True + break + time.sleep(1) + + assert check_ver, f"dnode{dnode_id} vg3 firstVer is not greater than 0 after {max_retry} seconds" + + # check wal vgId 2 firstVer is 0 + for dnode_id in [1,2,3]: + check_ver = False + for _ in range(max_retry): + ver = self.get_wal_file_first_version(dnode_id, 2) + tdLog.info(f"dnode{dnode_id} vg2 firstVer: {ver}") + if ver == 0: + check_ver = True + break + time.sleep(1) + assert check_ver, f"dnode{dnode_id} vg2 firstVer is not greater than 0 after {max_retry} seconds" + + + def get_wal_file_first_version(self, dnode_id, vgId): + """Get firstVer from WAL meta-ver file + + Args: + dnode_id: dnode ID + vgId: vgroup ID + + Returns: + int: firstVer value from meta-ver file + """ + rootDir = tdDnodes.getDnodeDir(dnode_id) + walPath = os.path.join(rootDir, "data", "vnode", f"vnode{vgId}", "wal") + + tdLog.info(f"walPath: {walPath}") + + # Find meta-ver file (exclude .tmp files, match pattern meta-ver or meta-ver*) + meta_ver_file = None + if os.path.exists(walPath): + for filename in os.listdir(walPath): + # Match meta-ver or meta-ver, but not meta-ver.tmp + if filename.startswith("meta-ver") and not filename.endswith(".tmp"): + meta_ver_file = os.path.join(walPath, filename) + tdLog.info(f"Found meta-ver file: {filename}") + break + + if not meta_ver_file: + tdLog.exit(f"meta-ver file not found in {walPath}") + + + with open(meta_ver_file, 'r') as f: + meta_data = json.load(f) + first_ver = int(meta_data['meta']['firstVer']) + tdLog.info(f"firstVer from {os.path.basename(meta_ver_file)}: {first_ver}") + return first_ver diff --git a/test/cases/06-DataIngestion/05-Others/test_wal_keep_version_trim.py b/test/cases/06-DataIngestion/05-Others/test_wal_keep_version_trim.py new file mode 100644 index 000000000000..3a0f0f4307c1 --- /dev/null +++ b/test/cases/06-DataIngestion/05-Others/test_wal_keep_version_trim.py @@ -0,0 +1,126 @@ +import os +import time +import json +import subprocess + +from new_test_framework.utils import tdLog, tdSql, sc, tdDnodes,clusterComCheck + + +class TestWalKeepVersionTrim: + def setup_class(cls): + tdLog.debug(f"start to execute {__file__}") + + + def test_wal_keep_version_and_trim(self): + """Test WAL keep version and trim functionality + + This test verifies: + 1. prepare data + 2. set keep version 0 + 3. check wal keep version + 4. trim database wal + 5. check wal log dropped after trim + + Catalog: + - Database:WAL + + Since: v3.3.6.31 + + Labels: common,ci + + Jira: TS-7567 + + History: + - 2025-11-06 beryl bao created + """ + clusterComCheck.checkDnodes(3) + + subprocess.run("taosBenchmark -t 100 -n 100000 -a 3 -y", shell=True, check=True) + + tdSql.execute("alter database test WAL_RETENTION_PERIOD 0") + + # set keep version 0 + tdSql.execute("alter vgroup 2 set keep 0") + tdSql.execute("flush database test") + + tdSql.query("show test.vgroups") + tdSql.checkData(0, 18, 0) + tdSql.checkData(1, 18, -1) + + max_retry = 30 + # check wal vgId 3 firstVer is greater than 0 means flush finished + for dnode_id in [1,2,3]: + check_ver = False + for _ in range(max_retry): + ver = self.get_wal_file_first_version(dnode_id, 3) + tdLog.info(f"dnode{dnode_id} vg3 firstVer: {ver}") + if ver > 0: + check_ver = True + break + time.sleep(1) + + assert check_ver, f"dnode{dnode_id} vg3 firstVer is not greater than 0 after {max_retry} seconds" + + # check wal vgId 2 firstVer is 0 + for dnode_id in [1,2,3]: + check_ver = False + for _ in range(max_retry): + ver = self.get_wal_file_first_version(dnode_id, 2) + tdLog.info(f"dnode{dnode_id} vg2 firstVer: {ver}") + if ver == 0: + check_ver = True + break + time.sleep(1) + assert check_ver, f"dnode{dnode_id} vg2 firstVer is not greater than 0 after {max_retry} seconds" + + + # trim database wal + tdSql.execute("trim database test wal") + + # check wal vgId 2 firstVer is greater than 0 after trim + for dnode_id in [1,2,3]: + check_ver = False + for _ in range(max_retry): + ver = self.get_wal_file_first_version(dnode_id, 2) + tdLog.info(f"dnode{dnode_id} vg2 firstVer: {ver}") + if ver > 0: + check_ver = True + break + time.sleep(1) + assert check_ver, f"dnode{dnode_id} vg2 firstVer is not greater than 0 after {max_retry} seconds after trim" + + + def get_wal_file_first_version(self, dnode_id, vgId): + """Get firstVer from WAL meta-ver file + + Args: + dnode_id: dnode ID + vgId: vgroup ID + + Returns: + int: firstVer value from meta-ver file + """ + rootDir = tdDnodes.getDnodeDir(dnode_id) + walPath = os.path.join(rootDir, "data", "vnode", f"vnode{vgId}", "wal") + + tdLog.info(f"walPath: {walPath}") + + # Find meta-ver file (exclude .tmp files, match pattern meta-ver or meta-ver*) + meta_ver_file = None + if os.path.exists(walPath): + for filename in os.listdir(walPath): + # Match meta-ver or meta-ver, but not meta-ver.tmp + if filename.startswith("meta-ver") and not filename.endswith(".tmp"): + meta_ver_file = os.path.join(walPath, filename) + tdLog.info(f"Found meta-ver file: {filename}") + break + + if not meta_ver_file: + tdLog.exit(f"meta-ver file not found in {walPath}") + + + with open(meta_ver_file, 'r') as f: + meta_data = json.load(f) + first_ver = int(meta_data['meta']['firstVer']) + tdLog.info(f"firstVer from {os.path.basename(meta_ver_file)}: {first_ver}") + return first_ver diff --git a/test/cases/22-Show/test_show_diskinfo.py b/test/cases/22-Show/test_show_diskinfo.py index 24263690d1cc..483d75b78c9f 100644 --- a/test/cases/22-Show/test_show_diskinfo.py +++ b/test/cases/22-Show/test_show_diskinfo.py @@ -69,7 +69,7 @@ def test_show_disk_info(self): tdSql.query(f"select * from information_schema.ins_disk_usage") tdSql.query(f"select sum(vgroup_id) from information_schema.ins_disk_usage") - tdSql.query(f"select sum(wal) from information_schema.ins_disk_usage") + tdSql.query(f"select sum(wal_size) from information_schema.ins_disk_usage") tdSql.query(f"select sum(data1) from information_schema.ins_disk_usage") tdSql.query(f"select sum(data2) from information_schema.ins_disk_usage") tdSql.query(f"select sum(data3) from information_schema.ins_disk_usage") diff --git a/test/ci/cases.task b/test/ci/cases.task index 3a26127758e1..e7c4f58afb72 100644 --- a/test/ci/cases.task +++ b/test/ci/cases.task @@ -263,6 +263,8 @@ ,,y,.,./ci/pytest.sh pytest cases/06-DataIngestion/05-Others/test_insert_null.py ,,y,.,./ci/pytest.sh pytest cases/06-DataIngestion/05-Others/test_insert_select.py ,,y,.,./ci/pytest.sh pytest cases/06-DataIngestion/05-Others/test_wal_kill.py +,,y,.,./ci/pytest.sh pytest cases/06-DataIngestion/05-Others/test_wal_keep_version_trim.py -N 3 +,,y,.,./ci/pytest.sh pytest cases/06-DataIngestion/05-Others/test_wal_keep_version_high_availability.py -N 3 # 07-DataQuerying ## 01-Operator diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index edca3af989ea..f12f3af8553d 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -221,7 +221,7 @@ def ins_columns_check(self): tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") - tdSql.checkRows(330) + tdSql.checkRows(332) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkRows(64) diff --git a/tests/system-test/0-others/test_show_disk_usage.py b/tests/system-test/0-others/test_show_disk_usage.py index 1c863d7cdf75..e35233cadca0 100644 --- a/tests/system-test/0-others/test_show_disk_usage.py +++ b/tests/system-test/0-others/test_show_disk_usage.py @@ -144,14 +144,14 @@ def run(self): tdSql.checkData(0,0,disk_occupied) tdSql.query(f"select sum(data1+data2+data3)/sum(raw_data) from information_schema.ins_disk_usage where db_name='{self.dbname}';") #tdSql.checkData(0,0,compress_radio/100) - tdSql.query(f"select sum(wal) from information_schema.ins_disk_usage where db_name='{self.dbname}';") + tdSql.query(f"select sum(wal_size) from information_schema.ins_disk_usage where db_name='{self.dbname}';") tdSql.query(f"select sum(table_meta) from information_schema.ins_disk_usage where db_name='{self.dbname}';") tdSql.query(f"select sum(cache_rdb) from information_schema.ins_disk_usage where db_name='{self.dbname}';") tdSql.execute(f"use {self.other_dbname};") tdSql.query(f"select sum(data1+data2+data3) from information_schema.ins_disk_usage where db_name='{self.other_dbname}';") tdSql.checkData(0,0,0) - tdSql.query(f"select sum(wal) from information_schema.ins_disk_usage where db_name='{self.other_dbname}';") + tdSql.query(f"select sum(wal_size) from information_schema.ins_disk_usage where db_name='{self.other_dbname}';") tdSql.checkData(0,0,0) tdSql.query(f"select sum(cache_rdb) from information_schema.ins_disk_usage where db_name='{self.other_dbname}';") tdSql.checkData(0,0,12) @@ -173,7 +173,7 @@ def run(self): tdSql.checkData(0,0,disk_occupied) - tdSql.query(f"select sum(wal) from information_schema.ins_disk_usage where db_name='{self.other_dbname}' or db_name='{self.dbname}';") + tdSql.query(f"select sum(wal_size) from information_schema.ins_disk_usage where db_name='{self.other_dbname}' or db_name='{self.dbname}';") tdSql.checkRows(1) iwal = tdSql.queryResult[0][0] tdSql.query(f"select sum(table_meta) from information_schema.ins_disk_usage where db_name='{self.other_dbname}' or db_name='{self.dbname}';") diff --git a/tests/system-test/8-stream/ts-5617.py b/tests/system-test/8-stream/ts-5617.py index 05c40ead45b1..cb53856d0869 100755 --- a/tests/system-test/8-stream/ts-5617.py +++ b/tests/system-test/8-stream/ts-5617.py @@ -99,7 +99,7 @@ def run(self): start_time = time.time() tdSql.execute(f'create stream s21 fill_history 1 async into ts5617.st21 tags(tname varchar(20)) subtable(tname) as select last(val), last(quality) from ts5617.stb_2_2_1 partition by tbname tname interval(1800s);') end_time = time.time() - if end_time - start_time > 1: + if end_time - start_time > 5: tdLog.exit("create history stream sync too long") tdSql.query("show streams") diff --git a/utils/test/c/tmq_offset_test.c b/utils/test/c/tmq_offset_test.c index 1060175f3ebf..b178ca16ad90 100644 --- a/utils/test/c/tmq_offset_test.c +++ b/utils/test/c/tmq_offset_test.c @@ -164,6 +164,7 @@ void test_offset(TAOS* pConn){ tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "msg.with.table.name", "false"); + tmq_conf_set(conf, "enable.wal.marker", "true"); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_conf_destroy(conf);