[SPARK-50768][CORE] Introduce TaskContext.createResourceUninterruptibly to avoid stream leak by task interruption #49413
+224
−7
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This PR fixes the potential stream leak issue by introduing
TaskContext.createResourceUninterruptibly
.When a task is using
TaskContext.createResourceUninterruptibly
to create the resource, the task would be marked as uninterruptible. Thus, any interruption request during the call toTaskContext.createResourceUninterruptibly
would be delayed until the creation finishes.This PR introduces an new lock contention between
Task.kill
andTaskContext.createResourceUninterruptibly
. But I think it is acceptable given that both are not on the hot-path.(I will submmit a followup to apply
TaskContext.createResourceUninterruptibly
in the codebase if this PR is approved by the community.)Why are the changes needed?
We had #48483 tried to fix the potential stream leak issue by task interruption. It mitigates the issue by using
But this utility function has an issue that
resource.close()
would leak open resouces ifinitialize(resource)
also created some resources internally, especially wheninitialize(resource)
is interrupted (See the example ofInterruptionSensitiveInputStream
in the test).Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added a unit test.
Was this patch authored or co-authored using generative AI tooling?
No.