Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: asset upload cancellation #1288

Closed
wants to merge 39 commits into from
Closed

Conversation

trOnk12
Copy link
Contributor

@trOnk12 trOnk12 commented Dec 29, 2022


PR Submission Checklist for internal contributors

  • The PR Title

    • conforms to the style of semantic commits messages¹ supported in Wire's Github Workflow²
    • contains a reference JIRA issue number like SQPIT-764
    • answers the question: If merged, this PR will: ... ³
  • The PR Description

    • is free of optional paragraphs and you have filled the relevant parts to the best of your ability

What's new in this PR?

Issues

It looks like somehow Ktor is not doing well with cancellation of request bodies that require some writing to ByteChannel's. The problem seems to be Ktor not working well with CancelationException

In the stacktrace, you can see, where the crash is coming from :

image

It looks like, when building the request, Ktor lets any other exception up to thread handler expect of IOException()

  @Throws(IOException::class)
  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

The issue is also described here , probably much better than above, so refer to it :
awslabs/aws-sdk-kotlin#733

The fix is also taken almost straight from their PR.

Additional to overwriting OkHttpClientEngine in order to inject our own version of RequestBody()

class ByteChannelRequestBody(
    private val contentLength: Long?,
    private val callContext: CoroutineContext,
    private val block: () -> ByteReadChannel
) : RequestBody(), CoroutineScope {

    override val coroutineContext: CoroutineContext
        get() = callContext + producerJob + Dispatchers.IO

    private val producerJob = Job(callContext[Job])

    override fun contentType(): MediaType? = null
    override fun writeTo(sink: BufferedSink) {
        throw CancellationException()
        withJob(producerJob) {
            if (producerJob.isActive) {
                block().toInputStream().source().use {
                    sink.writeAll(it)
                }
            }
        }
    }

    override fun contentLength(): Long = contentLength ?: -1

    /**
     * Completes the given job when the block returns calling either `complete()` when the block runs
     * successfully or `completeExceptionally()` on exception.
     * @return the result of calling [block]
     */
    private inline fun <T> withJob(job: CompletableJob, block: () -> T): T {
        try {
            return block()
        } catch (ex: Exception) {
            println("Exception in withJob: $ex")
            job.completeExceptionally(ex)
            // wrap all exceptions thrown from inside `okhttp3.RequestBody#writeTo(..)` as an IOException
            // see https://github.com/awslabs/aws-sdk-kotlin/issues/733
            throw IOException(ex)
        } finally {
            job.complete()
        }
    }

}

We also had to do similar operation to our StreamAssetContent.kt which overrides OutgoingContent.WriteChannelContent.kt,
which is also being used for updating the asset and also could slip the CancellationException into the thread exception handler and cause our app to crash.

internal class StreamAssetContent internal constructor(
    private val metadata: AssetMetadataRequest,
    private val encryptedDataSize: Long,
    private val fileContentStream: () -> Source,
    callContext: CoroutineContext,
) : OutgoingContent.WriteChannelContent(), CoroutineScope {

    private val producerJob = Job(callContext[Job])

    override val coroutineContext: CoroutineContext = callContext + producerJob

    private val openingData: String by lazy {
        val body = StringBuilder()

        // Part 1
        val strMetadata = "{\"public\": ${metadata.public}, \"retention\": \"${metadata.retentionType.name.lowercase()}\"}"

        body.append("--frontier\r\n")
        body.append("Content-Type: application/json;charset=utf-8\r\n")
        body.append("Content-Length: ")
            .append(strMetadata.length)
            .append("\r\n\r\n")
        body.append(strMetadata)
            .append("\r\n")

        // Part 2
        body.append("--frontier\r\n")
        body.append("Content-Type: application/octet-stream")
            .append("\r\n")
        body.append("Content-Length: ")
            .append(encryptedDataSize)
            .append("\r\n")
        body.append("Content-MD5: ")
            .append(metadata.md5)
            .append("\r\n\r\n")

        body.toString()
    }

    private val closingArray = "\r\n--frontier--\r\n"

    @OptIn(ExperimentalStdlibApi::class)
    override suspend fun writeTo(channel: ByteWriteChannel) {
        try {
            coroutineScope {
                if (!channel.isClosedForWrite && producerJob.isActive) {

                    channel.writeStringUtf8(openingData)
                    val contentBuffer = Buffer()
                    val fileContentStream = fileContentStream()
                    while (fileContentStream.read(contentBuffer, BUFFER_SIZE) != -1L) {
                        contentBuffer.readByteArray().let { content ->
                            channel.writePacket(ByteReadPacket(content))
                        }
                    }
                    channel.writeStringUtf8(closingArray)
                    channel.flush()
                    channel.close()
                }
            }
        } catch (e: Exception) {
            channel.flush()
            channel.close()
            producerJob.completeExceptionally(e)

            throw IOException(e.message)
        } finally {
            producerJob.complete()
        }
    }
}

Dependencies (Optional)

If there are some other pull requests related to this one (e.g. new releases of frameworks), specify them here.

Needs releases with:

  • GitHub link to other pull request

Testing

Test Coverage (Optional)

  • I have added automated test to this contribution

How to Test

Briefly describe how this change was tested and if applicable the exact steps taken to verify that it works as expected.

Notes (Optional)

Specify here any other facts that you think are important for this issue.

Attachments (Optional)

Attachments like images, videos, etc. (drag and drop in the text box)


PR Post Submission Checklist for internal contributors (Optional)

  • Wire's Github Workflow has automatically linked the PR to a JIRA issue

PR Post Merge Checklist for internal contributors

  • If any soft of configuration variable was introduced by this PR, it has been added to the relevant documents and the CI jobs have been updated.

References
  1. https://sparkbox.com/foundry/semantic_commit_messages
  2. https://github.com/wireapp/.github#usage
  3. E.g. feat(conversation-list): Sort conversations by most emojis in the title #SQPIT-764.

@github-actions
Copy link
Contributor

github-actions bot commented Dec 29, 2022

Unit Test Results

       3 files   - 302         3 suites   - 302   2m 47s ⏱️ + 2m 2s
1 446 tests  -   53  1 435 ✔️  -   20  11 💤  - 33  0 ±0 

Results for commit 5ec165f. ± Comparison against base commit 6b1453b.

♻️ This comment has been updated with latest results.

@trOnk12 trOnk12 marked this pull request as draft December 29, 2022 17:01
@codecov-commenter
Copy link

codecov-commenter commented Dec 30, 2022

Codecov Report

Merging #1288 (d1d43fd) into develop (bec2e66) will decrease coverage by 0.79%.
The diff coverage is 24.47%.

@@              Coverage Diff              @@
##             develop    #1288      +/-   ##
=============================================
- Coverage      56.58%   55.79%   -0.80%     
  Complexity      1010     1010              
=============================================
  Files            709      720      +11     
  Lines          18666    19022     +356     
  Branches        1794     1822      +28     
=============================================
+ Hits           10562    10613      +51     
- Misses          7413     7728     +315     
+ Partials         691      681      -10     

Copy link
Contributor

@vitorhugods vitorhugods left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whilst the solution to observe the message visibility and cancelling the scope was pretty sweet, and managed to implement the functionality quite well, the cost of having to fork so many OkHttp classes is far too high. I am not sure if this is worth it. I'm inclined to say that it isn't.

Every time we want to upgrade its version, we would need to manually bring changes to our side.

I'm diving deeper into this and trying to find an alternative solution for now.

@trOnk12
Copy link
Contributor Author

trOnk12 commented Feb 7, 2023

closing the ticket for the sake of Jetbrains PR solving it 🔥

https://youtrack.jetbrains.com/issue/KTOR-5518

@trOnk12 trOnk12 closed this Feb 7, 2023
@gongracr gongracr deleted the fix/asset_upload_cancel branch June 1, 2023 17:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants