Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,23 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
}
defer srcClose(ctx)

partitioned := config.WatermarkColumn != ""
if tableSizeEstimatorConn, ok := srcConn.(connectors.TableSizeEstimatorConnector); ok && partitioned {
// expect estimate query execution to be fast, set a short timeout defensively to avoid blocking workflow
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if bytes, connErr := tableSizeEstimatorConn.GetTableSizeEstimatedBytes(timeoutCtx, config.WatermarkTable); connErr == nil {
if bytes > 100<<30 { // 100 GiB
msg := fmt.Sprintf("large table detected: %s (%s). Counting/partitioning queries for parallel "+
"snapshotting may take minutes to hours to execute. This is normal for tables over 100 GiB.",
config.WatermarkTable, utils.FormatTableSize(bytes))
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, msg)
}
} else {
logger.Warn("failed to get estimated table size", slog.Any("error", connErr))
}
}

partitions, err := srcConn.GetQRepPartitions(ctx, config, last)
if err != nil {
return nil, a.Alerter.LogFlowWrappedError(ctx, config.FlowJobName, "failed to get partitions from source", err)
Expand Down
10 changes: 10 additions & 0 deletions flow/connectors/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,12 @@ type DatabaseVariantConnector interface {
GetDatabaseVariant(ctx context.Context) (protos.DatabaseVariant, error)
}

type TableSizeEstimatorConnector interface {
Connector

GetTableSizeEstimatedBytes(ctx context.Context, tableName string) (int64, error)
}

func LoadPeerType(ctx context.Context, catalogPool shared.CatalogPool, peerName string) (protos.DBType, error) {
row := catalogPool.QueryRow(ctx, "SELECT type FROM peers WHERE name = $1", peerName)
var dbtype protos.DBType
Expand Down Expand Up @@ -673,4 +679,8 @@ var (

_ DatabaseVariantConnector = &connpostgres.PostgresConnector{}
_ DatabaseVariantConnector = &connmysql.MySqlConnector{}

_ TableSizeEstimatorConnector = &connpostgres.PostgresConnector{}
_ TableSizeEstimatorConnector = &connmysql.MySqlConnector{}
_ TableSizeEstimatorConnector = &connmongo.MongoConnector{}
)
12 changes: 12 additions & 0 deletions flow/connectors/mongo/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,15 @@ func (c *MongoConnector) GetLogRetentionHours(ctx context.Context) (float64, err

return float64(serverStatus.OplogTruncation.OplogMinRetentionHours), nil
}

func (c *MongoConnector) GetTableSizeEstimatedBytes(ctx context.Context, fullyQualifiedTable string) (int64, error) {
databaseTable, err := utils.ParseSchemaTable(fullyQualifiedTable)
if err != nil {
return 0, err
}
collStats, err := peerdb_mongo.GetCollStats(ctx, c.client, databaseTable.Schema, databaseTable.Table)
if err != nil {
return 0, err
}
return collStats.Size, nil
}
19 changes: 19 additions & 0 deletions flow/connectors/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,3 +515,22 @@ func (c *MySqlConnector) GetDatabaseVariant(ctx context.Context) (protos.Databas

return protos.DatabaseVariant_VARIANT_UNKNOWN, nil
}

func (c *MySqlConnector) GetTableSizeEstimatedBytes(ctx context.Context, fullyQualifiedTable string) (int64, error) {
schemaTable, err := utils.ParseSchemaTable(fullyQualifiedTable)
if err != nil {
return 0, err
}
query := fmt.Sprintf(
"SELECT data_length FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'",
mysql.Escape(schemaTable.Schema),
mysql.Escape(schemaTable.Table),
)

rs, err := c.Execute(ctx, query)
if err != nil {
return 0, err
}
defer rs.Close()
return rs.GetInt(0, 0)
}
12 changes: 12 additions & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1803,3 +1803,15 @@ func (c *PostgresConnector) GetDatabaseVariant(ctx context.Context) (protos.Data

return protos.DatabaseVariant_VARIANT_UNKNOWN, nil
}

func (c *PostgresConnector) GetTableSizeEstimatedBytes(ctx context.Context, fullyQualifiedTableName string) (int64, error) {
tableSizeQuery := "SELECT pg_relation_size(to_regclass($1))"
var tableSizeBytes pgtype.Int8
if err := c.conn.QueryRow(ctx, tableSizeQuery, fullyQualifiedTableName).Scan(&tableSizeBytes); err != nil {
return 0, err
}
if !tableSizeBytes.Valid {
return 0, errors.New("table size is not valid")
}
return tableSizeBytes.Int64, nil
}
29 changes: 28 additions & 1 deletion flow/pkg/mongo/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ func GetHelloResponse(ctx context.Context, client *mongo.Client) (HelloResponse,
return runCommand[HelloResponse](ctx, client, "hello")
}

type CollStats struct {
// uncompressed
Size int64 `bson:"size"`
// compressed
StorageSize int64 `bson:"storageSize"`
}

func GetCollStats(ctx context.Context, client *mongo.Client, database string, collection string) (CollStats, error) {
return runDatabaseCommand[CollStats](ctx, client, database, bson.D{
{Key: "collStats", Value: collection},
{Key: "scale", Value: 1},
})
}

func runCommand[T any](ctx context.Context, client *mongo.Client, command string) (T, error) {
var result T
singleResult := client.Database("admin").RunCommand(ctx, bson.D{
Expand All @@ -70,7 +84,20 @@ func runCommand[T any](ctx context.Context, client *mongo.Client, command string
}

if err := singleResult.Decode(&result); err != nil {
return result, fmt.Errorf("'%s' failed: %v", command, err)
return result, fmt.Errorf("'%s' decoding failed: %v", command, err)
}
return result, nil
}

func runDatabaseCommand[T any](ctx context.Context, client *mongo.Client, database string, commandDoc bson.D) (T, error) {
var result T
singleResult := client.Database(database).RunCommand(ctx, commandDoc)
if singleResult.Err() != nil {
return result, fmt.Errorf("'%s' failed: %v", commandDoc.String(), singleResult.Err())
}

if err := singleResult.Decode(&result); err != nil {
return result, fmt.Errorf("'%s' decoding failed: %v", commandDoc.String(), err)
}
return result, nil
}
Loading