Skip to content

Commit 27babd1

Browse files
committed
add user facing logging when table is too large
1 parent ad1c115 commit 27babd1

File tree

4 files changed

+55
-0
lines changed

4 files changed

+55
-0
lines changed

flow/activities/flowable.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,19 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context,
514514
}
515515
defer srcClose(ctx)
516516

517+
if tableSizeEstimatorConn, ok := srcConn.(connectors.TableSizeEstimatorConnector); ok {
518+
// expect estimate query execution to be fast, so set a short timeout
519+
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
520+
defer cancel()
521+
if bytes, connErr := tableSizeEstimatorConn.GetTableSizeEstimatedBytes(timeoutCtx, config.WatermarkTable); connErr == nil {
522+
if bytes > 1024*1024*1024*2024 {
523+
msg := fmt.Sprintf("Large table detected: %s (%s). Partitioning query may take several hours to execute. "+
524+
"This is normal for tables over 1 TB.", config.WatermarkTable, utils.FormatTableSize(bytes))
525+
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, msg)
526+
}
527+
}
528+
}
529+
517530
partitions, err := srcConn.GetQRepPartitions(ctx, config, last)
518531
if err != nil {
519532
return nil, a.Alerter.LogFlowWrappedError(ctx, config.FlowJobName, "failed to get partitions from source", err)

flow/connectors/core.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,12 @@ type DatabaseVariantConnector interface {
331331
GetDatabaseVariant(ctx context.Context) (protos.DatabaseVariant, error)
332332
}
333333

334+
type TableSizeEstimatorConnector interface {
335+
Connector
336+
337+
GetTableSizeEstimatedBytes(ctx context.Context, tableName string) (int64, error)
338+
}
339+
334340
func LoadPeerType(ctx context.Context, catalogPool shared.CatalogPool, peerName string) (protos.DBType, error) {
335341
row := catalogPool.QueryRow(ctx, "SELECT type FROM peers WHERE name = $1", peerName)
336342
var dbtype protos.DBType
@@ -673,4 +679,7 @@ var (
673679

674680
_ DatabaseVariantConnector = &connpostgres.PostgresConnector{}
675681
_ DatabaseVariantConnector = &connmysql.MySqlConnector{}
682+
683+
_ TableSizeEstimatorConnector = &connpostgres.PostgresConnector{}
684+
_ TableSizeEstimatorConnector = &connmysql.MySqlConnector{}
676685
)

flow/connectors/mysql/mysql.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,3 +515,23 @@ func (c *MySqlConnector) GetDatabaseVariant(ctx context.Context) (protos.Databas
515515

516516
return protos.DatabaseVariant_VARIANT_UNKNOWN, nil
517517
}
518+
519+
func (c *MySqlConnector) GetTableSizeEstimatedBytes(ctx context.Context, fullyQualifiedTable string) (int64, error) {
520+
schemaTable, err := utils.ParseSchemaTable(fullyQualifiedTable)
521+
if err != nil {
522+
return 0, err
523+
}
524+
query := fmt.Sprintf(
525+
"SELECT data_length FROM information_schema.tables WHERE table_schema = '%s' AND table_name = '%s'",
526+
mysql.Escape(schemaTable.Schema),
527+
mysql.Escape(schemaTable.Table),
528+
)
529+
530+
rs, err := c.Execute(ctx, query)
531+
if err != nil {
532+
c.logger.Warn("failed to get estimated table size", slog.Any("error", err))
533+
return 0, err
534+
}
535+
defer rs.Close()
536+
return rs.GetInt(0, 0)
537+
}

flow/connectors/postgres/postgres.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1803,3 +1803,16 @@ func (c *PostgresConnector) GetDatabaseVariant(ctx context.Context) (protos.Data
18031803

18041804
return protos.DatabaseVariant_VARIANT_UNKNOWN, nil
18051805
}
1806+
1807+
func (c *PostgresConnector) GetTableSizeEstimatedBytes(ctx context.Context, fullyQualifiedTableName string) (int64, error) {
1808+
tableSizeQuery := "SELECT pg_relation_size(to_regclass($1))"
1809+
var tableSizeBytes pgtype.Int8
1810+
if err := c.conn.QueryRow(ctx, tableSizeQuery, fullyQualifiedTableName).Scan(&tableSizeBytes); err != nil {
1811+
c.logger.Warn("failed to get estimated table size", slog.Any("error", err))
1812+
return 0, err
1813+
}
1814+
if !tableSizeBytes.Valid {
1815+
return 0, errors.New("table size is not valid")
1816+
}
1817+
return tableSizeBytes.Int64, nil
1818+
}

0 commit comments

Comments
 (0)