Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35400][checkpoint] Release FileMergingSnapshotManager if all tasks finished #24817

Closed
wants to merge 1 commit into from

Conversation

Zakelly
Copy link
Contributor

@Zakelly Zakelly commented May 21, 2024

What is the purpose of the change

Currently, the FileMergingSnapshotManager is created for each job, only when the corresponding job released, the manager is released and disposed. For failover scenario, the tasks quit but job is still there, leading a reuse of FileMergingSnapshotManager, which violates the design of FileMergingSnapshotManager.

Brief change log

  • Make TaskExecutorFileMergingManager record reference from ExecutionAttemptID to FileMergingSnapshotManager, and release the reference when task resources release.

Verifying this change

This change is already covered by modified existing tests, such as TaskExecutorFileMergingManagerTest.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@Zakelly Zakelly requested a review from fredia May 21, 2024 05:20
@flinkbot
Copy link
Collaborator

flinkbot commented May 21, 2024

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@jectpro7 jectpro7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks the PR, it is looking good to me

Copy link
Contributor

@fredia fredia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zakelly Thanks for the PR, PTAL at my comments.

@@ -74,8 +79,9 @@ public TaskExecutorFileMergingManager() {
* Initialize file merging snapshot manager for each job according configurations when {@link
* org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
*/
public @Nullable FileMergingSnapshotManager fileMergingSnapshotManagerForJob(
public @Nullable FileMergingSnapshotManager fileMergingSnapshotManagerForTask(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about renaming it to fileMergingSnapshotManagerForAttempt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer the Task, since this manager will be used to create a TaskStateManager which will be given to a Task. The Task is more in line with the caller’s cognition.

@@ -53,7 +57,8 @@ public class TaskExecutorFileMergingManager {
* manager(executor).
*/
@GuardedBy("lock")
private final Map<JobID, FileMergingSnapshotManager> fileMergingSnapshotManagerByJobId;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if using <JobID, ExecutionAttemptID> directly as the key is enough? to avoid nesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I suggest no. We still need to call releaseMergingSnapshotManagerForJob in releaseJobResources as a safety guard, where we use the job id to retrieve a manager.

Copy link
Contributor

@fredia fredia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, overall LGTM now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants