Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove pandas / numpy Dependency From metricflow-semantics #1189

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,8 @@
import datetime
import logging
from dataclasses import dataclass
from typing import Optional

import pandas as pd
from dbt_semantic_interfaces.dataclass_serialization import SerializableDataclass
from dbt_semantic_interfaces.type_enums.time_granularity import TimeGranularity

from metricflow_semantics.time.time_granularity import offset_period

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -58,36 +53,6 @@ def empty_time() -> TimeRangeConstraint:
end_time=TimeRangeConstraint.ALL_TIME_BEGIN(),
)

def _adjust_time_constraint_start_by_window(
self,
time_granularity: TimeGranularity,
time_unit_count: int,
) -> TimeRangeConstraint:
"""Moves the start of the time constraint back by <time_unit_count> windows.

if the metric is weekly-active-users (ie window = 1 week) it moves time_constraint.start one week earlier
"""
start_ts = pd.Timestamp(self.start_time)
offset = offset_period(time_granularity) * time_unit_count
adjusted_start = (start_ts - offset).to_pydatetime()
return TimeRangeConstraint(
start_time=adjusted_start,
end_time=self.end_time,
)

def adjust_time_constraint_for_cumulative_metric(
self, granularity: Optional[TimeGranularity], count: int
) -> TimeRangeConstraint:
"""Given a time constraint for the overall query, adjust it to cover the time range for this metric."""
if granularity is not None:
return self._adjust_time_constraint_start_by_window(granularity, count)

# if no window is specified we want to accumulate from the beginning of time
return TimeRangeConstraint(
start_time=TimeRangeConstraint.ALL_TIME_BEGIN(),
end_time=self.end_time,
)

def is_subset_of(self, other: TimeRangeConstraint) -> bool: # noqa: D102
return self.start_time >= other.start_time and self.end_time <= other.end_time

Expand Down
30 changes: 6 additions & 24 deletions metricflow-semantics/metricflow_semantics/query/query_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from dataclasses import dataclass
from typing import List, Optional, Sequence, Tuple, Union

import pandas as pd
from dbt_semantic_interfaces.implementations.filters.where_filter import (
PydanticWhereFilter,
PydanticWhereFilterIntersection,
Expand Down Expand Up @@ -64,12 +63,7 @@
TimeDimensionSpec,
)
from metricflow_semantics.specs.spec_set import group_specs_by_type
from metricflow_semantics.time.time_granularity import (
adjust_to_end_of_period,
adjust_to_start_of_period,
is_period_end,
is_period_start,
)
from metricflow_semantics.time.dateutil_adjuster import DateutilTimePeriodAdjuster

logger = logging.getLogger(__name__)

Expand All @@ -95,6 +89,7 @@ def __init__( # noqa: D107
DunderNamingScheme(),
)
self._where_filter_pattern_factory = where_filter_pattern_factory
self._time_period_adjuster = DateutilTimePeriodAdjuster()

def parse_and_validate_saved_query(
self,
Expand Down Expand Up @@ -191,23 +186,10 @@ def _adjust_time_constraint(

e.g. [2020-01-15, 2020-2-15] with MONTH granularity -> [2020-01-01, 2020-02-29]
"""
constraint_start = time_constraint.start_time
constraint_end = time_constraint.end_time

start_ts = pd.Timestamp(time_constraint.start_time)
if not is_period_start(metric_time_granularity, start_ts):
constraint_start = adjust_to_start_of_period(metric_time_granularity, start_ts).to_pydatetime()

end_ts = pd.Timestamp(time_constraint.end_time)
if not is_period_end(metric_time_granularity, end_ts):
constraint_end = adjust_to_end_of_period(metric_time_granularity, end_ts).to_pydatetime()

if constraint_start < TimeRangeConstraint.ALL_TIME_BEGIN():
constraint_start = TimeRangeConstraint.ALL_TIME_BEGIN()
if constraint_end > TimeRangeConstraint.ALL_TIME_END():
constraint_end = TimeRangeConstraint.ALL_TIME_END()

return TimeRangeConstraint(start_time=constraint_start, end_time=constraint_end)
return self._time_period_adjuster.expand_time_constraint_to_fill_granularity(
time_constraint=time_constraint,
granularity=metric_time_granularity,
)

def _parse_order_by_names(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,26 @@
from __future__ import annotations

import pathlib
from typing import Dict, Optional

from dbt_semantic_interfaces.implementations.semantic_manifest import PydanticSemanticManifest
from dbt_semantic_interfaces.parsing.dir_to_model import (
SemanticManifestBuildResult,
parse_directory_of_yaml_files_to_semantic_manifest,
)
from dbt_semantic_interfaces.validations.semantic_manifest_validator import SemanticManifestValidator

from metricflow_semantics.test_helpers.semantic_manifest_yamls import SEMANTIC_MANIFEST_YAMLS_PATH_ANCHOR


def load_semantic_manifest(
relative_manifest_path: str,
yaml_file_directory: pathlib.Path,
template_mapping: Optional[Dict[str, str]] = None,
) -> SemanticManifestBuildResult:
) -> PydanticSemanticManifest:
"""Reads the manifest YAMLs from the standard location, applies transformations, runs validations."""
yaml_file_directory = SEMANTIC_MANIFEST_YAMLS_PATH_ANCHOR.directory.joinpath(relative_manifest_path)
build_result = parse_directory_of_yaml_files_to_semantic_manifest(
str(yaml_file_directory), template_mapping=template_mapping
)
validator = SemanticManifestValidator[PydanticSemanticManifest]()
validator.checked_validations(build_result.semantic_manifest)
return build_result


def load_named_manifest(template_mapping: Dict[str, str], manifest_name: str) -> PydanticSemanticManifest: # noqa: D103
try:
build_result = load_semantic_manifest(manifest_name, template_mapping)
build_result = parse_directory_of_yaml_files_to_semantic_manifest(
str(yaml_file_directory), template_mapping=template_mapping
)
validator = SemanticManifestValidator[PydanticSemanticManifest]()
validator.checked_validations(build_result.semantic_manifest)
return build_result.semantic_manifest
except Exception as e:
raise RuntimeError(f"Error while loading semantic manifest: {manifest_name}") from e
raise RuntimeError(f"Error while loading semantic manifest: {yaml_file_directory}") from e
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from metricflow_semantics.test_helpers.config_helpers import DirectoryPathAnchor

AMBIGUOUS_RESOLUTION_MANIFEST_ANCHOR = DirectoryPathAnchor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from metricflow_semantics.test_helpers.config_helpers import DirectoryPathAnchor

CONFIG_LINTER_MANIFEST_ANCHOR = DirectoryPathAnchor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from metricflow_semantics.test_helpers.config_helpers import DirectoryPathAnchor

CYCLIC_JOIN_MANIFEST_ANCHOR = DirectoryPathAnchor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from metricflow_semantics.test_helpers.config_helpers import DirectoryPathAnchor

DW_VALIDATION_MANIFEST_ANCHOR = DirectoryPathAnchor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from metricflow_semantics.test_helpers.config_helpers import DirectoryPathAnchor

EXTENDED_DATE_MANIFEST_ANCHOR = DirectoryPathAnchor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from metricflow_semantics.test_helpers.config_helpers import DirectoryPathAnchor

JOIN_TYPES_MANIFEST_ANCHOR = DirectoryPathAnchor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from metricflow_semantics.test_helpers.config_helpers import DirectoryPathAnchor

MULTI_HOP_JOIN_MANIFEST_ANCHOR = DirectoryPathAnchor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from metricflow_semantics.test_helpers.config_helpers import DirectoryPathAnchor

NON_SM_MANIFEST_ANCHOR = DirectoryPathAnchor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from metricflow_semantics.test_helpers.config_helpers import DirectoryPathAnchor

PARTITIONED_MULTI_HOP_JOIN_MANIFEST_ANCHOR = DirectoryPathAnchor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from metricflow_semantics.test_helpers.config_helpers import DirectoryPathAnchor

SCD_MANIFEST_ANCHOR = DirectoryPathAnchor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from metricflow_semantics.test_helpers.config_helpers import DirectoryPathAnchor

SIMPLE_MANIFEST_ANCHOR = DirectoryPathAnchor()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from __future__ import annotations

from metricflow_semantics.test_helpers.config_helpers import DirectoryPathAnchor

SIMPLE_MULTI_HOP_JOIN_MANIFEST_ANCHOR = DirectoryPathAnchor()
114 changes: 114 additions & 0 deletions metricflow-semantics/metricflow_semantics/time/dateutil_adjuster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from __future__ import annotations

import datetime
from typing import Optional

import dateutil.relativedelta
from dateutil.relativedelta import relativedelta
from dbt_semantic_interfaces.enum_extension import assert_values_exhausted
from dbt_semantic_interfaces.type_enums import TimeGranularity
from typing_extensions import override

from metricflow_semantics.filters.time_constraint import TimeRangeConstraint
from metricflow_semantics.time.time_period import TimePeriodAdjuster


class DateutilTimePeriodAdjuster(TimePeriodAdjuster):
"""Implementation of time period adjustments using `dateutil`.

* `relativedelta` will not change weekday if already at the given weekday, even with a Nth parameter.
* `relativedelta` will automatically handle day values exceeding for months with fewer days.
"""

def _relative_delta_for_window(self, time_granularity: TimeGranularity, count: int) -> relativedelta:
"""Relative-delta to cover time windows specified at different grains."""
if time_granularity is TimeGranularity.DAY:
return relativedelta(days=count)
elif time_granularity is TimeGranularity.WEEK:
return relativedelta(weeks=count)
elif time_granularity is TimeGranularity.MONTH:
return relativedelta(months=count)
elif time_granularity is TimeGranularity.QUARTER:
return relativedelta(months=count * 3)
elif time_granularity is TimeGranularity.YEAR:
return relativedelta(years=count)
else:
assert_values_exhausted(time_granularity)

@override
def expand_time_constraint_to_fill_granularity(
self, time_constraint: TimeRangeConstraint, granularity: TimeGranularity
) -> TimeRangeConstraint:
adjusted_start = self.adjust_to_start_of_period(granularity, time_constraint.start_time)
adjusted_end = self.adjust_to_end_of_period(granularity, time_constraint.end_time)

if adjusted_start < TimeRangeConstraint.ALL_TIME_BEGIN():
adjusted_start = TimeRangeConstraint.ALL_TIME_BEGIN()
if adjusted_end > TimeRangeConstraint.ALL_TIME_END():
adjusted_end = TimeRangeConstraint.ALL_TIME_END()

return TimeRangeConstraint(start_time=adjusted_start, end_time=adjusted_end)

@override
def adjust_to_start_of_period(
self, time_granularity: TimeGranularity, date_to_adjust: datetime.datetime
) -> datetime.datetime:
if time_granularity is TimeGranularity.DAY:
return date_to_adjust
elif time_granularity is TimeGranularity.WEEK:
return date_to_adjust + relativedelta(weekday=dateutil.relativedelta.MO(-1))
elif time_granularity is TimeGranularity.MONTH:
return date_to_adjust + relativedelta(day=1)
elif time_granularity is TimeGranularity.QUARTER:
if date_to_adjust.month <= 3:
return date_to_adjust + relativedelta(month=1, day=1)
elif date_to_adjust.month <= 6:
return date_to_adjust + relativedelta(month=4, day=1)
elif date_to_adjust.month <= 9:
return date_to_adjust + relativedelta(month=7, day=1)
else:
return date_to_adjust + relativedelta(month=10, day=1)
elif time_granularity is TimeGranularity.YEAR:
return date_to_adjust + relativedelta(month=1, day=1)
else:
assert_values_exhausted(time_granularity)

@override
def adjust_to_end_of_period(
self, time_granularity: TimeGranularity, date_to_adjust: datetime.datetime
) -> datetime.datetime:
if time_granularity is TimeGranularity.DAY:
return date_to_adjust
elif time_granularity is TimeGranularity.WEEK:
return date_to_adjust + relativedelta(weekday=dateutil.relativedelta.SU(1))
elif time_granularity is TimeGranularity.MONTH:
return date_to_adjust + relativedelta(day=31)
elif time_granularity is TimeGranularity.QUARTER:
if date_to_adjust.month <= 3:
return date_to_adjust + relativedelta(month=3, day=31)
elif date_to_adjust.month <= 6:
return date_to_adjust + relativedelta(month=6, day=31)
elif date_to_adjust.month <= 9:
return date_to_adjust + relativedelta(month=9, day=31)
else:
return date_to_adjust + relativedelta(month=12, day=31)
elif time_granularity is TimeGranularity.YEAR:
return date_to_adjust + relativedelta(month=12, day=31)
else:
assert_values_exhausted(time_granularity)

@override
def expand_time_constraint_for_cumulative_metric(
self, time_constraint: TimeRangeConstraint, granularity: Optional[TimeGranularity], count: int
) -> TimeRangeConstraint:
if granularity is not None:
return TimeRangeConstraint(
start_time=time_constraint.start_time - self._relative_delta_for_window(granularity, count),
end_time=time_constraint.end_time,
)

# if no window is specified we want to accumulate from the beginning of time
return TimeRangeConstraint(
start_time=TimeRangeConstraint.ALL_TIME_BEGIN(),
end_time=time_constraint.end_time,
)
47 changes: 47 additions & 0 deletions metricflow-semantics/metricflow_semantics/time/time_period.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from __future__ import annotations

import datetime
from abc import ABC, abstractmethod
from typing import Optional

from dbt_semantic_interfaces.type_enums import TimeGranularity

from metricflow_semantics.filters.time_constraint import TimeRangeConstraint


class TimePeriodAdjuster(ABC):
"""Interface to simplify switching time-period adjustment-logic from `pandas` to `dateutil`."""

@abstractmethod
def adjust_to_start_of_period(
self, time_granularity: TimeGranularity, date_to_adjust: datetime.datetime
) -> datetime.datetime:
"""Adjust to start of period if not at end already."""
raise NotImplementedError

@abstractmethod
def adjust_to_end_of_period(
self, time_granularity: TimeGranularity, date_to_adjust: datetime.datetime
) -> datetime.datetime:
"""Adjust to end of period if not at end already."""
raise NotImplementedError

@abstractmethod
def expand_time_constraint_to_fill_granularity(
self, time_constraint: TimeRangeConstraint, granularity: TimeGranularity
) -> TimeRangeConstraint:
"""Change the time range so that the ends are at the ends of the appropriate time granularity windows.

e.g. [2020-01-15, 2020-2-15] with MONTH granularity -> [2020-01-01, 2020-02-29]
"""
raise NotImplementedError

@abstractmethod
def expand_time_constraint_for_cumulative_metric(
self, time_constraint: TimeRangeConstraint, granularity: Optional[TimeGranularity], count: int
) -> TimeRangeConstraint:
"""Moves the start of the time constraint back by <time_unit_count> windows for cumulative metrics.

e.g. if the metric is weekly-active-users (window = 1 week) it moves time_constraint.start one week earlier
"""
raise NotImplementedError
1 change: 0 additions & 1 deletion metricflow-semantics/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ classifiers = [

dependencies = [
"dbt-semantic-interfaces>=0.5.1, <0.6.0",
"pandas>=1.5.0, <1.6.0",
"rapidfuzz>=3.0, <4.0",
]

Expand Down
Loading
Loading