From 4094accb928bca1e84b73715635fef77b9c9af22 Mon Sep 17 00:00:00 2001 From: Jens Neuse Date: Thu, 27 Jul 2023 16:28:50 +0200 Subject: [PATCH] feat: allow data source to configure process response for subscriptions (#564) --- .../graphql_datasource/graphql_datasource.go | 4 ++ .../graphql_datasource_test.go | 2 + pkg/engine/plan/plan.go | 8 ++- pkg/engine/resolve/resolve.go | 56 ++++++++++++++----- pkg/engine/resolve/resolve_test.go | 3 +- 5 files changed, 56 insertions(+), 17 deletions(-) diff --git a/pkg/engine/datasource/graphql_datasource/graphql_datasource.go b/pkg/engine/datasource/graphql_datasource/graphql_datasource.go index e6a0f1e64..3004d90b8 100644 --- a/pkg/engine/datasource/graphql_datasource/graphql_datasource.go +++ b/pkg/engine/datasource/graphql_datasource/graphql_datasource.go @@ -344,6 +344,10 @@ func (p *Planner) ConfigureSubscription() plan.SubscriptionConfiguration { client: p.subscriptionClient, }, Variables: p.variables, + ProcessResponseConfig: resolve.ProcessResponseConfig{ + ExtractGraphqlResponse: true, + ExtractFederationEntities: false, + }, } } diff --git a/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go b/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go index 713ca538e..0625dbad1 100644 --- a/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go +++ b/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go @@ -3670,6 +3670,7 @@ func TestGraphQLDataSource(t *testing.T) { Source: &SubscriptionSource{ NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx), }, + ProcessResponseConfig: resolve.ProcessResponseConfig{ExtractGraphqlResponse: true}, }, Response: &resolve.GraphQLResponse{ Data: &resolve.Object{ @@ -3708,6 +3709,7 @@ func TestGraphQLDataSource(t *testing.T) { Source: &SubscriptionSource{ client: NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx), }, + ProcessResponseConfig: resolve.ProcessResponseConfig{ExtractGraphqlResponse: true}, }, Response: &resolve.GraphQLResponse{ Data: &resolve.Object{ diff --git a/pkg/engine/plan/plan.go b/pkg/engine/plan/plan.go index c74ec8e90..d9f756a29 100644 --- a/pkg/engine/plan/plan.go +++ b/pkg/engine/plan/plan.go @@ -1205,6 +1205,7 @@ func (v *Visitor) configureSubscription(config objectFetchConfiguration) { subscription := config.planner.ConfigureSubscription() config.trigger.Variables = subscription.Variables config.trigger.Source = subscription.DataSource + config.trigger.ProcessResponseConfig = subscription.ProcessResponseConfig v.resolveInputTemplates(config, &subscription.Input, &config.trigger.Variables) config.trigger.Input = []byte(subscription.Input) } @@ -1386,9 +1387,10 @@ type DataSourcePlanner interface { } type SubscriptionConfiguration struct { - Input string - Variables resolve.Variables - DataSource resolve.SubscriptionDataSource + Input string + Variables resolve.Variables + DataSource resolve.SubscriptionDataSource + ProcessResponseConfig resolve.ProcessResponseConfig } type FetchConfiguration struct { diff --git a/pkg/engine/resolve/resolve.go b/pkg/engine/resolve/resolve.go index 754fe1735..8b8e74ca7 100644 --- a/pkg/engine/resolve/resolve.go +++ b/pkg/engine/resolve/resolve.go @@ -496,17 +496,41 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons buf := r.getBufPair() defer r.freeBufPair(buf) - responseBuf := r.getBufPair() - defer r.freeBufPair(responseBuf) + if data != nil { + ctx.lastFetchID = initialValueID + } - extractResponse(data, responseBuf, ProcessResponseConfig{ExtractGraphqlResponse: true}) + if r.dataLoaderEnabled { + ctx.dataLoader = r.dataloaderFactory.newDataLoader(data) + defer func() { + r.dataloaderFactory.freeDataLoader(ctx.dataLoader) + ctx.dataLoader = nil + }() + } - if data != nil { + ignoreData := false + err = r.resolveNode(ctx, response.Data, data, buf) + if err != nil { + if !errors.Is(err, errNonNullableFieldValueIsNull) { + return + } + ignoreData = true + } + + return writeGraphqlResponse(buf, writer, ignoreData) +} + +func (r *Resolver) resolveGraphQLSubscriptionResponse(ctx *Context, response *GraphQLResponse, subscriptionData *BufPair, writer io.Writer) (err error) { + + buf := r.getBufPair() + defer r.freeBufPair(buf) + + if subscriptionData.HasData() { ctx.lastFetchID = initialValueID } if r.dataLoaderEnabled { - ctx.dataLoader = r.dataloaderFactory.newDataLoader(responseBuf.Data.Bytes()) + ctx.dataLoader = r.dataloaderFactory.newDataLoader(subscriptionData.Data.Bytes()) defer func() { r.dataloaderFactory.freeDataLoader(ctx.dataLoader) ctx.dataLoader = nil @@ -514,15 +538,15 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons } ignoreData := false - err = r.resolveNode(ctx, response.Data, responseBuf.Data.Bytes(), buf) + err = r.resolveNode(ctx, response.Data, subscriptionData.Data.Bytes(), buf) if err != nil { if !errors.Is(err, errNonNullableFieldValueIsNull) { return } ignoreData = true } - if responseBuf.Errors.Len() > 0 { - r.MergeBufPairErrors(responseBuf, buf) + if subscriptionData.HasErrors() { + r.MergeBufPairErrors(subscriptionData, buf) } return writeGraphqlResponse(buf, writer, ignoreData) @@ -568,6 +592,9 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ return err } + responseBuf := r.getBufPair() + defer r.freeBufPair(responseBuf) + for { select { case <-resolverDone: @@ -577,7 +604,9 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ if !ok { return nil } - err = r.ResolveGraphQLResponse(ctx, subscription.Response, data, writer) + responseBuf.Reset() + extractResponse(data, responseBuf, subscription.Trigger.ProcessResponseConfig) + err = r.resolveGraphQLSubscriptionResponse(ctx, subscription.Response, responseBuf, writer) if err != nil { return err } @@ -1564,10 +1593,11 @@ type GraphQLSubscription struct { } type GraphQLSubscriptionTrigger struct { - Input []byte - InputTemplate InputTemplate - Variables Variables - Source SubscriptionDataSource + Input []byte + InputTemplate InputTemplate + Variables Variables + Source SubscriptionDataSource + ProcessResponseConfig ProcessResponseConfig } type FlushWriter interface { diff --git a/pkg/engine/resolve/resolve_test.go b/pkg/engine/resolve/resolve_test.go index 67e824dfb..856612f02 100644 --- a/pkg/engine/resolve/resolve_test.go +++ b/pkg/engine/resolve/resolve_test.go @@ -4131,7 +4131,8 @@ func TestResolver_ResolveGraphQLSubscription(t *testing.T) { setup := func(ctx context.Context, stream SubscriptionDataSource) (*Resolver, *GraphQLSubscription, *TestFlushWriter) { plan := &GraphQLSubscription{ Trigger: GraphQLSubscriptionTrigger{ - Source: stream, + Source: stream, + ProcessResponseConfig: ProcessResponseConfig{ExtractGraphqlResponse: true}, }, Response: &GraphQLResponse{ Data: &Object{