Skip to content

Commit

Permalink
Merge branch 'main' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Apr 12, 2024
2 parents bde2362 + 7a402b6 commit ac156ae
Show file tree
Hide file tree
Showing 55 changed files with 910 additions and 617 deletions.
3 changes: 3 additions & 0 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config *protos.QRepConfig,
runUUID string,
) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return monitoring.UpdateEndTimeForQRepRun(ctx, a.CatalogPool, runUUID)
Expand All @@ -757,6 +758,7 @@ func (a *FlowableActivity) ConsolidateQRepPartitions(ctx context.Context, config
}

func (a *FlowableActivity) CleanupQRepFlow(ctx context.Context, config *protos.QRepConfig) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dst, err := connectors.GetQRepConsolidateConnector(ctx, config.DestinationPeer)
if errors.Is(err, connectors.ErrUnsupportedFunctionality) {
return nil
Expand All @@ -780,6 +782,7 @@ func (a *FlowableActivity) DropFlowSource(ctx context.Context, config *protos.Sh
}

func (a *FlowableActivity) DropFlowDestination(ctx context.Context, config *protos.ShutdownRequest) error {
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName)
dstConn, err := connectors.GetCDCSyncConnector(ctx, config.DestinationPeer)
if err != nil {
return fmt.Errorf("failed to get destination connector: %w", err)
Expand Down
5 changes: 1 addition & 4 deletions flow/cmd/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,8 @@ type APIServerParams struct {

// setupGRPCGatewayServer sets up the grpc-gateway mux
func setupGRPCGatewayServer(args *APIServerParams) (*http.Server, error) {
//nolint:staticcheck
conn, err := grpc.DialContext(
context.Background(),
conn, err := grpc.NewClient(
fmt.Sprintf("0.0.0.0:%d", args.Port),
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
Expand Down
37 changes: 24 additions & 13 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (

metadataStore "github.com/PeerDB-io/peer-flow/connectors/external_metadata"
"github.com/PeerDB-io/peer-flow/connectors/utils"
numeric "github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/dynamicconf"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -620,11 +621,7 @@ func (c *BigQueryConnector) SetupNormalizedTable(
for _, column := range tableSchema.Columns {
genericColType := column.Type
if genericColType == "numeric" {
precision, scale := numeric.ParseNumericTypmod(column.TypeModifier)
if column.TypeModifier == -1 || precision > 38 || scale > 37 {
precision = numeric.PeerDBNumericPrecision
scale = numeric.PeerDBNumericScale
}
precision, scale := numeric.GetNumericTypeForWarehouse(column.TypeModifier, numeric.BigQueryNumericCompatibility{})
columns = append(columns, &bigquery.FieldSchema{
Name: column.Name,
Type: bigquery.BigNumericFieldType,
Expand Down Expand Up @@ -669,10 +666,20 @@ func (c *BigQueryConnector) SetupNormalizedTable(
}
}

timePartitionEnabled := dynamicconf.PeerDBBigQueryEnableSyncedAtPartitioning(ctx)
var timePartitioning *bigquery.TimePartitioning
if timePartitionEnabled && syncedAtColName != "" {
timePartitioning = &bigquery.TimePartitioning{
Type: bigquery.DayPartitioningType,
Field: syncedAtColName,
}
}

metadata := &bigquery.TableMetadata{
Schema: schema,
Name: datasetTable.table,
Clustering: clustering,
Schema: schema,
Name: datasetTable.table,
Clustering: clustering,
TimePartitioning: timePartitioning,
}

err = table.Create(ctx, metadata)
Expand All @@ -691,10 +698,14 @@ func (c *BigQueryConnector) SyncFlowCleanup(ctx context.Context, jobName string)
}

dataset := c.client.DatasetInProject(c.projectID, c.datasetID)
// deleting PeerDB specific tables
err = dataset.Table(c.getRawTableName(jobName)).Delete(ctx)
if err != nil {
return fmt.Errorf("failed to delete raw table: %w", err)
rawTableHandle := dataset.Table(c.getRawTableName(jobName))
// check if exists, then delete
_, err = rawTableHandle.Metadata(ctx)
if err == nil {
deleteErr := rawTableHandle.Delete(ctx)
if deleteErr != nil {
return fmt.Errorf("failed to delete raw table: %w", deleteErr)
}
}

return nil
Expand Down
9 changes: 4 additions & 5 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"go.temporal.io/sdk/activity"

avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
numeric "github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -278,10 +278,9 @@ func DefineAvroSchema(dstTableName string,
func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
avroNumericPrecision := int16(bqField.Precision)
avroNumericScale := int16(bqField.Scale)
if avroNumericPrecision > 38 || avroNumericPrecision <= 0 ||
avroNumericScale > 38 || avroNumericScale < 0 {
avroNumericPrecision = numeric.PeerDBNumericPrecision
avroNumericScale = numeric.PeerDBNumericScale
bqNumeric := numeric.BigQueryNumericCompatibility{}
if !bqNumeric.IsValidPrevisionAndScale(avroNumericPrecision, avroNumericScale) {
avroNumericPrecision, avroNumericScale = bqNumeric.DefaultPrecisionAndScale()
}

considerRepeated := func(typ string, repeated bool) interface{} {
Expand Down
27 changes: 18 additions & 9 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
)

const (
checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = ? AND name = ?) AS table_exists;`
mirrorJobsTableIdentifier = "PEERDB_MIRROR_JOBS"
checkIfTableExistsSQL = `SELECT exists(SELECT 1 FROM system.tables WHERE database = ? AND name = ?) AS table_exists;`
dropTableIfExistsSQL = `DROP TABLE IF EXISTS %s;`
)

// getRawTableName returns the raw table name for the given table identifier.
Expand All @@ -44,13 +44,6 @@ func (c *ClickhouseConnector) checkIfTableExists(ctx context.Context, databaseNa
return result.Int32 == 1, nil
}

type MirrorJobRow struct {
MirrorJobName string
Offset int
SyncBatchID int
NormalizeBatchID int
}

func (c *ClickhouseConnector) CreateRawTable(ctx context.Context, req *protos.CreateRawTableInput) (*protos.CreateRawTableOutput, error) {
rawTableName := c.getRawTableName(req.FlowJobName)

Expand Down Expand Up @@ -188,3 +181,19 @@ func (c *ClickhouseConnector) ReplayTableSchemaDeltas(ctx context.Context, flowJ

return nil
}

func (c *ClickhouseConnector) SyncFlowCleanup(ctx context.Context, jobName string) error {
err := c.PostgresMetadata.SyncFlowCleanup(ctx, jobName)
if err != nil {
return fmt.Errorf("[snowflake drop mirror] unable to clear metadata for sync flow cleanup: %w", err)
}

// delete raw table if exists
rawTableIdentifier := c.getRawTableName(jobName)
_, err = c.database.ExecContext(ctx, fmt.Sprintf(dropTableIfExistsSQL, rawTableIdentifier))
if err != nil {
return fmt.Errorf("[snowflake drop mirror] unable to drop raw table: %w", err)
}

return nil
}
8 changes: 2 additions & 6 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"strconv"
"strings"

"github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

Expand Down Expand Up @@ -84,11 +84,7 @@ func generateCreateTableSQLForNormalizedTable(

switch colType {
case qvalue.QValueKindNumeric:
precision, scale := numeric.ParseNumericTypmod(column.TypeModifier)
if column.TypeModifier == -1 || precision > 76 || scale > precision {
precision = numeric.PeerDBClickhousePrecision
scale = numeric.PeerDBClickhouseScale
}
precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{})
stmtBuilder.WriteString(fmt.Sprintf("`%s` DECIMAL(%d, %d), ",
colName, precision, scale))
default:
Expand Down
45 changes: 23 additions & 22 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/connectors/utils/cdc_records"
geo "github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/geo"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -112,6 +112,7 @@ func GetChildToParentRelIDMap(ctx context.Context, conn *pgx.Conn) (map[uint32]u

// PullRecords pulls records from the cdc stream
func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullRecordsRequest) error {
logger := logger.LoggerFromCtx(ctx)
conn := p.replConn.PgConn()
records := req.RecordStream
// clientXLogPos is the last checkpoint id, we need to ack that we have processed
Expand All @@ -135,25 +136,24 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
if cdcRecordsStorage.IsEmpty() {
records.SignalAsEmpty()
}
p.logger.Info(fmt.Sprintf("[finished] PullRecords streamed %d records", cdcRecordsStorage.Len()))
logger.Info(fmt.Sprintf("[finished] PullRecords streamed %d records", cdcRecordsStorage.Len()))
err := cdcRecordsStorage.Close()
if err != nil {
p.logger.Warn("failed to clean up records storage", slog.Any("error", err))
logger.Warn("failed to clean up records storage", slog.Any("error", err))
}
}()

shutdown := utils.HeartbeatRoutine(ctx, func() string {
currRecords := cdcRecordsStorage.Len()
msg := fmt.Sprintf("pulling records, currently have %d records", currRecords)
p.logger.Info(msg)
logger.Info(msg)
return msg
})
defer shutdown()

standbyMessageTimeout := req.IdleTimeout
nextStandbyMessageDeadline := time.Now().Add(standbyMessageTimeout)

logger := logger.LoggerFromCtx(ctx)
addRecordWithKey := func(key model.TableWithPkey, rec model.Record) error {
err := cdcRecordsStorage.Set(logger, key, rec)
if err != nil {
Expand All @@ -164,7 +164,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
if cdcRecordsStorage.Len() == 1 {
records.SignalAsNotEmpty()
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
p.logger.Info(fmt.Sprintf("pushing the standby deadline to %s", nextStandbyMessageDeadline))
logger.Info(fmt.Sprintf("pushing the standby deadline to %s", nextStandbyMessageDeadline))
}
return nil
}
Expand All @@ -183,7 +183,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco

if time.Since(standByLastLogged) > 10*time.Second {
numRowsProcessedMessage := fmt.Sprintf("processed %d rows", cdcRecordsStorage.Len())
p.logger.Info("Sent Standby status message. " + numRowsProcessedMessage)
logger.Info("Sent Standby status message. " + numRowsProcessedMessage)
standByLastLogged = time.Now()
}
}
Expand All @@ -195,7 +195,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
}

if waitingForCommit {
p.logger.Info(fmt.Sprintf(
logger.Info(fmt.Sprintf(
"[%s] commit received, returning currently accumulated records - %d",
p.flowJobName,
cdcRecordsStorage.Len()),
Expand All @@ -207,20 +207,20 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
// if we are past the next standby deadline (?)
if time.Now().After(nextStandbyMessageDeadline) {
if !cdcRecordsStorage.IsEmpty() {
p.logger.Info(fmt.Sprintf("standby deadline reached, have %d records", cdcRecordsStorage.Len()))
logger.Info(fmt.Sprintf("standby deadline reached, have %d records", cdcRecordsStorage.Len()))

if p.commitLock == nil {
p.logger.Info(
logger.Info(
fmt.Sprintf("no commit lock, returning currently accumulated records - %d",
cdcRecordsStorage.Len()))
return nil
} else {
p.logger.Info(fmt.Sprintf("commit lock, waiting for commit to return records - %d",
logger.Info(fmt.Sprintf("commit lock, waiting for commit to return records - %d",
cdcRecordsStorage.Len()))
waitingForCommit = true
}
} else {
p.logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait",
logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait",
p.flowJobName),
)
}
Expand All @@ -244,7 +244,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco

if err != nil && p.commitLock == nil {
if pgconn.Timeout(err) {
p.logger.Info(fmt.Sprintf("Stand-by deadline reached, returning currently accumulated records - %d",
logger.Info(fmt.Sprintf("Stand-by deadline reached, returning currently accumulated records - %d",
cdcRecordsStorage.Len()))
return nil
} else {
Expand All @@ -253,7 +253,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
}

if errMsg, ok := rawMsg.(*pgproto3.ErrorResponse); ok {
p.logger.Error(fmt.Sprintf("received Postgres WAL error: %+v", errMsg))
logger.Error(fmt.Sprintf("received Postgres WAL error: %+v", errMsg))
return fmt.Errorf("received Postgres WAL error: %+v", errMsg)
}

Expand All @@ -269,7 +269,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
return fmt.Errorf("ParsePrimaryKeepaliveMessage failed: %w", err)
}

p.logger.Debug(
logger.Debug(
fmt.Sprintf("Primary Keepalive Message => ServerWALEnd: %s ServerTime: %s ReplyRequested: %t",
pkm.ServerWALEnd, pkm.ServerTime, pkm.ReplyRequested))

Expand All @@ -287,7 +287,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
return fmt.Errorf("ParseXLogData failed: %w", err)
}

p.logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n",
logger.Debug(fmt.Sprintf("XLogData => WALStart %s ServerWALEnd %s ServerTime %s\n",
xld.WALStart, xld.ServerWALEnd, xld.ServerTime))
rec, err := p.processMessage(ctx, records, xld, clientXLogPos)
if err != nil {
Expand Down Expand Up @@ -395,7 +395,7 @@ func (p *PostgresCDCSource) PullRecords(ctx context.Context, req *model.PullReco
case *model.RelationRecord:
tableSchemaDelta := r.TableSchemaDelta
if len(tableSchemaDelta.AddedColumns) > 0 {
p.logger.Info(fmt.Sprintf("Detected schema change for table %s, addedColumns: %v",
logger.Info(fmt.Sprintf("Detected schema change for table %s, addedColumns: %v",
tableSchemaDelta.SrcTableName, tableSchemaDelta.AddedColumns))
records.AddSchemaDelta(req.TableNameMapping, tableSchemaDelta)
}
Expand Down Expand Up @@ -426,15 +426,16 @@ func (p *PostgresCDCSource) processMessage(
xld pglogrepl.XLogData,
currentClientXlogPos pglogrepl.LSN,
) (model.Record, error) {
logger := logger.LoggerFromCtx(ctx)
logicalMsg, err := pglogrepl.Parse(xld.WALData)
if err != nil {
return nil, fmt.Errorf("error parsing logical message: %w", err)
}

switch msg := logicalMsg.(type) {
case *pglogrepl.BeginMessage:
p.logger.Debug(fmt.Sprintf("BeginMessage => FinalLSN: %v, XID: %v", msg.FinalLSN, msg.Xid))
p.logger.Debug("Locking PullRecords at BeginMessage, awaiting CommitMessage")
logger.Debug(fmt.Sprintf("BeginMessage => FinalLSN: %v, XID: %v", msg.FinalLSN, msg.Xid))
logger.Debug("Locking PullRecords at BeginMessage, awaiting CommitMessage")
p.commitLock = msg
case *pglogrepl.InsertMessage:
return p.processInsertMessage(xld.WALStart, msg)
Expand All @@ -444,7 +445,7 @@ func (p *PostgresCDCSource) processMessage(
return p.processDeleteMessage(xld.WALStart, msg)
case *pglogrepl.CommitMessage:
// for a commit message, update the last checkpoint id for the record batch.
p.logger.Debug(fmt.Sprintf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v",
logger.Debug(fmt.Sprintf("CommitMessage => CommitLSN: %v, TransactionEndLSN: %v",
msg.CommitLSN, msg.TransactionEndLSN))
batch.UpdateLatestCheckpoint(int64(msg.CommitLSN))
p.commitLock = nil
Expand All @@ -456,13 +457,13 @@ func (p *PostgresCDCSource) processMessage(
return nil, nil
}

p.logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
logger.Debug(fmt.Sprintf("RelationMessage => RelationID: %d, Namespace: %s, RelationName: %s, Columns: %v",
msg.RelationID, msg.Namespace, msg.RelationName, msg.Columns))

return p.processRelationMessage(ctx, currentClientXlogPos, msg)

case *pglogrepl.TruncateMessage:
p.logger.Warn("TruncateMessage not supported")
logger.Warn("TruncateMessage not supported")
}

return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/lib/pq/oid"

"github.com/PeerDB-io/peer-flow/connectors/utils"
numeric "github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/shared"
)

Expand Down

0 comments on commit ac156ae

Please sign in to comment.