Skip to content
This repository has been archived by the owner on May 27, 2022. It is now read-only.

Infinity loop if any message logged by kafka client library when kafka server is down. #36

Open
xiangyihong opened this issue Nov 20, 2016 · 6 comments

Comments

@xiangyihong
Copy link

xiangyihong commented Nov 20, 2016

For any logs from kafka client library, the strategy is to defer these messages and drained them before the next log.

    @Override
    public void doAppend(E e) {
        ensureDeferredAppends();
        if (e instanceof ILoggingEvent && ((ILoggingEvent)e).getLoggerName().startsWith(KAFKA_LOGGER_PREFIX)) {
            deferAppend(e);
        } else {
            super.doAppend(e);
        }
    }
private void deferAppend(E event) {
        queue.add(event);
    }

However, if kafka server is down, this may lead to an infinity loop, especially when the log level is low(trace, debug, or even info).

production code use kafka appender to log --> kafka appender sends message to kafka client library --> cannot connect to kafka server, kafka client library log messages --> kafka appender gets these messages, try to send them to kafka client library --> ...

Would there be any fix for this issue? How about we eat logs from kafka client library. Users have to configure another logger for org.apache.kafka.clients

@danielwegener
Copy link
Owner

danielwegener commented Nov 20, 2016

IIRC the process was like this:

KafkaAppender.doAppend () -> UnsynchronizedAppenderBase.doAppend 
  -> KafkaAppender.append -> deliveryStrategy.send

When kafka really is unavailable, the deliveryStrategy should eventually send the message to the fallback appenders (see https://github.com/danielwegener/logback-kafka-appender#fallback-appender).

The basic mechanic is: The KafkaAppender.doAppend enqueues all kafka messages until a non-kafka message comes to free them. I do not see a real infinity loop (since every message is either directly sent to UnsynchronizedAppenderBase.doAppend or delayed via the queue - but I agree the flow is not super obvious and could be clearer code wise). The only problem I'd see is that if there are only kafka logs coming in, the queue would grow indefinitely. I think we should rather replace the queue with a concurrent ring-buffer like structure with configurable max size that evicts old entries rather then consume up the whole heap.

The reasoning behind this stunt is #1 and #11 (logbacks bootstrap).

I hope this clarifies a bit how it - at least- is intended to work :)

@xiangyihong
Copy link
Author

xiangyihong commented Nov 21, 2016

Hi @danielwegener, my reproducible environment is: add kafkaAppender to root logger and set this logger's level to trace.

If I shutdown kafka server before starting this Spring Boot application, this application would never be initialized but having the following log messages again and again and again (from another file appender).

In my production environment, I set the root logger level to error. This works fine when kafka is not available.

By the way, thank you very much for this project. It really speeds up my development.

2016-11-21 12:00:00.692 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Connection with /127.0.0.1 disconnected
java.net.ConnectException: Connection refused: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
	at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
	at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:79)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
	at java.lang.Thread.run(Unknown Source)
2016-11-21 12:00:00.692 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Connection with /127.0.0.1 disconnected
java.net.ConnectException: Connection refused: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
	at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
	at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:79)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
	at java.lang.Thread.run(Unknown Source)
2016-11-21 12:00:00.693 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Node -1 disconnected.
2016-11-21 12:00:00.792 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Initialize connection to node -1 for sending metadata request
2016-11-21 12:00:00.693 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Node -1 disconnected.
2016-11-21 12:00:00.792 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Initiating connection to node -1 at 127.0.0.1:9092.
2016-11-21 12:00:00.792 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Initialize connection to node -1 for sending metadata request
2016-11-21 12:00:01.666 [HOST][SERVER][PID:11016][THR:http-nio-20100-ClientPoller-0][DEBUG] timeout completed: keys processed=0; now=1479700801666; nextExpiration=1479700801665; keyCount=0; hasEvents=false; eval=false
2016-11-21 12:00:00.792 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Initiating connection to node -1 at 127.0.0.1:9092.
2016-11-21 12:00:01.666 [HOST][SERVER][PID:11016][THR:http-nio-20100-ClientPoller-0][DEBUG] timeout completed: keys processed=0; now=1479700801666; nextExpiration=1479700801665; keyCount=0; hasEvents=false; eval=false
2016-11-21 12:00:01.674 [HOST][SERVER][PID:11016][THR:http-nio-20100-ClientPoller-1][DEBUG] timeout completed: keys processed=0; now=1479700801674; nextExpiration=1479700801673; keyCount=0; hasEvents=false; eval=false
2016-11-21 12:00:01.674 [HOST][SERVER][PID:11016][THR:http-nio-20100-ClientPoller-1][DEBUG] timeout completed: keys processed=0; now=1479700801674; nextExpiration=1479700801673; keyCount=0; hasEvents=false; eval=false
2016-11-21 12:00:01.793 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Connection with /127.0.0.1 disconnected
java.net.ConnectException: Connection refused: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
	at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
	at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:79)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
	at java.lang.Thread.run(Unknown Source)
2016-11-21 12:00:01.793 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Connection with /127.0.0.1 disconnected
java.net.ConnectException: Connection refused: no further information
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
	at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:54)
	at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:79)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:274)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
	at java.lang.Thread.run(Unknown Source)
2016-11-21 12:00:01.794 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Node -1 disconnected.
2016-11-21 12:00:01.816 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Give up sending metadata request since no node is available
2016-11-21 12:00:01.794 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Node -1 disconnected.
2016-11-21 12:00:01.916 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Initialize connection to node -1 for sending metadata request
2016-11-21 12:00:01.816 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Give up sending metadata request since no node is available
2016-11-21 12:00:01.916 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Initiating connection to node -1 at 127.0.0.1:9092.
2016-11-21 12:00:01.916 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Initialize connection to node -1 for sending metadata request
2016-11-21 12:00:02.666 [HOST][SERVER][PID:11016][THR:http-nio-20100-ClientPoller-0][DEBUG] timeout completed: keys processed=0; now=1479700802666; nextExpiration=1479700802666; keyCount=0; hasEvents=false; eval=false
2016-11-21 12:00:01.916 [HOST][SERVER][PID:11016][THR:kafka-producer-network-thread | producer-2][DEBUG] Initiating connection to node -1 at 127.0.0.1:9092.
2016-11-21 12:00:02.666 [HOST][SERVER][PID:11016][THR:http-nio-20100-ClientPoller-0][DEBUG] timeout completed: keys processed=0; now=1479700802666; nextExpiration=1479700802666; keyCount=0; hasEvents=false; eval=false
2016-11-21 12:00:02.674 [HOST][SERVER][PID:11016][THR:http-nio-20100-ClientPoller-1][DEBUG] timeout completed: keys processed=0; now=1479700802674; nextExpiration=1479700802674; keyCount=0; hasEvents=false; eval=false
2016-11-21 12:00:02.674 [HOST][SERVER][PID:11016][THR:http-nio-20100-ClientPoller-1][DEBUG] timeout completed: keys processed=0; now=1479700802674; nextExpiration=1479700802674; keyCount=0; hasEvents=false; eval=false

@aerskine
Copy link
Contributor

@xiangyihong would you be so kind as to check against the code in #39 ? This kind of thing is very difficult to test, even with the embedded integration tests for example

@aerskine
Copy link
Contributor

@danielwegener at the very least my previous contribution was missing the metrics logger prefix which was used at trace level.

I was also concerned if synchronize block in the lazy producer could interleave with locking in the Kafka Sender. Moving the code further into the append method is my attempt to allay this fear.

To test I used a version of LogbackIntegrationIT with a local Kafka instance (not the embedded one) at trace level.

@aerskine
Copy link
Contributor

I've also just noticed @yangqiju's PR :)

@seasuresh
Copy link

seasuresh commented Sep 29, 2017

@danielwegener @aerskine - I read about this issue and it appears this issue was resolved. Can you please provide me the steps to resolve it.
I'm using Spring boot and danielwegener kafka appender to publish application logs to Kafka but the spring boot application is not starting when Kafka brokers are down.

Please see my log config

   <root level="INFO">
            <appender-ref ref="kafkaLogAppenderAsync"/>
    </root>
    <appender name="kafkaLogAppenderAsync" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="logAppender"/>
    </appender>
 <appender name="logAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
        <encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>concise-local - %msg</pattern>
                <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            </layout>
        </encoder>
        <topic>app-logs</topic>
        <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy"/>
        <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
        <producerConfig>bootstrap.servers=${kafka.bootstrap.servers}</producerConfig>
        <producerConfig>acks=0</producerConfig>
        <producerConfig>compression.type=gzip</producerConfig>
        <producerConfig>retries=2</producerConfig>
        <producerConfig>request.timeout.ms=10</producerConfig>
        <producerConfig>max.block.ms=800</producerConfig>
        <producerConfig>metadata.fetch.timeout.ms=10</producerConfig>
        <appender-ref ref="CONSOLE"/>
    </appender>

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants