diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 2dc46ae..113ca55 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -24,4 +24,4 @@ jobs: - name: Tests run: PYTHONPATH=. pytest --cov=data_flow --cov-report term - name: Lint - run: flake8 data_flow/ \ No newline at end of file + run: pflake8 data_flow/ \ No newline at end of file diff --git a/Makefile b/Makefile index 6a65b8b..abd825d 100644 --- a/Makefile +++ b/Makefile @@ -11,4 +11,4 @@ tests:: PYTHONPATH=. venv/bin/pytest --cov=data_flow --cov-report html --cov-report term -rP tests/ -vvv lint:: - venv/bin/flake8 data_flow/ + venv/bin/pflake8 data_flow/ diff --git a/data_flow/__init__.py b/data_flow/__init__.py index 1368361..b98e3ed 100644 --- a/data_flow/__init__.py +++ b/data_flow/__init__.py @@ -1,2 +1 @@ -from data_flow.data_flow import DataFlow -from data_flow.lib.FileType import FileType +from .data_flow import DataFlow diff --git a/data_flow/data_flow.py b/data_flow/data_flow.py index a699d55..f3061bc 100644 --- a/data_flow/data_flow.py +++ b/data_flow/data_flow.py @@ -9,7 +9,6 @@ from data_flow.lib import FileType from data_flow.lib.data_columns import data_get_columns, data_delete_columns, data_rename_columns, data_select_columns from data_flow.lib.data_from import ( - df_from_tmp_filename, from_csv_2_file, from_feather_2_file, from_parquet_2_file, @@ -23,6 +22,9 @@ to_json_from_file, to_hdf_from_file, ) +from data_flow.lib.fireducks import from_fireducks_2_file, to_fireducks_from_file +from data_flow.lib.pandas import from_pandas_2_file +from data_flow.lib.polars import from_polars_2_file, to_polars_from_file from data_flow.lib.tools import generate_temporary_filename, delete_file @@ -45,25 +47,44 @@ def __del__(self): if not self.__in_memory: delete_file(self.__filename) + def from_fireducks(self, df: fd.DataFrame): + if self.__in_memory: + self.__data = df + else: + from_fireducks_2_file(df=df, tmp_filename=self.__filename, file_type=self.__file_type) + return self + def to_fireducks(self) -> fd.DataFrame: if self.__in_memory: return self.__data else: - return df_from_tmp_filename(tmp_filename=self.__filename, file_type=self.__file_type) + return to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type) + + def from_pandas(self, df: pd.DataFrame): + if self.__in_memory: + self.__data = fd.from_pandas(df) + else: + from_pandas_2_file(df=df, tmp_filename=self.__filename, file_type=self.__file_type) + return self def to_pandas(self) -> pd.DataFrame: if self.__in_memory: return self.__data.to_pandas() else: - return df_from_tmp_filename(tmp_filename=self.__filename, file_type=self.__file_type).to_pandas() + return to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type).to_pandas() + + def from_polars(self, df: pl.DataFrame): + if self.__in_memory: + self.__data = fd.from_pandas(df.to_pandas()) + else: + from_polars_2_file(df=df, tmp_filename=self.__filename, file_type=self.__file_type) + return self def to_polars(self) -> pl.DataFrame: if self.__in_memory: return pl.from_pandas(self.__data.to_pandas()) else: - return pl.from_pandas( - df_from_tmp_filename(tmp_filename=self.__filename, file_type=self.__file_type).to_pandas() - ) + return to_polars_from_file(tmp_filename=self.__filename, file_type=self.__file_type) def from_csv(self, filename: str): if self.__in_memory: @@ -139,14 +160,14 @@ def head(self): if self.__in_memory: print(self.__data.head()) else: - print(df_from_tmp_filename(tmp_filename=self.__filename, file_type=self.__file_type).head()) + print(to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type).head()) return self def stats(self): if self.__in_memory: data = self.__data else: - data = df_from_tmp_filename(tmp_filename=self.__filename, file_type=self.__file_type) + data = to_fireducks_from_file(tmp_filename=self.__filename, file_type=self.__file_type) print("***** Data stats *****") print(f"Columns names : {data.columns.to_list()}") diff --git a/data_flow/lib/Operator.py b/data_flow/lib/Operator.py new file mode 100644 index 0000000..7abaa22 --- /dev/null +++ b/data_flow/lib/Operator.py @@ -0,0 +1,10 @@ +from enum import Enum + + +class Operator(Enum): + Eq = "==" + Gt = ">" + Lt = "<" + Gte = ">=" + Lte = "<=" + Ne = "!=" diff --git a/data_flow/lib/__init__.py b/data_flow/lib/__init__.py index 24d726a..b656cc7 100644 --- a/data_flow/lib/__init__.py +++ b/data_flow/lib/__init__.py @@ -1 +1,2 @@ from .FileType import FileType +from .Operator import Operator diff --git a/data_flow/lib/data_columns.py b/data_flow/lib/data_columns.py index ecf2275..dfdfd5a 100644 --- a/data_flow/lib/data_columns.py +++ b/data_flow/lib/data_columns.py @@ -45,13 +45,5 @@ def data_select_columns(tmp_filename: str, file_type: FileType, columns: list) - case FileType.feather: data = fd.read_feather(tmp_filename)[columns] data.to_feather(tmp_filename) - case _: raise ValueError(f"File type not implemented: {file_type} !") - - -# def __slice(dataframe, start_row, end_row, start_col, end_col): -# assert len(dataframe) > end_row and start_row >= 0 -# assert len(dataframe.columns) > end_col and start_col >= 0 -# list_of_indexes = list(dataframe.columns)[start_col:end_col] -# return dataframe.iloc[start_row:end_row][list_of_indexes] diff --git a/data_flow/lib/data_from.py b/data_flow/lib/data_from.py index 9ce90e8..e23ad16 100644 --- a/data_flow/lib/data_from.py +++ b/data_flow/lib/data_from.py @@ -52,13 +52,3 @@ def from_hdf_2_file(filename: str, tmp_filename: str, file_type: FileType) -> No fd.read_hdf(filename).to_feather(tmp_filename) case _: raise ValueError(f"File type not implemented: {file_type} !") - - -def df_from_tmp_filename(tmp_filename: str, file_type: FileType) -> fd.DataFrame: - match file_type: - case FileType.parquet: - return fd.read_parquet(tmp_filename) - case FileType.feather: - return fd.read_feather(tmp_filename) - case _: - raise ValueError(f"File type not implemented: {file_type} !") diff --git a/data_flow/lib/fireducks.py b/data_flow/lib/fireducks.py index e69de29..2c5f7e2 100644 --- a/data_flow/lib/fireducks.py +++ b/data_flow/lib/fireducks.py @@ -0,0 +1,23 @@ +import fireducks.pandas as fd + +from data_flow.lib.FileType import FileType + + +def from_fireducks_2_file(df: fd.DataFrame, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + df.to_parquet(tmp_filename) + case FileType.feather: + df.to_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def to_fireducks_from_file(tmp_filename: str, file_type: FileType) -> fd.DataFrame: + match file_type: + case FileType.parquet: + return fd.read_parquet(tmp_filename) + case FileType.feather: + return fd.read_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") diff --git a/data_flow/lib/pandas.py b/data_flow/lib/pandas.py index e69de29..ddec9d5 100644 --- a/data_flow/lib/pandas.py +++ b/data_flow/lib/pandas.py @@ -0,0 +1,24 @@ +import fireducks.pandas as fd +import pandas as pd + +from data_flow.lib.FileType import FileType + + +def from_pandas_2_file(df: pd.DataFrame, tmp_filename: str, file_type: FileType) -> None: + match file_type: + case FileType.parquet: + fd.from_pandas(df).to_parquet(tmp_filename) + case FileType.feather: + fd.from_pandas(df).to_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") + + +def to_pandas_from_file(tmp_filename: str, file_type: FileType) -> fd.DataFrame: + match file_type: + case FileType.parquet: + return pd.read_parquet(tmp_filename) + case FileType.feather: + return pd.read_feather(tmp_filename) + case _: + raise ValueError(f"File type not implemented: {file_type} !") diff --git a/data_flow/lib/polars.py b/data_flow/lib/polars.py index e69de29..384b9e7 100644 --- a/data_flow/lib/polars.py +++ b/data_flow/lib/polars.py @@ -0,0 +1,12 @@ +import polars as pl + +from data_flow.lib import FileType +from data_flow.lib.pandas import to_pandas_from_file, from_pandas_2_file + + +def from_polars_2_file(df: pl.DataFrame, tmp_filename: str, file_type: FileType) -> None: + from_pandas_2_file(df=df.to_pandas(), tmp_filename=tmp_filename, file_type=file_type) + + +def to_polars_from_file(tmp_filename: str, file_type: FileType) -> pl.DataFrame: + return pl.from_pandas(to_pandas_from_file(tmp_filename=tmp_filename, file_type=file_type)) diff --git a/requirements.dev.txt b/requirements.dev.txt index c4e88d3..dde5647 100644 --- a/requirements.dev.txt +++ b/requirements.dev.txt @@ -1,5 +1,5 @@ black flake8 -flake8-pyproject +pyproject-flake8 pytest pytest-cov diff --git a/tests/SequenceTestCase.py b/tests/SequenceTestCase.py index df44173..0fb182a 100644 --- a/tests/SequenceTestCase.py +++ b/tests/SequenceTestCase.py @@ -4,8 +4,8 @@ class SequenceTestCase(BaseTestCase): def _sequence(self, data: DataFlow.DataFrame) -> None: - data.stats() - data.head() + polars = data.to_polars() + self.assertEqual(10, len(data.columns())) data.del_columns( @@ -28,3 +28,8 @@ def _sequence(self, data: DataFlow.DataFrame) -> None: data.select_columns(columns=["_year_"]) self.assertListEqual(["_year_"], data.columns()) + + self.assertPandasEqual( + DataFlow().DataFrame().from_polars(polars).to_pandas(), + DataFlow().DataFrame().from_csv(self.CSV_FILE).to_pandas(), + ) diff --git a/tests/test_data_flow_csv.py b/tests/test_data_flow_csv.py index f8ce669..5e0cf73 100644 --- a/tests/test_data_flow_csv.py +++ b/tests/test_data_flow_csv.py @@ -1,6 +1,7 @@ import unittest -from data_flow import DataFlow, FileType +from data_flow import DataFlow +from data_flow.lib import FileType from tests.SequenceTestCase import SequenceTestCase @@ -10,7 +11,6 @@ def test_memory(self): DataFlow().DataFrame().from_csv("./tests/data/annual-enterprise-survey-2023-financial-year-provisional.csv") ) df.to_csv(self.TEST_CSV_FILE) - self.assertPandasEqual(df.to_pandas(), DataFlow().DataFrame().from_csv(self.CSV_FILE).to_pandas()) self._sequence(data=df) diff --git a/tests/test_data_flow_feather.py b/tests/test_data_flow_feather.py index 4a47ddf..dd7d93a 100644 --- a/tests/test_data_flow_feather.py +++ b/tests/test_data_flow_feather.py @@ -1,6 +1,7 @@ import unittest -from data_flow import DataFlow, FileType +from data_flow import DataFlow +from data_flow.lib import FileType from data_flow.lib.tools import delete_file from tests.SequenceTestCase import SequenceTestCase diff --git a/tests/test_data_flow_hdf.py b/tests/test_data_flow_hdf.py index 8bad9ce..968fe1b 100644 --- a/tests/test_data_flow_hdf.py +++ b/tests/test_data_flow_hdf.py @@ -1,6 +1,7 @@ import unittest -from data_flow import DataFlow, FileType +from data_flow import DataFlow +from data_flow.lib import FileType from data_flow.lib.tools import delete_file from tests.SequenceTestCase import SequenceTestCase diff --git a/tests/test_data_flow_json.py b/tests/test_data_flow_json.py index c5dfc71..22b1aa4 100644 --- a/tests/test_data_flow_json.py +++ b/tests/test_data_flow_json.py @@ -1,6 +1,7 @@ import unittest -from data_flow import DataFlow, FileType +from data_flow import DataFlow +from data_flow.lib import FileType from data_flow.lib.tools import delete_file from tests.SequenceTestCase import SequenceTestCase diff --git a/tests/test_data_flow_parquet.py b/tests/test_data_flow_parquet.py index 1b1dcee..aed9d8a 100644 --- a/tests/test_data_flow_parquet.py +++ b/tests/test_data_flow_parquet.py @@ -1,6 +1,7 @@ import unittest -from data_flow import DataFlow, FileType +from data_flow import DataFlow +from data_flow.lib import FileType from data_flow.lib.tools import delete_file from tests.SequenceTestCase import SequenceTestCase