Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing kafka collector #3244

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docker/examples/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ services:
- MYSQL_HOST=mysql
# Uncomment to enable self-tracing
# - SELF_TRACING_ENABLED=true
# Uncomment to increase heap size
# - JAVA_OPTS=-Xms128m -Xmx128m -XX:+ExitOnOutOfMemoryError
# Uncomment to enable debug logging
# - JAVA_OPTS=-Dlogging.level.zipkin2=DEBUG
ports:
# Port used for the Zipkin UI and HTTP Api
- 9411:9411
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
* Copyright 2015-2020 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -21,7 +21,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
Expand Down Expand Up @@ -65,6 +69,7 @@ public static final class Builder extends CollectorComponent.Builder {
CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;
String topic = "zipkin";
int streams = 1;
Function<Properties, Consumer<byte[], byte[]>> consumerSupplier = KafkaConsumer::new;

@Override
public Builder storage(StorageComponent storage) {
Expand Down Expand Up @@ -116,6 +121,11 @@ public Builder streams(int streams) {
return this;
}

public Builder consumerSupplier(Function<Properties, Consumer<byte[], byte[]>> consumerSupplier) {
this.consumerSupplier = consumerSupplier;
return this;
}

/**
* By default, a consumer will be built from properties derived from builder defaults, as well
* as "auto.offset.reset" -> "earliest". Any properties set here will override the consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +50,7 @@ public void onSuccess(Void value) {}
public void onError(Throwable t) {}
};

final Function<Properties, Consumer<byte[], byte[]>> consumerSupplier;
final Properties properties;
final List<String> topics;
final Collector collector;
Expand All @@ -58,6 +61,7 @@ public void onError(Throwable t) {}
final AtomicBoolean running = new AtomicBoolean(true);

KafkaCollectorWorker(KafkaCollector.Builder builder) {
consumerSupplier = builder.consumerSupplier;
properties = builder.properties;
topics = Arrays.asList(builder.topic.split(","));
collector = builder.delegate.build();
Expand All @@ -66,7 +70,7 @@ public void onError(Throwable t) {}

@Override
public void run() {
try (KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(properties)) {
try (Consumer<byte[], byte[]> kafkaConsumer = consumerSupplier.apply(properties)) {
kafkaConsumer.subscribe(
topics,
// added for integration tests only, see ITKafkaCollector
Expand All @@ -85,7 +89,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
});
LOG.debug("Kafka consumer starting polling loop.");
while (running.get()) {
final ConsumerRecords<byte[], byte[]> consumerRecords = kafkaConsumer.poll(Duration.of(1000, ChronoUnit.MILLIS));
final ConsumerRecords<byte[], byte[]> consumerRecords =
kafkaConsumer.poll(Duration.of(1000, ChronoUnit.MILLIS));
LOG.debug("Kafka polling returned batch of {} messages.", consumerRecords.count());
for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
final byte[] bytes = record.value();
Expand Down
11 changes: 11 additions & 0 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@
<!-- <version>0.10.3</version>-->
<!-- <optional>true</optional>-->
<!-- </dependency>-->
<dependency>
<groupId>io.zipkin.brave.cassandra</groupId>
<artifactId>brave-instrumentation-cassandra-driver</artifactId>
<version>0.10.3</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-kafka-clients</artifactId>
<optional>true</optional>
</dependency>

<!-- Test dependencies -->
<!-- to test the experimental grpc endpoint with the square/wire library -->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
* Copyright 2015-2020 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -60,7 +60,7 @@ public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeM
String selfTracingEnabled = context.getEnvironment()
.getProperty("zipkin.self-tracing.enabled");

if (!Boolean.valueOf(selfTracingEnabled)) {
if (!Boolean.parseBoolean(selfTracingEnabled)) {
return ConditionOutcome.noMatch("zipkin.self-tracing.enabled isn't true");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import brave.Tracing;
import brave.context.slf4j.MDCScopeDecorator;
import brave.http.HttpTracing;
import brave.kafka.clients.KafkaTracing;
import brave.messaging.MessagingTracing;
import brave.propagation.B3Propagation;
import brave.propagation.CurrentTraceContext;
import brave.propagation.ThreadLocalSpan;
Expand Down Expand Up @@ -108,6 +110,7 @@ public class ZipkinSelfTracingConfiguration {
// Reduce the impact on untraced downstream http services such as Elasticsearch
.propagationFactory(B3Propagation.newFactoryBuilder()
.injectFormat(brave.Span.Kind.CLIENT, B3Propagation.Format.SINGLE)
.injectFormat(brave.Span.Kind.CONSUMER, B3Propagation.Format.SINGLE_NO_PARENT)
.build())
.addSpanHandler(zipkinSpanHandler)
.build();
Expand All @@ -127,6 +130,14 @@ public class ZipkinSelfTracingConfiguration {
.build();
}

@Bean KafkaTracing kafkaTracing(Tracing tracing) {
final MessagingTracing messagingTracing = MessagingTracing.newBuilder(tracing)
.build();
return KafkaTracing.newBuilder(messagingTracing)
.remoteServiceName("zipkin-kafka")
.build();
}

@Bean ArmeriaServerConfigurator tracingConfigurator(HttpTracing tracing) {
return server -> server.decorator(BraveService.newDecorator(tracing));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
* Copyright 2015-2020 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,6 +13,13 @@
*/
package zipkin2.server.internal.kafka;

import brave.kafka.clients.KafkaTracing;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -23,6 +30,7 @@
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.CollectorSampler;
import zipkin2.collector.kafka.KafkaCollector;
import zipkin2.server.internal.ConditionalOnSelfTracing;
import zipkin2.storage.StorageComponent;

/**
Expand All @@ -33,15 +41,34 @@
@Conditional(ZipkinKafkaCollectorConfiguration.KafkaBootstrapServersSet.class)
@EnableConfigurationProperties(ZipkinKafkaCollectorProperties.class)
public class ZipkinKafkaCollectorConfiguration { // makes simple type name unique for /actuator/conditions
static final String QUALIFIER = "zipkinKafka";

@Bean(initMethod = "start")
KafkaCollector kafka(
ZipkinKafkaCollectorProperties properties,
CollectorSampler sampler,
CollectorMetrics metrics,
StorageComponent storage) {
return properties.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build();
@Bean(initMethod = "start") KafkaCollector kafka(
ZipkinKafkaCollectorProperties properties,
CollectorSampler sampler,
CollectorMetrics metrics,
StorageComponent storage,
java.util.function.Consumer<KafkaCollector.Builder> kafkaTracing) {
final KafkaCollector.Builder builder = properties.toBuilder()
.sampler(sampler)
.metrics(metrics)
.storage(storage);
kafkaTracing.accept(builder);
return builder.build();
}

@Bean @Qualifier(QUALIFIER) @ConditionalOnSelfTracing
java.util.function.Consumer<KafkaCollector.Builder> consumerSupplier(
Optional<KafkaTracing> maybeKafkaTracing
) {
return builder ->
builder.consumerSupplier(
maybeKafkaTracing
.<Function<Properties, Consumer<byte[], byte[]>>>
map(kafkaTracing -> props -> kafkaTracing.consumer(new KafkaConsumer<>(props)))
.orElseGet(() -> KafkaConsumer::new));
}

/**
* This condition passes when {@link ZipkinKafkaCollectorProperties#getBootstrapServers()} is set
* to non-empty.
Expand All @@ -65,7 +92,7 @@ private static boolean isEmpty(String s) {
return s == null || s.isEmpty();
}

private static boolean notFalse(String s){
private static boolean notFalse(String s) {
return s == null || !s.equals("false");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 The OpenZipkin Authors
* Copyright 2015-2020 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,16 +13,22 @@
*/
package zipkin2.server.internal.kafka;

import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import zipkin2.collector.kafka.KafkaCollector;
import zipkin2.server.internal.InMemoryConfiguration;
import zipkin2.server.internal.brave.ZipkinSelfTracingConfiguration;

import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -65,7 +71,9 @@ public class ZipkinKafkaCollectorConfigurationTest {
context.register(
PropertyPlaceholderAutoConfiguration.class,
ZipkinKafkaCollectorConfiguration.class,
InMemoryConfiguration.class);
InMemoryConfiguration.class,
ZipkinSelfTracingConfiguration.class,
KafkaTracingCustomization.class);
context.refresh();

assertThat(context.getBean(KafkaCollector.class)).isNotNull();
Expand All @@ -84,4 +92,15 @@ public class ZipkinKafkaCollectorConfigurationTest {
thrown.expect(NoSuchBeanDefinitionException.class);
context.getBean(KafkaCollector.class);
}

@Configuration
static class KafkaTracingCustomization {

@Bean @Qualifier("zipkinKafka") public Consumer<KafkaCollector.Builder> one() {
return builderConsumer;
}

Consumer<KafkaCollector.Builder> builderConsumer =
builder -> builder.consumerSupplier(KafkaConsumer::new);
}
}