Skip to content

Commit

Permalink
feat: allow data source to configure process response for subscriptio…
Browse files Browse the repository at this point in the history
  • Loading branch information
jensneuse authored and pvormste committed Aug 21, 2023
1 parent 16585c0 commit fe64a6d
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ func (p *Planner) ConfigureSubscription() plan.SubscriptionConfiguration {
client: p.subscriptionClient,
},
Variables: p.variables,
ProcessResponseConfig: resolve.ProcessResponseConfig{
ExtractGraphqlResponse: true,
ExtractFederationEntities: false,
},
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3675,6 +3675,7 @@ func TestGraphQLDataSource(t *testing.T) {
Source: &SubscriptionSource{
NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
},
ProcessResponseConfig: resolve.ProcessResponseConfig{ExtractGraphqlResponse: true},
},
Response: &resolve.GraphQLResponse{
Data: &resolve.Object{
Expand Down Expand Up @@ -3713,6 +3714,7 @@ func TestGraphQLDataSource(t *testing.T) {
Source: &SubscriptionSource{
client: NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
},
ProcessResponseConfig: resolve.ProcessResponseConfig{ExtractGraphqlResponse: true},
},
Response: &resolve.GraphQLResponse{
Data: &resolve.Object{
Expand Down
8 changes: 5 additions & 3 deletions pkg/engine/plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1206,6 +1206,7 @@ func (v *Visitor) configureSubscription(config objectFetchConfiguration) {
subscription := config.planner.ConfigureSubscription()
config.trigger.Variables = subscription.Variables
config.trigger.Source = subscription.DataSource
config.trigger.ProcessResponseConfig = subscription.ProcessResponseConfig
v.resolveInputTemplates(config, &subscription.Input, &config.trigger.Variables)
config.trigger.Input = []byte(subscription.Input)
}
Expand Down Expand Up @@ -1387,9 +1388,10 @@ type DataSourcePlanner interface {
}

type SubscriptionConfiguration struct {
Input string
Variables resolve.Variables
DataSource resolve.SubscriptionDataSource
Input string
Variables resolve.Variables
DataSource resolve.SubscriptionDataSource
ProcessResponseConfig resolve.ProcessResponseConfig
}

type FetchConfiguration struct {
Expand Down
56 changes: 43 additions & 13 deletions pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,33 +497,57 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons
buf := r.getBufPair()
defer r.freeBufPair(buf)

responseBuf := r.getBufPair()
defer r.freeBufPair(responseBuf)
if data != nil {
ctx.lastFetchID = initialValueID
}

extractResponse(data, responseBuf, ProcessResponseConfig{ExtractGraphqlResponse: true})
if r.dataLoaderEnabled {
ctx.dataLoader = r.dataloaderFactory.newDataLoader(data)
defer func() {
r.dataloaderFactory.freeDataLoader(ctx.dataLoader)
ctx.dataLoader = nil
}()
}

if data != nil {
ignoreData := false
err = r.resolveNode(ctx, response.Data, data, buf)
if err != nil {
if !errors.Is(err, errNonNullableFieldValueIsNull) {
return
}
ignoreData = true
}

return writeGraphqlResponse(buf, writer, ignoreData)
}

func (r *Resolver) resolveGraphQLSubscriptionResponse(ctx *Context, response *GraphQLResponse, subscriptionData *BufPair, writer io.Writer) (err error) {

buf := r.getBufPair()
defer r.freeBufPair(buf)

if subscriptionData.HasData() {
ctx.lastFetchID = initialValueID
}

if r.dataLoaderEnabled {
ctx.dataLoader = r.dataloaderFactory.newDataLoader(responseBuf.Data.Bytes())
ctx.dataLoader = r.dataloaderFactory.newDataLoader(subscriptionData.Data.Bytes())
defer func() {
r.dataloaderFactory.freeDataLoader(ctx.dataLoader)
ctx.dataLoader = nil
}()
}

ignoreData := false
err = r.resolveNode(ctx, response.Data, responseBuf.Data.Bytes(), buf)
err = r.resolveNode(ctx, response.Data, subscriptionData.Data.Bytes(), buf)
if err != nil {
if !errors.Is(err, errNonNullableFieldValueIsNull) {
return
}
ignoreData = true
}
if responseBuf.Errors.Len() > 0 {
r.MergeBufPairErrors(responseBuf, buf)
if subscriptionData.HasErrors() {
r.MergeBufPairErrors(subscriptionData, buf)
}

return writeGraphqlResponse(buf, writer, ignoreData)
Expand Down Expand Up @@ -569,6 +593,9 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ
return err
}

responseBuf := r.getBufPair()
defer r.freeBufPair(responseBuf)

for {
select {
case <-resolverDone:
Expand All @@ -578,7 +605,9 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ
if !ok {
return nil
}
err = r.ResolveGraphQLResponse(ctx, subscription.Response, data, writer)
responseBuf.Reset()
extractResponse(data, responseBuf, subscription.Trigger.ProcessResponseConfig)
err = r.resolveGraphQLSubscriptionResponse(ctx, subscription.Response, responseBuf, writer)
if err != nil {
return err
}
Expand Down Expand Up @@ -1581,10 +1610,11 @@ type GraphQLSubscription struct {
}

type GraphQLSubscriptionTrigger struct {
Input []byte
InputTemplate InputTemplate
Variables Variables
Source SubscriptionDataSource
Input []byte
InputTemplate InputTemplate
Variables Variables
Source SubscriptionDataSource
ProcessResponseConfig ProcessResponseConfig
}

type FlushWriter interface {
Expand Down
3 changes: 2 additions & 1 deletion pkg/engine/resolve/resolve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4166,7 +4166,8 @@ func TestResolver_ResolveGraphQLSubscription(t *testing.T) {
setup := func(ctx context.Context, stream SubscriptionDataSource) (*Resolver, *GraphQLSubscription, *TestFlushWriter) {
plan := &GraphQLSubscription{
Trigger: GraphQLSubscriptionTrigger{
Source: stream,
Source: stream,
ProcessResponseConfig: ProcessResponseConfig{ExtractGraphqlResponse: true},
},
Response: &GraphQLResponse{
Data: &Object{
Expand Down

0 comments on commit fe64a6d

Please sign in to comment.