Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement @defer "June 2023" response format #6331

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .github/workflows/defer-with-router-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,31 @@ jobs:
DEFER_WITH_ROUTER_TESTS: true
run: |
./gradlew --no-daemon --console plain -p tests :defer:allTests
defer-with-apollo-server-tests:
runs-on: ubuntu-latest
if: github.repository == 'apollographql/apollo-kotlin'
steps:
- name: Checkout project
uses: actions/checkout@692973e3d937129bcbf40652eb9f2f61becf3332 #v4.1.7

- name: Install and run graph
working-directory: tests/defer/apollo-server/
run: |
npm install --legacy-peer-deps
npx patch-package
APOLLO_PORT=4000 npm start &

- name: Setup Java
uses: actions/setup-java@99b8673ff64fbf99d8d325f52d9a5bdedb8483e9 #v4.2.1
with:
distribution: 'temurin'
java-version: 17

- name: Setup Gradle
uses: gradle/actions/setup-gradle@dbbdc275be76ac10734476cc723d82dfe7ec6eda #v3.4.2

- name: Run Apollo Kotlin @defer tests
env:
DEFER_WITH_APOLLO_SERVER_TESTS: true
run: |
./gradlew --no-daemon --console plain -p tests :defer:allTests
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,20 @@ fun BooleanExpression<BTerm>.evaluate(
return evaluate {
when (it) {
is BVariable -> !(variables?.contains(it.name) ?: false)
is BLabel -> hasDeferredFragment(deferredFragmentIdentifiers, croppedPath!!, it.label)
is BLabel -> !isDeferredFragmentPending(deferredFragmentIdentifiers, croppedPath!!, it.label)
is BPossibleTypes -> it.possibleTypes.contains(typename)
}
}
}

private fun hasDeferredFragment(deferredFragmentIdentifiers: Set<DeferredFragmentIdentifier>?, path: List<Any>, label: String?): Boolean {
private fun isDeferredFragmentPending(
deferredFragmentIdentifiers: Set<DeferredFragmentIdentifier>?,
path: List<Any>,
label: String?,
): Boolean {
if (deferredFragmentIdentifiers == null) {
// By default, parse all deferred fragments - this is the case when parsing from the normalized cache.
return true
return false
}
return deferredFragmentIdentifiers.contains(DeferredFragmentIdentifier(path, label))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,28 @@ private typealias MutableJsonMap = MutableMap<String, Any?>
/**
* Utility class for merging GraphQL JSON payloads received in multiple chunks when using the `@defer` directive.
*
* Each call to [merge] will merge the given chunk into the [merged] Map, and will also update the [mergedFragmentIds] Set with the
* Each call to [merge] will merge the given chunk into the [merged] Map, and will also update the [pendingFragmentIds] Set with the
* value of its `path` and `label` field.
*
* The fields in `data` are merged into the node found in [merged] at `path` (for the first call to [merge], the payload is
* copied to [merged] as-is).
* The fields in `data` are merged into the node found in [merged] at the path known by looking at the `id` field (for the first call to
* [merge], the payload is copied to [merged] as-is).
*
* `errors` in incremental items (if present) are merged together in an array and then set to the `errors` field of the [merged] Map,
* at each call to [merge].
* `extensions` in incremental items (if present) are merged together in an array and then set to the `extensions/incremental` field of the
* `errors` in incremental and completed items (if present) are merged together in an array and then set to the `errors` field of the
* [merged] Map, at each call to [merge].
* `extensions` in incremental items (if present) are merged together in an array and then set to the `extensions` field of the [merged]
* Map, at each call to [merge].
*/
@ApolloInternal
@Suppress("UNCHECKED_CAST")
class DeferredJsonMerger {
private val _merged: MutableJsonMap = mutableMapOf()
val merged: JsonMap = _merged

private val _mergedFragmentIds = mutableSetOf<DeferredFragmentIdentifier>()
val mergedFragmentIds: Set<DeferredFragmentIdentifier> = _mergedFragmentIds
/**
* Map of identifiers to their corresponding DeferredFragmentIdentifier, found in `pending`.
*/
private val _pendingFragmentIds = mutableMapOf<String, DeferredFragmentIdentifier>()
val pendingFragmentIds: Set<DeferredFragmentIdentifier> get() = _pendingFragmentIds.values.toSet()

var hasNext: Boolean = true
private set
Expand All @@ -47,61 +51,78 @@ class DeferredJsonMerger {
return merge(payloadMap)
}

@Suppress("UNCHECKED_CAST")
fun merge(payload: JsonMap): JsonMap {
val completed = payload["completed"] as? List<JsonMap>
if (merged.isEmpty()) {
// Initial payload, no merging needed
_merged += payload
// Initial payload, no merging needed (strip some fields that should not appear in the final result)
_merged += payload - "hasNext" - "pending"
handlePending(payload)
handleCompleted(completed)
return merged
}
handlePending(payload)

val incrementalList = payload["incremental"] as? List<JsonMap>
if (incrementalList == null) {
isEmptyPayload = true
} else {
isEmptyPayload = false
val mergedErrors = mutableListOf<JsonMap>()
val mergedExtensions = mutableListOf<JsonMap>()
if (incrementalList != null) {
for (incrementalItem in incrementalList) {
mergeData(incrementalItem)
// Merge errors and extensions (if any) of the incremental list
(incrementalItem["errors"] as? List<JsonMap>)?.let { mergedErrors += it }
(incrementalItem["extensions"] as? JsonMap)?.let { mergedExtensions += it }
}
// Keep only this payload's errors and extensions, if any
if (mergedErrors.isNotEmpty()) {
_merged["errors"] = mergedErrors
} else {
_merged.remove("errors")
}
if (mergedExtensions.isNotEmpty()) {
_merged["extensions"] = mapOf("incremental" to mergedExtensions)
} else {
_merged.remove("extensions")
mergeIncrementalData(incrementalItem)
// Merge errors (if any) of the incremental item
(incrementalItem["errors"] as? List<JsonMap>)?.let { getOrPutMergedErrors() += it }
}
}
isEmptyPayload = completed == null && incrementalList == null

hasNext = payload["hasNext"] as Boolean? ?: false

handleCompleted(completed)

(payload["extensions"] as? JsonMap)?.let { getOrPutExtensions() += it }

return merged
}

@Suppress("UNCHECKED_CAST")
private fun mergeData(incrementalItem: JsonMap) {
val data = incrementalItem["data"] as JsonMap?
val path = incrementalItem["path"] as List<Any>
val mergedData = merged["data"] as JsonMap
private fun getOrPutMergedErrors() = _merged.getOrPut("errors") { mutableListOf<JsonMap>() } as MutableList<JsonMap>

// payloadData can be null if there are errors
if (data != null) {
val nodeToMergeInto = nodeAtPath(mergedData, path) as MutableJsonMap
deepMerge(nodeToMergeInto, data)
private fun getOrPutExtensions() = _merged.getOrPut("extensions") { mutableMapOf<String, Any?>() } as MutableJsonMap

_mergedFragmentIds += DeferredFragmentIdentifier(path = path, label = incrementalItem["label"] as String?)
private fun handlePending(payload: JsonMap) {
val pending = payload["pending"] as? List<JsonMap>
if (pending != null) {
for (pendingItem in pending) {
val id = pendingItem["id"] as String
val path = pendingItem["path"] as List<Any>
val label = pendingItem["label"] as String?
_pendingFragmentIds[id] = DeferredFragmentIdentifier(path = path, label = label)
}
}
}

private fun handleCompleted(completed: List<JsonMap>?) {
if (completed != null) {
for (completedItem in completed) {
// Merge errors (if any) of the completed item
val errors = completedItem["errors"] as? List<JsonMap>
if (errors != null) {
getOrPutMergedErrors() += errors
} else {
// Fragment is no longer pending - only if there were no errors
val id = completedItem["id"] as String
_pendingFragmentIds.remove(id) ?: error("Id '$id' not found in pending results")
}
}
}
}

@Suppress("UNCHECKED_CAST")
private fun mergeIncrementalData(incrementalItem: JsonMap) {
val id = incrementalItem["id"] as String? ?: error("No id found in incremental item")
val data = incrementalItem["data"] as JsonMap? ?: error("No data found in incremental item")
val subPath = incrementalItem["subPath"] as List<Any>? ?: emptyList()
val path = (_pendingFragmentIds[id]?.path ?: error("Id '$id' not found in pending results")) + subPath
val mergedData = merged["data"] as JsonMap
val nodeToMergeInto = nodeAtPath(mergedData, path) as MutableJsonMap
deepMerge(nodeToMergeInto, data)
}

private fun deepMerge(destination: MutableJsonMap, map: JsonMap) {
for ((key, value) in map) {
if (destination.containsKey(key) && destination[key] is MutableMap<*, *>) {
Expand All @@ -116,7 +137,6 @@ class DeferredJsonMerger {
}
}

@Suppress("UNCHECKED_CAST")
private fun jsonToMap(json: BufferedSource): JsonMap = BufferedSourceJsonReader(json).readAny() as JsonMap


Expand All @@ -130,7 +150,6 @@ class DeferredJsonMerger {
node = if (node is List<*>) {
node[key as Int]
} else {
@Suppress("UNCHECKED_CAST")
node as JsonMap
node[key]
}
Expand All @@ -140,7 +159,7 @@ class DeferredJsonMerger {

fun reset() {
_merged.clear()
_mergedFragmentIds.clear()
_pendingFragmentIds.clear()
hasNext = true
isEmptyPayload = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private constructor(
jsonMerger = DeferredJsonMerger()
}
val merged = jsonMerger!!.merge(part)
val deferredFragmentIds = jsonMerger!!.mergedFragmentIds
val deferredFragmentIds = jsonMerger!!.pendingFragmentIds
val isLast = !jsonMerger!!.hasNext

if (jsonMerger!!.isEmptyPayload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ private class DefaultSubscriptionParser<D : Operation.Data>(private val request:
}

val (payload, mergedFragmentIds) = if (responseMap.isDeferred()) {
deferredJsonMerger.merge(responseMap) to deferredJsonMerger.mergedFragmentIds
deferredJsonMerger.merge(responseMap) to deferredJsonMerger.pendingFragmentIds
} else {
responseMap to null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onSubscription
import kotlinx.coroutines.launch
import okio.use

/**
* A [NetworkTransport] that manages a single instance of a [WebSocketConnection].
Expand Down Expand Up @@ -304,7 +303,7 @@ private constructor(
val responsePayload = response.payload
val requestCustomScalarAdapters = request.executionContext[CustomScalarAdapters]!!
val (payload, mergedFragmentIds) = if (responsePayload.isDeferred()) {
deferredJsonMerger.merge(responsePayload) to deferredJsonMerger.mergedFragmentIds
deferredJsonMerger.merge(responsePayload) to deferredJsonMerger.pendingFragmentIds
} else {
responsePayload to null
}
Expand Down
Loading
Loading