Skip to content

Commit

Permalink
feat: Add support for pubsub for Dead Letter queue (#897)
Browse files Browse the repository at this point in the history
* changes

* change

* linting

* docs and tests

* ui

* changes

* chages

* change

* changes
  • Loading branch information
asthamohta authored Sep 6, 2024
1 parent 07cff77 commit ae5535a
Show file tree
Hide file tree
Showing 25 changed files with 149 additions and 48 deletions.
21 changes: 12 additions & 9 deletions accessors/dataflow/dataflow_accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ func TestMain(m *testing.M) {

func getParameters() map[string]string {
return map[string]string{
"streamName": "my-stream",
"instanceId": "my-instance",
"databaseId": "my-dbName",
"sessionFilePath": "gs://session.json",
"deadLetterQueueDirectory": "gs://dlq",
"transformationContextFilePath": "gs://transformationContext.json",
"gcsPubSubSubscription": "projects/my-project/subscriptions/my-subscription",
"streamName": "my-stream",
"instanceId": "my-instance",
"databaseId": "my-dbName",
"sessionFilePath": "gs://session.json",
"deadLetterQueueDirectory": "gs://dlq",
"transformationContextFilePath": "gs://transformationContext.json",
"dlqGcsPubSubSubscription": "projects/my-project/subscriptions/my-dlq-subscription",
"gcsPubSubSubscription": "projects/my-project/subscriptions/my-subscription",
}
}

Expand Down Expand Up @@ -105,6 +106,7 @@ func getExpectedGcloudCmd1() string {
"--dataflow-kms-key sample-kms-key --disable-public-ips " +
"--enable-streaming-engine " +
"--parameters databaseId=my-dbName,deadLetterQueueDirectory=gs://dlq," +
"dlqGcsPubSubSubscription=projects/my-project/subscriptions/my-dlq-subscription," +
"gcsPubSubSubscription=projects/my-project/subscriptions/my-subscription," +
"instanceId=my-instance,sessionFilePath=gs://session.json,streamName=my-stream," +
"transformationContextFilePath=gs://transformationContext.json"
Expand Down Expand Up @@ -141,8 +143,8 @@ func getTemplateDfRequest2() *dataflowpb.LaunchFlexTemplateRequest {
}

func getExpectedGcloudCmd2() string {
return ""+
"gcloud dataflow flex-template run test-job " +
return "" +
"gcloud dataflow flex-template run test-job " +
"--project=test-project --region=us-central1 " +
"--template-file-gcs-location=gs://template/Cloud_Datastream_to_Spanner " +
"--num-workers 10 --max-workers 50 --service-account-email [email protected] " +
Expand All @@ -153,6 +155,7 @@ func getExpectedGcloudCmd2() string {
"--worker-zone test-worker-zone --enable-streaming-engine " +
"--flexrs-goal FLEXRS_SPEED_OPTIMIZED --staging-location gs://staging-location " +
"--parameters databaseId=my-dbName,deadLetterQueueDirectory=gs://dlq," +
"dlqGcsPubSubSubscription=projects/my-project/subscriptions/my-dlq-subscription," +
"gcsPubSubSubscription=projects/my-project/subscriptions/my-subscription," +
"instanceId=my-instance,sessionFilePath=gs://session.json,streamName=my-stream," +
"transformationContextFilePath=gs://transformationContext.json"
Expand Down
23 changes: 15 additions & 8 deletions common/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,17 @@ const (
//Migration types
MINIMAL_DOWNTIME_MIGRATION = "minimal_downtime"
//Job Resource Types
DATAFLOW_RESOURCE string = "dataflow"
PUBSUB_RESOURCE string = "pubsub"
PUBSUB_TOPIC_RESOURCE string = "pubsub_topic"
PUBSUB_SUB_RESOURCE string = "pubsub_sub"
MONITORING_RESOURCE string = "monitoring"
AGG_MONITORING_RESOURCE string = "aggregated_monitoring"
DATASTREAM_RESOURCE string = "datastream"
GCS_RESOURCE string = "gcs"
DATAFLOW_RESOURCE string = "dataflow"
PUBSUB_RESOURCE string = "pubsub"
DLQ_PUBSUB_RESOURCE string = "dlq_pubsub"
PUBSUB_TOPIC_RESOURCE string = "pubsub_topic"
DLQ_PUBSUB_TOPIC_RESOURCE string = "dlq_pubsub_topic"
PUBSUB_SUB_RESOURCE string = "pubsub_sub"
DLQ_PUBSUB_SUB_RESOURCE string = "dlq_pubsub_sub"
MONITORING_RESOURCE string = "monitoring"
AGG_MONITORING_RESOURCE string = "aggregated_monitoring"
DATASTREAM_RESOURCE string = "datastream"
GCS_RESOURCE string = "gcs"
// Metadata table names
SMT_JOB_TABLE string = "SMT_JOB"
SMT_RESOURCE_TABLE string = "SMT_RESOURCE"
Expand All @@ -114,4 +117,8 @@ const (
FK_SET_DEFAULT string = "SET DEFAULT"
FK_SET_NULL string = "SET NULL"
FK_RESTRICT string = "RESTRICT"

// GCS PUBSUB MODES
REGULAR_GCS string = "data"
DLQ_GCS string = "dlq"
)
2 changes: 1 addition & 1 deletion conversion/conversion_from_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (sads *DataFromSourceImpl) dataFromDatabase(ctx context.Context, migrationP
streamingCfg, _ := streamInfo["streamingCfg"].(streaming.StreamingCfg)
// Fetch and store the GCS bucket associated with the datastream
dsClient := GetDatastreamClient(ctx)
gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig)
gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, "data")
if fetchGcsErr != nil {
logger.Log.Info("Could not fetch GCS Bucket, hence Monitoring Dashboard will not contain Metrics for the gcs bucket\n")
logger.Log.Debug("Error", zap.Error(fetchGcsErr))
Expand Down
10 changes: 8 additions & 2 deletions conversion/data_from_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
datastream_accessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/datastream"
spanneraccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/spanner"
storageaccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/storage"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/metrics"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
Expand Down Expand Up @@ -136,11 +137,16 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(migrationPr
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
fmt.Printf("Initiating migration for shard: %v\n", p.DataShardId)
pubsubCfg, err := streaming.CreatePubsubResources(ctx, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname)
pubsubCfg, err := streaming.CreatePubsubResources(ctx, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname, constants.REGULAR_GCS)
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.PubsubCfg = *pubsubCfg
dlqPubsubCfg, err := streaming.CreatePubsubResources(ctx, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, targetProfile.Conn.Sp.Dbname, constants.DLQ_GCS)
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
}
streamingCfg.DlqPubsubCfg = *dlqPubsubCfg
err = streaming.LaunchStream(ctx, sourceProfile, p.LogicalShards, migrationProjectId, streamingCfg.DatastreamCfg)
if err != nil {
return common.TaskResult[*profiles.DataShard]{Result: p, Err: err}
Expand All @@ -154,7 +160,7 @@ func (dd *DataFromDatabaseImpl) dataFromDatabaseForDataflowMigration(migrationPr

// Fetch and store the GCS bucket associated with the datastream
dsClient := GetDatastreamClient(ctx)
gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig)
gcsBucket, gcsDestPrefix, fetchGcsErr := streaming.FetchTargetBucketAndPath(ctx, dsClient, migrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, "data")
if fetchGcsErr != nil {
logger.Log.Info(fmt.Sprintf("Could not fetch GCS Bucket for Shard %s hence Monitoring Dashboard will not contain Metrics for the gcs bucket\n", p.DataShardId))
logger.Log.Debug("Error", zap.Error(fetchGcsErr))
Expand Down
4 changes: 3 additions & 1 deletion docs/troubleshoot/minimal.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ gcloud dataflow flex-template run <jobName> \
--template-file-gcs-location=gs://dataflow-templates-southamerica-west1/2023-09-12-00_RC00/flex/Cloud_Datastream_to_Spanner \
--num-workers 1 --max-workers 50 \
--enable-streaming-engine \
--parameters databaseId=<database id>,deadLetterQueueDirectory=<GCS location of the DLQ directory>,gcsPubSubSubscription=<pubsub subscription being used in a gcs notification policy>,instanceId=<spanner-instance-id>,sessionFilePath=<GCS location of the session json>,streamName=<data stream name>,transformationContextFilePath=<path to transformation context json>
--parameters databaseId=<database id>,deadLetterQueueDirectory=<GCS location of the DLQ directory>,gcsPubSubSubscription=<pubsub subscription being used in a gcs notification policy>,dlqGcsPubSubSubscription=<pubsub subscription being used in a dlq gcs notification policy>,instanceId=<spanner-instance-id>,sessionFilePath=<GCS location of the session json>,streamName=<data stream name>,transformationContextFilePath=<path to transformation context json>
```

Expand All @@ -121,6 +121,7 @@ gcloud dataflow flex-template run <jobname> \
--additional-experiments=use_runner_v2 \
--parameters gcsPubSubSubscription=<pubsub subscription being used in a gcs notification policy>,streamName=<Datastream name>, \
instanceId=<Spanner Instance Id>,databaseId=<Spanner Database Id>,sessionFilePath=<GCS path to session file>, \
dlqGcsPubSubSubscription=<pubsub subscription being used in a dlq gcs notification policy>, \
deadLetterQueueDirectory=<GCS path to the DLQ>,runMode=retryDLQ
```

Expand All @@ -134,4 +135,5 @@ instanceId
databaseId
sessionFilePath
deadLetterQueueDirectory
dlqGcsPubSubSubscription
```
2 changes: 2 additions & 0 deletions internal/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ type MonitoringResources struct {
type ShardResources struct {
DatastreamResources DatastreamResources
PubsubResources PubsubResources
DlqPubsubResources PubsubResources
DataflowResources DataflowResources
GcsResources GcsResources
MonitoringResources MonitoringResources
Expand All @@ -246,6 +247,7 @@ type streamingStats struct {
DatastreamResources DatastreamResources
DataflowResources DataflowResources
PubsubResources PubsubResources
DlqPubsubResources PubsubResources
GcsResources GcsResources
MonitoringResources MonitoringResources
ShardToShardResourcesMap map[string]ShardResources
Expand Down
7 changes: 6 additions & 1 deletion sources/mysql/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,16 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Mysql.Db)
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Mysql.Db, constants.REGULAR_GCS)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
dlqPubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Mysql.Db, constants.DLQ_GCS)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.DlqPubsubCfg = *dlqPubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, isi.MigrationProjectId, streamingCfg, isi.SourceProfile, isi.TargetProfile, schemaDetails)
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
Expand Down
8 changes: 7 additions & 1 deletion sources/oracle/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

sp "cloud.google.com/go/spanner"

"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
"github.com/GoogleCloudPlatform/spanner-migration-tool/schema"
Expand Down Expand Up @@ -438,11 +439,16 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Oracle.User)
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Oracle.User, constants.REGULAR_GCS)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
dlqPubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.SourceProfile.Conn.Oracle.User, constants.DLQ_GCS)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.DlqPubsubCfg = *dlqPubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, isi.MigrationProjectId, streamingCfg, isi.SourceProfile, isi.TargetProfile, schemaDetails)
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
Expand Down
8 changes: 7 additions & 1 deletion sources/postgres/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
sp "cloud.google.com/go/spanner"
_ "github.com/lib/pq" // we will use database/sql package instead of using this package directly

"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
"github.com/GoogleCloudPlatform/spanner-migration-tool/schema"
Expand Down Expand Up @@ -75,11 +76,16 @@ func (isi InfoSchemaImpl) StartChangeDataCapture(ctx context.Context, conv *inte
if err != nil {
return nil, fmt.Errorf("error reading streaming config: %v", err)
}
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.TargetProfile.Conn.Sp.Dbname)
pubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.TargetProfile.Conn.Sp.Dbname, constants.REGULAR_GCS)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.PubsubCfg = *pubsubCfg
dlqPubsubCfg, err := streaming.CreatePubsubResources(ctx, isi.MigrationProjectId, streamingCfg.DatastreamCfg.DestinationConnectionConfig, isi.TargetProfile.Conn.Sp.Dbname, constants.DLQ_GCS)
if err != nil {
return nil, fmt.Errorf("error creating pubsub resources: %v", err)
}
streamingCfg.DlqPubsubCfg = *dlqPubsubCfg
streamingCfg, err = streaming.StartDatastream(ctx, isi.MigrationProjectId, streamingCfg, isi.SourceProfile, isi.TargetProfile, schemaDetails)
if err != nil {
err = fmt.Errorf("error starting datastream: %v", err)
Expand Down
5 changes: 5 additions & 0 deletions streaming/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ func InitiateJobCleanup(ctx context.Context, migrationJobId string, dataShardIds
if err != nil {
logger.Log.Debug(fmt.Sprintf("Unable to fetch pubsub resources for jobId: %s: %v\n", migrationJobId, err))
}
dlqPubSubList, err := FetchResources(ctx, migrationJobId, constants.DLQ_PUBSUB_RESOURCE, dataShardIds, spannerProjectId, instance)
if err != nil {
logger.Log.Debug(fmt.Sprintf("Unable to fetch pubsub resources for jobId: %s: %v\n", migrationJobId, err))
}
pubsubResourcesList = append(pubsubResourcesList, dlqPubSubList...)
//cleanup
for _, resources := range pubsubResourcesList {
var pubsubResources internal.PubsubResources
Expand Down
15 changes: 12 additions & 3 deletions streaming/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func PersistResources(ctx context.Context, targetProfile profiles.TargetProfile,
err = fmt.Errorf("can't create database client: %v", err)
return err
}
err = writeJobResources(ctx, migrationJobId, dataShardId, conv.Audit.StreamingStats.DataflowResources, conv.Audit.StreamingStats.DatastreamResources, conv.Audit.StreamingStats.GcsResources, conv.Audit.StreamingStats.PubsubResources, conv.Audit.StreamingStats.MonitoringResources, time.Now(), client)
err = writeJobResources(ctx, migrationJobId, dataShardId, conv.Audit.StreamingStats.DataflowResources, conv.Audit.StreamingStats.DatastreamResources, conv.Audit.StreamingStats.GcsResources, conv.Audit.StreamingStats.PubsubResources, conv.Audit.StreamingStats.DlqPubsubResources, conv.Audit.StreamingStats.MonitoringResources, time.Now(), client)
if err != nil {
err = fmt.Errorf("can't store generated resources for datashard: %v", err)
return err
Expand Down Expand Up @@ -156,7 +156,7 @@ func writeJobDetails(ctx context.Context, migrationJobId string, isShardedMigrat
return nil
}

func writeJobResources(ctx context.Context, migrationJobId string, dataShardId string, dataflowResources internal.DataflowResources, datastreamResources internal.DatastreamResources, gcsResources internal.GcsResources, pubsubResources internal.PubsubResources, monitoringResources internal.MonitoringResources, createTimestamp time.Time, client *spanner.Client) error {
func writeJobResources(ctx context.Context, migrationJobId string, dataShardId string, dataflowResources internal.DataflowResources, datastreamResources internal.DatastreamResources, gcsResources internal.GcsResources, pubsubResources internal.PubsubResources, dlqPubsubResources internal.PubsubResources, monitoringResources internal.MonitoringResources, createTimestamp time.Time, client *spanner.Client) error {
datastreamResourcesBytes, err := json.Marshal(datastreamResources)
if err != nil {
logger.Log.Error(fmt.Sprintf("can't marshal datastream resources for data shard %s: %v\n", dataShardId, err))
Expand All @@ -177,6 +177,11 @@ func writeJobResources(ctx context.Context, migrationJobId string, dataShardId s
logger.Log.Error(fmt.Sprintf("can't marshal pubsub resources for data shard %s: %v\n", dataShardId, err))
return err
}
dlqPubsubResourcesBytes, err := json.Marshal(dlqPubsubResources)
if err != nil {
logger.Log.Error(fmt.Sprintf("can't marshal pubsub resources for data shard %s: %v\n", dataShardId, err))
return err
}
monitoringResourcesBytes, err := json.Marshal(monitoringResources)
if err != nil {
logger.Log.Error(fmt.Sprintf("can't marshal monitoring resources for data shard %s: %v\n", dataShardId, err))
Expand All @@ -200,11 +205,15 @@ func writeJobResources(ctx context.Context, migrationJobId string, dataShardId s
if errr != nil {
return errr
}
dlqPubsubMutation, errr := createResourceMutation(migrationJobId, dlqPubsubResources.TopicId, constants.DLQ_PUBSUB_RESOURCE, dlqPubsubResources.TopicId, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(dlqPubsubResourcesBytes)})
if errr != nil {
return errr
}
monitoringMutation, err := createResourceMutation(migrationJobId, monitoringResources.DashboardName, constants.MONITORING_RESOURCE, monitoringResources.DashboardName, MinimalDowntimeResourceData{DataShardId: dataShardId, ResourcePayload: string(monitoringResourcesBytes)})
if err != nil {
return err
}
err = txn.BufferWrite([]*spanner.Mutation{datastreamMutation, dataflowMutation, gcsMutation, pubsubMutation, monitoringMutation})
err = txn.BufferWrite([]*spanner.Mutation{datastreamMutation, dataflowMutation, gcsMutation, pubsubMutation, dlqPubsubMutation, monitoringMutation})
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit ae5535a

Please sign in to comment.