Skip to content

Commit

Permalink
Feature engine ok (#325)
Browse files Browse the repository at this point in the history
* Pandas working with ok method implementation

* Added check summary with ok call on Check

* Pre-commit validations

* Revision change for black

* Added ruff on the pre-commit file

* Added configuration for ruff

* Added configuration for ruff

* Added configuration for ruff

* Added configuration for ruff

* Added configuration for ruff

* Proper ruff config

* Added proper config for ruff in pre-commit

* Added ruff

* Added ruff

* Added ruff

* Added ruff

* Added specific rules ruff

* Working version of polars ok check method

* Added proper repo config in pre-commit

* Remove unused import induced

* All checks formatted

* Added new README.md information for Assertions
  • Loading branch information
canimus authored Sep 28, 2024
1 parent c0e9e27 commit a8f0164
Show file tree
Hide file tree
Showing 88 changed files with 407 additions and 180 deletions.
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[run]
source=./cuallee
omit = cuallee/bigquery_validation.py
omit = cuallee/bigquery_validation.py
2 changes: 1 addition & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

version: 2
updates:
- package-ecosystem: "pip"
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "weekly"
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: ci
name: ci
on:
push:
branches:
Expand Down
17 changes: 17 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.1.0
hooks:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 24.8.0
hooks:
- id: black
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.6.8
hooks:
- id: ruff
args: ["-v", "--exclude", "^test/"]
files: "^cuallee/.*\\.py$"
2 changes: 1 addition & 1 deletion MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1 +1 @@
include cuallee/bio/amino_acids.csv
include cuallee/bio/amino_acids.csv
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ black: # Format code
@black test

clean: # Remove workspace files
@find . -name "__pycache__" -exec rm -rf {} +
@find . -name "__pycache__" -exec rm -rf {} +
@rm -rf ./.pytest_cache
@rm -rf ./htmlcov
@rm -rf dist/
Expand Down Expand Up @@ -53,4 +53,4 @@ testers: # Generate all test functions on folder/
@python -c "print('To Test: 🏃')"

loadenv:
@echo "for i in $`cat .env$`; do export $$i; done"
@echo "for i in $`cat .env$`; do export $$i; done"
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,19 @@ check.validate(df).show(truncate=False)
+---+-------------------+------------------+-------+-------------------------+------------+--------------------+----+----------+---------+--------------+------+

```
### Assertions
`[2024-09-28]`__New feature!__ Return a simple `true|false` as a unified result for your `check`
```python
import pandas as pd
from cuallee import Check
df = pd.DataFrame({"X":[1,2,3]})
# .ok(dataframe) method of a check will call validate and then verify that all rules are PASS
assert Check().is_complete("X").ok(df)
```


### Controls
`[2023-12-28]`__New feature!__ to simplify the entire validation of a dataframe in a particular dimension.
Simplify the entire validation of a dataframe in a particular dimension.
```python
import pandas as pd
from cuallee import Control
Expand Down
16 changes: 9 additions & 7 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import warnings
import pytest
from cuallee import Check, CheckLevel
from pyspark.sql import SparkSession
Expand Down Expand Up @@ -86,21 +85,24 @@ def db() -> duckdb.DuckDBPyConnection:
@pytest.fixture(scope="session")
def bq_client():

if Path('temp/key.json').exists()==True:
credentials = os.getenv('GOOGLE_APPLICATION_CREDENTIALS')
if Path("temp/key.json").exists() == True:
credentials = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
else:
from google.oauth2 import service_account
import os
import json

with open("key.json", "w") as writer:
json.dump(json.loads(os.getenv("GOOGLE_KEY")), writer)

credentials = service_account.Credentials.from_service_account_file("key.json")

try:
client = bigquery.Client(project="cuallee-bigquery-386709", credentials=credentials)
client = bigquery.Client(
project="cuallee-bigquery-386709", credentials=credentials
)
return client
except:
pass
#finally:
#client.stop()
# finally:
# client.stop()
47 changes: 35 additions & 12 deletions cuallee/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from toolz.curried import map as map_curried

logger = logging.getLogger("cuallee")
__version__ = "0.14.1"
__version__ = "0.15.0"


class CustomComputeException(Exception):
Expand Down Expand Up @@ -177,6 +177,9 @@ def validate_data_types(self, rules: List[Rule], dataframe: Any) -> bool:
def summary(self, check: Any, dataframe: Any) -> Any:
"""Computes all predicates and expressions for check summary"""

def ok(self, check: Any, dataframe: Any) -> Any:
"""Return True or False after validation of a dataframe"""


class Check:
def __init__(
Expand Down Expand Up @@ -1243,7 +1246,7 @@ def is_custom(
)
return self

def validate(self, dataframe: Any):
def validate(self, dataframe: Any, ok: bool = False):
"""
Compute all rules in this check for specific data frame
Expand All @@ -1257,29 +1260,49 @@ def validate(self, dataframe: Any):
self.dtype = first(re.match(r".*'(.*)'", str(type(dataframe))).groups())
match self.dtype:
case self.dtype if "pyspark" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.pyspark_validation")
self.compute_engine = importlib.import_module(
"cuallee.pyspark_validation"
)
case self.dtype if "pandas" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.pandas_validation")
self.compute_engine = importlib.import_module(
"cuallee.pandas_validation"
)
case self.dtype if "snowpark" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.snowpark_validation")
self.compute_engine = importlib.import_module(
"cuallee.snowpark_validation"
)
case self.dtype if "polars" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.polars_validation")
self.compute_engine = importlib.import_module(
"cuallee.polars_validation"
)
case self.dtype if "duckdb" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.duckdb_validation")
self.compute_engine = importlib.import_module(
"cuallee.duckdb_validation"
)
case self.dtype if "bigquery" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.bigquery_validation")
self.compute_engine = importlib.import_module(
"cuallee.bigquery_validation"
)
case self.dtype if "daft" in self.dtype:
self.compute_engine = importlib.import_module("cuallee.daft_validation")
case _:
raise NotImplementedError(f"{self.dtype} is not yet implemented in cuallee")


raise NotImplementedError(
f"{self.dtype} is not yet implemented in cuallee"
)

assert self.compute_engine.validate_data_types(
self.rules, dataframe
), "Invalid data types between rules and dataframe"

return self.compute_engine.summary(self, dataframe)
if ok:
result = self.compute_engine.ok(self, dataframe)
else:
result = self.compute_engine.summary(self, dataframe)
return result

def ok(self, dataframe: Any) -> bool:
"""True when all checks passed"""
return self.validate(dataframe, ok=True)


class Control:
Expand Down
24 changes: 22 additions & 2 deletions cuallee/duckdb_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from toolz import first # type: ignore
from string import Template
import re
import textwrap

from cuallee import Check, Rule
from cuallee import Check, Rule, CheckStatus


class Compute:
Expand Down Expand Up @@ -158,7 +159,7 @@ def is_daily(self, rule: Rule) -> str:
template = Template(
"""
distinct(select LIST_VALUE(count(B.$id),SUM(CAST(B.$id IS NULL AS INTEGER))::INTEGER) as r from (
select distinct(unnest(range(min($id)::TIMESTAMP, cast(date_add(max($id), INTERVAL 1 DAY) as TIMESTAMP), INTERVAL 1 DAY))) as w,
select distinct(unnest(range(min($id)::TIMESTAMP, cast(date_add(max($id), INTERVAL 1 DAY) as TIMESTAMP), INTERVAL 1 DAY))) as w,
extract(dow from w) as y from '$table'
) A LEFT JOIN '$table' B ON A.w = B.$id where A.y in $value)
""".strip()
Expand Down Expand Up @@ -212,6 +213,20 @@ def compute(check: Check):


def summary(check: Check, connection: dk.DuckDBPyConnection) -> list:
if isinstance(connection, dk.DuckDBPyRelation):
raise NotImplementedError(
textwrap.dedent(
"""
Invalid DuckDb object, please pass a DuckDbPyConnection instead. And register your relation like:
conn = duckdb.connect()
check = Check(table_name="demo_table")
duckdb_relation_object = conn.sql("FROM range(10)")
conn.register(view_name=check.table_name, python_object=duckdb_relation_object)
check.is_complete("range").validate(conn)
"""
)
)

unified_columns = ",\n\t".join(
[
operator.methodcaller(rule.method, rule)(Compute(check.table_name))
Expand Down Expand Up @@ -291,3 +306,8 @@ def _evaluate_status(pass_rate, pass_threshold):
for index, (hash_key, rule) in enumerate(check._rule.items(), 1)
]
return pd.DataFrame(computation_basis).reset_index(drop=True)


def ok(check: Check, connection: dk.DuckDBPyConnection) -> bool:
"""True when all rules in the check pass validation"""
return summary(check, connection).status.str.match(CheckStatus.PASS.value).all()
7 changes: 6 additions & 1 deletion cuallee/pandas_validation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Dict, Union, List
from cuallee import Check, Rule
from cuallee import Check, Rule, CheckStatus
import pandas as pd # type: ignore
import operator
import numpy as np
Expand Down Expand Up @@ -355,3 +355,8 @@ def _evaluate_status(pass_rate, pass_threshold):
for index, (hash_key, rule) in enumerate(check._rule.items(), 1)
]
return pd.DataFrame(computation_basis)


def ok(check: Check, dataframe: pd.DataFrame) -> bool:
"""True when all rules in the check pass validation"""
return summary(check, dataframe).status.str.match(CheckStatus.PASS.value).all()
17 changes: 15 additions & 2 deletions cuallee/polars_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import numpy as np
import polars as pl # type: ignore
from toolz import compose, first # type: ignore

from cuallee import Check, Rule
from toolz.curried import map as map_curried
from cuallee import Check, Rule, CheckStatus
from functools import partial


class Compute:
Expand Down Expand Up @@ -528,3 +529,15 @@ def _evaluate_status(pass_rate, pass_threshold):
]
pl.Config.set_tbl_cols(12)
return pl.DataFrame(computation_basis)

def ok(check: Check, dataframe: pl.DataFrame) -> bool:
"""True when all rules in the check pass validation"""

_all_pass = compose(
all,
map_curried(partial(operator.eq, CheckStatus.PASS.value)),
operator.methodcaller("to_list"),
operator.methodcaller("to_series"),
operator.methodcaller("select", "status"),
)
return _all_pass(summary(check, dataframe))
25 changes: 20 additions & 5 deletions cuallee/pyspark_validation.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import enum
import operator
from dataclasses import dataclass
from functools import reduce
from functools import reduce, partial
from typing import Any, Callable, Dict, List, Tuple, Type, Union

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Window as W
from pyspark.sql import Column, DataFrame, Row
from toolz import first, valfilter, last # type: ignore

from toolz import first, valfilter, last, compose
from toolz.curried import map as map_curried
import cuallee.utils as cuallee_utils
from cuallee import Check, ComputeEngine, Rule, CustomComputeException
from cuallee import Check, ComputeEngine, Rule, CustomComputeException, CheckStatus

import os

Expand Down Expand Up @@ -598,7 +598,9 @@ def _execute(dataframe: DataFrame, key: str):
rule.value, Callable
), "Please provide a Callable/Function for validation"
computed_frame = rule.value(dataframe)
assert "pyspark" in str(type(computed_frame)), "Custom function does not return a PySpark DataFrame"
assert "pyspark" in str(
type(computed_frame)
), "Custom function does not return a PySpark DataFrame"
assert (
len(computed_frame.columns) >= 1
), "Custom function should retun at least one column"
Expand Down Expand Up @@ -857,3 +859,16 @@ def _value(x):
)

return result


def ok(check: Check, dataframe: DataFrame) -> bool:
"""True when all rules in the check pass validation"""

_all_pass = compose(
all,
map_curried(partial(operator.eq, CheckStatus.PASS.value)),
map_curried(operator.attrgetter("status")),
operator.methodcaller("collect"),
operator.methodcaller("select", "status"),
)
return _all_pass(summary(check, dataframe))
Loading

0 comments on commit a8f0164

Please sign in to comment.