Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50768][CORE] Introduce TaskContext.createResourceUninterruptibly to avoid stream leak by task interruption #49413

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Jan 8, 2025

What changes were proposed in this pull request?

This PR fixes the potential stream leak issue by introduing TaskContext.createResourceUninterruptibly.

When a task is using TaskContext.createResourceUninterruptibly to create the resource, the task would be marked as uninterruptible. Thus, any interruption request during the call to TaskContext.createResourceUninterruptibly would be delayed until the creation finishes.

This PR introduces an new lock contention between Task.kill and TaskContext.createResourceUninterruptibly. But I think it is acceptable given that both are not on the hot-path.

(I will submmit a followup to apply TaskContext.createResourceUninterruptibly in the codebase if this PR is approved by the community.)

Why are the changes needed?

We had #48483 tried to fix the potential stream leak issue by task interruption. It mitigates the issue by using

def tryInitializeResource[R <: Closeable, T](createResource: => R)(initialize: R => T): T = {
  val resource = createResource
  try {
    initialize(resource)
  } catch {
    case e: Throwable =>
      resource.close()
      throw e
  }
} 

But this utility function has an issue that resource.close() would leak open resouces if initialize(resource) also created some resources internally, especially when initialize(resource) is interrupted (See the example of InterruptionSensitiveInputStream in the test).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added a unit test.

Was this patch authored or co-authored using generative AI tooling?

No.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants