This package connects Apache Spark™ to OpenTelemetry Tracing.
This allows reporting tracing from any Spark or PySpark job to OpenTelemetry Collector, or directly to any supported backend.
ℹ️This project is in early development. It can be used for proof-of-concept, but the instrumentation is not comprehensive.
Here's what a simple Spark job managed by Airflow might look like, if both Airflow and Spark report traces to Grafana Tempo:
The recommended way to use Spot relies on OpenTelemetry Autoconfigure to obtain the OpenTelemetry configuration. You pass the spot-complete jar to spark-submit to make Spot available to your job, and configure spark.extraListeners
to enable it.
The jar filename embeds the two-digit Spark version, and the scala version:
spot-complete-3.3_2.12-1.0.0.jar
↑↑↑ ↑↑↑↑ ↑↑↑↑↑
A B C
A: Spark Version ∈ { 3.3, 3.4, 3.5 }
B: Scala Version ∈ { 2.12, 2.13 }
C: Spot library version
The spark and scala versions must match your spark process.
SPARK_VERSION=3.5
SCALA_VERSION=2.12
spark-submit \
+ --jar com.xebia.data.spot.spot-complete-${SPARK_VERSION}_${SCALA_VERSION}-x.y.z.jar \
+ --conf spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \
com.example.MySparkJob
To use context propagation, provide the necessary headers as SparkConf values. The default configuration uses traceparent
:
SPARK_VERSION=3.5
SCALA_VERSION=2.12
spark-submit \
--jar com.xebia.data.spot.spot-complete-${SPARK_VERSION}_${SCALA_VERSION}-x.y.z.jar \
--conf spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \
+ --conf spark.com.xebia.data.spot.traceparent=00-1234abcd5678abcd-1234abcd-01 \
com.example.MySparkJob
All SparkConf values that start with spark.com.xebia.data.spot.
are made available to the ContextPropagator
. If you use a different propagator than the default, you can prefix its required keys accordingly.
See also Airflow Context Propagation below.
Instrumenting for telemetry is useless until you publish the recorded data somewhere. This might be the native metrics suite of your chosen cloud provider, or a free or commercial third party system such as Prometheus + Tempo + Grafana. You can have your instrumented Spark jobs publish directly to the backend, or run the traffic via OpenTelemetry Collector. Choosing the backend and routing architecture is outside the scope of this document.
If you're using Spark on top of Kubernetes, you should install and configure the OpenTelemetry Operator. In any other deployment you should publish the appropriate environment variables for autoconf.
The automatic configuration is controlled by a set of environment variables or JVM system properties. These are documented here: configuration.
Use any mechanism of choice, such as shell exports:
export OTEL_TRACES_EXPORTER=zipkin
export OTEL_EXPORTER_ZIPKIN_ENDPOINT=http://localhost:9411/api/v2/spans
Note: if you use the Kubernetes Operator, these environment variables are controlled there.
Besides all the standard ways, JVM system properties can also be passed to Spot via the spark-submit command:
SPARK_VERSION=3.5
SCALA_VERSION=2.12
spark-submit \
--jar com.xebia.data.spot.spot-complete-${SPARK_VERSION}_${SCALA_VERSION}-x.y.z.jar \
--conf spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \
+ --conf spark.otel.traces.exporter=zipkin \
+ --conf spark.exporter.zipkin.endpoint=http://localhost:9411/api/v2/spans \
com.example.MySparkJob
All options starting with spark.otel
are so exposed. Note: existing values are overwritten.
If the OpenTelemetry Autoconfigure mechanism doesn't meet your requirements, you can provide your own OpenTelemetry instance programmatically. This requires a few steps:
- Write a class that implements
com.xebia.data.spot.OpenTelemetrySdkProvider
.package com.example import io.opentelemetry.api.OpenTelemetry import io.opentelemetry.sdk.OpenTelemetrySdk class MyCustomProvider extends OpenTelemetrySdkProvider { override def get(config: Map[String, String]): OpenTelemetry = OpenTelemetrySdk.builder() // customize SDK construction .build() }
- Make the compiled class available to your Spark environment.
- Add
spark.com.xebia.data.spot.sdkProvider
to your spark config, referencing your implementation.SPARK_VERSION=3.5 SCALA_VERSION=2.12 # This will be 2.12 or 2.13, whichever matches your Spark deployment. spark-submit \ --jar com.xebia.data.spot.spot-complete-${SPARK_VERSION}_${SCALA_VERSION}-x.y.z.jar \ --conf spark.extraListeners=com.xebia.data.spot.TelemetrySparkListener \ + --conf spark.com.xebia.data.spot.sdkProvider=com.example.MyCustomProvider \ com.example.MySparkJob
Apache Airflow is instrumented with OpenTelemetry docs. In a deployment where both Airflow and Spark are reporting telemetry, the tracing generated by Spark can be linked to the tracing coming out of Airflow. This requires a custom Operator
.
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.utils.context import Context as AirflowContext # ntb confused with OpenTelemetry Context
def compose_traceparent(context: AirflowContext) -> str:
"""Obtains the Trace ID and Span ID of the ongoing Task Instance.
This function can only be called during task execution. The ongoing OpenTelemetry Context is not included in the
Airflow Context, but Airflow uses a deterministic function to generate the required IDs out of the DagRun and
TaskInstance. This function uses the same and outputs a string in the format of a W3C ``traceparent`` header.
:param context: Airflow task execution context.
:return: Traceparent header value.
"""
from airflow.traces import NO_TRACE_ID
from airflow.traces.utils import gen_trace_id, gen_span_id
version = "00"
trace_id = gen_trace_id(context['dag_run'])
span_id = gen_span_id(context['task_instance'])
flags = "00" if trace_id == NO_TRACE_ID else "01"
return f"{version}-{trace_id}-{span_id}-{flags}"
class SpotSparkSubmitOperator(SparkSubmitOperator):
def execute(self, context: AirflowContext):
self.conf["spark.com.xebia.data.spot.traceparent"] = compose_traceparent(context)
super().execute(context)
This is implemented in the examples/spark-opentelemetry example of our Whirl project.
Because that's something that already exists, and this is something I wanted to build. If the DropWizard metrics in Spark meet your needs, you should consider using those.
In part for the same reason: because that's something that already exists, and this is something I wanted to build. There's obviously more of a difference here; the purpose of telemetry (whether that's DropWizard or OpenTelemetry) is to enable monitoring and alerting, whereas the JobHistory server is used reactively.
If the OpenTelemetry SDK cannot be obtained during startup, we allow the listener –and enclosing spark job– to crash.
Trade-off: the enclosing spark job can run just fine without the telemetry listener. If we handle any initialization errors, we get out of the way of the instrumented business process.
Rationale: if you instrument the job, you expect to see your telemetry. Fail-fast behaviour ensures no telemetry is silently lost.
These are things that are out of scope for the moment:
- Downstream propagation of trace context. It may be useful in some environments to forward the trace context to downstream systems such as data stores.