diff --git a/weave/trace_server/clickhouse_trace_server_batched.py b/weave/trace_server/clickhouse_trace_server_batched.py index 3ab8b264bef1..3aebed8e4868 100644 --- a/weave/trace_server/clickhouse_trace_server_batched.py +++ b/weave/trace_server/clickhouse_trace_server_batched.py @@ -172,7 +172,8 @@ ENTITY_TOO_LARGE_PAYLOAD = '{"_weave": {"error":""}}' CLICKHOUSE_DEFAULT_QUERY_SETTINGS = { - "max_memory_usage": 16 * 1024 * 1024 * 1024, # 16 GiB + # "max_memory_usage": 16 * 1024 * 1024 * 1024, # 16 GiB + "max_memory_usage": 1 * 1024 * 1024, # 1 MiB } @@ -1695,24 +1696,29 @@ def _query_stream( summary = None parameters = _process_parameters(parameters) - with self.ch_client.query_rows_stream( - query, - parameters=parameters, - column_formats=column_formats, - use_none=True, - settings=settings, - ) as stream: - if isinstance(stream.source, QueryResult): - summary = stream.source.summary - logger.info( - "clickhouse_stream_query", - extra={ - "query": query, - "parameters": parameters, - "summary": summary, - }, - ) - yield from stream + try: + with self.ch_client.query_rows_stream( + query, + parameters=parameters, + column_formats=column_formats, + use_none=True, + settings=settings, + ) as stream: + if isinstance(stream.source, QueryResult): + summary = stream.source.summary + logger.info( + "clickhouse_stream_query", + extra={ + "query": query, + "parameters": parameters, + "summary": summary, + }, + ) + yield from stream + except Exception as e: + # wrap exception with query and parameters + extra = {"query": query, "parameters": parameters} + raise type(e)(f"{str(e)} - Query: {query}, Parameters: {parameters}") from None def _query( self,