From 9f785947730099e89e83a777084447c5f8ca141e Mon Sep 17 00:00:00 2001 From: Tom Schmidt Date: Thu, 19 Dec 2024 10:51:08 -0800 Subject: [PATCH] add example databricks connector to data lens docs --- docs/source/teams/data_lens.rst | 344 +++++++++++++++++++++++++++++++- 1 file changed, 341 insertions(+), 3 deletions(-) diff --git a/docs/source/teams/data_lens.rst b/docs/source/teams/data_lens.rst index d8f3923eb2..8394c284d9 100644 --- a/docs/source/teams/data_lens.rst +++ b/docs/source/teams/data_lens.rst @@ -355,7 +355,7 @@ Using these inputs, we are expected to return a generator which yields :class:`DataLensSearchResponse ` objects. To start, we'll create some synthetic data to better understand the interaction between Data Lens and our operator. We'll look at a -:ref:`more realistic example ` later on. +:ref:`more realistic example ` later on. .. note:: @@ -573,7 +573,7 @@ There are a couple things to note about the changes we made here. This example is meant to illustrate how users can interact with our operator. For a more realistic view into how inputs can tailor our search experience, see our example - :ref:`integration with BigQuery `. + :ref:`integration with Databricks `. .. _data-lens-preview-vs-import: @@ -594,12 +594,350 @@ ______________________________ This section provides example Data Lens connectors for various popular data sources. +.. _data-lens-databricks: + +Databricks +---------- + +Below is an example of a Data Lens connector for Databricks. This example uses +a schema consistent with the Berkeley DeepDrive dataset format. + +.. code-block:: python + :linenos: + + import json + import time + from typing import Generator + + import fiftyone as fo + from databricks.sdk import WorkspaceClient + from databricks.sdk.service.sql import ( + StatementResponse, StatementState, StatementParameterListItem + ) + from fiftyone import operators as foo + from fiftyone.operators import types + from fiftyone.operators.data_lens import ( + DataLensOperator, DataLensSearchRequest, DataLensSearchResponse + ) + + class DatabricksConnector(DataLensOperator): + """Data Lens operator which retrieves samples from Databricks.""" + + @property + def config(self) -> foo.OperatorConfig: + return foo.OperatorConfig( + name="databricks_connector", + label="Databricks Connector", + unlisted=True, + execute_as_generator=True, + ) + + def resolve_input(self, ctx: foo.ExecutionContext): + inputs = types.Object() + + # Times of day + inputs.bool( + "daytime", + label="Day", + description="Show daytime samples", + default=True, + ) + inputs.bool( + "night", + label="Night", + description="Show night samples", + default=True, + ) + inputs.bool( + "dawn/dusk", + label="Dawn / Dusk", + description="Show dawn/dusk samples", + default=True, + ) + + # Weather + inputs.bool( + "clear", + label="Clear weather", + description="Show samples with clear weather", + default=True, + ) + inputs.bool( + "rainy", + label="Rainy weather", + description="Show samples with rainy weather", + default=True, + ) + + # Detection label + inputs.str( + "detection_label", + label="Detection label", + description="Filter samples by detection label", + ) + + return types.Property(inputs) + + def handle_lens_search_request( + self, + request: DataLensSearchRequest, + ctx: foo.ExecutionContext + ) -> Generator[DataLensSearchResponse, None, None]: + handler = DatabricksHandler() + for response in handler.handle_request(request, ctx): + yield response + + + class DatabricksHandler: + """Handler for interacting with Databricks tables.""" + + def __init__(self): + self.client = None + self.warehouse_id = None + + def handle_request( + self, + request: DataLensSearchRequest, + ctx: foo.ExecutionContext + ) -> Generator[DataLensSearchResponse, None, None]: + + # Initialize the client + self._init_client(ctx) + + # Iterate over samples + sample_buffer = [] + for sample in self._iter_data(request): + sample_buffer.append(self._transform_sample(sample)) + + # Yield batches of data as they are available + if len(sample_buffer) == request.batch_size: + yield DataLensSearchResponse( + result_count=len(sample_buffer), + query_result=sample_buffer, + ) + + sample_buffer = [] + + # Yield final batch if it's non-empty + if len(sample_buffer) > 0: + yield DataLensSearchResponse( + result_count=len(sample_buffer), + query_result=sample_buffer, + ) + + # No more samples. + + def _init_client(self, ctx: foo.ExecutionContext): + """Prepare the Databricks client for query execution.""" + + # Initialize the Databricks client using credentials provided via `ctx.secret` + self.client = WorkspaceClient( + host=ctx.secret("DATABRICKS_HOST"), + account_id=ctx.secret("DATABRICKS_ACCOUNT_ID"), + client_id=ctx.secret("DATABRICKS_CLIENT_ID"), + client_secret=ctx.secret("DATABRICKS_CLIENT_SECRET"), + ) + + # Start a SQL warehouse instance to execute our query + self.warehouse_id = self._start_warehouse() + if self.warehouse_id is None: + raise ValueError("No available warehouse") + + def _start_warehouse(self) -> str: + """Start a SQL warehouse and return its ID.""" + + last_warehouse_id = None + + # If any warehouses are already running, use the first available + for warehouse in self.client.warehouses.list(): + last_warehouse_id = warehouse.id + if warehouse.health.status is not None: + return warehouse.id + + # Otherwise, manually start the last available warehouse + if last_warehouse_id is not None: + self.client.warehouses.start(last_warehouse_id) + + return last_warehouse_id + + def _iter_data(self, request: DataLensSearchRequest) -> Generator[dict, None, None]: + """Iterate over sample data retrieved from Databricks.""" + + # Filter samples based on selected times of day + enabled_times_of_day = tuple([ + f'"{tod}"' + for tod in ["daytime", "night", "dawn/dusk"] + if request.search_params.get(tod, False) + ]) + + # Filter samples based on selected weather + enabled_weather = tuple([ + f'"{weather}"' + for weather in ["clear", "rainy"] + if request.search_params.get(weather, False) + ]) + + # Build Databricks query + query = f""" + SELECT * FROM datasets.bdd.det_train samples + WHERE + samples.attributes.timeofday IN ({", ".join(enabled_times_of_day)}) + AND samples.attributes.weather IN ({", ".join(enabled_weather)}) + """ + + query_parameters = [] + + # Filter samples based on detection label if provided + if request.search_params.get("detection_label") not in (None, ""): + query += f""" + AND samples.name IN ( + SELECT DISTINCT(labels.name) + FROM datasets.bdd.det_train_labels labels + WHERE labels.category = :detection_label + ) + """ + + query_parameters.append( + StatementParameterListItem( + "detection_label", + value=request.search_params.get("detection_label") + ) + ) + + # Execute query asynchronously; + # we'll get a statement_id that we can use to poll for results + statement_response = self.client.statement_execution.execute_statement( + query, + self.warehouse_id, + catalog="datasets", + parameters=query_parameters, + row_limit=request.max_results, + wait_timeout="0s" + ) + + # Poll on our statement until it's no longer in an active state + while ( + statement_response.status.state in + (StatementState.PENDING, StatementState.RUNNING) + ): + statement_response = self.client.statement_execution.get_statement( + statement_response.statement_id + ) + + time.sleep(2.5) + + # Process the first batch of data + json_result = self._response_to_dicts(statement_response) + + for element in json_result: + yield element + + # Databricks paginates samples using "chunks"; iterate over chunks until next is None + while statement_response.result.next_chunk_index is not None: + statement_response = self.client.statement_execution.get_statement_result_chunk_n( + statement_response.statement_id, + statement_response.result.next_chunk_index + ) + + # Process the next batch of data + json_result = self._response_to_dicts(statement_response) + + for element in json_result: + yield element + + def _transform_sample(self, sample: dict) -> dict: + """Transform a dict of raw Databricks data into a FiftyOne Sample dict.""" + + return fo.Sample( + filepath=f"cloud://bucket/{sample.get('name')}", + detections=self._build_detections(sample), + ).to_dict() + + def _build_detections(self, sample: dict) -> fo.Detections: + # Images are a known, static size + image_width = 1280 + image_height = 720 + + # Extract detection labels and pre-process bounding boxes + labels_list = json.loads(sample["labels"]) + for label_data in labels_list: + if "box2d" in label_data: + label_data["box2d"] = { + k: float(v) + for k, v in label_data["box2d"].items() + } + + return fo.Detections( + detections=[ + fo.Detection( + label=label_data["category"], + # FiftyOne expects bounding boxes to be of the form + # [x, y, width, height] + # where values are normalized to the image's dimensions. + # + # Our source data is of the form + # {x1, y1, x2, y2} + # where values are in absolute pixels. + bounding_box=[ + label_data["box2d"]["x1"] / image_width, + label_data["box2d"]["y1"] / image_height, + (label_data["box2d"]["x2"] - label_data["box2d"]["x1"]) / image_width, + (label_data["box2d"]["y2"] - label_data["box2d"]["y1"]) / image_height + ] + ) + for label_data in labels_list + if "box2d" in label_data + ] + ) + + def _response_to_dicts(self, response: StatementResponse) -> list[dict]: + # Check for response errors before processing + self._check_for_error(response) + + # Extract column names from response + columns = response.manifest.schema.columns + column_names = [column.name for column in columns] + + # Extract data from response + data = response.result.data_array or [] + + # Each element in data is a list of raw column values. + # Remap ([col1, col2, ..., colN], [val1, val2, ..., valN]) tuples + # to {col1: val1, col2: val2, ..., colN: valN} dicts + return [ + { + key: value + for key, value in zip(column_names, datum) + } + for datum in data + ] + + def _check_for_error(self, response: StatementResponse): + if response is None: + raise ValueError("received null response from databricks") + + if response.status is not None: + if response.status.error is not None: + raise ValueError("databricks error: ({0}) {1}".format( + response.status.error.error_code, + response.status.error.message + )) + + if response.status.state in ( + StatementState.CLOSED, + StatementState.FAILED, + StatementState.CANCELED, + ): + raise ValueError( + f"databricks error: response state = {response.status.state}" + ) + .. _data-lens-bigquery: Google BigQuery --------------- -Here's a fully-functional Data Lens connector for BigQuery: +Below is an example of a Data Lens connector for BigQuery: .. code-block:: python :linenos: