Skip to content

Commit 713f301

Browse files
committed
reafctor qrep_partitioning to smaller components
1 parent 1e25143 commit 713f301

File tree

2 files changed

+293
-217
lines changed

2 files changed

+293
-217
lines changed

flow/connectors/postgres/qrep.go

Lines changed: 61 additions & 217 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,9 @@ package connpostgres
22

33
import (
44
"bytes"
5-
"cmp"
65
"context"
7-
"errors"
86
"fmt"
97
"log/slog"
10-
"math"
118
"strconv"
129
"strings"
1310
"text/template"
@@ -68,13 +65,21 @@ func (c *PostgresConnector) GetQRepPartitions(
6865
if err != nil {
6966
return nil, fmt.Errorf("failed to begin transaction: %w", err)
7067
}
68+
// rollback transaction will no-op if transaction is committed
7169
defer shared.RollbackTx(getPartitionsTx, c.logger)
7270

7371
if err := c.setTransactionSnapshot(ctx, getPartitionsTx, config.SnapshotName); err != nil {
7472
return nil, fmt.Errorf("failed to set transaction snapshot: %w", err)
7573
}
7674

77-
return c.getNumRowsPartitions(ctx, getPartitionsTx, config, last)
75+
partitions, err := c.getPartitions(ctx, getPartitionsTx, config, last)
76+
77+
// commit transaction
78+
if err := getPartitionsTx.Commit(ctx); err != nil {
79+
return nil, fmt.Errorf("failed to commit transaction: %w", err)
80+
}
81+
82+
return partitions, err
7883
}
7984

8085
func (c *PostgresConnector) GetDefaultPartitionKeyForTables(
@@ -152,244 +157,83 @@ func (c *PostgresConnector) setTransactionSnapshot(ctx context.Context, tx pgx.T
152157
return nil
153158
}
154159

155-
func (c *PostgresConnector) getNumRowsPartitions(
160+
func (c *PostgresConnector) getPartitions(
156161
ctx context.Context,
157162
tx pgx.Tx,
158163
config *protos.QRepConfig,
159164
last *protos.QRepPartition,
160165
) ([]*protos.QRepPartition, error) {
161-
numPartitions := int64(config.NumPartitionsOverride)
162166
numRowsPerPartition := int64(config.NumRowsPerPartition)
163-
quotedWatermarkColumn := utils.QuoteIdentifier(config.WatermarkColumn)
164-
165-
whereClause := ""
166-
if last != nil && last.Range != nil {
167-
whereClause = fmt.Sprintf(`WHERE %s > $1`, quotedWatermarkColumn)
168-
}
169-
170-
parsedWatermarkTable, err := utils.ParseSchemaTable(config.WatermarkTable)
167+
numPartitions := int64(config.NumPartitionsOverride)
168+
schemaTable, err := utils.ParseSchemaTable(config.WatermarkTable)
171169
if err != nil {
172170
return nil, fmt.Errorf("unable to parse watermark table: %w", err)
173171
}
172+
watermarkTable := schemaTable.String()
173+
watermarkColumn := utils.QuoteIdentifier(config.WatermarkColumn)
174174

175-
if numPartitions == 0 {
176-
// Query to get the total number of rows in the table
177-
countQuery := fmt.Sprintf(`SELECT COUNT(*) FROM %s %s`, parsedWatermarkTable.String(), whereClause)
178-
var row pgx.Row
179-
var minVal any
180-
if last != nil && last.Range != nil {
181-
switch lastRange := last.Range.Range.(type) {
182-
case *protos.PartitionRange_IntRange:
183-
minVal = lastRange.IntRange.End
184-
case *protos.PartitionRange_UintRange:
185-
minVal = lastRange.UintRange.End
186-
case *protos.PartitionRange_TimestampRange:
187-
minVal = lastRange.TimestampRange.End.AsTime()
188-
}
189-
190-
row = tx.QueryRow(ctx, countQuery, minVal)
191-
} else {
192-
row = tx.QueryRow(ctx, countQuery)
193-
}
194-
195-
var totalRows pgtype.Int8
196-
if err := row.Scan(&totalRows); err != nil {
197-
return nil, fmt.Errorf("failed to query for total rows: %w", err)
198-
}
199-
200-
if totalRows.Int64 == 0 {
201-
c.logger.Warn("no records to replicate, returning")
202-
return nil, nil
203-
}
204-
205-
// Calculate the number of partitions
206-
adjustedPartitions := shared.AdjustNumPartitions(totalRows.Int64, numRowsPerPartition)
207-
c.logger.Info("[postgres] partition adjustment details",
208-
slog.Int64("totalRows", totalRows.Int64),
209-
slog.Int64("desiredNumRowsPerPartition", numRowsPerPartition),
210-
slog.Int64("adjustedNumPartitions", adjustedPartitions.AdjustedNumPartitions),
211-
slog.Int64("adjustedNumRowsPerPartition", adjustedPartitions.AdjustedNumRowsPerPartition))
212-
213-
// Query to get partitions using window functions
214-
var rows pgx.Rows
215-
if minVal != nil {
216-
partitionsQuery := fmt.Sprintf(
217-
`SELECT bucket, MIN(%[2]s) AS start, MAX(%[2]s) AS end
218-
FROM (
219-
SELECT NTILE(%[1]d) OVER (ORDER BY %[2]s) AS bucket, %[2]s
220-
FROM %[3]s WHERE %[2]s > $1
221-
) subquery
222-
GROUP BY bucket
223-
ORDER BY start`,
224-
adjustedPartitions.AdjustedNumPartitions,
225-
quotedWatermarkColumn,
226-
parsedWatermarkTable.String(),
227-
)
228-
c.logger.Info("[row_based_next] partitions query", slog.String("query", partitionsQuery))
229-
rows, err = tx.Query(ctx, partitionsQuery, minVal)
230-
} else {
231-
partitionsQuery := fmt.Sprintf(
232-
`SELECT bucket, MIN(%[2]s) AS start, MAX(%[2]s) AS end
233-
FROM (
234-
SELECT NTILE(%[1]d) OVER (ORDER BY %[2]s) AS bucket, %[2]s FROM %[3]s
235-
) subquery
236-
GROUP BY bucket
237-
ORDER BY start`,
238-
adjustedPartitions.AdjustedNumPartitions,
239-
quotedWatermarkColumn,
240-
parsedWatermarkTable.String(),
241-
)
242-
c.logger.Info("[row_based] partitions query", slog.String("query", partitionsQuery))
243-
rows, err = tx.Query(ctx, partitionsQuery)
244-
}
245-
if err != nil {
246-
return nil, shared.LogError(c.logger, fmt.Errorf("failed to query for partitions: %w", err))
247-
}
248-
defer rows.Close()
249-
250-
partitionHelper := utils.NewPartitionHelper(c.logger)
251-
for rows.Next() {
252-
var bucket pgtype.Int8
253-
var start, end any
254-
if err := rows.Scan(&bucket, &start, &end); err != nil {
255-
return nil, fmt.Errorf("failed to scan row: %w", err)
256-
}
257-
258-
if err := partitionHelper.AddPartition(start, end); err != nil {
259-
return nil, fmt.Errorf("failed to add partition: %w", err)
260-
}
261-
}
262-
263-
if err := rows.Err(); err != nil {
264-
return nil, fmt.Errorf("failed to read rows: %w", err)
265-
}
266-
267-
if err := tx.Commit(ctx); err != nil {
268-
return nil, fmt.Errorf("failed to commit transaction: %w", err)
269-
}
270-
271-
return partitionHelper.GetPartitions(), nil
272-
} else {
273-
// Special handling for CTID watermark column when a fixed number of partitions is specified:
274-
// Partitions are created by dividing table blocks uniformly.
275-
// Note: partition boundaries (block ranges) are uniform, but actual row distribution may be skewed
276-
// due to table bloat, deleted tuples, or uneven data distribution across blocks.
277-
if config.WatermarkColumn == ctidColumnName {
278-
return c.getCTIDBlockPartitions(ctx, tx, *parsedWatermarkTable, numPartitions, last)
279-
}
280-
281-
// Default path for non-CTID watermark column when a fixed number of partitions is specified:
282-
// Partitions are created by uniformly splitting the min/max value range.
283-
// Note: partition boundaries are uniform, but actual row distribution may be skewed
284-
// due to non-uniform data distribution, gaps in the value range, or deleted rows.
285-
minmaxQuery := fmt.Sprintf("SELECT MIN(%[2]s),MAX(%[2]s) FROM %[1]s %[3]s",
286-
parsedWatermarkTable.String(), quotedWatermarkColumn, whereClause)
287-
var row pgx.Row
288-
var minVal any
289-
if last != nil && last.Range != nil {
290-
switch lastRange := last.Range.Range.(type) {
291-
case *protos.PartitionRange_IntRange:
292-
minVal = lastRange.IntRange.End
293-
case *protos.PartitionRange_UintRange:
294-
minVal = lastRange.UintRange.End
295-
case *protos.PartitionRange_TimestampRange:
296-
minVal = lastRange.TimestampRange.End.AsTime()
175+
var lastRangeEnd any
176+
if last != nil && last.Range != nil {
177+
switch lastRange := last.Range.Range.(type) {
178+
case *protos.PartitionRange_IntRange:
179+
lastRangeEnd = lastRange.IntRange.End
180+
case *protos.PartitionRange_UintRange:
181+
lastRangeEnd = lastRange.UintRange.End
182+
case *protos.PartitionRange_TimestampRange:
183+
lastRangeEnd = lastRange.TimestampRange.End.AsTime()
184+
case *protos.PartitionRange_TidRange:
185+
lastRangeEnd = pgtype.TID{
186+
BlockNumber: lastRange.TidRange.End.BlockNumber,
187+
OffsetNumber: uint16(lastRange.TidRange.End.OffsetNumber),
188+
Valid: true,
297189
}
298-
299-
row = tx.QueryRow(ctx, minmaxQuery, minVal)
300-
} else {
301-
row = tx.QueryRow(ctx, minmaxQuery)
190+
default:
191+
return nil, fmt.Errorf("unknown range type %T", lastRange)
302192
}
303-
var start, end any
304-
if err := row.Scan(&start, &end); err != nil {
305-
return nil, err
306-
}
307-
308-
partitionHelper := utils.NewPartitionHelper(c.logger)
309-
if err := partitionHelper.AddPartitionsWithRange(start, end, numPartitions); err != nil {
310-
return nil, fmt.Errorf("failed to add partitions: %w", err)
311-
}
312-
return partitionHelper.GetPartitions(), nil
313193
}
314-
}
315194

316-
func (c *PostgresConnector) getCTIDBlockPartitions(
317-
ctx context.Context,
318-
tx pgx.Tx,
319-
parsedWatermarkTable utils.SchemaTable,
320-
numPartitions int64,
321-
last *protos.QRepPartition,
322-
) ([]*protos.QRepPartition, error) {
323-
if numPartitions <= 1 {
324-
return nil, errors.New("expect numPartitions to be greater than 1")
195+
partitionParams := PartitionParams{
196+
tx: tx,
197+
watermarkTable: watermarkTable,
198+
watermarkColumn: watermarkColumn,
199+
numPartitions: numPartitions,
200+
lastRangeEnd: lastRangeEnd,
201+
logger: c.logger,
325202
}
326203

327-
blocksQuery := "SELECT (pg_relation_size(to_regclass($1)) / current_setting('block_size')::int)::bigint"
328-
var totalBlocks pgtype.Int8
329-
if err := tx.QueryRow(ctx, blocksQuery, parsedWatermarkTable.String()).Scan(&totalBlocks); err != nil {
330-
return nil, fmt.Errorf("failed to get relation blocks: %w", err)
331-
}
332-
if !totalBlocks.Valid || totalBlocks.Int64 <= 0 {
333-
return nil, fmt.Errorf("total blocks: %d, valid: %t", totalBlocks.Int64, totalBlocks.Valid)
334-
}
335-
336-
tidCmp := func(a pgtype.TID, b pgtype.TID) int {
337-
if blockCmp := cmp.Compare(a.BlockNumber, b.BlockNumber); blockCmp != 0 {
338-
return blockCmp
339-
}
340-
return cmp.Compare(a.OffsetNumber, b.OffsetNumber)
341-
}
342-
343-
tidInc := func(t pgtype.TID) pgtype.TID {
344-
if t.OffsetNumber < math.MaxUint16 {
345-
return pgtype.TID{BlockNumber: t.BlockNumber, OffsetNumber: t.OffsetNumber + 1, Valid: true}
204+
if config.NumPartitionsOverride <= 0 {
205+
computedNumPartitions, err := ComputeNumPartitions(ctx, partitionParams, numRowsPerPartition)
206+
if err != nil {
207+
return nil, err
346208
}
347-
return pgtype.TID{BlockNumber: t.BlockNumber + 1, OffsetNumber: 0, Valid: true}
209+
partitionParams.numPartitions = computedNumPartitions
348210
}
349211

350-
tidRangeForPartition := func(partitionIndex int64) (pgtype.TID, pgtype.TID, bool) {
351-
blockStart := uint32((partitionIndex * totalBlocks.Int64) / numPartitions)
352-
nextPartitionBlockStart := uint32(((partitionIndex + 1) * totalBlocks.Int64) / numPartitions)
353-
if nextPartitionBlockStart <= blockStart {
354-
return pgtype.TID{}, pgtype.TID{}, false
212+
logLargeTable := func() {
213+
tableSizeQuery := "SELECT pg_relation_size(to_regclass($1))"
214+
var tableSizeBytes pgtype.Int8
215+
if err := tx.QueryRow(ctx, tableSizeQuery, watermarkTable).Scan(&tableSizeBytes); err != nil {
216+
c.logger.Warn("failed to get table size, skipping size check", slog.Any("error", err))
217+
} else if tableSizeBytes.Valid && tableSizeBytes.Int64 > 1024*1024*1024*1024 {
218+
c.logger.Info(fmt.Sprintf("Large table detected: %s (%s). Partitioning query may take "+
219+
"several hours to execute. This is normal for tables over 1 TB.",
220+
watermarkTable, utils.FormatTableSize(tableSizeBytes.Int64)))
355221
}
356-
tidStartInclusive := pgtype.TID{BlockNumber: blockStart, OffsetNumber: 0, Valid: true}
357-
tidEndInclusive := pgtype.TID{BlockNumber: nextPartitionBlockStart - 1, OffsetNumber: math.MaxUint16, Valid: true}
358-
return tidStartInclusive, tidEndInclusive, true
359222
}
360223

361-
var resumeFrom pgtype.TID
362-
if last != nil && last.Range != nil {
363-
if lr, ok := last.Range.Range.(*protos.PartitionRange_TidRange); ok {
364-
resume := pgtype.TID{BlockNumber: lr.TidRange.End.BlockNumber, OffsetNumber: uint16(lr.TidRange.End.OffsetNumber), Valid: true}
365-
resumeFrom = tidInc(resume)
224+
var partitionFunc PartitioningFunc
225+
if config.NumPartitionsOverride > 0 {
226+
if config.WatermarkColumn == ctidColumnName {
227+
partitionFunc = CTIDBlockPartitioningFunc
366228
} else {
367-
c.logger.Warn("Ignoring resume offset because it's not TidRange")
368-
}
369-
}
370-
371-
partitionHelper := utils.NewPartitionHelper(c.logger)
372-
for i := range numPartitions {
373-
start, end, valid := tidRangeForPartition(i)
374-
if !valid {
375-
continue
376-
}
377-
if resumeFrom.Valid {
378-
if tidCmp(end, resumeFrom) < 0 {
379-
continue
380-
}
381-
if tidCmp(start, resumeFrom) < 0 {
382-
start = resumeFrom
383-
}
384-
}
385-
if err := partitionHelper.AddPartition(
386-
pgtype.TID{BlockNumber: start.BlockNumber, OffsetNumber: start.OffsetNumber, Valid: true},
387-
pgtype.TID{BlockNumber: end.BlockNumber, OffsetNumber: end.OffsetNumber, Valid: true},
388-
); err != nil {
389-
return nil, fmt.Errorf("failed to add TID partition: %w", err)
229+
logLargeTable()
230+
partitionFunc = MinMaxRangePartitioningFunc
390231
}
232+
} else {
233+
logLargeTable()
234+
partitionFunc = NTileBucketPartitioningFunc
391235
}
392-
return partitionHelper.GetPartitions(), nil
236+
return partitionFunc(ctx, partitionParams)
393237
}
394238

395239
func (c *PostgresConnector) getMinMaxValues(

0 commit comments

Comments
 (0)