From 9d4a9d20bc257699596b94293ca696d546f24149 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 6 Nov 2024 00:55:49 +0530 Subject: [PATCH 1/5] setup and snapshot info logs --- flow/activities/flowable.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index cc09bae0d7..bcd5d4f4f9 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -265,15 +265,19 @@ func (a *FlowableActivity) CreateNormalizedTable( numTablesSetup.Add(1) if !existing { logger.Info("created table " + tableIdentifier) + a.Alerter.LogFlowInfo(ctx, config.FlowName, "created table "+tableIdentifier+" in destination") } else { logger.Info("table already exists " + tableIdentifier) } + } if err := conn.FinishSetupNormalizedTables(ctx, tx); err != nil { return nil, fmt.Errorf("failed to commit normalized tables tx: %w", err) } + a.Alerter.LogFlowInfo(ctx, config.FlowName, "All destination tables have been setup") + return &protos.SetupNormalizedTableBatchOutput{ TableExistsMapping: tableExistsMapping, }, nil @@ -576,6 +580,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, } } + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, fmt.Sprintf("replicated all rows for table %s", config.DestinationTableIdentifier)) return nil } From e880f507c6023024daa58249ac16833fd2876e5d Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 6 Nov 2024 01:40:31 +0530 Subject: [PATCH 2/5] minor fix --- flow/activities/flowable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index bcd5d4f4f9..d8dc199142 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -580,7 +580,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, } } - a.Alerter.LogFlowInfo(ctx, config.FlowJobName, fmt.Sprintf("replicated all rows for table %s", config.DestinationTableIdentifier)) + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, fmt.Sprintf("replicated all rows to destination for table %s", config.DestinationTableIdentifier)) return nil } From 006f9006e8e2d0e539eb47378a06e9ef42b70f00 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 6 Nov 2024 02:11:14 +0530 Subject: [PATCH 3/5] cdc log --- flow/activities/flowable.go | 14 ++++++++++++++ flow/activities/flowable_core.go | 7 ++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index d8dc199142..97b6531053 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -513,6 +513,8 @@ func (a *FlowableActivity) GetQRepPartitions(ctx context.Context, } } + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, fmt.Sprintf("obtained partitions for table %s", config.WatermarkTable)) + return &protos.QRepParitionResult{ Partitions: partitions, }, nil @@ -642,6 +644,8 @@ func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropF return pullCleanupErr } + a.Alerter.LogFlowInfo(ctx, req.FlowJobName, "Cleaned up source peer replication objects. Any replication slot or publication created by PeerDB has been removed.") + return nil } @@ -661,6 +665,8 @@ func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos. return syncFlowCleanupErr } + a.Alerter.LogFlowInfo(ctx, req.FlowJobName, "Cleaned up destination peer replication objects. Any PeerDB metadata storage has been dropped.") + return nil } @@ -905,6 +911,8 @@ func (a *FlowableActivity) RenameTables(ctx context.Context, config *protos.Rena } } + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Resync completed for all tables") + return renameOutput, tx.Commit(ctx) } @@ -978,6 +986,9 @@ func (a *FlowableActivity) AddTablesToPublication(ctx context.Context, cfg *prot if err != nil { a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) } + + a.Alerter.LogFlowInfo(ctx, cfg.FlowJobName, fmt.Sprintf("ensured %d tables exist in publication %s", + len(additionalTableMappings), cfg.PublicationName)) return err } @@ -1001,6 +1012,9 @@ func (a *FlowableActivity) RemoveTablesFromPublication( if err != nil { a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) } + + a.Alerter.LogFlowInfo(ctx, cfg.FlowJobName, fmt.Sprintf("removed %d tables from publication %s", + len(removedTablesMapping), cfg.PublicationName)) return err } diff --git a/flow/activities/flowable_core.go b/flow/activities/flowable_core.go index db04efea30..64e6494caf 100644 --- a/flow/activities/flowable_core.go +++ b/flow/activities/flowable_core.go @@ -228,6 +228,11 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon return nil, fmt.Errorf("failed to sync schema: %w", err) } + numberOfSchemaChanges := len(recordBatchSync.SchemaDeltas) + if numberOfSchemaChanges > 0 { + a.Alerter.LogFlowInfo(ctx, flowName, fmt.Sprintf("synced %d schema changes from source to destination", numberOfSchemaChanges)) + } + return &model.SyncCompositeResponse{ SyncResponse: &model.SyncResponse{ CurrentSyncBatchID: -1, @@ -321,7 +326,7 @@ func syncCore[TPull connectors.CDCPullConnectorCore, TSync connectors.CDCSyncCon } } - pushedRecordsWithCount := fmt.Sprintf("pushed %d records", numRecords) + pushedRecordsWithCount := fmt.Sprintf("pushed %d records into intermediate storage", numRecords) activity.RecordHeartbeat(ctx, pushedRecordsWithCount) a.Alerter.LogFlowInfo(ctx, flowName, pushedRecordsWithCount) From 39ae5cefb44c5abd8ec51dd5377430800bda9d7f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 6 Nov 2024 02:16:05 +0530 Subject: [PATCH 4/5] minor --- flow/activities/flowable.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 97b6531053..b70ccbe480 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -582,7 +582,7 @@ func (a *FlowableActivity) ReplicateQRepPartitions(ctx context.Context, } } - a.Alerter.LogFlowInfo(ctx, config.FlowJobName, fmt.Sprintf("replicated all rows to destination for table %s", config.DestinationTableIdentifier)) + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "replicated all rows to destination for table "+config.DestinationTableIdentifier) return nil } From 3f8bd7e0ec7160ac003b8a38d6b4a3aa59878d0f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 6 Nov 2024 02:21:09 +0530 Subject: [PATCH 5/5] more setup and snapshot --- flow/activities/flowable.go | 2 ++ flow/activities/snapshot_activity.go | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index b70ccbe480..f6d785e434 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -219,6 +219,7 @@ func (a *FlowableActivity) CreateNormalizedTable( ) (*protos.SetupNormalizedTableBatchOutput, error) { logger := activity.GetLogger(ctx) ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) + a.Alerter.LogFlowInfo(ctx, config.FlowName, "Setting up destination tables") conn, err := connectors.GetByNameAs[connectors.NormalizedTablesConnector](ctx, config.Env, a.CatalogPool, config.PeerName) if err != nil { if errors.Is(err, errors.ErrUnsupported) { @@ -247,6 +248,7 @@ func (a *FlowableActivity) CreateNormalizedTable( }) defer shutdown() + a.Alerter.LogFlowInfo(ctx, config.FlowName, "Setting up destination tables") tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping)) for tableIdentifier, tableSchema := range tableNameSchemaMapping { existing, err := conn.SetupNormalizedTable( diff --git a/flow/activities/snapshot_activity.go b/flow/activities/snapshot_activity.go index 01c9e748e6..5993bfe463 100644 --- a/flow/activities/snapshot_activity.go +++ b/flow/activities/snapshot_activity.go @@ -58,7 +58,7 @@ func (a *SnapshotActivity) SetupReplication( ) (*protos.SetupReplicationOutput, error) { ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) logger := activity.GetLogger(ctx) - + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Setting up replication slot and publication") a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job") conn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, nil, a.CatalogPool, config.PeerName) @@ -98,6 +98,8 @@ func (a *SnapshotActivity) SetupReplication( connector: conn, } + a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Replication slot and publication setup complete") + return &protos.SetupReplicationOutput{ SlotName: slotInfo.SlotName, SnapshotName: slotInfo.SnapshotName,