Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Jul 12, 2023
2 parents 5a8121b + 2033a70 commit e3249c8
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions flow/connectors/postgres/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *PostgresConnector) getNumRowsPartitions(
config *protos.QRepConfig,
last *protos.QRepPartition,
) ([]*protos.QRepPartition, error) {
numRows := int64(config.NumRowsPerPartition)
numRowsPerPartition := int64(config.NumRowsPerPartition)
quotedWatermarkColumn := fmt.Sprintf("\"%s\"", config.WatermarkColumn)

whereClause := ""
Expand All @@ -67,7 +67,7 @@ func (c *PostgresConnector) getNumRowsPartitions(
// Query to get the total number of rows in the table
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s %s", config.WatermarkTable, whereClause)
var row pgx.Row
var minVal interface{}
var minVal interface{} = nil
if last != nil && last.Range != nil {
switch lastRange := last.Range.Range.(type) {
case *protos.PartitionRange_IntRange:
Expand All @@ -85,11 +85,18 @@ func (c *PostgresConnector) getNumRowsPartitions(
return nil, fmt.Errorf("failed to query for total rows: %w", err)
}

if totalRows == 0 {
log.Warnf("no records to replicate for flow job %s, returning", config.FlowJobName)
return make([]*protos.QRepPartition, 0), nil
}

// Calculate the number of partitions
numPartitions := totalRows / numRows
if totalRows%numRows != 0 {
numPartitions := totalRows / numRowsPerPartition
if totalRows%numRowsPerPartition != 0 {
numPartitions++
}
log.Infof("total rows: %d, num partitions: %d, num rows per partition: %d",
totalRows, numPartitions, numRowsPerPartition)

// Query to get partitions using window functions
var rows pgx.Rows
Expand All @@ -106,6 +113,7 @@ func (c *PostgresConnector) getNumRowsPartitions(
quotedWatermarkColumn,
config.WatermarkTable,
)
log.Infof("partitions query: %s", partitionsQuery)
rows, err = c.pool.Query(c.ctx, partitionsQuery, minVal)
} else {
partitionsQuery := fmt.Sprintf(
Expand All @@ -118,6 +126,7 @@ func (c *PostgresConnector) getNumRowsPartitions(
quotedWatermarkColumn,
config.WatermarkTable,
)
log.Infof("partitions query: %s", partitionsQuery)
rows, err = c.pool.Query(c.ctx, partitionsQuery)
}
if err != nil {
Expand Down

0 comments on commit e3249c8

Please sign in to comment.