Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPCs are executed one at a time, blocking on a mutex #174

Open
chibenwa opened this issue May 17, 2022 · 2 comments
Open

RPCs are executed one at a time, blocking on a mutex #174

chibenwa opened this issue May 17, 2022 · 2 comments

Comments

@chibenwa
Copy link
Contributor

Summary

apache/james-project#1003

Calling Sender::unbind does wait a mutex release if a RPC is currently executed on the current channel, and waits this RPC to complete before submitting a new one.

This leads to a blocking behaviour upon "massive" unbinds.

Expected Behavior

I would expect to not get such blocking behaviours in a reactor-* project and would expect a reactive driver not to force me to put subscribeOn(boundedElastic()) everywhere.

At the very least we could have a separate method in ChannelPool to get a channel not executing RPCs thus enabling smart implementation to get channels always in a state to directly submit RPCs, open new channels if needed or waits without blocking threads if relevant.

Actual Behavior

See this flame graph taken on the Netty event loop:

Screenshot from 2022-05-17 18-07-46

Context: a CTRL+C in a perf test lead to 10.000 IMAP connections being closed at the same time, cleaning up

Steps to Reproduce

        @Test
        void blockingRPCs(DockerRabbitMQ rabbitMQ) throws Exception {
            BlockHound.install();
            SenderOptions senderOptions =  new SenderOptions()
                .connectionFactory(connectionFactory)
                .resourceManagementScheduler(Schedulers.boundedElastic());

            final Sender sender = new Sender(senderOptions);

            sender.declare(QueueSpecification.queue().name("queue")).block();
            sender.declare(ExchangeSpecification.exchange().name("ex")).block();

            Flux.range(0, 100)
                .parallel()
                .flatMap(i -> {
                    return sender.bind(BindingSpecification.binding().exchange("ex").queue("queue").routingKey("" + i))
                        .then(sender.unbind(BindingSpecification.binding().exchange("ex").queue("queue").routingKey("routing" + i)));
                })
                .then()
                .subscribeOn(Schedulers.parallel())
                .block();
        }

Fails with

reactor.core.Exceptions$ReactiveException: reactor.blockhound.BlockingOperationError: Blocking call! java.net.SocketOutputStream#socketWrite0
[...]
Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.net.SocketOutputStream#socketWrite0
	at java.base/java.net.SocketOutputStream.socketWrite0(SocketOutputStream.java)
	at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
	at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
	at java.base/java.io.DataOutputStream.flush(DataOutputStream.java:123)
	at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:197)
	at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:636)
	at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:134)
	at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:455)
	at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:434)
	at com.rabbitmq.client.impl.AMQChannel.quiescingAsyncRpc(AMQChannel.java:369)
	at com.rabbitmq.client.impl.AMQChannel.asyncRpc(AMQChannel.java:360)
	at com.rabbitmq.client.impl.AMQChannel.privateAsyncRpc(AMQChannel.java:320)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingAsyncRpc(AMQChannel.java:155)
	at com.rabbitmq.client.impl.ChannelN.asyncCompletableRpc(ChannelN.java:1580)
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.asyncCompletableRpc(AutorecoveringChannel.java:931)
	at reactor.rabbitmq.ChannelProxy.asyncCompletableRpc(ChannelProxy.java:556)
	at reactor.rabbitmq.Sender.lambda$bind$28(Sender.java:691)
[...]

Your Environment

  • Reactor version(s) used: 2020.0.18
  • JAVA 11
  • Ubuntu 20.04
@acogoluegnes
Copy link
Contributor

The underlying Java client uses blocking IO by default. You can switch to NIO with ConnectionFactory#useNio() and this should make the Blockhound failure from above disappear. See the documentation for more information.

The Java client still has to use some locking to serialize RPC calls like unbind as the AMQP 0.9.1 protocol does not use a correlation ID for requests, so there can be only one RPC call at a time on a channel in practice (this is what most client libraries do AFAIK).

You're welcome to investigate further with NIO enabled to see where things could be made more optimized. The one-RPC-call-at-a-time limitation will always be there but there may be ways to mitigate the issue.

@chibenwa
Copy link
Contributor Author

chibenwa commented May 23, 2022

You're welcome to investigate further with NIO enabled to see where things could be made more optimized.

Fair. Modifying the code exemple to:

        @Test
        void blockingRPCs(DockerRabbitMQ rabbitMQ) throws Exception {
            BlockHound.install();
            final ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
            connectionFactory.useNio();
            SenderOptions senderOptions =  new SenderOptions()
                .connectionFactory(connectionFactory)
                .resourceManagementScheduler(Schedulers.boundedElastic());

            final Sender sender = new Sender(senderOptions);

            sender.declare(QueueSpecification.queue().name("queue")).block();
            sender.declare(ExchangeSpecification.exchange().name("ex")).block();

            Flux.range(0, 100)
                .parallel()
                .flatMap(i -> {
                    return sender.bind(BindingSpecification.binding().exchange("ex").queue("queue").routingKey("routing" + i))
                        .then(sender.unbind(BindingSpecification.binding().exchange("ex").queue("queue").routingKey("routing" + i)));
                })
                .then()
                .subscribeOn(Schedulers.parallel())
                .block();
        }

Turns into the expected stacktrace:

reactor.core.Exceptions$ReactiveException: reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Object#wait

	at reactor.core.Exceptions.propagate(Exceptions.java:392)
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:97)
	at reactor.core.publisher.Mono.block(Mono.java:1707)
	at org.apache.james.backends.rabbitmq.RabbitMQTest$FourConnections.blockingRPCs(RabbitMQTest.java:219)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Object#wait
	at java.base/java.lang.Object.wait(Object.java)
	at java.base/java.lang.Object.wait(Object.java:328)
	at com.rabbitmq.client.impl.AMQChannel.doEnqueueRpc(AMQChannel.java:220)
	at com.rabbitmq.client.impl.AMQChannel.enqueueAsyncRpc(AMQChannel.java:212)
	at com.rabbitmq.client.impl.AMQChannel.quiescingAsyncRpc(AMQChannel.java:368)
	at com.rabbitmq.client.impl.AMQChannel.asyncRpc(AMQChannel.java:360)
	at com.rabbitmq.client.impl.AMQChannel.privateAsyncRpc(AMQChannel.java:320)
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingAsyncRpc(AMQChannel.java:155)

See the documentation for more information.

The documentation do not mention nio will prevent some blocking calls and merely states:

Use the NIO mode if your Java process uses many connections (dozens or hundreds). You should use fewer threads than with the default blocking mode. With the appropriate number of threads set, you shouldn't experience any decrease in performance, especially if the connections are not so busy.

Which, running a single connection do not look appealing to me!

I suggest updating the documentation and clerly mention the costs of not using NIO regarding blocking call...

The one-RPC-call-at-a-time limitation will always be there but there may be ways to mitigate the issue.

Yes, I propose above refining the channel pool API.

Using another dedicated channel pool upon bind, unbind, where return of the pools to the channels are piggy backed on RPC completion could work too.

By the way, at the very least this limitation needs to be documented.

chibenwa added a commit to chibenwa/james-project that referenced this issue May 23, 2022
As demonstrated in reactor/reactor-rabbitmq#174 (comment)
use of NIO can prevent some blocking calls.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants