Skip to content

Commit

Permalink
Feature dtype (#315)
Browse files Browse the repository at this point in the history
* Added skip tests for cloud and version changes

* Removed validation for is_custom to match just pyspark
  • Loading branch information
canimus authored Sep 12, 2024
1 parent 766701d commit b413bd0
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 95 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

Meaning `good` in Aztec ([Nahuatl](https://nahuatl.wired-humanities.org/content/cualli-0)), _pronounced: QUAL-E_

This library provides an intuitive `API` to describe `checks` initially just for `PySpark` dataframes `v3.3.0`. And extended to `pandas`, `snowpark`, `duckdb`, `daft` and more.
This library provides an intuitive `API` to describe data quality `checks` initially just for `PySpark` dataframes `v3.3.0`. And extended to `pandas`, `snowpark`, `duckdb`, `daft` and more.
It is a replacement written in pure `python` of the `pydeequ` framework.

I gave up in _deequ_ as after extensive use, the API is not user-friendly, the Python Callback servers produce additional costs in our compute clusters, and the lack of support to the newest version of PySpark.
Expand Down
110 changes: 24 additions & 86 deletions cuallee/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
import enum
import hashlib
import importlib
Expand All @@ -8,52 +9,11 @@
from datetime import datetime, timedelta, timezone
from types import ModuleType
from typing import Any, Dict, List, Literal, Optional, Protocol, Tuple, Union, Callable
from toolz import compose, valfilter # type: ignore
from toolz import compose, valfilter, first # type: ignore
from toolz.curried import map as map_curried

logger = logging.getLogger("cuallee")
__version__ = "0.13.2"
# Verify Libraries Available
# ==========================
try:
from pandas import DataFrame as pandas_dataframe # type: ignore
except (ModuleNotFoundError, ImportError):
logger.debug("KO: Pandas")

try:
from polars.dataframe.frame import DataFrame as polars_dataframe # type: ignore
except (ModuleNotFoundError, ImportError):
logger.debug("KO: Polars")

try:
from pyspark.sql import DataFrame as pyspark_dataframe
except (ModuleNotFoundError, ImportError):
logger.debug("KO: PySpark")

try:
from pyspark.sql.connect.dataframe import DataFrame as pyspark_connect_dataframe
except (ModuleNotFoundError, ImportError):
logger.debug("KO: PySpark Connect")

try:
from snowflake.snowpark import DataFrame as snowpark_dataframe # type: ignore
except (ModuleNotFoundError, ImportError):
logger.debug("KO: Snowpark")

try:
from duckdb import DuckDBPyConnection as duckdb_dataframe # type: ignore
except (ModuleNotFoundError, ImportError):
logger.debug("KO: DuckDB")

try:
from google.cloud import bigquery
except (ModuleNotFoundError, ImportError):
logger.debug("KO: BigQuery")

try:
from daft import DataFrame as daft_dataframe
except (ModuleNotFoundError, ImportError):
logger.debug("KO: BigQuery")
__version__ = "0.14.0"


class CustomComputeException(Exception):
Expand Down Expand Up @@ -252,6 +212,7 @@ def __init__(
self.rows = -1
self.config: Dict[str, str] = {}
self.table_name = table_name
self.dtype = "cuallee.dataframe"
try:
from .iso.checks import ISO
from .bio.checks import BioChecks
Expand Down Expand Up @@ -1293,49 +1254,26 @@ def validate(self, dataframe: Any):
# Stop execution if the there is no rules in the check
assert not self.empty, "Check is empty. Try adding some rules?"

# When dataframe is PySpark DataFrame API
if "pyspark_dataframe" in globals() and isinstance(
dataframe, pyspark_dataframe
):
self.compute_engine = importlib.import_module("cuallee.pyspark_validation")

elif "pyspark_connect_dataframe" in globals() and isinstance(
dataframe, pyspark_connect_dataframe
):
self.compute_engine = importlib.import_module("cuallee.pyspark_validation")

# When dataframe is Pandas DataFrame API
elif "pandas_dataframe" in globals() and isinstance(
dataframe, pandas_dataframe
):
self.compute_engine = importlib.import_module("cuallee.pandas_validation")

# When dataframe is Snowpark DataFrame API
elif "snowpark_dataframe" in globals() and isinstance(
dataframe, snowpark_dataframe
):
self.compute_engine = importlib.import_module("cuallee.snowpark_validation")

elif "duckdb_dataframe" in globals() and isinstance(
dataframe, duckdb_dataframe
):
self.compute_engine = importlib.import_module("cuallee.duckdb_validation")

elif "bigquery" in globals() and isinstance(dataframe, bigquery.table.Table):
self.compute_engine = importlib.import_module("cuallee.bigquery_validation")

elif "polars_dataframe" in globals() and isinstance(
dataframe, polars_dataframe
):
self.compute_engine = importlib.import_module("cuallee.polars_validation")

elif "daft_dataframe" in globals() and isinstance(dataframe, daft_dataframe):
self.compute_engine = importlib.import_module("cuallee.daft_validation")

else:
raise Exception(
"Cuallee is not ready for this data structure. You can log a Feature Request in Github."
)
self.dtype = first(re.match(r".*'(.*)'", str(type(dataframe))).groups())
match self.dtype:
case self.dtype if "pyspark" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.pyspark_validation")
case self.dtype if "pandas" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.pandas_validation")
case self.dtype if "snowpark" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.snowpark_validation")
case self.dtype if "polars" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.polars_validation")
case self.dtype if "duckdb" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.duckdb_validation")
case self.dtype if "bigquery" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.bigquery_validation")
case self.dtype if "daft" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.daft_validation")
case _:
raise NotImplementedError(f"{self.dtype} is not yet implemented in cuallee")



assert self.compute_engine.validate_data_types(
self.rules, dataframe
Expand Down
2 changes: 1 addition & 1 deletion cuallee/cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def publish(check):
os.getenv("CUALLEE_CLOUD_HOST"),
data=compress(check),
headers=CUALLEE_CLOUD_HEADERS,
verify=False,
verify=True,
)
except (ModuleNotFoundError, KeyError, ConnectionError) as error:
logger.debug(f"Unable to send check to cuallee cloud: {str(error)}")
Expand Down
4 changes: 1 addition & 3 deletions cuallee/pyspark_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,7 @@ def _execute(dataframe: DataFrame, key: str):
rule.value, Callable
), "Please provide a Callable/Function for validation"
computed_frame = rule.value(dataframe)
assert isinstance(
computed_frame, DataFrame
), "Custom function does not return a PySpark DataFrame"
assert "pyspark" in str(type(computed_frame)), "Custom function does not return a PySpark DataFrame"
assert (
len(computed_frame.columns) >= 1
), "Custom function should retun at least one column"
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ build-backend = "setuptools.build_meta"

[project]
name = "cuallee"
version = "0.13.2"
version = "0.14.0"

authors = [
{ name="Herminio Vazquez", email="[email protected]"},
{ name="Virginie Grosboillot", email="[email protected]" }
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = cuallee
version = 0.13.2
version = 0.14.0
[options]
packages = find:
include_package_data = True
5 changes: 3 additions & 2 deletions test/unit/cloud/test_publish.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from cuallee.cloud import publish, compress, CUALLEE_CLOUD_HEADERS
from unittest.mock import patch
import os
import pytest


@pytest.mark.skip
def test_publish(spark, check):
os.environ["CUALLEE_CLOUD_HOST"] = "https://localhost:5000/msgpack"
os.environ["CUALLEE_CLOUD_TOKEN"] = "test"
Expand All @@ -18,7 +19,7 @@ def test_publish(spark, check):
verify=False,
)


@pytest.mark.skip
def test_connection(spark, check):
os.environ["CUALLEE_CLOUD_HOST"] = "https://localhost:6000/wrong"
os.environ["CUALLEE_CLOUD_TOKEN"] = "test"
Expand Down

0 comments on commit b413bd0

Please sign in to comment.