Skip to content
This repository has been archived by the owner on Apr 22, 2022. It is now read-only.

Divolte dashing #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions tcp-kafka-dashing/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
Divolte tcp-kafka-dashing example
==================================

This example uses the python to create a Kafka consumer that sends events from Divolte to Dashing. To run this, you need:
- The accompanying Javadoc Avro schema installed into your local Maven repository.
- A running HTTP server which serves the static Javadoc HTML files instrumented with the Divolte Collector tag.
- Kafka (including Zookeeper)
- Dashing
- kafka-python (pip install kafka-python)
- apache avro (http://avro.apache.org/docs/1.7.6/gettingstartedpython.html#download_install)

## Building and running

#### Step 1: install and configure Divolte Collector
Download the latest [Divolte Collector](https://github.com/divolte/divolte-collector) release. Use either the .zip or the .tar.gz archive. Extract the archive to a directory of your choice. In the installation, there is a conf/ directory. In here, create a file called divolte-collector.conf with the following contents:
```hocon
divolte {
kafka_flusher {
enabled = true
threads = 1
}

hdfs_flusher {
enabled = false
}

javascript {
logging = true
debug = true
}

tracking {
schema_file = /path/to/divolte-examples/avro-schema/src/main/resources/JavadocEventRecord.avsc
schema_mapping {
version = 2
mapping_script_file = "/path/to/divolte-examples/avro-schema/mapping.groovy"
}
}
}
```
> *Make sure you correct the paths to the Avro schema and mapping configuration!*

#### Step 2: download, unpack and run Kafka
Setting up Kafka is beyond the scope of this document. It is however very simple to get Kafka up and running on your local machine using all default settings. [Download a Kafka release](https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz), unpack it and run as follows:
```sh
# In one terminal session
cd kafka_2.10-0.8.1.1/bin
./zookeeper-server-start.sh ../config/zookeeper.properties

# Leave Zookeeper running and in another terminal session, do:
cd kafka_2.10-0.8.1.1/bin
./kafka-server-start.sh ../config/server.properties
```
#### Step 3: start Divolte Collector
Go into the bin directory of your installation and run:
```sh
cd divolte-collector-0.2/bin
./divolte-collector
```

#### Step 4: host your Javadoc files
Setup a HTTP server that serves the Javadoc files that you generated or downloaded for the examples. If you have Python installed, you can use this:
```sh
cd <your-javadoc-directory>
python -m SimpleHTTPServer
```

#### Step 5: build a dashing template based on your avro-schema
```sh
cd divolte-examples/tcp-kafka-dashing
python generate_template.py --schema /path/to/divolte-examples/avro-schema/src/main/resources/JavadocEventRecord.avsc
```

#### Step 7: generate a new dashing dashboard and install the required pie widget
```sh
cd <location-for-your-dashing-dashboard>
dashing new divolte_dashboard_project
cd divolte_dashboard_project
dashing install 6273841
bundle
```

#### Step 8: copy/paste the template into your new dashboard
Edit dashboards/sample.erb, replace the body of the ul-element with the contents of the template.dashing file your generated in step 5.

#### Step 9: start everything
```sh
cd <location-for-your-dashing-dashboard>/divolte_dashboard_project
dashing start

# Leaving it running and in another terminal do
cd divolte-examples/tcp-kafka-dashing
python main.py --schema /path/to/divolte-examples/avro-schema/src/main/resources/JavadocEventRecord.avsc
```

#### Step 10: enjoy
Go to localhost:8000 to view the javadoc, and localhost:3030 to view the dashboard
85 changes: 85 additions & 0 deletions tcp-kafka-dashing/generate_template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import avro.schema
import avro.io
import argparse

def select_dashing(options):
selected = raw_input('\tChoose which between %s, [%s]' % (", ".join(options), options[0]))
if selected not in options + ['']:
return select_dashing(options)
return selected or options[0]

def generate_mapping(input_avro, output_template, output_properties):
mapping = {}
json_schema = ""
with open(input_avro) as fp:
for line in fp:
if not line.startswith("//"):
json_schema += line

schema = avro.schema.parse(json_schema)
for key, value in schema.fields_dict.iteritems():
type = value.to_json()['type']
if isinstance(type, basestring):
type = [type]

answer = raw_input('Include "%s" in the dashing dashboard? [yes]' % key)
if answer == '' or answer == 'yes':
if 'boolean' in type:
dashing_type = 'graph'
elif 'string' in type:
dashing_type = select_dashing(['pie', 'list'])
else:
dashing_type = select_dashing(['graph', 'number', 'meter'])

mapping[key] = dashing_type

template = open(output_template, 'w')
properties = open(output_properties, 'w')

for key, value in mapping.iteritems():
print >> properties, key, value

if value == 'graph':
dashing_template = """
<li data-row="1" data-col="1" data-sizex="2" data-sizey="1">
<div data-id="%s" data-view="Graph" data-title="%s" style="background-color:#ff9618"></div>
</li>"""
elif value == 'list':
dashing_template = """
<li data-row="1" data-col="1" data-sizex="1" data-sizey="1">
<div data-id="%s" data-view="List" data-unordered="true" data-title="%s" data-moreinfo=""></div>
</li>"""
elif value == 'pie':
dashing_template = """
<li data-row="1" data-col="1" data-sizex="1" data-sizey="1">
<div data-id="%s" data-view="Pie" data-title="%s"></div>
</li>"""
elif value == 'number':
dashing_template = """
<li data-row="1" data-col="1" data-sizex="1" data-sizey="1">
<div data-id="%s" data-view="Number" data-title="%s" data-moreinfo="" data-prefix=""></div>
</li>"""
else:
dashing_template = """
<li data-row="1" data-col="1" data-sizex="1" data-sizey="1">
<div data-id="%s" data-view="Meter" data-title="%s" data-min="0" data-max="100"></div>
</li>"""

print >> template, dashing_template % (key, key.capitalize())

template.close()
properties.close()

def parse_args():
def utf8_bytes(s):
return bytes(s, 'utf-8')

parser = argparse.ArgumentParser(description='Runs the consumer.')
parser.add_argument('--schema', '-s', metavar='SCHEMA', type=str, required=True, help='Avro schema of Kafka messages.')
parser.add_argument('--template', '-t', metavar='TEMPLATE', type=str, required=False, default='template.dashing', help='Where to output the dashing widget-template.')
parser.add_argument('--properties', '-p', metavar='PROPERTIES', type=str, required=False, default='properties.dashing', help='Where to output the property file mapping the avro fields to dashing widgets.')
return parser.parse_args()

if __name__ == '__main__':
args = parse_args()
generate_mapping(args.schema, args.template, args.properties)
99 changes: 99 additions & 0 deletions tcp-kafka-dashing/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import avro.schema
import avro.io
import argparse
from kafka import KafkaConsumer
from StringIO import StringIO
from utils import RunningAverage, StringRunningAverage
from threading import Thread
import requests
import json
import time
from requests.exceptions import ConnectionError

def construct_averages(input_properties):
values = {}
for line in open(input_properties):
key, dashing_type = line.strip().split()
if dashing_type in ['pie', 'list']:
values[key] = (dashing_type, StringRunningAverage(5, 60))
else:
values[key] = (dashing_type, RunningAverage(1 if key == 'pageView' else 5, 60))
return values

def start_consumer(input_avro, metadata_broker_list, values):
json_schema = ""
with open(input_avro) as fp:
for line in fp:
if not line.startswith("//"):
json_schema += line

schema = avro.schema.parse(json_schema)
dreader = avro.io.DatumReader(schema, schema)

def parse_message(msg):
return dreader.read(avro.io.BinaryDecoder(StringIO(msg)))

consumer = KafkaConsumer("divolte", metadata_broker_list=metadata_broker_list, deserializer_class=lambda msg: parse_message(msg))
for message in consumer:
timestamp = int(message.value['timestamp'] / 1000)
for key, value in message.value.iteritems():
if key == 'eventType':
key, value = value, 1

if key in values:
values[key][1].addValue(timestamp, value)

consumer.close()

def start_poster(dashing_ip, dashing_auth, values):
def gen_list(dictionary, x, y, sortByKey=False, sortByValue=False, removeZeros=False):
keys = dictionary.keys()
if sortByValue:
keys.sort(cmp=lambda a, b: cmp(dictionary[a], dictionary[b]), reverse=True)
if sortByKey:
keys.sort()
return [{x: key, y: dictionary[key]} for key in keys if not removeZeros or (dictionary[key] != 0 and key)]

while True:
for key, value in values.iteritems():
try:
dashing_type, average = value
if dashing_type == 'list':
payload = {'auth_token': dashing_auth, 'items': gen_list(average.getSumAsDict(), "label", "value", sortByValue=True, removeZeros=True)}
elif dashing_type == 'graph':
payload = {'auth_token': dashing_auth, 'points': gen_list(average.getSumAsDict(), 'x', 'y', sortByKey=True)}
elif dashing_type == 'pie':
payload = {'auth_token': dashing_auth, 'value': gen_list(average.getSumAsDict(), 'label', 'value', removeZeros=True)}
elif dashing_type == 'number':
payload = {'auth_token': dashing_auth, 'current': average.getSum(), 'last': average.getSum()}
else:
raise RuntimeError("'%s' is not supported" % dashing_type)

requests.post("http://%s/widgets/%s" % (dashing_ip, key), data=json.dumps(payload))

except ConnectionError:
pass

time.sleep(1)

def parse_args():
def utf8_bytes(s):
return bytes(s, 'utf-8')

parser = argparse.ArgumentParser(description='Runs the consumer.')
parser.add_argument('--schema', '-s', metavar='SCHEMA', type=str, required=True, help='Avro schema of Kafka messages.')
parser.add_argument('--properties', '-p', metavar='PROPERTIES', type=str, required=False, default='properties.dashing', help='The mapping of the avro fields to dashing widgets.')
parser.add_argument('--dashing', '-d', metavar='DASHING_HOST_PORT', type=str, required=False, default='localhost:3030', help='Dashing hostname + port.')
parser.add_argument('--dashingauth', '-a', metavar='DASHING_AUTH', type=str, required=False, default='YOUR_AUTH_TOKEN', help='Dashing auth token.')
parser.add_argument('--brokers', '-b', metavar='KAFKA_BROKERS', type=str, nargs="+", help='A list of Kafka brokers (host:port).', default=['localhost:9092'])
return parser.parse_args()

if __name__ == '__main__':
args = parse_args()

values = construct_averages(args.properties)

t = Thread(target=start_poster, args=(args.dashing, args.dashingauth, values))
t.start()

start_consumer(args.schema, args.brokers, values)
57 changes: 57 additions & 0 deletions tcp-kafka-dashing/test/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from unittest import TestCase
from utils import RunningAverage, StringRunningAverage
from time import time

class TestUtils(TestCase):

def setUp(self):
self.r = RunningAverage(5, 60)

def test_running_average(self):
self.r.addValue(time(), 1)
self.r.addValue(time(), 1)

assert self.r.getSum() == 2, self.r.getSum()
assert self.r.getAverage() == 2 / 60.0, self.r.getAverage()

def test_running_average_bin(self):
now = int(time() / 5) * 5
self.r.addValue(now, 1)
self.r.addValue(now, 1)
self.r.addValue(now - 5, 1)

assert self.r.getSumAsDict().get(now) == 2, (now, self.r.getSumAsDict())
assert self.r.getAverageAsDict().get(now) == 2 / 5.0, (now, self.r.getAverageAsDict())

def test_running_average_bool(self):
self.r.addValue(time(), True)
self.r.addValue(time(), True)

assert self.r.getSum() == 2, self.r.getSum()
assert self.r.getAverage() == 2 / 60.0, self.r.getAverage()

def test_running_average_to_old(self):
self.r.addValue(0, 1)
self.r.addValue(0, 1)

assert self.r.getSum() == 0, self.r.getSum()
assert self.r.getAverage() == 0, self.r.getAverage()

class TestStringRA(TestCase):

def setUp(self):
self.r = StringRunningAverage(5, 60)
self.r.addValue(time(), "hello")
self.r.addValue(time(), "world")

def test_stringrunning_average(self):
assert self.r.getSum() == 2, self.r.getSum()
assert self.r.getAverage() == 2 / 60.0, self.r.getAverage()

def test_stringrunning_average_bin(self):
self.r.addValue(time(), "world")

assert self.r.getSumAsDict()["hello"] == 1, self.r.getSumAsDict()
assert self.r.getSumAsDict()["world"] == 2, self.r.getSumAsDict()
assert self.r.getAverageAsDict()["hello"] == 1 / 60.0, self.r.getAverageAsDict()
assert self.r.getAverageAsDict()["world"] == 2 / 60.0, self.r.getAverageAsDict()
Loading