Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: asset upload cancellation #1251

Closed
wants to merge 14 commits into from

Conversation

trOnk12
Copy link
Contributor

@trOnk12 trOnk12 commented Dec 19, 2022


PR Submission Checklist for internal contributors

  • The PR Title

    • conforms to the style of semantic commits messages¹ supported in Wire's Github Workflow²
    • contains a reference JIRA issue number like SQPIT-764
    • answers the question: If merged, this PR will: ... ³
  • The PR Description

    • is free of optional paragraphs and you have filled the relevant parts to the best of your ability

What's new in this PR?

Issues

It looks like somehow Ktor is not doing well with cancellation of request bodies that require some writing to ByteChannel's. The problem seems to be Ktor not working well with CancelationException

In the stacktrace, you can see, where the crash is coming from :

image

It looks like, when building the request, Ktor lets any other exception up to thread handler expect of IOException()

  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

The issue is also described here , probably much better than above, so refer to it :
awslabs/aws-sdk-kotlin#733

The fix is also taken almost straight from their PR.

Additional to overwriting OkHttpClientEngine in order to inject our own version of RequestBody()


class ByteChannelRequestBody(
    private val contentLength: Long?,
    private val callContext: CoroutineContext,
    private val block: () -> ByteReadChannel
) : RequestBody(), CoroutineScope {

    override val coroutineContext: CoroutineContext
        get() = callContext + producerJob + Dispatchers.IO

    private val producerJob = Job(callContext[Job])

    override fun contentType(): MediaType? = null
    override fun writeTo(sink: BufferedSink) {
        throw CancellationException()
        withJob(producerJob) {
            if (producerJob.isActive) {
                block().toInputStream().source().use {
                    sink.writeAll(it)
                }
            }
        }
    }

    override fun contentLength(): Long = contentLength ?: -1

    /**
     * Completes the given job when the block returns calling either `complete()` when the block runs
     * successfully or `completeExceptionally()` on exception.
     * @return the result of calling [block]
     */
    private inline fun <T> withJob(job: CompletableJob, block: () -> T): T {
        try {
            return block()
        } catch (ex: Exception) {
            println("Exception in withJob: $ex")
            job.completeExceptionally(ex)
            // wrap all exceptions thrown from inside `okhttp3.RequestBody#writeTo(..)` as an IOException
            // see https://github.com/awslabs/aws-sdk-kotlin/issues/733
            throw IOException(ex)
        } finally {
            job.complete()
        }
    }

}

We also had to do similar operation to our StreamAssetContent.kt which overrides OutgoingContent.WriteChannelContent.kt,
which is also being used for updating the asset and also could slip the CancellationException into the thread exception handler and cause our app to crash.

internal class StreamAssetContent internal constructor(
    private val metadata: AssetMetadataRequest,
    private val encryptedDataSize: Long,
    private val fileContentStream: () -> Source,
    callContext: CoroutineContext,
) : OutgoingContent.WriteChannelContent(), CoroutineScope {

    private val producerJob = Job(callContext[Job])

    override val coroutineContext: CoroutineContext = callContext + producerJob

    private val openingData: String by lazy {
        val body = StringBuilder()

        // Part 1
        val strMetadata = "{\"public\": ${metadata.public}, \"retention\": \"${metadata.retentionType.name.lowercase()}\"}"

        body.append("--frontier\r\n")
        body.append("Content-Type: application/json;charset=utf-8\r\n")
        body.append("Content-Length: ")
            .append(strMetadata.length)
            .append("\r\n\r\n")
        body.append(strMetadata)
            .append("\r\n")

        // Part 2
        body.append("--frontier\r\n")
        body.append("Content-Type: application/octet-stream")
            .append("\r\n")
        body.append("Content-Length: ")
            .append(encryptedDataSize)
            .append("\r\n")
        body.append("Content-MD5: ")
            .append(metadata.md5)
            .append("\r\n\r\n")

        body.toString()
    }

    private val closingArray = "\r\n--frontier--\r\n"

    @OptIn(ExperimentalStdlibApi::class)
    override suspend fun writeTo(channel: ByteWriteChannel) {
        try {
            coroutineScope {
                if (!channel.isClosedForWrite && producerJob.isActive) {

                    channel.writeStringUtf8(openingData)
                    val contentBuffer = Buffer()
                    val fileContentStream = fileContentStream()
                    while (fileContentStream.read(contentBuffer, BUFFER_SIZE) != -1L) {
                        contentBuffer.readByteArray().let { content ->
                            channel.writePacket(ByteReadPacket(content))
                        }
                    }
                    channel.writeStringUtf8(closingArray)
                    channel.flush()
                    channel.close()
                }
            }
        } catch (e: Exception) {
            channel.flush()
            channel.close()
            producerJob.completeExceptionally(e)

            throw IOException(e.message)
        } finally {
            producerJob.complete()
        }
    }
}

Dependencies (Optional)

If there are some other pull requests related to this one (e.g. new releases of frameworks), specify them here.

Needs releases with:

  • GitHub link to other pull request

Testing

Test Coverage (Optional)

  • I have added automated test to this contribution

How to Test

Briefly describe how this change was tested and if applicable the exact steps taken to verify that it works as expected.

Notes (Optional)

Specify here any other facts that you think are important for this issue.

Attachments (Optional)

Attachments like images, videos, etc. (drag and drop in the text box)


PR Post Submission Checklist for internal contributors (Optional)

  • Wire's Github Workflow has automatically linked the PR to a JIRA issue

PR Post Merge Checklist for internal contributors

  • If any soft of configuration variable was introduced by this PR, it has been added to the relevant documents and the CI jobs have been updated.

References
  1. https://sparkbox.com/foundry/semantic_commit_messages
  2. https://github.com/wireapp/.github#usage
  3. E.g. feat(conversation-list): Sort conversations by most emojis in the title #SQPIT-764.

@trOnk12 trOnk12 added breaking change This PR has changes that will most-definitely break apps using Kalium DO NOT MERGE labels Dec 19, 2022
@trOnk12 trOnk12 changed the title Fix/cancellation asset upload fix: asset upload cancellation Dec 20, 2022
@trOnk12 trOnk12 removed breaking change This PR has changes that will most-definitely break apps using Kalium DO NOT MERGE labels Dec 20, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Dec 20, 2022

Unit Test Results

0 files   -    296  0 suites   - 296   0s ⏱️ -41s
0 tests  - 1 436  0 ✔️  - 1 393  0 💤  - 43  0 ±0 

Results for commit c618f15. ± Comparison against base commit 9b7b5f8.

♻️ This comment has been updated with latest results.

Copy link
Contributor

@gongracr gongracr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all great job with all the issue investigation. Really strange these cancellations errors are not handled by Ktor in an easier and more graceful manner 🤯

However, my understanding was that we didn't want to allow user's cancellation of any ongoing upload?
If we take this path this will introduce the need of maintenance of all the overriding classes recently added + more room for other edge cases in case Ktor behavior regarding callbacks changes in the near future for example. Therefore I'd also advocate for keeping it simpler and not allow this kind of user interaction while uploading.

But if we still decide to allow it, I'd say I'd feel more comfortable adding some more unit tests to this PR, showing how if the call to the asset repository gets cancelled, halfway through, we will not be forwarding the error to the upper layer.

@trOnk12
Copy link
Contributor Author

trOnk12 commented Dec 27, 2022

closing the PR, because of the product decision not to cancel asset upload

@trOnk12 trOnk12 closed this Dec 27, 2022
@trOnk12 trOnk12 reopened this Dec 27, 2022
@trOnk12 trOnk12 changed the base branch from develop to fix/asset_cancellation December 27, 2022 10:02
@trOnk12 trOnk12 closed this Jan 3, 2023
@gongracr gongracr deleted the fix/cancellation_asset_upload branch January 4, 2023 09:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants