DGD-4258 DGD-4311 Events emission enhancements #11
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This pull request introduces a significant refactor to the event emission logic in the Spark agent, focusing on the introduction of a new
NuEventEmitter
class. The changes aim to improve the filtering and handling of OpenLineage events.Introduction of
NuEventEmitter
:integration/spark/app/src/main/java/io/openlineage/spark/agent/NuEventEmitter.java
: Added theNuEventEmitter
class, which includes methods to filter events based on job type, event type, and job name, and to discard column lineage facets for non-complete events.Refactoring event emission:
integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/RddExecutionContext.java
: Replaced direct calls toeventEmitter.emit
withNuEventEmitter.emit
in thestart
andend
methods forSparkListenerJobStart
andSparkListenerJobEnd
events. [1] [2] [3]integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkApplicationExecutionContext.java
: Updated thestart
andend
methods forSparkListenerApplicationStart
andSparkListenerApplicationEnd
events to useNuEventEmitter.emit
. [1] [2] [3]integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/SparkSQLExecutionContext.java
: Applied similar changes to useNuEventEmitter.emit
for various event methods, includingstart
andend
forSparkListenerSQLExecutionStart
,SparkListenerSQLExecutionEnd
,SparkListenerStageSubmitted
, andSparkListenerStageCompleted
events. [1] [2] [3] [4] [5] [6] [7] [8] [9]Enhancements to
NuFacet
:integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/NuFacet.java
: Implemented a method to fetch the job name (jobNurn
) from the Spark session configuration, adding logging for cases where the job name is not found. [1] [2]Removal of unused fields:
integration/spark/shared/src/main/java/io/openlineage/spark/api/OpenLineageContext.java
: Removed thejobNurn
field, which is now dynamically fetched inNuFacet
.Type of Change
Checklist