Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(devexp): add ability to parse simple storage queries #5776

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7633006
logical query can now include a storage
volokluev Apr 12, 2024
2689bb2
working storage query
volokluev Apr 12, 2024
78db72a
remove unused imports
volokluev Apr 12, 2024
024875f
working test for nested fields
volokluev Apr 12, 2024
bb3c8c3
test composite query doesn't work because we do entity validation in …
volokluev Apr 15, 2024
60e1295
cleanup composite stuff
volokluev Apr 15, 2024
c72dd2e
remove print statements
volokluev Apr 15, 2024
3c0598a
rename
volokluev Apr 15, 2024
5f5c346
unnecessary comment
volokluev Apr 15, 2024
c40becd
fix test
volokluev Apr 15, 2024
5cda379
fix the mypy issues??
volokluev Apr 16, 2024
edc5448
working end to end storage query for metrics_summaries, still need to…
volokluev Apr 16, 2024
1bc0459
Fix mypy although not too happy with mql parser changes
volokluev Apr 16, 2024
e4c59da
Merge branch 'master' of github.com:getsentry/snuba into volo/devexp/…
volokluev Apr 29, 2024
64cf7d1
kind of fix up composite query parsing, still waiting for entity vali…
volokluev Apr 29, 2024
9349313
Merge branch 'master' of github.com:getsentry/snuba into volo/devexp/…
volokluev Apr 30, 2024
559e698
fix mypy again?
volokluev Apr 30, 2024
422b133
remove pdb
volokluev Apr 30, 2024
56d1148
remove unused code
volokluev Apr 30, 2024
477348c
wip, composite query test still not working
volokluev Apr 30, 2024
975c298
wip, aliases not working, possible bug
volokluev May 1, 2024
81aa0dc
doesn't work still, need to checkout master and see if I can reproduce
volokluev May 1, 2024
5120b4e
get the composite test
volokluev May 1, 2024
6deda59
remove erroneous test
volokluev May 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions snuba/clickhouse/query_dsl/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
get_first_level_and_conditions,
is_in_condition_pattern,
)
from snuba.query.data_source.simple import Entity, SimpleDataSource, Table
from snuba.query.data_source.simple import Entity, SimpleDataSource, Storage, Table
from snuba.query.expressions import Expression
from snuba.query.expressions import FunctionCall as FunctionCallExpr
from snuba.query.expressions import Literal as LiteralExpr
Expand Down Expand Up @@ -149,7 +149,9 @@ def get_time_range_expressions(


def get_time_range(
query: Union[ProcessableQuery[Table], ProcessableQuery[Entity]],
query: Union[
ProcessableQuery[Table], ProcessableQuery[Entity], ProcessableQuery[Storage]
],
timestamp_field: str,
) -> Tuple[Optional[datetime], Optional[datetime]]:
"""
Expand Down
3 changes: 2 additions & 1 deletion snuba/datasets/plans/translator/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
TranslationMappers,
)
from snuba.query.logical import Query as LogicalQuery
from snuba.query.logical import StorageQuery


def identity_translate(query: LogicalQuery) -> ClickhouseQuery:
def identity_translate(query: LogicalQuery | StorageQuery) -> ClickhouseQuery:
"""
Utility method to build a Clickhouse Query from a Logical Query
without transforming anything.
Expand Down
32 changes: 25 additions & 7 deletions snuba/pipeline/stages/query_processing.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
from typing import cast

from snuba.clickhouse.query import Query as ClickhouseQuery
from snuba.datasets.plans.entity_processing import run_entity_processing_executor
from snuba.datasets.plans.entity_validation import run_entity_validators
from snuba.datasets.plans.storage_processing import (
apply_storage_processors,
build_best_plan,
get_query_data_source,
)
from snuba.datasets.plans.translator.query import identity_translate
from snuba.datasets.storages.factory import get_storage
from snuba.pipeline.composite_entity_processing import translate_composite_query
from snuba.pipeline.composite_storage_processing import (
apply_composite_storage_processors,
build_best_plan_for_composite_query,
)
from snuba.pipeline.query_pipeline import QueryPipelineData, QueryPipelineStage
from snuba.query.composite import CompositeQuery
from snuba.query.data_source.simple import Table
from snuba.query.data_source.simple import Entity, Table
from snuba.query.logical import Query as LogicalQuery
from snuba.query.logical import StorageQuery
from snuba.request import Request


Expand All @@ -23,17 +29,29 @@ class EntityProcessingStage(
def _process_data(
self, pipe_input: QueryPipelineData[Request]
) -> ClickhouseQuery | CompositeQuery[Table]:
# Execute entity validators on logical query
run_entity_validators(pipe_input.data.query, pipe_input.query_settings)

# Run entity processors on the query and transform logical query into a physical query
if isinstance(pipe_input.data.query, LogicalQuery):
# TODO: support composite queries for storage queries
if isinstance(pipe_input.data.query, StorageQuery):
res = identity_translate(pipe_input.data.query)
storage = get_storage(pipe_input.data.query.get_from_clause().key)
res.set_from_clause(
get_query_data_source(
storage.get_schema().get_data_source(),
allocation_policies=storage.get_allocation_policies(),
final=pipe_input.data.query.get_final(),
sampling_rate=pipe_input.data.query.get_sample(),
storage_key=storage.get_storage_key(),
)
)
return res
elif isinstance(pipe_input.data.query, LogicalQuery):
run_entity_validators(pipe_input.data.query, pipe_input.query_settings)
return run_entity_processing_executor(
pipe_input.data.query, pipe_input.query_settings
)
else:
return translate_composite_query(
pipe_input.data.query, pipe_input.query_settings
cast(CompositeQuery[Entity], pipe_input.data.query),
pipe_input.query_settings,
)


Expand Down
21 changes: 13 additions & 8 deletions snuba/query/data_source/projects_finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,38 @@
from snuba.query import ProcessableQuery
from snuba.query.composite import CompositeQuery
from snuba.query.data_source.join import IndividualNode, JoinClause, JoinVisitor
from snuba.query.data_source.simple import Entity
from snuba.query.data_source.simple import SimpleDataSource
from snuba.query.data_source.visitor import DataSourceVisitor


class ProjectsFinder(
DataSourceVisitor[set[int], Entity], JoinVisitor[set[int], Entity]
DataSourceVisitor[set[int], SimpleDataSource],
JoinVisitor[set[int], SimpleDataSource],
):
"""
Traverses a query to find project_id conditions
"""

def _visit_simple_source(self, data_source: Entity) -> set[int]:
def _visit_simple_source(self, data_source: SimpleDataSource) -> set[int]:
return set()

def _visit_join(self, data_source: JoinClause[Entity]) -> set[int]:
def _visit_join(self, data_source: JoinClause[SimpleDataSource]) -> set[int]:
return self.visit_join_clause(data_source)

def _visit_simple_query(self, data_source: ProcessableQuery[Entity]) -> set[int]:
def _visit_simple_query(
self, data_source: ProcessableQuery[SimpleDataSource]
) -> set[int]:
return get_object_ids_in_query_ast(data_source, "project_id") or set()

def _visit_composite_query(self, data_source: CompositeQuery[Entity]) -> set[int]:
def _visit_composite_query(
self, data_source: CompositeQuery[SimpleDataSource]
) -> set[int]:
return self.visit(data_source.get_from_clause())

def visit_individual_node(self, node: IndividualNode[Entity]) -> set[int]:
def visit_individual_node(self, node: IndividualNode[SimpleDataSource]) -> set[int]:
return self.visit(node.data_source)

def visit_join_clause(self, node: JoinClause[Entity]) -> set[int]:
def visit_join_clause(self, node: JoinClause[SimpleDataSource]) -> set[int]:
left = node.left_node.accept(self)
right = node.right_node.accept(self)
return left | right
13 changes: 13 additions & 0 deletions snuba/query/data_source/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ def human_readable_id(self) -> str:
return f"Entity({self.key.value})"


@dataclass(frozen=True)
class Storage(SimpleDataSource):
"""An datasource that is just a pointer to a storage. Acts as an adapter class to be
able to query storages directly from SnQL"""

key: StorageKey
sample: Optional[float] = None

@property
def human_readable_id(self) -> str:
return f"STORAGE({self.key.value})"


@dataclass(frozen=True)
class Table(SimpleDataSource):
"""
Expand Down
44 changes: 30 additions & 14 deletions snuba/query/dsl.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

from typing import Optional, Sequence

from snuba.query.expressions import (
Expand All @@ -13,6 +15,28 @@
# verbose to build.


class NestedColumn:
"""Usage:

tags = NestedColumn("tags")
assert tags["some_key"] == SubscriptableReference(
"_snuba_tags[some_key]",
Column("_snuba_tags"), None, "tags"),
Literal(None, "some_key")
)
"""

def __init__(self, column_name: str) -> None:
self.column_name = column_name

def __getitem__(self, key: str) -> SubscriptableReference:
return SubscriptableReference(
f"_snuba_{self.column_name}[{key}]",
Column(f"_snuba_{self.column_name}", None, self.column_name),
Literal(None, key),
)


def column(
column_name: str, table_name: str | None = None, alias: str | None = None
) -> Column:
Expand All @@ -23,18 +47,6 @@ def literal(value: OptionalScalarType, alias: str | None = None) -> Literal:
return Literal(alias, value)


def snuba_tags_raw(indexer_mapping: int) -> SubscriptableReference:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was not used

return SubscriptableReference(
f"_snuba_tags_raw[{indexer_mapping}]",
Column(
"_snuba_tags_raw",
None,
"tags_raw",
),
Literal(None, str(indexer_mapping)),
)


def literals_tuple(alias: Optional[str], literals: Sequence[Literal]) -> FunctionCall:
return FunctionCall(alias, "tuple", tuple(literals))

Expand Down Expand Up @@ -91,8 +103,12 @@ def binary_condition(
return FunctionCall(None, function_name, (lhs, rhs))


def equals(lhs: Expression, rhs: Expression) -> FunctionCall:
return binary_condition("equals", lhs, rhs)
def equals(
lhs: Expression | OptionalScalarType, rhs: Expression | OptionalScalarType
) -> FunctionCall:
Comment on lines +106 to +108
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

happy to change this but I found it useful to not have to specify literals

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like it

left = lhs if isinstance(lhs, Expression) else Literal(None, lhs)
right = rhs if isinstance(rhs, Expression) else Literal(None, rhs)
return binary_condition("equals", left, right)


def and_cond(lhs: FunctionCall, rhs: FunctionCall, *args: FunctionCall) -> FunctionCall:
Expand Down
78 changes: 73 additions & 5 deletions snuba/query/logical.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

from typing import Callable, Iterable, Optional, Sequence

from snuba.query import LimitBy, OrderBy
from snuba.query import ProcessableQuery as AbstractQuery
from snuba.query import SelectedExpression
from snuba.query.data_source.simple import Entity
from snuba.query import LimitBy, OrderBy, ProcessableQuery, SelectedExpression
from snuba.query.data_source.simple import Entity, Storage
from snuba.query.expressions import Expression, ExpressionVisitor


class Query(AbstractQuery[Entity]):
class Query(ProcessableQuery[Entity]):
"""
Represents the logical query during query processing.
This means the query class used between parsing and query translation.
Expand Down Expand Up @@ -81,3 +79,73 @@ def _transform_expressions_impl(

def _transform_impl(self, visitor: ExpressionVisitor[Expression]) -> None:
pass


class StorageQuery(ProcessableQuery[Storage]):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: I think this type can be generalized

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you decide to parse into a StorageQuery rather than a ClickhouseQuery?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The main thing is that the Table datasaoure does not resemble the entity in any real way and it's only by the time we get to the StorageProcessing stage that we can process the clickhouse query.

Ideally what I would do is remove Table as a datasource and probably remove ClickhouseQuery later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it matter that Table doesn't resemble Entity?

"""
Represents the logical query during query processing.
This means the query class used between parsing and query translation.

TODO: This query is supposed to rely on entities as data source.
This will happen in a following step.
"""

def __init__(
self,
from_clause: Optional[Storage],
selected_columns: Optional[Sequence[SelectedExpression]] = None,
array_join: Optional[Sequence[Expression]] = None,
condition: Optional[Expression] = None,
prewhere: Optional[Expression] = None,
groupby: Optional[Sequence[Expression]] = None,
having: Optional[Expression] = None,
order_by: Optional[Sequence[OrderBy]] = None,
limitby: Optional[LimitBy] = None,
sample: Optional[float] = None,
limit: Optional[int] = None,
offset: int = 0,
totals: bool = False,
granularity: Optional[int] = None,
):
"""
Expects an already parsed query body.
"""
self.__final = False
self.__sample = sample
super().__init__(
from_clause=from_clause,
selected_columns=selected_columns,
array_join=array_join,
condition=condition,
groupby=groupby,
having=having,
order_by=order_by,
limitby=limitby,
limit=limit,
offset=offset,
totals=totals,
granularity=granularity,
)

def get_final(self) -> bool:
return self.__final

def set_final(self, final: bool) -> None:
self.__final = final

def get_sample(self) -> Optional[float]:
return self.__sample

def _eq_functions(self) -> Sequence[str]:
return tuple(super()._eq_functions()) + ("get_final", "get_sample")

def _get_expressions_impl(self) -> Iterable[Expression]:
return []

def _transform_expressions_impl(
self, func: Callable[[Expression], Expression]
) -> None:
pass

def _transform_impl(self, visitor: ExpressionVisitor[Expression]) -> None:
pass
5 changes: 3 additions & 2 deletions snuba/query/mql/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
)
from snuba.query.indexer.resolver import resolve_mappings
from snuba.query.logical import Query as LogicalQuery
from snuba.query.logical import StorageQuery
from snuba.query.mql.mql_context import MQLContext
from snuba.query.parser.exceptions import ParsingException
from snuba.query.processors.logical.filter_in_select_optimizer import (
Expand Down Expand Up @@ -1039,7 +1040,7 @@ def populate_query_from_mql_context(


def quantiles_to_quantile(
query: Union[CompositeQuery[QueryEntity], LogicalQuery]
query: Union[CompositeQuery[QueryEntity], LogicalQuery, StorageQuery]
) -> None:
"""
Changes quantiles(0.5)(...) to arrayElement(quantiles(0.5)(...), 1). This is to simplify
Expand All @@ -1061,7 +1062,7 @@ def transform(exp: Expression) -> Expression:


CustomProcessors = Sequence[
Callable[[Union[CompositeQuery[QueryEntity], LogicalQuery]], None]
Callable[[Union[CompositeQuery[QueryEntity], LogicalQuery, StorageQuery]], None]
]

MQL_POST_PROCESSORS: CustomProcessors = POST_PROCESSORS + [
Expand Down