From c331b85961874eb138a2005ba663c9b46dd79eca Mon Sep 17 00:00:00 2001 From: Volkan Altan Date: Wed, 8 Nov 2017 11:21:59 +0300 Subject: [PATCH] add new sample, avro data convert to json as a new topic --- divolte-kafka-streams/.gitignore | 25 ++++ divolte-kafka-streams/README.md | 11 ++ divolte-kafka-streams/pom.xml | 133 ++++++++++++++++++ .../com/divolte/kafka/streams/AvroUtil.java | 39 +++++ .../kafka/streams/StreamsDivolteApp.java | 40 ++++++ .../src/main/resources/MyEventRecord.avsc | 41 ++++++ .../src/main/resources/log4j.properties | 5 + .../src/main/resources/mapping.groovy | 45 ++++++ 8 files changed, 339 insertions(+) create mode 100755 divolte-kafka-streams/.gitignore create mode 100755 divolte-kafka-streams/README.md create mode 100755 divolte-kafka-streams/pom.xml create mode 100755 divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/AvroUtil.java create mode 100755 divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/StreamsDivolteApp.java create mode 100755 divolte-kafka-streams/src/main/resources/MyEventRecord.avsc create mode 100755 divolte-kafka-streams/src/main/resources/log4j.properties create mode 100755 divolte-kafka-streams/src/main/resources/mapping.groovy diff --git a/divolte-kafka-streams/.gitignore b/divolte-kafka-streams/.gitignore new file mode 100755 index 0000000..e7f58e1 --- /dev/null +++ b/divolte-kafka-streams/.gitignore @@ -0,0 +1,25 @@ +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.ear +*.zip +*.tar.gz +*.rar + +target/ + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + +#IDE +idea/ +*.iml diff --git a/divolte-kafka-streams/README.md b/divolte-kafka-streams/README.md new file mode 100755 index 0000000..4fc9329 --- /dev/null +++ b/divolte-kafka-streams/README.md @@ -0,0 +1,11 @@ +### Avro Data convert to Json as a new topic + +``` +mvn clean install +mvn clean package +``` + +#### Run Consumer +``` +java -jar target/divolte-kafka-streams-1.0-SNAPSHOT-jar-with-dependencies.jar +``` diff --git a/divolte-kafka-streams/pom.xml b/divolte-kafka-streams/pom.xml new file mode 100755 index 0000000..e714ab6 --- /dev/null +++ b/divolte-kafka-streams/pom.xml @@ -0,0 +1,133 @@ + + + 4.0.0 + + com.divolte.kafka.streams + divolte-kafka-streams + 1.0-SNAPSHOT + + + + + confluent + http://packages.confluent.io/maven/ + + + + + 1.8 + 0.11.0.0-cp1 + 2.11 + ${kafka.scala.version}.8 + 3.3.0 + 2.2.6 + 0.13.0 + 1.8.2 + 0.9.2 + 9.2.12.v20150709 + 2.8.8 + 2.25 + UTF-8 + + + + + com.twitter + bijection-avro_2.10 + 0.9.2 + + + + io.confluent + kafka-streams-avro-serde + ${confluent.version} + + + io.confluent + kafka-avro-serializer + ${confluent.version} + + + io.confluent + kafka-schema-registry-client + ${confluent.version} + + + + + org.apache.kafka + kafka-streams + 0.11.0.0 + + + + + org.slf4j + slf4j-api + 1.7.25 + + + + + org.slf4j + slf4j-log4j12 + 1.7.25 + + + + + junit + junit + 4.12 + test + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.6.1 + + 1.8 + 1.8 + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + 2.5.2 + + + jar-with-dependencies + + + + true + com.divolte.kafka.streams.StreamsDivolteApp + + + + + + assemble-all + package + + single + + + + + + + + + \ No newline at end of file diff --git a/divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/AvroUtil.java b/divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/AvroUtil.java new file mode 100755 index 0000000..cbd3fa2 --- /dev/null +++ b/divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/AvroUtil.java @@ -0,0 +1,39 @@ +package com.divolte.kafka.streams; + +import java.io.IOException; +import java.io.EOFException; +import java.io.InputStream; +import java.io.*; + +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.*; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.Schema; +import com.twitter.bijection.Injection; +import com.twitter.bijection.avro.GenericAvroCodecs; + +public class AvroUtil { + public static String transform(byte[] value) { + String returnVal = ""; + try { + Schema.Parser parser = new Schema.Parser(); + Schema schema = parser.parse(new File("src/main/resources/MyEventRecord.avsc")); + GenericRecord avroRecord = new GenericData.Record(schema); + returnVal = avroRecord.toString(); + + Injection recordInjection = GenericAvroCodecs.toBinary(schema); + GenericRecord record = recordInjection.invert(value).get(); + + returnVal = record.toString(); + } catch (Exception e) { + String ex = e.toString(); + } + + return returnVal; + } + +} diff --git a/divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/StreamsDivolteApp.java b/divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/StreamsDivolteApp.java new file mode 100755 index 0000000..77a7fbc --- /dev/null +++ b/divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/StreamsDivolteApp.java @@ -0,0 +1,40 @@ +package com.divolte.kafka.streams; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import java.util.Properties; +import org.apache.kafka.common.serialization.Serde; + +public class StreamsDivolteApp { + + public static void main(String[] args) { + + Properties config = new Properties(); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-divolte-app"); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.102:9092"); + config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + + KStreamBuilder builder = new KStreamBuilder(); + final Serde stringSerde = Serdes.String(); + final Serde byteArraySerde = Serdes.ByteArray(); + + builder.stream(stringSerde, byteArraySerde, "divolte-data") + .mapValues(value -> AvroUtil.transform(value)) + .to("divolte-json"); + + KafkaStreams streams = new KafkaStreams(builder, config); + streams.cleanUp(); // only do this in dev - not in prod + streams.start(); + + // print the topology + System.out.println(streams.toString()); + + // shutdown hook to correctly close the streams application + Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); + } +} diff --git a/divolte-kafka-streams/src/main/resources/MyEventRecord.avsc b/divolte-kafka-streams/src/main/resources/MyEventRecord.avsc new file mode 100755 index 0000000..1d16802 --- /dev/null +++ b/divolte-kafka-streams/src/main/resources/MyEventRecord.avsc @@ -0,0 +1,41 @@ +{ + "namespace": "io.divolte.record", + "type": "record", + "name": "MyEventRecord", + "fields": [ + { "name": "userId", "type": ["null", "int"], "default": null }, + { "name": "q", "type": ["null", "string"], "default": null }, + { "name": "page", "type": ["null", "string"], "default": null }, + { "name": "n", "type": ["null", "int"], "default": null }, + { "name": "cookieCustom", "type": ["null", "string"], "default": null }, + { "name": "detectedDuplicate", "type": ["null", "boolean"], "default": null }, + { "name": "detectedCorruption", "type": ["null", "boolean"], "default": null }, + { "name": "firstInSession", "type": ["null", "boolean"], "default": null }, + { "name": "timestamp", "type": ["null", "long"], "default": null }, + { "name": "clientTimestamp", "type": ["null", "long"], "default": null }, + { "name": "remoteHost", "type": "string", "default": null }, + { "name": "referer", "type": ["null", "string"], "default": null }, + { "name": "location", "type": ["null", "string"], "default": null }, + { "name": "viewportPixelWidth", "type": ["null", "int"], "default": null }, + { "name": "viewportPixelHeight", "type": ["null", "int"], "default": null }, + { "name": "screenPixelWidth", "type": ["null", "int"], "default": null }, + { "name": "screenPixelHeight", "type": ["null", "int"], "default": null }, + { "name": "devicePixelRatio", "type": ["null", "int"], "default": null }, + { "name": "partyId", "type": ["null", "string"], "default": null }, + { "name": "sessionId", "type": ["null", "string"], "default": null }, + { "name": "pageViewId", "type": ["null", "string"], "default": null }, + { "name": "eventType", "type": "string", "default": "unknown" }, + { "name": "eventId", "type": "string", "default": "unknown" }, + { "name": "localPath", "type": ["null", "string"], "default": null }, + { "name": "userAgentString", "type": ["null", "string"], "default": null }, + { "name": "userAgentName", "type": ["null", "string"], "default": null }, + { "name": "userAgentFamily", "type": ["null", "string"], "default": null }, + { "name": "userAgentVendor", "type": ["null", "string"], "default": null }, + { "name": "userAgentType", "type": ["null", "string"], "default": null }, + { "name": "userAgentVersion", "type": ["null", "string"], "default": null }, + { "name": "userAgentDeviceCategory", "type": ["null", "string"], "default": null }, + { "name": "userAgentOsFamily", "type": ["null", "string"], "default": null }, + { "name": "userAgentOsVersion", "type": ["null", "string"], "default": null }, + { "name": "userAgentOsVendor", "type": ["null", "string"], "default": null } + ] +} \ No newline at end of file diff --git a/divolte-kafka-streams/src/main/resources/log4j.properties b/divolte-kafka-streams/src/main/resources/log4j.properties new file mode 100755 index 0000000..d511cbd --- /dev/null +++ b/divolte-kafka-streams/src/main/resources/log4j.properties @@ -0,0 +1,5 @@ +log4j.rootLogger=INFO, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%p %m (%c:%L) %n \ No newline at end of file diff --git a/divolte-kafka-streams/src/main/resources/mapping.groovy b/divolte-kafka-streams/src/main/resources/mapping.groovy new file mode 100755 index 0000000..c24abba --- /dev/null +++ b/divolte-kafka-streams/src/main/resources/mapping.groovy @@ -0,0 +1,45 @@ +mapping { + map {parse eventParameters().value('userId') to int32 } onto 'userId' + + map eventType() onto 'eventType' + map firstInSession() onto 'firstInSession' + map timestamp() onto 'timestamp' + map remoteHost() onto 'remoteHost' + map duplicate() onto 'detectedDuplicate' + map corrupt() onto 'detectedCorruption' + map clientTimestamp() onto 'clientTimestamp' + map eventId() onto 'eventId' + map cookie('cookieCustom') onto 'cookieCustom' + + map referer() onto 'referer' + map location() onto 'location' + map viewportPixelWidth() onto 'viewportPixelWidth' + map viewportPixelHeight() onto 'viewportPixelHeight' + map screenPixelWidth() onto 'screenPixelWidth' + map screenPixelHeight() onto 'screenPixelHeight' + map devicePixelRatio() onto 'devicePixelRatio' + map partyId() onto 'partyId' + map sessionId() onto 'sessionId' + map pageViewId() onto 'pageViewId' + + map userAgentString() onto 'userAgentString' + def ua = userAgent() + map ua.name() onto 'userAgentName' + map ua.family() onto 'userAgentFamily' + map ua.vendor() onto 'userAgentVendor' + map ua.type() onto 'userAgentType' + map ua.version() onto 'userAgentVersion' + map ua.deviceCategory() onto 'userAgentDeviceCategory' + map ua.osFamily() onto 'userAgentOsFamily' + map ua.osVersion() onto 'userAgentOsVersion' + map ua.osVendor() onto 'userAgentOsVendor' + + def locationUri = parse location() to uri + def localUri = parse locationUri.rawFragment() to uri + map localUri.path() onto 'localPath' + + def localQuery = localUri.query() + map localQuery.value('q') onto 'q' + map localQuery.value('page') onto 'page' + map { parse localQuery.value('n') to int32 } onto 'n' +} \ No newline at end of file