diff --git a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java index 5e9dcf27ad3..ad2ec83f1f6 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java +++ b/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java @@ -15,6 +15,8 @@ import io.vertx.core.eventbus.*; import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.logging.Logger; +import io.vertx.core.impl.logging.LoggerFactory; import io.vertx.core.impl.utils.ConcurrentCyclicSequence; import io.vertx.core.spi.metrics.EventBusMetrics; import io.vertx.core.spi.metrics.MetricsProvider; @@ -22,10 +24,15 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; @@ -40,6 +47,7 @@ public class EventBusImpl implements EventBusInternal, MetricsProvider { private static final AtomicReferenceFieldUpdater OUTBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventBusImpl.class, Handler[].class, "outboundInterceptors"); private static final AtomicReferenceFieldUpdater INBOUND_INTERCEPTORS_UPDATER = AtomicReferenceFieldUpdater.newUpdater(EventBusImpl.class, Handler[].class, "inboundInterceptors"); + static final Logger logger = LoggerFactory.getLogger(EventBusImpl.class); private volatile Handler[] outboundInterceptors = new Handler[0]; private volatile Handler[] inboundInterceptors = new Handler[0]; private final AtomicLong replySequence = new AtomicLong(0); @@ -353,17 +361,46 @@ protected boolean isMessageLocal(MessageImpl msg) { protected ReplyException deliverMessageLocally(MessageImpl msg) { ConcurrentCyclicSequence handlers = handlerMap.get(msg.address()); boolean messageLocal = isMessageLocal(msg); + boolean findingHandlerFailed = true; if (handlers != null) { if (msg.isSend()) { //Choose one - HandlerHolder holder = nextHandler(handlers, messageLocal); + HandlerHolder holder = nextHandler(handlers, messageLocal, null); if (metrics != null) { metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, holder != null ? 1 : 0); } - if (holder != null) { - holder.handler.receive(msg.copyBeforeReceive()); - } else { - // RACY issue !!!!! + /* + In case the handler isn't able to enqueue the operation, we will try until we have exhausted all the handlers + before failing hard. + */ + Set blacklistedHandlers = null; + while(true) { + if (holder != null) { + try { + holder.handler.receive(msg.copyBeforeReceive()); + findingHandlerFailed = false; + } catch (RejectedExecutionException e) { + if(blacklistedHandlers == null) { + blacklistedHandlers = new HashSet<>(); + } + blacklistedHandlers.add(holder); + holder = nextHandler(handlers, messageLocal, blacklistedHandlers); + if(holder != null) { + if(logger.isDebugEnabled()) { + logger.debug(String.format("Failed to enqueue message onto handler during send, will try another handler. Address: %s", msg.address()), e); + } + continue; + } + else { + if(logger.isDebugEnabled()) { + logger.debug(String.format("Failed to enqueue message onto handler during send, no other handler found. Address: %s", msg.address()), e); + } + } + } + } else { + // RACY issue !!!!! + } + break; } } else { // Publish @@ -372,21 +409,43 @@ protected ReplyException deliverMessageLocally(MessageImpl msg) { } for (HandlerHolder holder: handlers) { if (messageLocal || !holder.isLocalOnly()) { - holder.handler.receive(msg.copyBeforeReceive()); + try { + holder.handler.receive(msg.copyBeforeReceive()); + findingHandlerFailed = false; + } catch (RejectedExecutionException e) { + if(logger.isDebugEnabled()) { + logger.debug(String.format("Failed to enqueue message onto handler during publish. Address: %s", msg.address()), e); + } + } } } } - return null; - } else { + } + if (findingHandlerFailed) { if (metrics != null) { metrics.messageReceived(msg.address(), !msg.isSend(), messageLocal, 0); } return new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address); } + return null; } - protected HandlerHolder nextHandler(ConcurrentCyclicSequence handlers, boolean messageLocal) { - return handlers.next(); + protected HandlerHolder nextHandler(ConcurrentCyclicSequence handlers, boolean messageLocal, Collection blacklistedHandlers) { + return nextHandlerMessageLocal(handlers, blacklistedHandlers); + } + + protected static HandlerHolder nextHandlerMessageLocal(ConcurrentCyclicSequence handlers, Collection blacklistedHandlers) { + if(blacklistedHandlers == null) { + return handlers.next(); + } + final Iterator iterator = handlers.iterator(); + while (iterator.hasNext()) { + final HandlerHolder handlerHolder = iterator.next(); + if(!blacklistedHandlers.contains(handlerHolder)) { + return handlerHolder; + } + } + return null; } protected void checkStarted() { diff --git a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java index 0a4ce71d87f..55c271037e9 100644 --- a/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java +++ b/src/main/java/io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.java @@ -44,6 +44,7 @@ import io.vertx.core.spi.cluster.RegistrationInfo; import io.vertx.core.spi.metrics.VertxMetrics; +import java.util.Collection; import java.util.Iterator; import java.util.Objects; import java.util.UUID; @@ -242,17 +243,19 @@ protected boolean isMessageLocal(MessageImpl msg) { } @Override - protected HandlerHolder nextHandler(ConcurrentCyclicSequence handlers, boolean messageLocal) { + protected HandlerHolder nextHandler(ConcurrentCyclicSequence handlers, boolean messageLocal, Collection blacklistedHandlers) { HandlerHolder handlerHolder = null; if (messageLocal) { - handlerHolder = handlers.next(); + handlerHolder = nextHandlerMessageLocal(handlers, blacklistedHandlers); } else { Iterator iterator = handlers.iterator(false); while (iterator.hasNext()) { HandlerHolder next = iterator.next(); if (next.isReplyHandler() || !next.isLocalOnly()) { - handlerHolder = next; - break; + if(blacklistedHandlers == null || !blacklistedHandlers.contains(next)) { + handlerHolder = next; + break; + } } } } diff --git a/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java b/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java index e4e95ddb8ba..75dab13ca34 100644 --- a/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java +++ b/src/test/java/io/vertx/core/eventbus/ClusteredEventBusTest.java @@ -12,7 +12,9 @@ package io.vertx.core.eventbus; import io.vertx.core.*; +import io.vertx.core.eventbus.impl.HandlerHolder; import io.vertx.core.impl.VertxInternal; +import io.vertx.core.impl.utils.ConcurrentCyclicSequence; import io.vertx.core.shareddata.AsyncMapTest.SomeClusterSerializableImplObject; import io.vertx.core.shareddata.AsyncMapTest.SomeClusterSerializableObject; import io.vertx.core.shareddata.AsyncMapTest.SomeSerializableObject; @@ -25,10 +27,9 @@ import org.junit.Test; import java.io.InvalidClassException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.*; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -706,4 +707,77 @@ public void testMultiHeaders() { await(); } + + @Test + public void testNextHandlerForNonLocalMessageEmptyBlacklist() throws Throwable { + testNextHandlerInternal(0, 10, false); + } + + @Test + public void testNextHandlerForNonLocalMessageNullBlacklist() throws Throwable { + testNextHandlerInternal(-1, 10, false); + } + + @Test + public void testNextHandlerForNonLocalMessageHalfBlacklisted() throws Throwable { + testNextHandlerInternal(5, 10, false); + } + + @Test + public void testNextHandlerForNonLocalMessageAllBlacklisted() throws Throwable { + testNextHandlerInternal(10, 10, false); + } + + @Test + public void testNextHandlerForLocalMessageEmptyBlacklist() throws Throwable { + testNextHandlerInternal(0, 10, true); + } + + @Test + public void testNextHandlerForLocalMessageNullBlacklist() throws Throwable { + testNextHandlerInternal(-1, 10, true); + } + + @Test + public void testNextHandlerForLocalMessageHalfBlacklisted() throws Throwable { + testNextHandlerInternal(5, 10, true); + } + + @Test + public void testNextHandlerForLocalMessageAllBlacklisted() throws Throwable { + testNextHandlerInternal(10, 10, true); + } + + private void testNextHandlerInternal(int numberOfEntriesToBlacklist, int totalNumberOfEntries, boolean localMessage) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { + int expectedIndex = numberOfEntriesToBlacklist >= 0 ? (numberOfEntriesToBlacklist >= totalNumberOfEntries ? -1 : numberOfEntriesToBlacklist) : 0; + startNodes(1); + waitFor(1); + final EventBus eventBus = vertices[0].eventBus(); + List handlerHolders = new ArrayList<>(); + for(int i = 0; i < totalNumberOfEntries; i++) { + final int handlerIndex = i; + handlerHolders.add(new HandlerHolder(null, false, false, null) { + @Override + public String toString() { + return super.toString() + " Index: " + handlerIndex; + } + + @Override + public boolean equals(Object o) { + return this == o; + } + + @Override + public int hashCode() { + return handlerIndex; + } + }); + } + List blacklist = numberOfEntriesToBlacklist >= 0 ? handlerHolders.stream().limit(numberOfEntriesToBlacklist).collect(Collectors.toList()) : null; + final ConcurrentCyclicSequence concurrentCyclicSequence = new ConcurrentCyclicSequence<>(handlerHolders.toArray(new HandlerHolder[0])); + final Method methodNextHandler = eventBus.getClass().getDeclaredMethod("nextHandler", new Class[]{ConcurrentCyclicSequence.class, Boolean.TYPE, Collection.class}); + methodNextHandler.setAccessible(true); + final HandlerHolder selectedHandleHolder = (HandlerHolder) methodNextHandler.invoke(eventBus, concurrentCyclicSequence, localMessage, blacklist); + assertSame(expectedIndex >= 0 ? handlerHolders.get(expectedIndex) : null, selectedHandleHolder); + } } diff --git a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java index 12377d6d277..7b6d31b8228 100644 --- a/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java +++ b/src/test/java/io/vertx/core/eventbus/LocalEventBusTest.java @@ -12,6 +12,7 @@ package io.vertx.core.eventbus; import io.vertx.core.*; +import io.vertx.core.eventbus.impl.EventBusImpl; import io.vertx.core.eventbus.impl.EventBusInternal; import io.vertx.core.eventbus.impl.MessageConsumerImpl; import io.vertx.core.impl.ConcurrentHashSet; @@ -21,9 +22,11 @@ import io.vertx.test.core.TestUtils; import org.junit.Test; +import java.lang.reflect.*; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -176,6 +179,132 @@ public void testRegisterLocal2() { await(); } + @Test + public void testHandlerWithContextRejectingEnqueueingWorkSendSucceedsAtFirstAttempt() throws Throwable { + testHandlerWithContextRejectingEnqueueingWorkInternal(true, true, false, false); + } + + @Test + public void testHandlerWithContextRejectingEnqueueingWorkSendSucceedsAfterHalfHandlerAttempts() throws Throwable { + testHandlerWithContextRejectingEnqueueingWorkInternal(true, false, false, false); + } + + @Test + public void testHandlerWithContextRejectingEnqueueingWorkSendSucceedsAtLastAttempt() throws Throwable { + testHandlerWithContextRejectingEnqueueingWorkInternal(true, false, true, false); + } + + @Test + public void testHandlerWithContextRejectingEnqueueingWorkSendAllAttemptsFails() throws Throwable { + testHandlerWithContextRejectingEnqueueingWorkInternal(false, false, false, false); + } + + @Test + public void testHandlerWithContextRejectingEnqueueingWorkPublishSucceedsForAllHandlerAttempts() throws Throwable { + testHandlerWithContextRejectingEnqueueingWorkInternal(true, true, false, true); + } + + @Test + public void testHandlerWithContextRejectingEnqueueingWorkPublishSucceedsForHalfHandlerAttempts() throws Throwable { + testHandlerWithContextRejectingEnqueueingWorkInternal(true, false, false, true); + } + + @Test + public void testHandlerWithContextRejectingEnqueueingWorkPublishSucceedsForSingleHandlerAttempts() throws Throwable { + testHandlerWithContextRejectingEnqueueingWorkInternal(true, false, true, true); + } + + private void testHandlerWithContextRejectingEnqueueingWorkInternal(boolean shouldSucceed, boolean succeedOnFirstAttempt, boolean succeedOnLastAttempt, boolean usePublish) throws Throwable { + final ContextInternal realContextInternal = (ContextInternal) this.vertx.getOrCreateContext(); + ClassLoader classLoader = realContextInternal.classLoader(); + + final String str = TestUtils.randomUnicodeString(100); + + final List> handlersSetup = new ArrayList<>(); + final AtomicInteger executorCalled = new AtomicInteger(); + final int handlerCount = 10; + final int executorFailUntilCalls = + shouldSucceed + ? ( + succeedOnFirstAttempt + ? 0 + : ( + succeedOnLastAttempt + ? handlerCount - 1 + : handlerCount / 2)) + : handlerCount; + final AtomicInteger handlerCalled = new AtomicInteger(); + + final int expectedPublishHandlerCalls = handlerCount - executorFailUntilCalls; + + final CountDownLatch receivedMessageLatch = new CountDownLatch(expectedPublishHandlerCalls); + + for(int i=0; i < handlerCount; i++) { + + ContextInternal contextInternal = (ContextInternal) Proxy.newProxyInstance(classLoader, new Class[]{ContextInternal.class}, new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + if(method.getName().equals("executor")) { + final int executeCallCount = executorCalled.getAndIncrement(); + if(executeCallCount < executorFailUntilCalls) { + throw new RejectedExecutionException(); + } + } + return method.invoke(realContextInternal, args); + } + }); + + final Constructor messageConsumerImplConstructor = MessageConsumerImpl.class.getDeclaredConstructor(Vertx.class, ContextInternal.class, EventBusImpl.class, String.class, Boolean.TYPE); + messageConsumerImplConstructor.setAccessible(true); + MessageConsumerImpl messageConsumerImpl = (MessageConsumerImpl) messageConsumerImplConstructor.newInstance(this.vertx, contextInternal, (EventBusImpl) this.vertx.eventBus(), ADDRESS1, true); + + final Promise handlerSetup = Promise.promise(); + handlersSetup.add(handlerSetup.future()); + + messageConsumerImpl.handler((Message msg) -> { + handlerCalled.incrementAndGet(); + assertEquals(str, msg.body()); + if(!usePublish) { + msg.reply("foo"); + } + receivedMessageLatch.countDown(); + }).completionHandler(ar -> { + assertTrue(ar.succeeded()); + handlerSetup.complete(); + }); + } + Future.all(handlersSetup).onComplete(result -> { + assertTrue(result.succeeded()); + if(usePublish) { + eb.publish(ADDRESS1, str); + } + else { + if (shouldSucceed) { + eb.request(ADDRESS1, str).onComplete(responseResult -> { + assertTrue(responseResult.succeeded()); + assertEquals("Assuming that executor should have failed a certain number of times", executorFailUntilCalls, executorCalled.get() - 1); + assertEquals(1, handlerCalled.get()); + testComplete(); + }); + } else { + eb.request(ADDRESS1, str).onComplete(responseResult -> { + assertFalse(responseResult.succeeded()); + assertEquals(ReplyException.class, responseResult.cause().getClass()); + assertEquals("Assuming that executor should have failed a certain number of times", handlerCount, executorCalled.get()); + assertEquals(0, handlerCalled.get()); + testComplete(); + }); + } + } + }); + if(usePublish) { + receivedMessageLatch.await(2, TimeUnit.MINUTES); + assertEquals(expectedPublishHandlerCalls, handlerCalled.get()); + testComplete(); + } + await(); + } + @Test public void testRegisterWithCompletionHandler() { String str = TestUtils.randomUnicodeString(100);