Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] ExpectColumnPairValuesAToBeGreaterThanB Spark Databricks #10559

Open
victorgrcp opened this issue Oct 24, 2024 · 2 comments
Open

[BUG] ExpectColumnPairValuesAToBeGreaterThanB Spark Databricks #10559

victorgrcp opened this issue Oct 24, 2024 · 2 comments

Comments

@victorgrcp
Copy link

Describe the bug
I'm using an Spark Data Source and Spark Dataframes as Data Assets. When I try to validate the ExpectColumnPairValuesAToBeGreaterThanB expectation it raises an error. I'm going to copy a small part of the exception raised:

{
"success": false,
"expectation_config": {
"type": "expect_column_pair_values_a_to_be_greater_than_b",
"kwargs": {
"column_A": "tpep_dropoff_datetime",
"column_B": "tpep_pickup_datetime",
"batch_id": "ds_samples_nyctaxi-da_df_trips"
},
"meta": {
"columns": [
"tpep_pickup_datetime",
"tpep_dropoff_datetime"
]
},
"id": "7310fd00-2153-43e9-8673-e8d7c4688abd"
},
"result": {},
"meta": {},
"exception_info": {
"('column_pair_values.a_greater_than_b.unexpected_count', '452c8f1abbd4f1d85e1503a16beb23ec', 'or_equal=None')": {
"exception_traceback": "Traceback (most recent call last):\n File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/great_expectations/execution_engine/execution_engine.py", line 533, in _process_direct_and_bundled_metric_computation_configurations\n metric_computation_configuration.metric_fn( # type: ignore[misc] # F not callable\n File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.11/site-packages/great_expectations/expectations/metrics/map_metric_provider/map_condition_auxilliary_methods.py", line 625, in _spark_map_condition_unexpected_count_value\n return filtered.count()\n ^^^^^^^^^^^^^^^^\n File "/databricks/spark/python/pyspark/sql/connect/dataframe.py", line 300, in count\n table, _ = self.agg(F._invoke_function("count", F.lit(1)))._to_table()\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/databricks/spark/python/pyspark/sql/connect/dataframe.py", line 1971, in _to_table\n table, schema, self._execution_info = self._session.client.to_table(\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1014, in to_table\n table, schema, metrics, observed_metrics, _ = self._execute_and_fetch(req, observations)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1755, in _execute_and_fetch\n for response in self._execute_and_fetch_as_iterator(\n File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1731, in _execute_and_fetch_as_iterator\n
...
"exception_message": "[CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "tpep_dropoff_datetime". It's probably because of illegal references like df1.select(df2.col(\"a\")). SQLSTATE: 42704\n\nJVM stacktrace:\norg...}"

To Reproduce

data_source = context.data_sources.add_spark(name=data_source_name)
data_asset = data_source.add_dataframe_asset(name=asset_name)
batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_def_name)
df = spark.read.table(f"{catalog}.{schema}.{table}") # samples.nyxtaxi.trips

date_expectation = gx.expectations.ExpectColumnPairValuesAToBeGreaterThanB(
    column_A="tpep_dropoff_datetime",
    column_B="tpep_pickup_datetime",
    or_equal=True 
)

batch = batch_definition.get_batch(batch_parameters={'dataframe': df})
val_results = batch.validate(date_expectation)

Expected behavior
Not an Exception error.

Environment:

  • Operating System: Linux
  • Great Expectations Version: 1.1.3
  • Data Source: Spark DataFrame
  • Cloud environment: Azure Databricks

Additional context
I tried with a Pandas DF and it worked, but I need to use the UnexpectedRowsExpectation expectation for other more complex validations. I replaced the ExpectColumnPairValuesAToBeGreaterThanB for UnexpectedRowsExpectation to workaround this datetime validation.

unexpected = gx.expectations.UnexpectedRowsExpectation( unexpected_rows_query = ( "SELECT * FROM {batch} WHERE tpep_dropoff_datetime < tpep_pickup_datetime" ) )

Seems to work for now, but I wanted to raise this bug.
Thank you :)

@adeola-ak
Copy link
Contributor

@victorgrcp hi there thank you for the detailed report, and I appreciate the workaround you shared. I am glad you were able to unblock yourself. i'm looking into this and i am actually not able to reproduce. what version of spark are you on? I am able to get this expectation to work with both success and failure cases.

data = {
    "ID": [1, 2, 3, 4, None],
    "name": ["Alice", "Bob", "Charlie", "David", None],
    "age_when_joined": [25, 30, 35, 40, 28],
    "age_when_left": [26, 38, 38, 49, 30],
}

df = pd.DataFrame(data)
spark_df = spark.createDataFrame(df)

if isinstance(spark_df, SparkDataFrame):
    print("Spark DataFrame")
else:
    print("Not a Spark DataFrame")

batch_parameters = {"dataframe": spark_df}

data_source_name = "my_data_source"
data_source = context.data_sources.add_spark(name=data_source_name)
data_asset_name = "my_dataframe_data_asset"
data_asset = data_source.add_dataframe_asset(name=data_asset_name)
batch_definition_name = "my_batch_definition"
batch_definition = data_asset.add_batch_definition_whole_dataframe(
    batch_definition_name
)

suite = context.suites.add(
    gx.core.expectation_suite.ExpectationSuite(name="my_expectations")
)

suite.add_expectation(
    gx.expectations.ExpectColumnPairValuesAToBeGreaterThanB(
        column_A="age_when_left", 
        column_B="age_when_joined", 
        or_equal=True
    )
)

validation_definition = context.validation_definitions.add(
    gx.core.validation_definition.ValidationDefinition(
        name="my_validation_definition",
        data=batch_definition,
        suite=suite,
    )
)

checkpoint = context.checkpoints.add(
    gx.Checkpoint(
        name="checkpoint",
        validation_definitions=[validation_definition],
        actions=[gx.checkpoint.actions.UpdateDataDocsAction(name="dda")],
        result_format={"result_format": "BASIC", "unexpected_index_column_names": ["ID", "name", "age_when_left", "age_when_joined"]},
    )
)

validation_results = checkpoint.run(batch_parameters=batch_parameters)
print(validation_results)
context.open_data_docs()

result:

Screenshot 2024-10-29 at 9 57 10 AM

@victorgrcp
Copy link
Author

Hi @adeola-ak, I updated to GX 1.2.0 and I'm on Spark version 3.5.0. Still the same problem.

My Context setup:

data_source = context.data_sources.add_spark(name=data_source_name)
data_asset = data_source.add_dataframe_asset(name=asset_name)
batch_definition = data_asset.add_batch_definition_whole_dataframe(batch_def_name)

df = spark.read.table(f"{catalog}.{schema}.{table}") # samples.nyctaxi.trips

expectation_suite = context.suites.add(gx.core.expectation_suite.ExpectationSuite(name=suite_name))
validation_definition = context.validation_definitions.add(gx.core.validation_definition.ValidationDefinition(
                            name=definition_name,
                            data=batch_definition,
                            suite=expectation_suite
                        )
                    )
checkpoint = context.checkpoints.add(
                        gx.Checkpoint(
                            name=f"checkpoint_{catalog}_{schema}_{table}",
                            validation_definitions=[validation_definition],
                            result_format={
                                "result_format": "COMPLETE", 
                                "unexpected_index_column_names":["pickup_zip","tpep_pickup_datetime","fare_amount","trip_distance","tpep_dropoff_datetime","dropoff_zip"],
                                "partial_unexpected_count": 0
                            }
                        )
                    )
validation_results = checkpoint.run(batch_parameters={'dataframe': df})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In progress
Development

No branches or pull requests

2 participants