-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add more information logs for UI logs table #2220
base: main
Are you sure you want to change the base?
Changes from all commits
9d4a9d2
e880f50
006f900
42b50d2
39ae5ce
3f8bd7e
f648453
a9a0e1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -218,6 +218,7 @@ | |
) (*protos.SetupNormalizedTableBatchOutput, error) { | ||
logger := activity.GetLogger(ctx) | ||
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowName) | ||
a.Alerter.LogFlowInfo(ctx, config.FlowName, "Setting up destination tables") | ||
conn, err := connectors.GetByNameAs[connectors.NormalizedTablesConnector](ctx, config.Env, a.CatalogPool, config.PeerName) | ||
if err != nil { | ||
if errors.Is(err, errors.ErrUnsupported) { | ||
|
@@ -246,6 +247,7 @@ | |
}) | ||
defer shutdown() | ||
|
||
a.Alerter.LogFlowInfo(ctx, config.FlowName, "Setting up destination tables") | ||
tableExistsMapping := make(map[string]bool, len(tableNameSchemaMapping)) | ||
for tableIdentifier, tableSchema := range tableNameSchemaMapping { | ||
existing, err := conn.SetupNormalizedTable( | ||
|
@@ -264,15 +266,19 @@ | |
numTablesSetup.Add(1) | ||
if !existing { | ||
logger.Info("created table " + tableIdentifier) | ||
a.Alerter.LogFlowInfo(ctx, config.FlowName, "created table "+tableIdentifier+" in destination") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be too noisy for a large number of tables |
||
} else { | ||
logger.Info("table already exists " + tableIdentifier) | ||
} | ||
|
||
} | ||
|
||
if err := conn.FinishSetupNormalizedTables(ctx, tx); err != nil { | ||
return nil, fmt.Errorf("failed to commit normalized tables tx: %w", err) | ||
} | ||
|
||
a.Alerter.LogFlowInfo(ctx, config.FlowName, "All destination tables have been setup") | ||
|
||
return &protos.SetupNormalizedTableBatchOutput{ | ||
TableExistsMapping: tableExistsMapping, | ||
}, nil | ||
|
@@ -510,6 +516,8 @@ | |
} | ||
} | ||
|
||
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, fmt.Sprintf("obtained partitions for table %s", config.WatermarkTable)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe just log instead of sending an alert? (too noisy) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These are not alerts. This is for our logs table in UI There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My bad, but still do we want to do it for all tables? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah because we have scarcity of user facing info during initial load - common feedback from customers cc @iskakaushik |
||
|
||
return &protos.QRepParitionResult{ | ||
Partitions: partitions, | ||
}, nil | ||
|
@@ -577,6 +585,7 @@ | |
} | ||
} | ||
|
||
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "replicated all rows to destination for table "+config.DestinationTableIdentifier) | ||
return nil | ||
} | ||
|
||
|
@@ -638,6 +647,8 @@ | |
return pullCleanupErr | ||
} | ||
|
||
a.Alerter.LogFlowInfo(ctx, req.FlowJobName, "Cleaned up source peer replication objects. Any replication slot or publication created by PeerDB has been removed.") | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -657,6 +668,8 @@ | |
return syncFlowCleanupErr | ||
} | ||
|
||
a.Alerter.LogFlowInfo(ctx, req.FlowJobName, "Cleaned up destination peer replication objects. Any PeerDB metadata storage has been dropped.") | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -897,6 +910,8 @@ | |
} | ||
} | ||
|
||
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Resync completed for all tables") | ||
|
||
return renameOutput, tx.Commit(ctx) | ||
} | ||
|
||
|
@@ -970,6 +985,9 @@ | |
if err != nil { | ||
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) | ||
} | ||
|
||
a.Alerter.LogFlowInfo(ctx, cfg.FlowJobName, fmt.Sprintf("ensured %d tables exist in publication %s", | ||
len(additionalTableMappings), cfg.PublicationName)) | ||
return err | ||
} | ||
|
||
|
@@ -993,6 +1011,9 @@ | |
if err != nil { | ||
a.Alerter.LogFlowError(ctx, cfg.FlowJobName, err) | ||
} | ||
|
||
a.Alerter.LogFlowInfo(ctx, cfg.FlowJobName, fmt.Sprintf("removed %d tables from publication %s", | ||
len(removedTablesMapping), cfg.PublicationName)) | ||
return err | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -58,7 +58,7 @@ func (a *SnapshotActivity) SetupReplication( | |||
) (*protos.SetupReplicationOutput, error) { | ||||
ctx = context.WithValue(ctx, shared.FlowNameKey, config.FlowJobName) | ||||
logger := activity.GetLogger(ctx) | ||||
|
||||
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Setting up replication slot and publication") | ||||
a.Alerter.LogFlowEvent(ctx, config.FlowJobName, "Started Snapshot Flow Job") | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for alerting, we need this |
||||
|
||||
conn, err := connectors.GetByNameAs[*connpostgres.PostgresConnector](ctx, nil, a.CatalogPool, config.PeerName) | ||||
|
@@ -98,6 +98,8 @@ func (a *SnapshotActivity) SetupReplication( | |||
connector: conn, | ||||
} | ||||
|
||||
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Replication slot and publication setup complete") | ||||
|
||||
return &protos.SetupReplicationOutput{ | ||||
SlotName: slotInfo.SlotName, | ||||
SnapshotName: slotInfo.SnapshotName, | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we have this twice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think it's meant to say conn.StartupSetupNormalizedTables has created tables, beginning ingestion. Wording needs to be updated