Skip to content

Commit

Permalink
backport: feat: allow data source to configure process response for s…
Browse files Browse the repository at this point in the history
…ubscriptions (#564)
  • Loading branch information
devsergiy committed Aug 22, 2023
1 parent 257c78c commit dc4f96c
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ func (p *Planner) ConfigureSubscription() plan.SubscriptionConfiguration {
client: p.subscriptionClient,
},
Variables: p.variables,
ProcessResponseConfig: resolve.ProcessResponseConfig{
ExtractGraphqlResponse: true,
ExtractFederationEntities: false,
},
}
}

Expand Down
7 changes: 4 additions & 3 deletions v2/pkg/engine/plan/datasource_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,10 @@ type DataSourcePlanner interface {
}

type SubscriptionConfiguration struct {
Input string
Variables resolve.Variables
DataSource resolve.SubscriptionDataSource
Input string
Variables resolve.Variables
DataSource resolve.SubscriptionDataSource
ProcessResponseConfig resolve.ProcessResponseConfig
}

type FetchConfiguration struct {
Expand Down
1 change: 1 addition & 0 deletions v2/pkg/engine/plan/visitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,7 @@ func (v *Visitor) configureSubscription(config objectFetchConfiguration) {
subscription := config.planner.ConfigureSubscription()
config.trigger.Variables = subscription.Variables
config.trigger.Source = subscription.DataSource
config.trigger.ProcessResponseConfig = subscription.ProcessResponseConfig
v.resolveInputTemplates(config, &subscription.Input, &config.trigger.Variables)
config.trigger.Input = []byte(subscription.Input)
}
Expand Down
47 changes: 38 additions & 9 deletions v2/pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,33 +133,57 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons
buf := r.getBufPair()
defer r.freeBufPair(buf)

responseBuf := r.getBufPair()
defer r.freeBufPair(responseBuf)
if data != nil {
ctx.lastFetchID = initialValueID
}

extractResponse(data, responseBuf, ProcessResponseConfig{ExtractGraphqlResponse: true})
if r.dataLoaderEnabled {
ctx.dataLoader = r.dataloaderFactory.newDataLoader(data)
defer func() {
r.dataloaderFactory.freeDataLoader(ctx.dataLoader)
ctx.dataLoader = nil
}()
}

if data != nil {
ignoreData := false
err = r.resolveNode(ctx, response.Data, data, buf)
if err != nil {
if !errors.Is(err, errNonNullableFieldValueIsNull) {
return
}
ignoreData = true
}

return writeGraphqlResponse(buf, writer, ignoreData)
}

func (r *Resolver) resolveGraphQLSubscriptionResponse(ctx *Context, response *GraphQLResponse, subscriptionData *BufPair, writer io.Writer) (err error) {

buf := r.getBufPair()
defer r.freeBufPair(buf)

if subscriptionData.HasData() {
ctx.lastFetchID = initialValueID
}

if r.dataLoaderEnabled {
ctx.dataLoader = r.dataloaderFactory.newDataLoader(responseBuf.Data.Bytes())
ctx.dataLoader = r.dataloaderFactory.newDataLoader(subscriptionData.Data.Bytes())
defer func() {
r.dataloaderFactory.freeDataLoader(ctx.dataLoader)
ctx.dataLoader = nil
}()
}

ignoreData := false
err = r.resolveNode(ctx, response.Data, responseBuf.Data.Bytes(), buf)
err = r.resolveNode(ctx, response.Data, subscriptionData.Data.Bytes(), buf)
if err != nil {
if !errors.Is(err, errNonNullableFieldValueIsNull) {
return
}
ignoreData = true
}
if responseBuf.Errors.Len() > 0 {
r.MergeBufPairErrors(responseBuf, buf)
if subscriptionData.HasErrors() {
r.MergeBufPairErrors(subscriptionData, buf)
}

return writeGraphqlResponse(buf, writer, ignoreData)
Expand Down Expand Up @@ -196,6 +220,9 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ
return err
}

responseBuf := r.getBufPair()
defer r.freeBufPair(responseBuf)

for {
select {
case <-resolverDone:
Expand All @@ -205,7 +232,9 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ
if !ok {
return nil
}
err = r.ResolveGraphQLResponse(ctx, subscription.Response, data, writer)
responseBuf.Reset()
extractResponse(data, responseBuf, subscription.Trigger.ProcessResponseConfig)
err = r.resolveGraphQLSubscriptionResponse(ctx, subscription.Response, responseBuf, writer)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions v2/pkg/engine/resolve/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ type GraphQLSubscription struct {
}

type GraphQLSubscriptionTrigger struct {
Input []byte
InputTemplate InputTemplate
Variables Variables
Source SubscriptionDataSource
Input []byte
InputTemplate InputTemplate
Variables Variables
Source SubscriptionDataSource
ProcessResponseConfig ProcessResponseConfig
}

type GraphQLResponse struct {
Expand Down

0 comments on commit dc4f96c

Please sign in to comment.