Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread interruption flag leaks from BoundedElasticThreadPerTaskScheduler thread into user code #3948

Open
petkov opened this issue Dec 4, 2024 · 1 comment
Labels
type/bug A general bug
Milestone

Comments

@petkov
Copy link

petkov commented Dec 4, 2024

When virtual threads are enabled with reactor.schedulers.defaultBoundedElasticOnVirtualThreads property, BoundedElasticThreadPerTaskScheduler thread leaks interrupted signal into user code. Below is the test that shows that:

    @Test
    void shouldOnErrorResumeLambdaBeExecutedInsideUninterruptedThread() {
        System.setProperty("reactor.schedulers.defaultBoundedElasticOnVirtualThreads", "true");
        Mono.fromCallable(() -> {
                throw new RuntimeException("$ERROR$");
            })
            .subscribeOn(Schedulers.boundedElastic())
            .onErrorResume(e -> {
                Assertions.assertFalse(Thread.interrupted()); // The test fails here
                return Mono.empty();
            })
            .block();
    }

Expected Behavior

The lambda which is called by onErrorResume should be run on a thread that is not interrupted.

Actual Behavior

The lambda which is called by onErrorResume is run on an interrupted Thread.

Reactor Core version: 3.6.6

Notes

Bellow test passes if an exception is thrown inside map function:

    @Test
    void shouldOnErrorResumeLambdaBeExecutedInsideUninterruptedThread() {
        System.setProperty("reactor.schedulers.defaultBoundedElasticOnVirtualThreads", "true");
        Mono.fromCallable(() -> {
                return "NOT_EMPTY";
            })
            .map(s -> {
                throw new RuntimeException("$ERROR$");
            })
            .subscribeOn(Schedulers.boundedElastic())
            .onErrorResume(e -> {
                Assertions.assertFalse(Thread.interrupted()); // The test fails here
                return Mono.empty();
            })
            .block();
    }
@chemicL chemicL added the type/bug A general bug label Dec 10, 2024
@chemicL chemicL added this to the 3.6.14 milestone Dec 10, 2024
@chemicL
Copy link
Member

chemicL commented Dec 10, 2024

Thank you for the report. It indeed looks like a bug, I'll try to review the implementation and figure out what to do.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

2 participants