Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sender does not close / manager resource management channel when declaring objects #183

Open
a701440 opened this issue Dec 15, 2023 · 3 comments

Comments

@a701440
Copy link

a701440 commented Dec 15, 2023

Sender class uses separate resource management channel mono that supplies a channel for declaration operations.
However unlike the "send" operation declaration operations do not close/handle closing of the channel.

This promotes giving the same cached channel for all resource management operations.
However it seems like channels can not be used by multiple threads concurrently, so that does not work well.
See: rabbitmq/rabbitmq-java-client#1194

   return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc(declare);
            } catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage(future))
            .flatMap(command -> Mono.just((AMQP.Queue.DeclareOk) command.getMethod()))
            .publishOn(resourceManagementScheduler);

This will request new channel from the channelMono and then not close it and also not give it to some close-handler.
In send operations we have:

.doFinally(st -> channelCloseHandler.accept(st, channel))
@acogoluegnes
Copy link
Contributor

The channel to deal with resources is used during the life of the Sender instance. You can provide your own channel mono in the SenderOptions. You can also provide a dedicated instance with the ResourceManagementOptions parameter of each resource-related method.

If you hit concurrency issues with resource management, you can provide your own instance of scheduler with the SenderOptions#resourceManagementScheduler.

@a701440
Copy link
Author

a701440 commented Dec 18, 2023

I understand that. It's just the patterns used with the primary channelMono and ResourceManagement channelMono are different in the Sender class. Primary channelMono supports ChannelPool and returns channels to the pool using the close handler of the pool. Resource management channel mono does not support that and it would be nice if this was consistent, especially given that channels are not thread-safe.

I actually ended-up creating a work-around hack to return the channels used in resource management Mono to the channel
pool using Reactor subscribe context and doFinally operator (getAutoReturnedChannelMono):

    public Mono<? extends Channel> getAutoReturnedChannelMono() {
        return getChannelMono().transformDeferredContextual(this::autoReturnChannel).contextWrite(this::createAutoReturnContext);
    }

    private Mono<? extends Channel> autoReturnChannel(Mono<? extends Channel> channelMono, ContextView context) {
        AtomicReference<Channel> ref = context.get(AtomicReference.class);
        return channelMono.doOnNext(ref::set).doFinally(signal -> getChannelCloseHandler().accept(signal, ref.get()));
    }

    private Context createAutoReturnContext(Context context) {
        return context.put(AtomicReference.class, new AtomicReference<Channel>());
    }
...

Related to this LazyChannelPool class could also use some improvement.
When dispensing channels it does not check that channel belongs to the same connection that is currently in use, may be should also check if the channel is still open. Also when dispensing channels it would be nice to have a "loop" and drain "bad" channels from the pool, so that they can be garbage collected.

@acogoluegnes
Copy link
Contributor

Feel free to provide a PR with backward-compatible changes. There may be unfortunate choices in the current implementation but we have to stick with them until the next release that allows breaking changes (minor releases for Reactor projects IIRC).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants