Skip to content

Commit

Permalink
feat: Add transformation option to the field configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
bakjos committed Aug 28, 2023
1 parent 6239b61 commit e0170ca
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 35 deletions.
113 changes: 100 additions & 13 deletions pkg/engine/plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"regexp"
"strings"

"github.com/jensneuse/pipeline/pkg/pipe"

"github.com/wundergraph/graphql-go-tools/pkg/ast"
"github.com/wundergraph/graphql-go-tools/pkg/astimport"
"github.com/wundergraph/graphql-go-tools/pkg/astvisitor"
Expand Down Expand Up @@ -122,6 +124,10 @@ type FieldConfiguration struct {
// e.g. {"response":"{\"foo\":\"bar\"}"} will be returned as {"foo":"bar"} when path is "response"
// This way, it is possible to resolve a JSON string as part of the response without extra String encoding of the JSON
UnescapeResponseJson bool
// A pipeline definition for the field
Pipeline *pipe.Pipeline
//Whether to resolve the array asynchronously or not
ResolveAsynchronous bool
}

type ArgumentsConfigurations []ArgumentConfiguration
Expand Down Expand Up @@ -715,20 +721,32 @@ func (v *Visitor) resolveFieldValue(fieldRef, typeRef int, nullable bool, path [
enclosingTypeName := v.Walker.EnclosingTypeDefinition.NameString(v.Definition)
fieldConfig := v.Config.Fields.ForTypeField(enclosingTypeName, fieldName)
unescapeResponseJson := false
resolveAsync := false
var transformation *pipe.Pipeline
if fieldConfig != nil {
unescapeResponseJson = fieldConfig.UnescapeResponseJson
resolveAsync = fieldConfig.ResolveAsynchronous
transformation = fieldConfig.Pipeline
}

switch v.Definition.Types[typeRef].TypeKind {
case ast.TypeKindNonNull:
return v.resolveFieldValue(fieldRef, ofType, false, path)
case ast.TypeKindList:
listItem := v.resolveFieldValue(fieldRef, ofType, true, nil)
return &resolve.Array{
Nullable: nullable,
Path: path,
Item: listItem,
array := &resolve.Array{
Nullable: nullable,
Path: path,
Item: listItem,
ResolveAsynchronous: resolveAsync,
}
if transformation != nil {
return &resolve.Transformation{
InnerValue: array,
Pipeline: transformation,
}
}
return array
case ast.TypeKindNamed:
typeName := v.Definition.ResolveTypeNameString(typeRef)
customResolve, ok := v.Config.CustomResolveMap[typeName]
Expand All @@ -747,50 +765,99 @@ func (v *Visitor) resolveFieldValue(fieldRef, typeRef int, nullable bool, path [
fieldExport := v.resolveFieldExport(fieldRef)
switch typeName {
case "String":
return &resolve.String{
value := &resolve.String{
Path: path,
Nullable: nullable,
Export: fieldExport,
UnescapeResponseJson: unescapeResponseJson,
}
if transformation != nil {
return &resolve.Transformation{
InnerValue: value,
Pipeline: transformation,
}
}
return value
case "Boolean":
return &resolve.Boolean{
value := &resolve.Boolean{
Path: path,
Nullable: nullable,
Export: fieldExport,
}
if transformation != nil {
return &resolve.Transformation{
InnerValue: value,
Pipeline: transformation,
}
}
return value
case "Int":
return &resolve.Integer{
value := &resolve.Integer{
Path: path,
Nullable: nullable,
Export: fieldExport,
}
if transformation != nil {
return &resolve.Transformation{
InnerValue: value,
Pipeline: transformation,
}
}
return value
case "Float":
return &resolve.Float{
value := &resolve.Float{
Path: path,
Nullable: nullable,
Export: fieldExport,
}
if transformation != nil {
return &resolve.Transformation{
InnerValue: value,
Pipeline: transformation,
}
}
return value
case "BigInt":
return &resolve.BigInt{
value := &resolve.BigInt{
Path: path,
Nullable: nullable,
Export: fieldExport,
}
if transformation != nil {
return &resolve.Transformation{
InnerValue: value,
Pipeline: transformation,
}
}
return value
default:
return &resolve.String{
value := &resolve.String{
Path: path,
Nullable: nullable,
Export: fieldExport,
UnescapeResponseJson: unescapeResponseJson,
}
if transformation != nil {
return &resolve.Transformation{
InnerValue: value,
Pipeline: transformation,
}
}
return value
}
case ast.NodeKindEnumTypeDefinition:
return &resolve.String{
value := &resolve.String{
Path: path,
Nullable: nullable,
UnescapeResponseJson: unescapeResponseJson,
}
if transformation != nil {
return &resolve.Transformation{
InnerValue: value,
Pipeline: transformation,
}
}
return value
case ast.NodeKindObjectTypeDefinition, ast.NodeKindInterfaceTypeDefinition, ast.NodeKindUnionTypeDefinition:
object := &resolve.Object{
Nullable: nullable,
Expand All @@ -805,12 +872,32 @@ func (v *Visitor) resolveFieldValue(fieldRef, typeRef int, nullable bool, path [
fields: &object.Fields,
})
})
if transformation != nil {
return &resolve.Transformation{
InnerValue: object,
Pipeline: transformation,
}
}
return object
default:
return &resolve.Null{}
value := &resolve.Null{}
if transformation != nil {
return &resolve.Transformation{
InnerValue: value,
Pipeline: transformation,
}
}
return value
}
default:
return &resolve.Null{}
value := &resolve.Null{}
if transformation != nil {
return &resolve.Transformation{
InnerValue: value,
Pipeline: transformation,
}
}
return value
}
}

Expand Down
72 changes: 50 additions & 22 deletions pkg/engine/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/buger/jsonparser"
"github.com/cespare/xxhash/v2"
"github.com/jensneuse/pipeline/pkg/pipe"
"github.com/tidwall/gjson"
errors "golang.org/x/xerrors"

Expand Down Expand Up @@ -82,8 +83,10 @@ type Node interface {
NodeKind() NodeKind
}

type NodeKind int
type FetchKind int
type (
NodeKind int
FetchKind int
)

const (
NodeKindObject NodeKind = iota + 1
Expand All @@ -97,6 +100,7 @@ const (
NodeKindFloat
NodeKindBigInt
NodeKindCustom
NodeKindTransformation

FetchKindSingle FetchKind = iota + 1
FetchKindParallel
Expand Down Expand Up @@ -252,6 +256,7 @@ func (c *Context) addResponseArrayElements(elements []string) {
func (c *Context) removeResponseLastElements(elements []string) {
c.responseElements = c.responseElements[:len(c.responseElements)-len(elements)]
}

func (c *Context) removeResponseArrayLastElements(elements []string) {
c.responseElements = c.responseElements[:len(c.responseElements)-(len(elements)+1)]
}
Expand Down Expand Up @@ -335,17 +340,18 @@ type SubscriptionDataSource interface {
}

type Resolver struct {
ctx context.Context
dataLoaderEnabled bool
resultSetPool sync.Pool
byteSlicesPool sync.Pool
waitGroupPool sync.Pool
bufPairPool sync.Pool
bufPairSlicePool sync.Pool
errChanPool sync.Pool
hash64Pool sync.Pool
dataloaderFactory *dataLoaderFactory
fetcher *Fetcher
ctx context.Context
dataLoaderEnabled bool
resultSetPool sync.Pool
byteSlicesPool sync.Pool
transformationPool sync.Pool
waitGroupPool sync.Pool
bufPairPool sync.Pool
bufPairSlicePool sync.Pool
errChanPool sync.Pool
hash64Pool sync.Pool
dataloaderFactory *dataLoaderFactory
fetcher *Fetcher
}

type inflightFetch struct {
Expand All @@ -372,6 +378,11 @@ func New(ctx context.Context, fetcher *Fetcher, enableDataLoader bool) *Resolver
return &slice
},
},
transformationPool: sync.Pool{
New: func() interface{} {
return fastbuffer.New()
},
},
waitGroupPool: sync.Pool{
New: func() interface{} {
return &sync.WaitGroup{}
Expand Down Expand Up @@ -438,6 +449,8 @@ func (r *Resolver) resolveNode(ctx *Context, node Node, data []byte, bufPair *Bu
return
case *CustomNode:
return r.resolveCustom(ctx, n, data, bufPair)
case *Transformation:
return r.resolveTransformation(ctx, n, data, bufPair)
default:
return
}
Expand All @@ -464,9 +477,7 @@ func extractResponse(responseData []byte, bufPair *BufPair, cfg ProcessResponseC
switch i {
case rootErrorsPathIndex:
_, _ = jsonparser.ArrayEach(bytes, func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
var (
message, locations, path, extensions []byte
)
var message, locations, path, extensions []byte
jsonparser.EachKey(value, func(i int, bytes []byte, valueType jsonparser.ValueType, err error) {
switch i {
case errorsMessagePathIndex:
Expand Down Expand Up @@ -495,7 +506,6 @@ func extractResponse(responseData []byte, bufPair *BufPair, cfg ProcessResponseC
}

func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLResponse, data []byte, writer io.Writer) (err error) {

buf := r.getBufPair()
defer r.freeBufPair(buf)

Expand Down Expand Up @@ -565,7 +575,6 @@ func writeAndFlush(writer FlushWriter, msg []byte) error {
}

func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQLSubscription, writer FlushWriter) (err error) {

buf := r.getBufPair()
err = subscription.Trigger.InputTemplate.Render(ctx, nil, buf.Data)
if err != nil {
Expand Down Expand Up @@ -619,7 +628,6 @@ func (r *Resolver) ResolveGraphQLSubscription(ctx *Context, subscription *GraphQ
}

func (r *Resolver) ResolveGraphQLStreamingResponse(ctx *Context, response *GraphQLStreamingResponse, data []byte, writer FlushWriter) (err error) {

if err := r.validateContext(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -692,7 +700,6 @@ Loop:
}

func (r *Resolver) ResolveGraphQLResponsePatch(ctx *Context, patch *GraphQLResponsePatch, data, path, extraPath []byte, writer io.Writer) (err error) {

buf := r.getBufPair()
defer r.freeBufPair(buf)

Expand Down Expand Up @@ -809,7 +816,6 @@ func (r *Resolver) resolveArray(ctx *Context, array *Array, data []byte, arrayBu
}

func (r *Resolver) resolveArraySynchronous(ctx *Context, array *Array, arrayItems *[][]byte, arrayBuf *BufPair) (err error) {

itemBuf := r.getBufPair()
defer r.freeBufPair(itemBuf)

Expand Down Expand Up @@ -856,7 +862,6 @@ func (r *Resolver) resolveArraySynchronous(ctx *Context, array *Array, arrayItem
}

func (r *Resolver) resolveArrayAsynchronous(ctx *Context, array *Array, arrayItems *[][]byte, arrayBuf *BufPair) (err error) {

arrayBuf.Data.WriteBytes(lBrack)

bufSlice := r.getBufPairSlice()
Expand Down Expand Up @@ -920,6 +925,19 @@ func (r *Resolver) resolveArrayAsynchronous(ctx *Context, array *Array, arrayIte
return
}

func (r *Resolver) resolveTransformation(ctx *Context, t *Transformation, data []byte, transformBuf *BufPair) error {
buffer := r.transformationPool.Get().(*fastbuffer.FastBuffer)
defer func() {
buffer.Reset()
r.transformationPool.Put(buffer)
}()

if err := t.Pipeline.Run(bytes.NewReader(data), buffer); err != nil {
return err
}
return r.resolveNode(ctx, t.InnerValue, buffer.Bytes(), transformBuf)
}

func (r *Resolver) exportField(ctx *Context, export *FieldExport, value []byte) {
if export == nil {
return
Expand Down Expand Up @@ -1456,6 +1474,7 @@ type Field struct {
SkipVariableName string
IncludeDirectiveDefined bool
IncludeVariableName string
Transformation
}

type Position struct {
Expand Down Expand Up @@ -1619,10 +1638,19 @@ type Stream struct {
PatchIndex int
}

type Transformation struct {
InnerValue Node
Pipeline *pipe.Pipeline
}

func (_ *Array) NodeKind() NodeKind {
return NodeKindArray
}

func (_ *Transformation) NodeKind() NodeKind {
return NodeKindTransformation
}

type GraphQLSubscription struct {
Trigger GraphQLSubscriptionTrigger
Response *GraphQLResponse
Expand Down
Loading

0 comments on commit e0170ca

Please sign in to comment.