Skip to content
This repository has been archived by the owner on Jul 23, 2020. It is now read-only.

spark-highcharts not plotting with kinesis stream #31

Open
plamen-paskov opened this issue Sep 26, 2017 · 15 comments
Open

spark-highcharts not plotting with kinesis stream #31

plamen-paskov opened this issue Sep 26, 2017 · 15 comments

Comments

@plamen-paskov
Copy link

I'm using zeppelin with kinesis stream like this:

import com.knockdata.spark.highcharts._
import com.knockdata.spark.highcharts.model._
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}


spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/test")

val df = spark
  .readStream
  .format("kinesis")
  .option("streams", "bla")
  .option("endpointUrl", "kinesis.us-west-2.amazonaws.com")
  .option("initialPositionInStream", "earliest")
  .option("format", "csv")
  .schema(
    StructType(
      StructField("device_id", StringType) ::
      StructField("temperature", IntegerType) ::
      Nil
    ))
  .load


val query = highcharts(df.seriesCol("device_id").series("y" -> col("temperature")), z, "append")

in the next zeppelin paragraph i have :

StreamingChart(z)

but the chart is not displayed. When i inspect the browser's console there are some javascript errors. I'm trying to automatically update the chart when new data arrive in the stream. Any suggestions what i'm doing wrong?

Infrastructure:

Screenshot:
https://i.stack.imgur.com/HWIn0.png

@rockie-yang
Copy link
Contributor

Have you includes Highcharts js files in Zeppelin?

You can first test with a normal plot instead of streaming.

@plamen-paskov
Copy link
Author

I forgot to include the highcharts js files. Actually i expected spark-highcharts to include it. Now the js errors are gone but the call to StreamingChart(z) display just "null".

When i try with this sample code it's working just fine:

import com.knockdata.spark.highcharts._
import com.knockdata.spark.highcharts.model._
import sqlContext.implicits._

val Tokyo = Seq(7.0, 6.9, 9.5, 14.5, 18.2, 21.5, 25.2, 26.5, 23.3, 18.3, 13.9, 9.6)
    .map(("Tokyo", _))
val NewYork = Seq(-0.2, 0.8, 5.7, 11.3, 17.0, 22.0, 24.8, 24.1, 20.1, 14.1, 8.6, 2.5)
  .map(("New York", _))
val Berlin = Seq(-0.9, 0.6, 3.5, 8.4, 13.5, 17.0, 18.6, 17.9, 14.3, 9.0, 3.9, 1.0)
  .map(("Berlin", _))
val London = Seq(3.9, 4.2, 5.7, 8.5, 11.9, 15.2, 17.0, 16.6, 14.2, 10.3, 6.6, 4.8)
  .map(("London", _))

val dataFrame = (Tokyo ++ NewYork ++ Berlin ++ London).toDF("city", "temperature")

dataFrame.show()

val chart = highcharts(dataFrame
  .seriesCol("city")
  .series("y" -> col("temperature")))
  chart.plot()

@rockie-yang
Copy link
Contributor

are you using spark-highcharts:0.6.5?

@plamen-paskov
Copy link
Author

Yes

@rockie-yang
Copy link
Contributor

It works on Sample data. I just tested using zeppelin-highcharts container

You can replace StreamingChart(z) with these two lines.

import org.apache.zeppelin.interpreter.InterpreterContext
z.get(InterpreterContext.get().getParagraphId)

It shall print a <div> and <script>

@plamen-paskov
Copy link
Author

i replaced the call to StreamingChart(z) with the lines you gave but it's returning:

import org.apache.zeppelin.interpreter.InterpreterContext
res111: Object = null

@rockie-yang
Copy link
Contributor

The content is write during the execution of streaming. It means nothing has been set if it's empty.

z.get(InterpreterContext.get().getParagraphId) 

Please check if it's using spark-highcharts:0.6.5, one of the previous version has issue.

You can tested with the zeppelin-highcharts container.

@plamen-paskov
Copy link
Author

plamen-paskov commented Oct 2, 2017

Here is what i tested:

paragraph 1:

%angular
<script type="text/javascript">
	$(function () {
	    if (typeof Highcharts == "undefined") {
			$.getScript("http://code.highcharts.com/highcharts.js")
			  .done(function( script, textStatus ) {
			    console.log( "load http://code.highcharts.com/highcharts.js " + textStatus );
			  })
			  .fail(function(jqxhr, settings, exception ) {
			     console.log("load http://code.highcharts.com/highcharts.js " + exception);
			  });
		} else {
		    console.log("highcharts already loaded");
		}
	});
</script>

paragraph 2:

import org.apache.spark.sql.execution.streaming.MemoryStream

implicit val ctx = spark.sqlContext

case class NuclearStockpile(country: String, stockpile: Int, year: Int)

val input = MemoryStream[NuclearStockpile]

spark.conf.set("spark.sql.streaming.checkpointLocation","/tmp/test3")

val USA = Seq(0, 0, 0, 0, 0, 6, 11, 32, 110, 235, 369, 640,
  1005, 1436, 2063, 3057, 4618, 6444, 9822, 15468, 20434, 24126,
  27387, 29459, 31056, 31982, 32040, 31233, 29224, 27342, 26662,
  26956, 27912, 28999, 28965, 27826, 25579, 25722, 24826, 24605,
  24304, 23464, 23708, 24099, 24357, 24237, 24401, 24344, 23586,
  22380, 21004, 17287, 14747, 13076, 12555, 12144, 11009, 10950,
  10871, 10824, 10577, 10527, 10475, 10421, 10358, 10295, 10104).
    zip(1940 to 2006).map(p => NuclearStockpile("USA", p._1, p._2))

val USSR = Seq(0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
  5, 25, 50, 120, 150, 200, 426, 660, 869, 1060, 1605, 2471, 3322,
  4238, 5221, 6129, 7089, 8339, 9399, 10538, 11643, 13092, 14478,
  15915, 17385, 19055, 21205, 23044, 25393, 27935, 30062, 32049,
  33952, 35804, 37431, 39197, 45000, 43000, 41000, 39000, 37000,
  35000, 33000, 31000, 29000, 27000, 25000, 24000, 23000, 22000,
  21000, 20000, 19000, 18000, 18000, 17000, 16000).
    zip(1940 to 2006).map(p => NuclearStockpile("USSR/Russia", p._1, p._2))

input.addData(USA.take(30) ++ USSR.take(30))
val structureDataFrame = input.toDF

paragraph 3:

import com.knockdata.spark.highcharts._
import com.knockdata.spark.highcharts.model._

val query = highcharts(
  structureDataFrame.seriesCol("country")
    .series("x" -> "year", "y" -> "stockpile")
    .orderBy(col("year")), z, "append")

paragraph 4:

StreamingChart(z)

paragraph 5 (to update the chart) :

input.addData(USA.drop(30) ++ USSR.drop(30))

the result when i run paragraph 4 is : null

I looked at src/main/scala/com/knockdata/spark/highcharts/CustomSinkProvider.scala and found these lines:

z.put(chartParagraphId, plotData)
println(s"run $chartParagraphId")
z.run(chartParagraphId)

Correct me if i'm wrong but z.put() will set a variable not a paragraph text?

@rockie-yang
Copy link
Contributor

z.put is to set the data to ZeppelinContext variable. It will be use by StreamingChart(z).

I had tested with the same code in docker environment.

Can you share the screenshot for the spark-highcharts version?

@plamen-paskov
Copy link
Author

plamen-paskov commented Oct 3, 2017

sure, here it is: https://i.imgur.com/8fc9Vvx.png
i downloaded it from : http://central.maven.org/maven2/com/knockdata/spark-highcharts/0.6.5/spark-highcharts-0.6.5.jar

Question: do you run awaitTermination() in your tests ? when i call query.awaitTermination i can see that the new incoming data is written to the sink but running the paragraph that will print the angular code to update the chart is stuck in pending state and never finish. also i cannot cancel the paragraph that is writing to the custom sink.
I also found an issue in zeppelin board that describe exactly what i'm expiriencing: https://issues.apache.org/jira/browse/ZEPPELIN-2563
I see an exception in the log file as well:

WARN [2017-10-03 12:03:04,386] ({Thread-21} RemoteInterpreterProcess.java[releaseClient]:106) - exception occurred during releasing thrift client
java.lang.IllegalStateException: Object has already been returned to this pool or is invalid
        at org.apache.commons.pool2.impl.GenericObjectPool.returnObject(GenericObjectPool.java:599)
        at org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess.releaseClient(RemoteInterpreterProcess.java:104)
        at org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller.progressRemoteZeppelinControlEvent(RemoteInterpreterEventPoller.java:325)
        at org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller.run(RemoteInterpreterEventPoller.java:231)```

@rockie-yang
Copy link
Contributor

No. It does not need awaitTermination.

I have extract the same code. and it works. It will be really hard to know what the problem extract is without seeing your real environment.

I attached the note book here. Which is working for me.

streaming.json.zip

@plamen-paskov
Copy link
Author

plamen-paskov commented Oct 4, 2017

I removed the awaitTermination and it seems it's working fine now.
I found something else related to the streaming charts. What i'm trying to achieve is to have continuously updated streaming chart (import the attached notebook to see how i'm doing it).

streaming_chart_kinesis.json.zip

The problem i encountered with this requirement is that when the sink receive the second batch of data the chart paragraph does not run and there is an exception InterpreterException("Can not run current Paragraph") . You will not see this exception anywhere in the logs. You have to manually attach a onFailure callback to the Future in ZeppelinContextHolder (src/main/scala/com/knockdata/spark/highcharts/ZeppelinContextHolder.scala) . It looks like the first time when you run the paragraph containing highcharts() singleton call the sink is able to successfully run the paragraph but when new data arrive later on the stream the current context paragraph id was changed to be the paragraph id of the last running paragraph (in this case it equals the paragraph id of the first sink call). I inspected the zeppelin code and i didn't find a z.run method overload where you can disable the paragraph id validation. A possible solution to this is to update the ZeppelinContextHolder.scala and replace:

Future(z.run(paragraphId))

with something like

    Future({
      val context = z.getInterpreterContext
      val noteId = context.getNoteId

      val runners = z.getInterpreterContextRunner(noteId, paragraphId, context)
      if (runners.size <= 0) throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size)
      else {
        val i$ = runners.iterator
        while (i$.hasNext) {
          val r: InterpreterContextRunner = i$.next
          r.run()
        }
      }
    })

which will do the job until zeppelin devs handle this somehow. Without this i will not be able to complete my requirement. If you have another solution to the problem i will appreciate to share it with me.

Thanks

@rockie-yang
Copy link
Contributor

Great to hear it works for you.

The method highcharts only need be invoked once. The internal sink will be triggered when there are new data comes. And new chart will be rendered.

@plamen-paskov
Copy link
Author

plamen-paskov commented Oct 5, 2017

Consider the following scenario:

  • Modify ZeppelinContextHolder.scala and replace
Future(z.run(paragraphId))

with

    val f = Future(z.run(paragraphId))

    f.onFailure({
      case err: Throwable => {
        val file = new File("/path/to/file.log")
        val ps = new PrintStream(file)
        err.printStackTrace(ps)
        ps.close()
      }
    })
  • Build and deploy the jar
  • Restart zeppelin
  • Run paragraph 1 (bind to kinesis stream)
  • Run paragraph 2 (highcharts() invocation)
  • Run paragraph 4 (load highcharts.js)
  • Start sending events to kinesis stream
  • Wait some time
  • Observe the generated log file: it will contain exception stack trace with output similar to this
org.apache.zeppelin.interpreter.InterpreterException: Can not run current Paragraph
        at org.apache.zeppelin.spark.ZeppelinContext.run(ZeppelinContext.java:332)
        at org.apache.zeppelin.spark.ZeppelinContext.run(ZeppelinContext.java:321)
        at com.knockdata.spark.highcharts.ZeppelinContextHolder$$anonfun$1.apply$mcV$sp(ZeppelinContextHolder.scala:17)
        at com.knockdata.spark.highcharts.ZeppelinContextHolder$$anonfun$1.apply(ZeppelinContextHolder.scala:17)
        at com.knockdata.spark.highcharts.ZeppelinContextHolder$$anonfun$1.apply(ZeppelinContextHolder.scala:17)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The exception above is caused by the following validation :

if(paragraphId.equals(context.getParagraphId())) {
    throw new InterpreterException("Can not run current Paragraph");
}

I guess it might be caused by the fact that context paragraph id value was changed to match the id of paragraph 3 after the first event was received by zeppelin.
Send the new data via console script and not from zeppelin GUI !!!

One more thing related to streaming data charts: currently what StreamingChart(z) is printing is a call to the jquery highcharts plugin which will cause the chart to "blink" when it's refreshed. To avoid this effect you can add another functionality to print a javascript call to addSeries() which will not reload the whole chart but will just append the new data

@rockie-yang
Copy link
Contributor

The exception indicate that you can not run the current paragraph from the current paragraph.

This is also why we need a separate paragraph just put StreamingChart(z). You can create a separate graph and get id then run it. If you want to run the next paragraph, check the code, which has find the next paragraph id.

That's right, there is a blink when new data comes. The solution you mentioned I had think to add it, why got no time now to work on it. I'm wondering if you have some time create a PR?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants