Skip to content

Commit

Permalink
Move RETL schedule config to destination subscription (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
deanhuynh authored Oct 25, 2024
1 parent e6e934f commit 93c85e7
Show file tree
Hide file tree
Showing 8 changed files with 557 additions and 45 deletions.
12 changes: 12 additions & 0 deletions docs/resources/destination_subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,20 @@ resource "segment_destination_subscription" "send_to_webhook" {
### Optional

- `model_id` (String) The unique identifier for the linked ReverseETLModel, if this part of a Reverse ETL connection.
- `reverse_etl_schedule` (Attributes) (Reverse ETL only) The schedule for the subscription being attached to ReverseETL model. (see [below for nested schema](#nestedatt--reverse_etl_schedule))

### Read-Only

- `action_slug` (String) The URL-friendly key for the associated Destination action.
- `id` (String) The unique identifier for the subscription.

<a id="nestedatt--reverse_etl_schedule"></a>
### Nested Schema for `reverse_etl_schedule`

Required:

- `strategy` (String) Strategy supports three modes: PERIODIC, SPECIFIC_DAYS, or MANUAL.

Optional:

- `config` (String) Configures the schedule for the subscription.
7 changes: 5 additions & 2 deletions docs/resources/reverse_etl_model.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,13 @@ resource "segment_reverse_etl_model" "example" {
- `name` (String) A short, human-readable description of the Model.
- `query` (String) The SQL query that will be executed to extract data from the connected Source.
- `query_identifier_column` (String) Indicates the column named in `query` that should be used to uniquely identify the extracted records.
- `schedule_config` (String) Depending on the chosen strategy, configures the schedule for this model.
- `schedule_strategy` (String) Determines the strategy used for triggering syncs, which will be used in conjunction with scheduleConfig.
- `source_id` (String) Indicates which Source to attach this model to.

### Optional

- `schedule_config` (String, Deprecated) Depending on the chosen strategy, configures the schedule for this model.
- `schedule_strategy` (String, Deprecated) Determines the strategy used for triggering syncs, which will be used in conjunction with scheduleConfig.

### Read-Only

- `id` (String) The unique identifier for the model.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/hashicorp/terraform-plugin-framework-validators v0.13.0
github.com/hashicorp/terraform-plugin-go v0.24.0
github.com/hashicorp/terraform-plugin-testing v1.10.0
github.com/segmentio/public-api-sdk-go v0.0.0-20240909200753-311bb8d791a2
github.com/segmentio/public-api-sdk-go v0.0.0-20241025180535-501a23c07559
gotest.tools/gotestsum v1.12.0
)

Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ github.com/russross/blackfriday v1.6.0 h1:KqfZb0pUVN2lYqZUYRddxF4OR8ZMURnJIG5Y3V
github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY=
github.com/segmentio/public-api-sdk-go v0.0.0-20240909200753-311bb8d791a2 h1:vlKTelJ32DPBuiiSx2PJaxN9jJd3OFK2avHU/XR/qB8=
github.com/segmentio/public-api-sdk-go v0.0.0-20240909200753-311bb8d791a2/go.mod h1:yKkoPfcOkkYjiZQj4lRWxji0Qwc6ncNEf7wCfywochY=
github.com/segmentio/public-api-sdk-go v0.0.0-20241017001201-fbbdab459db8 h1:pYJu97HA0FVdy+WCqQbS/baDzXxg2q0Vl+akD6SUBSI=
github.com/segmentio/public-api-sdk-go v0.0.0-20241017001201-fbbdab459db8/go.mod h1:yKkoPfcOkkYjiZQj4lRWxji0Qwc6ncNEf7wCfywochY=
github.com/segmentio/public-api-sdk-go v0.0.0-20241025180535-501a23c07559 h1:6jgXPksz5bEJUMbhp4biSIViye/Os2yfAOx9yy44e1g=
github.com/segmentio/public-api-sdk-go v0.0.0-20241025180535-501a23c07559/go.mod h1:yKkoPfcOkkYjiZQj4lRWxji0Qwc6ncNEf7wCfywochY=
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 h1:n661drycOFuPLCN3Uc8sB6B/s6Z4t2xvBgU1htSHuq8=
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3/go.mod h1:A0bzQcvG0E7Rwjx0REVgAGH58e96+X0MeOfepqsbeW4=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
Expand Down
203 changes: 195 additions & 8 deletions internal/provider/destination_subscription_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package provider

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/segmentio/terraform-provider-segment/internal/provider/docs"
"github.com/segmentio/terraform-provider-segment/internal/provider/models"

"github.com/hashicorp/terraform-plugin-framework-jsontypes/jsontypes"
"github.com/hashicorp/terraform-plugin-framework/diag"
"github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/resource"
"github.com/hashicorp/terraform-plugin-framework/resource/schema"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/planmodifier"
"github.com/hashicorp/terraform-plugin-framework/resource/schema/stringplanmodifier"
"github.com/hashicorp/terraform-plugin-framework/types/basetypes"

"github.com/segmentio/public-api-sdk-go/api"
)
Expand Down Expand Up @@ -91,12 +94,27 @@ func (r *destinationSubscriptionResource) Schema(_ context.Context, _ resource.S
Description: `The customer settings for action fields. Only settings included in the configuration will be managed by Terraform.`,
CustomType: jsontypes.NormalizedType{},
},
"reverse_etl_schedule": schema.SingleNestedAttribute{
Optional: true,
Description: "(Reverse ETL only) The schedule for the subscription being attached to ReverseETL model.",
Attributes: map[string]schema.Attribute{
"strategy": schema.StringAttribute{
Required: true,
Description: "Strategy supports three modes: PERIODIC, SPECIFIC_DAYS, or MANUAL.",
},
"config": schema.StringAttribute{
Optional: true,
Description: "Configures the schedule for the subscription.",
CustomType: jsontypes.NormalizedType{},
},
},
},
},
}
}

func (r *destinationSubscriptionResource) Create(ctx context.Context, req resource.CreateRequest, resp *resource.CreateResponse) {
var plan models.DestinationSubscriptionState
var plan models.DestinationSubscriptionPlan
diags := req.Plan.Get(ctx, &plan)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
Expand All @@ -110,6 +128,15 @@ func (r *destinationSubscriptionResource) Create(ctx context.Context, req resour
return
}

if !plan.ModelID.IsNull() && !plan.ModelID.IsUnknown() && (plan.ReverseETLSchedule.IsNull() || plan.ReverseETLSchedule.IsUnknown()) {
resp.Diagnostics.AddError(
"Reverse ETL model ID provided without reverse ETL schedule",
"Reverse ETL model ID must be provided with a reverse ETL schedule",
)

return
}

out, body, err := r.client.DestinationsAPI.CreateDestinationSubscription(r.authContext, plan.DestinationID.ValueString()).CreateDestinationSubscriptionAlphaInput(api.CreateDestinationSubscriptionAlphaInput{
Name: plan.Name.ValueString(),
ActionId: plan.ActionID.ValueString(),
Expand All @@ -130,9 +157,39 @@ func (r *destinationSubscriptionResource) Create(ctx context.Context, req resour
return
}

destinationSubscription := out.Data.GetDestinationSubscription()
resp.State.SetAttribute(ctx, path.Root("id"), out.Data.DestinationSubscription.Id)
resp.State.SetAttribute(ctx, path.Root("destination_id"), out.Data.DestinationSubscription.DestinationId)

reverseETLSchedule, diags := getSchedule(ctx, plan.ReverseETLSchedule)
if diags.HasError() {
resp.Diagnostics.Append(diags...)

return
}

updateOut, body, err := r.client.DestinationsAPI.UpdateSubscriptionForDestination(r.authContext, plan.DestinationID.ValueString(), out.Data.DestinationSubscription.Id).UpdateSubscriptionForDestinationAlphaInput(api.UpdateSubscriptionForDestinationAlphaInput{
Input: api.DestinationSubscriptionUpdateInput{
Name: plan.Name.ValueStringPointer(),
Trigger: plan.Trigger.ValueStringPointer(),
Enabled: plan.Enabled.ValueBoolPointer(),
Settings: settings,
ReverseETLModelId: plan.ModelID.ValueStringPointer(),
ReverseETLSchedule: reverseETLSchedule,
},
}).Execute()
if body != nil {
defer body.Body.Close()
}
if err != nil {
resp.Diagnostics.AddError(
"Unable to update Destination subscription",
getError(err, body),
)

return
}

resp.State.SetAttribute(ctx, path.Root("id"), destinationSubscription.Id)
destinationSubscription := updateOut.Data.Subscription

var state models.DestinationSubscriptionState
err = state.Fill(destinationSubscription)
Expand Down Expand Up @@ -203,7 +260,7 @@ func (r *destinationSubscriptionResource) Read(ctx context.Context, req resource
}

func (r *destinationSubscriptionResource) Update(ctx context.Context, req resource.UpdateRequest, resp *resource.UpdateResponse) {
var plan models.DestinationSubscriptionState
var plan models.DestinationSubscriptionPlan
diags := req.Plan.Get(ctx, &plan)
resp.Diagnostics.Append(diags...)
if resp.Diagnostics.HasError() {
Expand All @@ -224,12 +281,30 @@ func (r *destinationSubscriptionResource) Update(ctx context.Context, req resour
return
}

if !plan.ModelID.IsNull() && !plan.ModelID.IsUnknown() && (plan.ReverseETLSchedule.IsNull() || plan.ReverseETLSchedule.IsUnknown()) {
resp.Diagnostics.AddError(
"Reverse ETL model ID provided without reverse ETL schedule",
"Reverse ETL model ID must be provided with a reverse ETL schedule",
)

return
}

reverseETLSchedule, diags := getSchedule(ctx, plan.ReverseETLSchedule)
if diags.HasError() {
resp.Diagnostics.Append(diags...)

return
}

out, body, err := r.client.DestinationsAPI.UpdateSubscriptionForDestination(r.authContext, state.DestinationID.ValueString(), state.ID.ValueString()).UpdateSubscriptionForDestinationAlphaInput(api.UpdateSubscriptionForDestinationAlphaInput{
Input: api.DestinationSubscriptionUpdateInput{
Name: plan.Name.ValueStringPointer(),
Trigger: plan.Trigger.ValueStringPointer(),
Enabled: plan.Enabled.ValueBoolPointer(),
Settings: settings,
Name: plan.Name.ValueStringPointer(),
Trigger: plan.Trigger.ValueStringPointer(),
Enabled: plan.Enabled.ValueBoolPointer(),
Settings: settings,
ReverseETLModelId: plan.ModelID.ValueStringPointer(),
ReverseETLSchedule: reverseETLSchedule,
},
}).Execute()
if body != nil {
Expand Down Expand Up @@ -320,3 +395,115 @@ func (r *destinationSubscriptionResource) Configure(_ context.Context, req resou
r.client = config.client
r.authContext = config.authContext
}

func getSchedule(ctx context.Context, planSchedule basetypes.ObjectValue) (*api.ReverseEtlScheduleDefinition, diag.Diagnostics) {
var reverseETLSchedule *api.ReverseEtlScheduleDefinition
var diags diag.Diagnostics
if !planSchedule.IsNull() && !planSchedule.IsUnknown() {
reverseETLSchedule = &api.ReverseEtlScheduleDefinition{}

wrappedReverseETLModelScheduleStrategy, err := planSchedule.Attributes()["strategy"].ToTerraformValue(ctx)
if err != nil {
diags.AddError(
"Unable to decode reverse ETL schedule strategy",
err.Error(),
)

return nil, diags
}

var reverseETLModelScheduleStrategy string
err = wrappedReverseETLModelScheduleStrategy.As(&reverseETLModelScheduleStrategy)
if err != nil {
diags.AddError(
"Unable to decode reverse ETL schedule strategy",
err.Error(),
)

return nil, diags
}

reverseETLSchedule.Strategy = reverseETLModelScheduleStrategy

wrappedReverseETLModelScheduleConfig, err := planSchedule.Attributes()["config"].ToTerraformValue(ctx)
if err != nil {
diags.AddError(
"Unable to decode reverse ETL schedule config",
err.Error(),
)

return nil, diags
}

if !wrappedReverseETLModelScheduleConfig.IsNull() && wrappedReverseETLModelScheduleConfig.IsKnown() {
if reverseETLSchedule.Strategy == "PERIODIC" {
reverseETLModelScheduleConfig := api.ReverseEtlPeriodicScheduleConfig{}
var config string
err = wrappedReverseETLModelScheduleConfig.As(&config)
if err != nil {
diags.AddError(
"Unable to decode reverse ETL schedule config",
err.Error(),
)

return nil, diags
}

err = json.Unmarshal([]byte(config), &reverseETLModelScheduleConfig)
if err != nil {
diags.AddError(
"Unable to decode reverse ETL schedule config",
err.Error(),
)

return nil, diags
}

reverseETLSchedule.Config = *api.NewNullableConfig(&api.Config{
ReverseEtlPeriodicScheduleConfig: &reverseETLModelScheduleConfig,
})
} else if reverseETLSchedule.Strategy == "SPECIFIC_DAYS" {
reverseETLModelScheduleConfig := api.ReverseEtlSpecificTimeScheduleConfig{}
var config string
err = wrappedReverseETLModelScheduleConfig.As(&config)
if err != nil {
diags.AddError(
"Unable to decode reverse ETL schedule config",
err.Error(),
)

return nil, diags
}

err = json.Unmarshal([]byte(config), &reverseETLModelScheduleConfig)
if err != nil {
diags.AddError(
"Unable to decode reverse ETL schedule config",
err.Error(),
)

return nil, diags
}

reverseETLSchedule.Config = *api.NewNullableConfig(&api.Config{
ReverseEtlSpecificTimeScheduleConfig: &reverseETLModelScheduleConfig,
})
} else if reverseETLSchedule.Strategy == "MANUAL" {
diags.AddError(
"Manual reverse ETL schedule strategy does not require a config",
"Manual reverse ETL schedule strategy does not require a config",
)
reverseETLSchedule.Config = *api.NewNullableConfig(nil)
} else {
diags.AddError(
"Unsupported reverse ETL schedule strategy",
fmt.Sprintf("Strategy %q is not supported", reverseETLSchedule.Strategy),
)

return nil, diags
}
}
}

return reverseETLSchedule, diags
}
Loading

0 comments on commit 93c85e7

Please sign in to comment.