Skip to content

Commit aa7ac99

Browse files
authored
Add user-facing logging to identify large tables (#3686)
Adding a log line when we identified a 1TB+ table to detect large tables more (should this threshold be lower?) Testing: - tested postgres/mysql/mongo queries locally with a smaller threshold to make sure logging works as expected and has correct value.
1 parent ad1c115 commit aa7ac99

File tree

6 files changed

+98
-1
lines changed

6 files changed

+98
-1
lines changed

flow/activities/flowable.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,23 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
514514
}
515515
defer srcClose(ctx)
516516

517+
partitioned := config.WatermarkColumn != ""
518+
if tableSizeEstimatorConn, ok := srcConn.(connectors.TableSizeEstimatorConnector); ok && partitioned {
519+
// expect estimate query execution to be fast, set a short timeout defensively to avoid blocking workflow
520+
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
521+
defer cancel()
522+
if bytes, connErr := tableSizeEstimatorConn.GetTableSizeEstimatedBytes(timeoutCtx, config.WatermarkTable); connErr == nil {
523+
if bytes > 100<<30 { // 100 GiB
524+
msg := fmt.Sprintf("large table detected: %s (%s). Counting/partitioning queries for parallel "+
525+
"snapshotting may take minutes to hours to execute. This is normal for tables over 100 GiB.",
526+
config.WatermarkTable, utils.FormatTableSize(bytes))
527+
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, msg)
528+
}
529+
} else {
530+
logger.Warn("failed to get estimated table size", slog.Any("error", connErr))
531+
}
532+
}
533+
517534
partitions, err := srcConn.GetQRepPartitions(ctx, config, last)
518535
if err != nil {
519536
return nil, a.Alerter.LogFlowWrappedError(ctx, config.FlowJobName, "failed to get partitions from source", err)

flow/connectors/core.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,12 @@ type DatabaseVariantConnector interface {
331331
GetDatabaseVariant(ctx context.Context) (protos.DatabaseVariant, error)
332332
}
333333

334+
type TableSizeEstimatorConnector interface {
335+
Connector
336+
337+
GetTableSizeEstimatedBytes(ctx context.Context, tableName string) (int64, error)
338+
}
339+
334340
func LoadPeerType(ctx context.Context, catalogPool shared.CatalogPool, peerName string) (protos.DBType, error) {
335341
row := catalogPool.QueryRow(ctx, "SELECT type FROM peers WHERE name = $1", peerName)
336342
var dbtype protos.DBType
@@ -673,4 +679,8 @@ var (
673679

674680
_ DatabaseVariantConnector = &connpostgres.PostgresConnector{}
675681
_ DatabaseVariantConnector = &connmysql.MySqlConnector{}
682+
683+
_ TableSizeEstimatorConnector = &connpostgres.PostgresConnector{}
684+
_ TableSizeEstimatorConnector = &connmysql.MySqlConnector{}
685+
_ TableSizeEstimatorConnector = &connmongo.MongoConnector{}
676686
)

flow/connectors/mongo/mongo.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,3 +187,15 @@ func (c *MongoConnector) GetLogRetentionHours(ctx context.Context) (float64, err
187187

188188
return float64(serverStatus.OplogTruncation.OplogMinRetentionHours), nil
189189
}
190+
191+
func (c *MongoConnector) GetTableSizeEstimatedBytes(ctx context.Context, fullyQualifiedTable string) (int64, error) {
192+
databaseTable, err := utils.ParseSchemaTable(fullyQualifiedTable)
193+
if err != nil {
194+
return 0, err
195+
}
196+
collStats, err := peerdb_mongo.GetCollStats(ctx, c.client, databaseTable.Schema, databaseTable.Table)
197+
if err != nil {
198+
return 0, err
199+
}
200+
return collStats.Size, nil
201+
}

flow/connectors/mysql/mysql.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,3 +515,22 @@ func (c *MySqlConnector) GetDatabaseVariant(ctx context.Context) (protos.Databas
515515

516516
return protos.DatabaseVariant_VARIANT_UNKNOWN, nil
517517
}
518+
519+
func (c *MySqlConnector) GetTableSizeEstimatedBytes(ctx context.Context, fullyQualifiedTable string) (int64, error) {
520+
schemaTable, err := utils.ParseSchemaTable(fullyQualifiedTable)
521+
if err != nil {
522+
return 0, err
523+
}
524+
query := fmt.Sprintf(
525+
"SELECT data_length FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'",
526+
mysql.Escape(schemaTable.Schema),
527+
mysql.Escape(schemaTable.Table),
528+
)
529+
530+
rs, err := c.Execute(ctx, query)
531+
if err != nil {
532+
return 0, err
533+
}
534+
defer rs.Close()
535+
return rs.GetInt(0, 0)
536+
}

flow/connectors/postgres/postgres.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1803,3 +1803,15 @@ func (c *PostgresConnector) GetDatabaseVariant(ctx context.Context) (protos.Data
18031803

18041804
return protos.DatabaseVariant_VARIANT_UNKNOWN, nil
18051805
}
1806+
1807+
func (c *PostgresConnector) GetTableSizeEstimatedBytes(ctx context.Context, fullyQualifiedTableName string) (int64, error) {
1808+
tableSizeQuery := "SELECT pg_relation_size(to_regclass($1))"
1809+
var tableSizeBytes pgtype.Int8
1810+
if err := c.conn.QueryRow(ctx, tableSizeQuery, fullyQualifiedTableName).Scan(&tableSizeBytes); err != nil {
1811+
return 0, err
1812+
}
1813+
if !tableSizeBytes.Valid {
1814+
return 0, errors.New("table size is not valid")
1815+
}
1816+
return tableSizeBytes.Int64, nil
1817+
}

flow/pkg/mongo/commands.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,20 @@ func GetHelloResponse(ctx context.Context, client *mongo.Client) (HelloResponse,
6060
return runCommand[HelloResponse](ctx, client, "hello")
6161
}
6262

63+
type CollStats struct {
64+
// uncompressed
65+
Size int64 `bson:"size"`
66+
// compressed
67+
StorageSize int64 `bson:"storageSize"`
68+
}
69+
70+
func GetCollStats(ctx context.Context, client *mongo.Client, database string, collection string) (CollStats, error) {
71+
return runDatabaseCommand[CollStats](ctx, client, database, bson.D{
72+
{Key: "collStats", Value: collection},
73+
{Key: "scale", Value: 1},
74+
})
75+
}
76+
6377
func runCommand[T any](ctx context.Context, client *mongo.Client, command string) (T, error) {
6478
var result T
6579
singleResult := client.Database("admin").RunCommand(ctx, bson.D{
@@ -70,7 +84,20 @@ func runCommand[T any](ctx context.Context, client *mongo.Client, command string
7084
}
7185

7286
if err := singleResult.Decode(&result); err != nil {
73-
return result, fmt.Errorf("'%s' failed: %v", command, err)
87+
return result, fmt.Errorf("'%s' decoding failed: %v", command, err)
88+
}
89+
return result, nil
90+
}
91+
92+
func runDatabaseCommand[T any](ctx context.Context, client *mongo.Client, database string, commandDoc bson.D) (T, error) {
93+
var result T
94+
singleResult := client.Database(database).RunCommand(ctx, commandDoc)
95+
if singleResult.Err() != nil {
96+
return result, fmt.Errorf("'%s' failed: %v", commandDoc.String(), singleResult.Err())
97+
}
98+
99+
if err := singleResult.Decode(&result); err != nil {
100+
return result, fmt.Errorf("'%s' decoding failed: %v", commandDoc.String(), err)
74101
}
75102
return result, nil
76103
}

0 commit comments

Comments
 (0)