Skip to content

Commit 0044598

Browse files
committed
enh: enh sys scan in vtable query.
1 parent ff6664a commit 0044598

File tree

23 files changed

+451
-135
lines changed

23 files changed

+451
-135
lines changed

include/common/tmsg.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3414,6 +3414,12 @@ typedef struct STagScanOperatorParam {
34143414
tb_uid_t vcUid;
34153415
} STagScanOperatorParam;
34163416

3417+
typedef struct SSysScanOperatorParam {
3418+
bool isVstb;
3419+
tb_uid_t uid;
3420+
int64_t version;
3421+
} SSysScanOperatorParam;
3422+
34173423
typedef struct SVTableScanOperatorParam {
34183424
uint64_t uid;
34193425
SOperatorParam* pTagScanOp;

include/libs/executor/storageapi.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,9 @@ typedef struct SStoreMeta {
302302
int32_t (*metaGetCachedRefDbs)(void* pVnode, tb_uid_t suid, SArray* pList);
303303
int32_t (*metaPutRefDbsToCache)(void* pVnode, tb_uid_t suid, SArray* pList);
304304

305+
int32_t (*metaGetVirtualSupertableVersion)(void* pVnode, tb_uid_t suid, int32_t* version);
306+
int32_t (*metaGetVirtualNormalChildtableVersion)(void* pVnode, tb_uid_t uid, int32_t* version);
307+
305308
void* (*storeGetIndexInfo)(void* pVnode);
306309
void* (*getInvertIndex)(void* pVnode);
307310
// support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]

include/libs/nodes/plannodes.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ typedef struct SDynQueryCtrlVtbScan {
264264
uint64_t uid;
265265
int32_t rversion;
266266
SNodeList* pOrgVgIds;
267+
SNodeList* pSysScanVgIds;
267268
SVgroupsInfo* pVgroupList;
268269
} SDynQueryCtrlVtbScan;
269270

@@ -675,6 +676,7 @@ typedef struct SVtbScanDynCtrlBasic {
675676
SEpSet mgmtEpSet;
676677
SNodeList *pScanCols;
677678
SNodeList *pOrgVgIds;
679+
SNodeList *pSysScanVgIds;
678680
} SVtbScanDynCtrlBasic;
679681

680682
typedef struct SDynQueryCtrlPhysiNode {

source/common/src/msg/tmsg.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10996,6 +10996,13 @@ int32_t tSerializeSOperatorParam(SEncoder *pEncoder, SOperatorParam *pOpParam) {
1099610996
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pOpParam->opType));
1099710997
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pOpParam->downstreamIdx));
1099810998
switch (pOpParam->opType) {
10999+
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: {
11000+
SSysScanOperatorParam* pSysScan = (SSysScanOperatorParam *)pOpParam->value;
11001+
TAOS_CHECK_RETURN(tEncodeBool(pEncoder, pSysScan->isVstb));
11002+
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pSysScan->version));
11003+
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pSysScan->uid));
11004+
break;
11005+
}
1099911006
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: {
1100011007
STagScanOperatorParam *pTagScan = (STagScanOperatorParam *)pOpParam->value;
1100111008
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pTagScan->vcUid));
@@ -11047,6 +11054,17 @@ int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam *pOpParam)
1104711054
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pOpParam->opType));
1104811055
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pOpParam->downstreamIdx));
1104911056
switch (pOpParam->opType) {
11057+
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: {
11058+
pOpParam->value = taosMemoryMalloc(sizeof(SSysScanOperatorParam));
11059+
if (NULL == pOpParam->value) {
11060+
TAOS_CHECK_RETURN(terrno);
11061+
}
11062+
SSysScanOperatorParam* pSysScan = pOpParam->value;
11063+
TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pSysScan->isVstb));
11064+
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pSysScan->version));
11065+
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pSysScan->uid));
11066+
break;
11067+
}
1105011068
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: {
1105111069
pOpParam->value = taosMemoryMalloc(sizeof(STagScanOperatorParam));
1105211070
if (NULL == pOpParam->value) {

source/common/src/systable.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ static const SSysDbTableSchema userVctbColsSchema[] = {
262262
{.name = "col_source", .bytes = TSDB_COL_FNAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
263263
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
264264
{.name = "ref_version", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
265+
{.name = "vstb_version", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
265266
};
266267

267268
static const SSysDbTableSchema userTblDistSchema[] = {

source/dnode/vnode/inc/vnode.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t
155155
int32_t metaGetCachedRefDbs(void *pVnode, tb_uid_t suid, SArray *pList);
156156
int32_t metaPutRefDbsToCache(void *pVnode, tb_uid_t suid, SArray *pList);
157157

158+
int32_t metaGetVirtualSupertableVersion(void *pVnode, tb_uid_t suid, int32_t *version);
159+
int32_t metaGetVirtualNormalChildtableVersion(void *pVnode, tb_uid_t uid, int32_t *version);
160+
158161
// tsdb
159162
typedef struct STsdbReader STsdbReader;
160163

source/dnode/vnode/src/inc/vnodeInt.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -540,8 +540,9 @@ struct SVnode {
540540
SVMonitorObj monitor;
541541
uint32_t applyQueueErrorCount;
542542

543-
// Notification Handles
544-
SStreamNotifyHandleMap* pNotifyHandleMap;
543+
// version for virtual super table
544+
TdThreadRwlock versionLock;
545+
SHashObj* pVtableVersion;
545546
};
546547

547548
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)

source/dnode/vnode/src/meta/metaCache.c

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,39 @@ int32_t metaRefDbsCacheClear(SMeta* pMeta, uint64_t suid) {
995995
return code;
996996
}
997997

998+
int32_t metaGetVirtualSupertableVersion(void *pVnode, tb_uid_t suid, int32_t *version) {
999+
int32_t code = TSDB_CODE_SUCCESS;
1000+
SVnode* vnode = (SVnode*)pVnode;
1001+
1002+
(void)taosThreadRwlockRdlock(&vnode->versionLock);
1003+
int32_t* ver = taosHashGet(vnode->pVtableVersion, &suid, sizeof(tb_uid_t));
1004+
if (ver != NULL) {
1005+
*version = (*ver);
1006+
} else {
1007+
*version = 0;
1008+
}
1009+
(void)taosThreadRwlockUnlock(&vnode->versionLock);
1010+
return code;
1011+
}
1012+
1013+
int32_t metaGetVirtualNormalChildtableVersion(void *pVnode, tb_uid_t uid, int32_t *version) {
1014+
int code = TSDB_CODE_SUCCESS;
1015+
int32_t lino = 0;
1016+
SMetaReader mr = {0};
1017+
metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, META_READER_LOCK);
1018+
code = metaReaderGetTableEntryByUid(&mr, uid);
1019+
TSDB_CHECK_CODE(code, lino, _return);
1020+
1021+
*version = mr.me.colRef.version;
1022+
1023+
_return:
1024+
if (code != TSDB_CODE_SUCCESS) {
1025+
metaError("%s failed since %s, line %d", __func__, tstrerror(code), lino);
1026+
}
1027+
metaReaderClear(&mr);
1028+
return code;
1029+
}
1030+
9981031
int32_t metaGetCachedRefDbs(void* pVnode, tb_uid_t suid, SArray* pList) {
9991032
int32_t code = TSDB_CODE_SUCCESS;
10001033
int32_t line = 0;

source/dnode/vnode/src/meta/metaEntry2.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2540,6 +2540,24 @@ int32_t metaHandleEntry2(SMeta *pMeta, const SMetaEntry *pEntry) {
25402540
pMeta->changed = true;
25412541
metaDebug("vgId:%d, index:%" PRId64 ", handle meta entry success, type:%d tb:%s uid:%" PRId64, vgId,
25422542
pEntry->version, pEntry->type, pEntry->type > 0 ? pEntry->name : "", pEntry->uid);
2543+
if (type == TSDB_VIRTUAL_CHILD_TABLE) {
2544+
int64_t suid = pEntry->ctbEntry.suid;
2545+
(void)taosThreadRwlockWrlock(&pMeta->pVnode->versionLock);
2546+
int32_t *ver = taosHashGet(pMeta->pVnode->pVtableVersion, &suid, sizeof(tb_uid_t));
2547+
if (ver != NULL) {
2548+
int32_t newVersion = (*ver) + 1;
2549+
metaDebug("vgId:%d, vtable uid:%" PRId64 " version updated to %d" , TD_VID(pMeta->pVnode), suid, newVersion);
2550+
code = taosHashRemove(pMeta->pVnode->pVtableVersion, &suid, sizeof(tb_uid_t));
2551+
if (TSDB_CODE_SUCCESS == code) {
2552+
code = taosHashPut(pMeta->pVnode->pVtableVersion, &suid, sizeof(tb_uid_t), &newVersion, sizeof(newVersion));
2553+
}
2554+
} else {
2555+
int32_t newVersion = 1;
2556+
metaDebug("vgId:%d, vtable uid:%" PRId64 " version set to %c", TD_VID(pMeta->pVnode), suid, newVersion);
2557+
code = taosHashPut(pMeta->pVnode->pVtableVersion, &suid, sizeof(tb_uid_t), &newVersion, sizeof(newVersion));
2558+
}
2559+
(void)taosThreadRwlockUnlock(&pMeta->pVnode->versionLock);
2560+
}
25432561
} else {
25442562
metaErr(vgId, code);
25452563
}

source/dnode/vnode/src/vnd/vnodeApi.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ void initMetadataAPI(SStoreMeta* pMeta) {
115115

116116
pMeta->metaGetCachedRefDbs = metaGetCachedRefDbs;
117117
pMeta->metaPutRefDbsToCache = metaPutRefDbsToCache;
118+
119+
pMeta->metaGetVirtualSupertableVersion = metaGetVirtualSupertableVersion;
120+
pMeta->metaGetVirtualNormalChildtableVersion = metaGetVirtualNormalChildtableVersion;
118121
}
119122

120123
void initTqAPI(SStoreTqReader* pTq) {

0 commit comments

Comments
 (0)