Skip to content

Commit

Permalink
Merge pull request #17 from keyko-io/feature/transform-events-in-time…
Browse files Browse the repository at this point in the history
…series

Add event to timeseries transformation
  • Loading branch information
aaitor authored Feb 25, 2020
2 parents 54a589e + 3d93b89 commit 25f6398
Showing 1 changed file with 71 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.keyko.monitoring.schemas.*;
import io.keyko.monitoring.serde.Web3MonitoringSerdes;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
Expand Down Expand Up @@ -45,7 +44,7 @@ public static KStream<String, EventBlockRecord> joinEventWithBlock(KStream<Strin
* @param viewStream Stream with the confirmed views
* @param blockStream Table with the blocks
* @return KStream
* */
*/

public static KStream<String, ViewBlockRecord> joinViewWithBlock(KStream<String, ViewRecord> viewStream, KTable<String, BlockRecord> blockStream) {
return viewStream
Expand All @@ -68,34 +67,40 @@ public static KStream<String, ViewBlockRecord> joinViewWithBlock(KStream<String,

}

public static KStream<String, TimeSeriesRecord> transformToTimeSeries(KStream<String, ViewBlockRecord> stream) {
/**
* Transform the ViewBlock records into a TimeSeriesRecord
*
* @param stream
* @return
*/
public static KStream<String, TimeSeriesRecord> transformToTimeSeries(KStream<String, ViewBlockRecord> stream) {

return stream.mapValues( viewBlock -> {
return stream.mapValues(viewBlock -> {

List output = viewBlock.getDetails().getOutput();
List<Object> output = viewBlock.getDetails().getOutput();

TimeSeriesRecord timeSeries = new TimeSeriesRecord();
timeSeries.setContractName(viewBlock.getDetails().getContractName());
timeSeries.setMethodName (viewBlock.getDetails().getName());
timeSeries.setMethodName(viewBlock.getDetails().getName());
timeSeries.setTimestamp(viewBlock.getDetailsBlock().getTimestamp());
timeSeries.setBlockNumber(viewBlock.getDetailsBlock().getNumber());

for (int i= 0; i< output.size(); i++) {
for (int i = 0; i < output.size(); i++) {

Object object = output.get(i);
NumberParameter numberParameter = object instanceof NumberParameter? (NumberParameter) object: null;
StringParameter stringParameter = object instanceof StringParameter? (StringParameter) object: null;
NumberParameter numberParameter = object instanceof NumberParameter ? (NumberParameter) object : null;
StringParameter stringParameter = object instanceof StringParameter ? (StringParameter) object : null;

TimeSeriesParameter param = new TimeSeriesParameter();

if (stringParameter!= null){
if (stringParameter != null) {
param.setLabel(stringParameter.getName());
param.setValue(stringParameter.getValue());
param.setNumberValue(0l);
}else if (numberParameter!= null){
param.setNumberValue(0L);
} else if (numberParameter != null) {
param.setLabel(numberParameter.getName());
param.setValue(numberParameter.getValue());
param.setNumberValue( numberParameter.getNumberValue());
param.setNumberValue(numberParameter.getNumberValue());
}
switch(i) {
case 0: timeSeries.setParam0(param); break;
Expand All @@ -117,4 +122,57 @@ public static KStream<String, TimeSeriesRecord> transformToTimeSeries(KStream<S

}

/**
* Transform the EventBlock records into a TimeSeriesRecord
*
* @param stream
* @return
*/
public static KStream<String, TimeSeriesRecord> transformEventToTimeSeries(KStream<String, EventBlockRecord> stream) {
return stream.mapValues(eventBlock -> {

List<Object> output = eventBlock.getDetails().getIndexedParameters();
output.addAll(eventBlock.getDetails().getNonIndexedParameters());

TimeSeriesRecord timeSeries = new TimeSeriesRecord();
timeSeries.setContractName(eventBlock.getDetails().getContractName());
timeSeries.setMethodName(eventBlock.getDetails().getName());
timeSeries.setTimestamp(eventBlock.getDetailsBlock().getTimestamp());
timeSeries.setBlockNumber(eventBlock.getDetailsBlock().getNumber());

for (int i = 0; i < output.size(); i++) {

Object object = output.get(i);
NumberParameter numberParameter = object instanceof NumberParameter ? (NumberParameter) object : null;
StringParameter stringParameter = object instanceof StringParameter ? (StringParameter) object : null;

TimeSeriesParameter param = new TimeSeriesParameter();

if (stringParameter != null) {
param.setLabel(stringParameter.getName());
param.setValue(stringParameter.getValue());
param.setNumberValue(0L);
} else if (numberParameter != null) {
param.setLabel(numberParameter.getName());
param.setValue(numberParameter.getValue());
param.setNumberValue(numberParameter.getNumberValue());
}
switch(i) {
case 0: timeSeries.setParam0(param); break;
case 1: timeSeries.setParam1(param); break;
case 2: timeSeries.setParam2(param); break;
case 3: timeSeries.setParam3(param); break;
case 4: timeSeries.setParam4(param); break;
case 5: timeSeries.setParam5(param); break;
case 6: timeSeries.setParam6(param); break;
case 7: timeSeries.setParam7(param); break;
case 8: timeSeries.setParam8(param); break;
case 9: timeSeries.setParam9(param); break;
default: break;
}
}
return timeSeries;

});
}
}

0 comments on commit 25f6398

Please sign in to comment.