Skip to content

Commit

Permalink
Merge pull request #20 from keyko-io/feature/create-topics-starting
Browse files Browse the repository at this point in the history
Create topics when the streaming process start
  • Loading branch information
Enrique Ruiz authored Mar 6, 2020
2 parents 1dc2d57 + e028593 commit b1a1194
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 18 deletions.
12 changes: 11 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.keyko.monitoring</groupId>
<artifactId>web3-event-streamer</artifactId>
<version>0.1.5</version>
<version>0.1.6</version>
<name>Web3 Monitoring Event Streamer</name>
<url>https://github.com/keyko-io/web3-event-streamer</url>
<inceptionYear>2019</inceptionYear>
Expand Down Expand Up @@ -113,6 +113,16 @@
<artifactId>schemas</artifactId>
<version>${keyko.schemas.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
43 changes: 36 additions & 7 deletions src/main/java/io/keyko/monitoring/config/StreamerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,47 @@

import com.typesafe.config.Config;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

public class StreamerConfig {

private static final String KAFKA_BOOTSTRAP_SERVER = "kafka.bootstrap-server";
private static final String KAFKA_CREATE_TOPICS = "kafka.create-topics";
private static final String SCHEMA_REGISTRY_URL = "schema-registry.url";
private static final String EVENT_TOPIC = "kafka.event-topic";
private static final String VIEW_TOPIC = "kafka.view-topic";
private static final String BLOCK_TOPIC = "kafka.block-topic";
private static final String EVENT_BLOCK_TOPIC = "kafka.event-block-topic";
private static final String FLAT_EVENT_TOPIC = "kafka.flat-event-topic";
private static final String ALERTS_TOPIC = "kafka.alert-topic";
private static final String EVENT_TOPIC = "kafka.topics.event-topic";
private static final String VIEW_TOPIC = "kafka.topics.view-topic";
private static final String BLOCK_TOPIC = "kafka.topics.block-topic";
private static final String EVENT_BLOCK_TOPIC = "kafka.topics.event-block-topic";
private static final String FLAT_EVENT_TOPIC = "kafka.topics.flat-event-topic";
private static final String ALERTS_TOPIC = "kafka.topics.alert-topic";
private static final String ALL_TOPICS = "kafka.topics";

private String kafkaServer;
private Boolean kafkaCreateTopics;
private String schemaRegistryUrl;
private String eventTopic;
private String viewTopic;
private String blockTopic;
private String eventBlockTopic;
private String flatEventTopic;
private String alertsTopic;
private List<String> topicList;


public StreamerConfig(Config config) {

this.setKafkaServer(config.getString(KAFKA_BOOTSTRAP_SERVER));
this.setKafkaCreateTopics(config.getBoolean(KAFKA_CREATE_TOPICS));
this.setSchemaRegistryUrl(config.getString(SCHEMA_REGISTRY_URL));
this.setEventTopic(config.getString(EVENT_TOPIC));
this.setViewTopic(config.getString(VIEW_TOPIC));
this.setBlockTopic(config.getString(BLOCK_TOPIC));
this.setEventBlockTopic(config.getString(EVENT_BLOCK_TOPIC));
this.setFlatEventTopic(config.getString(FLAT_EVENT_TOPIC));
this.setAlertsTopic(config.getString(ALERTS_TOPIC));

this.setAllTopics(config.getConfig(ALL_TOPICS).root().unwrapped().values());
}

public String getKafkaServer() {
Expand All @@ -44,6 +53,14 @@ public void setKafkaServer(String kafkaServer) {
this.kafkaServer = kafkaServer;
}

public Boolean getKafkaCreateTopics() {
return kafkaCreateTopics;
}

public void setKafkaCreateTopics(Boolean kafkaCreateTopics) {
this.kafkaCreateTopics = kafkaCreateTopics;
}

public String getSchemaRegistryUrl() {
return schemaRegistryUrl;
}
Expand Down Expand Up @@ -100,4 +117,16 @@ public void setAlertsTopic(String alertsTopic) {
this.alertsTopic = alertsTopic;
}

public List<String> getAllTopics() {
return topicList;
}

public void setAllTopics(Collection<Object> values) {
List<String> list = new ArrayList<>();
for (Object i : values) {
list.add(i.toString());
}
this.topicList = list;
}

}
27 changes: 27 additions & 0 deletions src/main/java/io/keyko/monitoring/preprocessing/TopicCreation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.keyko.monitoring.preprocessing;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class TopicCreation {

public static void createTopics(List<String> topicNames, String kafkaServer) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer);
AdminClient a = AdminClient.create(props);
a.createTopics(convertToNewTopics(topicNames));
}

public static List<NewTopic> convertToNewTopics(List<String> topicNames) {
List<NewTopic> topics = new ArrayList<NewTopic>();
for (String i : topicNames) {
topics.add(new NewTopic(i, 1, (short) 1));
}
return topics;
}
}
13 changes: 10 additions & 3 deletions src/main/java/io/keyko/monitoring/stream/BaseStreamManager.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.keyko.monitoring.stream;

import io.keyko.monitoring.config.StreamerConfig;

import io.keyko.monitoring.preprocessing.Input;
import io.keyko.monitoring.preprocessing.TopicCreation;
import io.keyko.monitoring.schemas.BlockRecord;
import io.keyko.monitoring.schemas.EventRecord;
import io.keyko.monitoring.schemas.ViewRecord;
Expand All @@ -13,6 +13,7 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.log4j.Logger;

import java.util.Properties;

Expand All @@ -22,6 +23,7 @@ public abstract class BaseStreamManager {
protected StreamsBuilder builder;
private static final Integer DEFAULT_THREADS = 1;
private static final Integer DEFAULT_REPLICATION_FACTOR = 1;
private Logger LOG = Logger.getLogger(BaseStreamManager.class);


protected BaseStreamManager(StreamerConfig streamerConfig) {
Expand Down Expand Up @@ -53,6 +55,7 @@ private Properties getStreamConfiguration() {
}

public void initStream() throws Exception {
if (configuration.getKafkaCreateTopics()) createTopics();

KafkaStreams streams = createStreams();

Expand All @@ -61,7 +64,7 @@ public void initStream() throws Exception {
streams.start();
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

LOG.warn("If your process stop just starting, review that the topics that your process is going to use are already created.");
}

protected void configureSerdes(String schemaRegistryUrl) {
Expand All @@ -83,7 +86,11 @@ private KafkaStreams createStreams() {

}

protected abstract void processStreams( KStream<String, EventRecord> eventStream, KStream<String, ViewRecord> viewStream, KTable<String, BlockRecord> blockTable);
private void createTopics() {
TopicCreation.createTopics(configuration.getAllTopics(), configuration.getKafkaServer());
}

protected abstract void processStreams(KStream<String, EventRecord> eventStream, KStream<String, ViewRecord> viewStream, KTable<String, BlockRecord> blockTable);


}
16 changes: 9 additions & 7 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
kafka {
bootstrap-server = "localhost:9092"
event-topic = "w3m-events"
view-topic = "w3m-views"
block-topic = "w3m-blocks"
event-block-topic = "w3m-event-block"
flat-event-topic = "w3m-flat-events"
alert-topic = "w3m-alerts"
accounts-aggregation-topic = "accounts-created-window_minute"
create-topics = false
topics {
event-topic = "w3m-events"
view-topic = "w3m-views"
block-topic = "w3m-blocks"
event-block-topic = "w3m-event-block"
flat-event-topic = "w3m-flat-events"
alert-topic = "w3m-alerts"
}
}
schema-registry {
url = "http://localhost:18081"
Expand Down
16 changes: 16 additions & 0 deletions src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
log4j.rootLogger=INFO, stdout, file

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%p %m (%c:%L) %n

log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.DatePattern=yyyy-MM-dd
log4j.appender.file.encoding=UTF-8
log4j.appender.file.append=true
log4j.appender.file.file=./celo-stremer.log
log4j.appender.file.threshold=INFO
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

log4j.logger.org.apache.kafka.clients.consumer.ConsumerConfig=DEBUG

0 comments on commit b1a1194

Please sign in to comment.