-
Notifications
You must be signed in to change notification settings - Fork 14
spark-highcharts not plotting with kinesis stream #31
Comments
Have you includes Highcharts js files in Zeppelin? You can first test with a normal plot instead of streaming. |
I forgot to include the highcharts js files. Actually i expected spark-highcharts to include it. Now the js errors are gone but the call to StreamingChart(z) display just "null". When i try with this sample code it's working just fine: import com.knockdata.spark.highcharts._
import com.knockdata.spark.highcharts.model._
import sqlContext.implicits._
val Tokyo = Seq(7.0, 6.9, 9.5, 14.5, 18.2, 21.5, 25.2, 26.5, 23.3, 18.3, 13.9, 9.6)
.map(("Tokyo", _))
val NewYork = Seq(-0.2, 0.8, 5.7, 11.3, 17.0, 22.0, 24.8, 24.1, 20.1, 14.1, 8.6, 2.5)
.map(("New York", _))
val Berlin = Seq(-0.9, 0.6, 3.5, 8.4, 13.5, 17.0, 18.6, 17.9, 14.3, 9.0, 3.9, 1.0)
.map(("Berlin", _))
val London = Seq(3.9, 4.2, 5.7, 8.5, 11.9, 15.2, 17.0, 16.6, 14.2, 10.3, 6.6, 4.8)
.map(("London", _))
val dataFrame = (Tokyo ++ NewYork ++ Berlin ++ London).toDF("city", "temperature")
dataFrame.show()
val chart = highcharts(dataFrame
.seriesCol("city")
.series("y" -> col("temperature")))
chart.plot() |
are you using spark-highcharts:0.6.5? |
Yes |
It works on Sample data. I just tested using zeppelin-highcharts container You can replace StreamingChart(z) with these two lines.
It shall print a |
i replaced the call to StreamingChart(z) with the lines you gave but it's returning: import org.apache.zeppelin.interpreter.InterpreterContext
res111: Object = null |
The content is write during the execution of streaming. It means nothing has been set if it's empty.
Please check if it's using spark-highcharts:0.6.5, one of the previous version has issue. You can tested with the zeppelin-highcharts container. |
Here is what i tested: paragraph 1: %angular
<script type="text/javascript">
$(function () {
if (typeof Highcharts == "undefined") {
$.getScript("http://code.highcharts.com/highcharts.js")
.done(function( script, textStatus ) {
console.log( "load http://code.highcharts.com/highcharts.js " + textStatus );
})
.fail(function(jqxhr, settings, exception ) {
console.log("load http://code.highcharts.com/highcharts.js " + exception);
});
} else {
console.log("highcharts already loaded");
}
});
</script> paragraph 2: import org.apache.spark.sql.execution.streaming.MemoryStream
implicit val ctx = spark.sqlContext
case class NuclearStockpile(country: String, stockpile: Int, year: Int)
val input = MemoryStream[NuclearStockpile]
spark.conf.set("spark.sql.streaming.checkpointLocation","/tmp/test3")
val USA = Seq(0, 0, 0, 0, 0, 6, 11, 32, 110, 235, 369, 640,
1005, 1436, 2063, 3057, 4618, 6444, 9822, 15468, 20434, 24126,
27387, 29459, 31056, 31982, 32040, 31233, 29224, 27342, 26662,
26956, 27912, 28999, 28965, 27826, 25579, 25722, 24826, 24605,
24304, 23464, 23708, 24099, 24357, 24237, 24401, 24344, 23586,
22380, 21004, 17287, 14747, 13076, 12555, 12144, 11009, 10950,
10871, 10824, 10577, 10527, 10475, 10421, 10358, 10295, 10104).
zip(1940 to 2006).map(p => NuclearStockpile("USA", p._1, p._2))
val USSR = Seq(0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
5, 25, 50, 120, 150, 200, 426, 660, 869, 1060, 1605, 2471, 3322,
4238, 5221, 6129, 7089, 8339, 9399, 10538, 11643, 13092, 14478,
15915, 17385, 19055, 21205, 23044, 25393, 27935, 30062, 32049,
33952, 35804, 37431, 39197, 45000, 43000, 41000, 39000, 37000,
35000, 33000, 31000, 29000, 27000, 25000, 24000, 23000, 22000,
21000, 20000, 19000, 18000, 18000, 17000, 16000).
zip(1940 to 2006).map(p => NuclearStockpile("USSR/Russia", p._1, p._2))
input.addData(USA.take(30) ++ USSR.take(30))
val structureDataFrame = input.toDF paragraph 3: import com.knockdata.spark.highcharts._
import com.knockdata.spark.highcharts.model._
val query = highcharts(
structureDataFrame.seriesCol("country")
.series("x" -> "year", "y" -> "stockpile")
.orderBy(col("year")), z, "append") paragraph 4: StreamingChart(z) paragraph 5 (to update the chart) : input.addData(USA.drop(30) ++ USSR.drop(30)) the result when i run paragraph 4 is : null I looked at src/main/scala/com/knockdata/spark/highcharts/CustomSinkProvider.scala and found these lines: z.put(chartParagraphId, plotData)
println(s"run $chartParagraphId")
z.run(chartParagraphId) Correct me if i'm wrong but z.put() will set a variable not a paragraph text? |
I had tested with the same code in docker environment. Can you share the screenshot for the spark-highcharts version? |
sure, here it is: https://i.imgur.com/8fc9Vvx.png Question: do you run awaitTermination() in your tests ? when i call query.awaitTermination i can see that the new incoming data is written to the sink but running the paragraph that will print the angular code to update the chart is stuck in pending state and never finish. also i cannot cancel the paragraph that is writing to the custom sink.
|
No. It does not need I have extract the same code. and it works. It will be really hard to know what the problem extract is without seeing your real environment. I attached the note book here. Which is working for me. |
I removed the awaitTermination and it seems it's working fine now. streaming_chart_kinesis.json.zip The problem i encountered with this requirement is that when the sink receive the second batch of data the chart paragraph does not run and there is an exception InterpreterException("Can not run current Paragraph") . You will not see this exception anywhere in the logs. You have to manually attach a onFailure callback to the Future in ZeppelinContextHolder (src/main/scala/com/knockdata/spark/highcharts/ZeppelinContextHolder.scala) . It looks like the first time when you run the paragraph containing highcharts() singleton call the sink is able to successfully run the paragraph but when new data arrive later on the stream the current context paragraph id was changed to be the paragraph id of the last running paragraph (in this case it equals the paragraph id of the first sink call). I inspected the zeppelin code and i didn't find a z.run method overload where you can disable the paragraph id validation. A possible solution to this is to update the ZeppelinContextHolder.scala and replace: Future(z.run(paragraphId)) with something like Future({
val context = z.getInterpreterContext
val noteId = context.getNoteId
val runners = z.getInterpreterContextRunner(noteId, paragraphId, context)
if (runners.size <= 0) throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size)
else {
val i$ = runners.iterator
while (i$.hasNext) {
val r: InterpreterContextRunner = i$.next
r.run()
}
}
}) which will do the job until zeppelin devs handle this somehow. Without this i will not be able to complete my requirement. If you have another solution to the problem i will appreciate to share it with me. Thanks |
Great to hear it works for you. The method |
Consider the following scenario:
Future(z.run(paragraphId)) with val f = Future(z.run(paragraphId))
f.onFailure({
case err: Throwable => {
val file = new File("/path/to/file.log")
val ps = new PrintStream(file)
err.printStackTrace(ps)
ps.close()
}
})
The exception above is caused by the following validation : if(paragraphId.equals(context.getParagraphId())) {
throw new InterpreterException("Can not run current Paragraph");
} I guess it might be caused by the fact that context paragraph id value was changed to match the id of paragraph 3 after the first event was received by zeppelin. One more thing related to streaming data charts: currently what StreamingChart(z) is printing is a call to the jquery highcharts plugin which will cause the chart to "blink" when it's refreshed. To avoid this effect you can add another functionality to print a javascript call to addSeries() which will not reload the whole chart but will just append the new data |
The exception indicate that you can not run the current paragraph from the current paragraph. This is also why we need a separate paragraph just put That's right, there is a blink when new data comes. The solution you mentioned I had think to add it, why got no time now to work on it. I'm wondering if you have some time create a PR? |
I'm using zeppelin with kinesis stream like this:
in the next zeppelin paragraph i have :
StreamingChart(z)
but the chart is not displayed. When i inspect the browser's console there are some javascript errors. I'm trying to automatically update the chart when new data arrive in the stream. Any suggestions what i'm doing wrong?
Infrastructure:
Screenshot:
https://i.stack.imgur.com/HWIn0.png
The text was updated successfully, but these errors were encountered: