Skip to content

Commit 8581b2f

Browse files
committed
add mongo support too
1 parent 27babd1 commit 8581b2f

File tree

5 files changed

+49
-4
lines changed

5 files changed

+49
-4
lines changed

flow/activities/flowable.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -514,14 +514,15 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
514514
}
515515
defer srcClose(ctx)
516516

517-
if tableSizeEstimatorConn, ok := srcConn.(connectors.TableSizeEstimatorConnector); ok {
518-
// expect estimate query execution to be fast, so set a short timeout
517+
partitioned := config.WatermarkColumn != ""
518+
if tableSizeEstimatorConn, ok := srcConn.(connectors.TableSizeEstimatorConnector); ok && partitioned {
519+
// expect estimate query execution to be fast, set a short timeout defensively to avoid blocking workflow
519520
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
520521
defer cancel()
521522
if bytes, connErr := tableSizeEstimatorConn.GetTableSizeEstimatedBytes(timeoutCtx, config.WatermarkTable); connErr == nil {
522-
if bytes > 1024*1024*1024*2024 {
523+
if bytes > 1<<40 { // 1 TiB
523524
msg := fmt.Sprintf("Large table detected: %s (%s). Partitioning query may take several hours to execute. "+
524-
"This is normal for tables over 1 TB.", config.WatermarkTable, utils.FormatTableSize(bytes))
525+
"This is normal for tables over 1 TiB.", config.WatermarkTable, utils.FormatTableSize(bytes))
525526
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, msg)
526527
}
527528
}

flow/connectors/core.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,4 +682,5 @@ var (
682682

683683
_ TableSizeEstimatorConnector = &connpostgres.PostgresConnector{}
684684
_ TableSizeEstimatorConnector = &connmysql.MySqlConnector{}
685+
_ TableSizeEstimatorConnector = &connmongo.MongoConnector{}
685686
)

flow/connectors/mongo/mongo.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,15 @@ func (c *MongoConnector) GetLogRetentionHours(ctx context.Context) (float64, err
187187

188188
return float64(serverStatus.OplogTruncation.OplogMinRetentionHours), nil
189189
}
190+
191+
func (c *MongoConnector) GetTableSizeEstimatedBytes(ctx context.Context, fullyQualifiedTable string) (int64, error) {
192+
databaseTable, err := utils.ParseSchemaTable(fullyQualifiedTable)
193+
if err != nil {
194+
return 0, err
195+
}
196+
collStats, err := peerdb_mongo.GetCollStats(ctx, c.client, databaseTable.Schema, databaseTable.Table)
197+
if err != nil {
198+
return 0, err
199+
}
200+
return collStats.Size, nil
201+
}

flow/connectors/postgres/qrep.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ type QRepSyncSink interface {
4141
CopyInto(context.Context, *PostgresConnector, pgx.Tx, pgx.Identifier) (int64, error)
4242
}
4343

44+
func (c *PostgresConnector) SupportsTablePartitioning() bool {
45+
return true
46+
}
47+
4448
func (c *PostgresConnector) GetQRepPartitions(
4549
ctx context.Context,
4650
config *protos.QRepConfig,

flow/pkg/mongo/commands.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,20 @@ func GetHelloResponse(ctx context.Context, client *mongo.Client) (HelloResponse,
6060
return runCommand[HelloResponse](ctx, client, "hello")
6161
}
6262

63+
type CollStats struct {
64+
// uncompressed
65+
Size int64 `bson:"size"`
66+
// compressed
67+
StorageSize int64 `bson:"storageSize"`
68+
}
69+
70+
func GetCollStats(ctx context.Context, client *mongo.Client, database string, collection string) (CollStats, error) {
71+
return runDatabaseCommand[CollStats](ctx, client, database, bson.D{
72+
{Key: "collStats", Value: collection},
73+
{Key: "scale", Value: 1},
74+
})
75+
}
76+
6377
func runCommand[T any](ctx context.Context, client *mongo.Client, command string) (T, error) {
6478
var result T
6579
singleResult := client.Database("admin").RunCommand(ctx, bson.D{
@@ -74,3 +88,16 @@ func runCommand[T any](ctx context.Context, client *mongo.Client, command string
7488
}
7589
return result, nil
7690
}
91+
92+
func runDatabaseCommand[T any](ctx context.Context, client *mongo.Client, database string, commandDoc bson.D) (T, error) {
93+
var result T
94+
singleResult := client.Database(database).RunCommand(ctx, commandDoc)
95+
if singleResult.Err() != nil {
96+
return result, fmt.Errorf("command failed: %v", singleResult.Err())
97+
}
98+
99+
if err := singleResult.Decode(&result); err != nil {
100+
return result, fmt.Errorf("command decoding failed: %v", err)
101+
}
102+
return result, nil
103+
}

0 commit comments

Comments
 (0)