From 9f785947730099e89e83a777084447c5f8ca141e Mon Sep 17 00:00:00 2001 From: Tom Schmidt Date: Thu, 19 Dec 2024 10:51:08 -0800 Subject: [PATCH 1/2] add example databricks connector to data lens docs --- docs/source/teams/data_lens.rst | 344 +++++++++++++++++++++++++++++++- 1 file changed, 341 insertions(+), 3 deletions(-) diff --git a/docs/source/teams/data_lens.rst b/docs/source/teams/data_lens.rst index d8f3923eb2..8394c284d9 100644 --- a/docs/source/teams/data_lens.rst +++ b/docs/source/teams/data_lens.rst @@ -355,7 +355,7 @@ Using these inputs, we are expected to return a generator which yields :class:`DataLensSearchResponse ` objects. To start, we'll create some synthetic data to better understand the interaction between Data Lens and our operator. We'll look at a -:ref:`more realistic example ` later on. +:ref:`more realistic example ` later on. .. note:: @@ -573,7 +573,7 @@ There are a couple things to note about the changes we made here. This example is meant to illustrate how users can interact with our operator. For a more realistic view into how inputs can tailor our search experience, see our example - :ref:`integration with BigQuery `. + :ref:`integration with Databricks `. .. _data-lens-preview-vs-import: @@ -594,12 +594,350 @@ ______________________________ This section provides example Data Lens connectors for various popular data sources. +.. _data-lens-databricks: + +Databricks +---------- + +Below is an example of a Data Lens connector for Databricks. This example uses +a schema consistent with the Berkeley DeepDrive dataset format. + +.. code-block:: python + :linenos: + + import json + import time + from typing import Generator + + import fiftyone as fo + from databricks.sdk import WorkspaceClient + from databricks.sdk.service.sql import ( + StatementResponse, StatementState, StatementParameterListItem + ) + from fiftyone import operators as foo + from fiftyone.operators import types + from fiftyone.operators.data_lens import ( + DataLensOperator, DataLensSearchRequest, DataLensSearchResponse + ) + + class DatabricksConnector(DataLensOperator): + """Data Lens operator which retrieves samples from Databricks.""" + + @property + def config(self) -> foo.OperatorConfig: + return foo.OperatorConfig( + name="databricks_connector", + label="Databricks Connector", + unlisted=True, + execute_as_generator=True, + ) + + def resolve_input(self, ctx: foo.ExecutionContext): + inputs = types.Object() + + # Times of day + inputs.bool( + "daytime", + label="Day", + description="Show daytime samples", + default=True, + ) + inputs.bool( + "night", + label="Night", + description="Show night samples", + default=True, + ) + inputs.bool( + "dawn/dusk", + label="Dawn / Dusk", + description="Show dawn/dusk samples", + default=True, + ) + + # Weather + inputs.bool( + "clear", + label="Clear weather", + description="Show samples with clear weather", + default=True, + ) + inputs.bool( + "rainy", + label="Rainy weather", + description="Show samples with rainy weather", + default=True, + ) + + # Detection label + inputs.str( + "detection_label", + label="Detection label", + description="Filter samples by detection label", + ) + + return types.Property(inputs) + + def handle_lens_search_request( + self, + request: DataLensSearchRequest, + ctx: foo.ExecutionContext + ) -> Generator[DataLensSearchResponse, None, None]: + handler = DatabricksHandler() + for response in handler.handle_request(request, ctx): + yield response + + + class DatabricksHandler: + """Handler for interacting with Databricks tables.""" + + def __init__(self): + self.client = None + self.warehouse_id = None + + def handle_request( + self, + request: DataLensSearchRequest, + ctx: foo.ExecutionContext + ) -> Generator[DataLensSearchResponse, None, None]: + + # Initialize the client + self._init_client(ctx) + + # Iterate over samples + sample_buffer = [] + for sample in self._iter_data(request): + sample_buffer.append(self._transform_sample(sample)) + + # Yield batches of data as they are available + if len(sample_buffer) == request.batch_size: + yield DataLensSearchResponse( + result_count=len(sample_buffer), + query_result=sample_buffer, + ) + + sample_buffer = [] + + # Yield final batch if it's non-empty + if len(sample_buffer) > 0: + yield DataLensSearchResponse( + result_count=len(sample_buffer), + query_result=sample_buffer, + ) + + # No more samples. + + def _init_client(self, ctx: foo.ExecutionContext): + """Prepare the Databricks client for query execution.""" + + # Initialize the Databricks client using credentials provided via `ctx.secret` + self.client = WorkspaceClient( + host=ctx.secret("DATABRICKS_HOST"), + account_id=ctx.secret("DATABRICKS_ACCOUNT_ID"), + client_id=ctx.secret("DATABRICKS_CLIENT_ID"), + client_secret=ctx.secret("DATABRICKS_CLIENT_SECRET"), + ) + + # Start a SQL warehouse instance to execute our query + self.warehouse_id = self._start_warehouse() + if self.warehouse_id is None: + raise ValueError("No available warehouse") + + def _start_warehouse(self) -> str: + """Start a SQL warehouse and return its ID.""" + + last_warehouse_id = None + + # If any warehouses are already running, use the first available + for warehouse in self.client.warehouses.list(): + last_warehouse_id = warehouse.id + if warehouse.health.status is not None: + return warehouse.id + + # Otherwise, manually start the last available warehouse + if last_warehouse_id is not None: + self.client.warehouses.start(last_warehouse_id) + + return last_warehouse_id + + def _iter_data(self, request: DataLensSearchRequest) -> Generator[dict, None, None]: + """Iterate over sample data retrieved from Databricks.""" + + # Filter samples based on selected times of day + enabled_times_of_day = tuple([ + f'"{tod}"' + for tod in ["daytime", "night", "dawn/dusk"] + if request.search_params.get(tod, False) + ]) + + # Filter samples based on selected weather + enabled_weather = tuple([ + f'"{weather}"' + for weather in ["clear", "rainy"] + if request.search_params.get(weather, False) + ]) + + # Build Databricks query + query = f""" + SELECT * FROM datasets.bdd.det_train samples + WHERE + samples.attributes.timeofday IN ({", ".join(enabled_times_of_day)}) + AND samples.attributes.weather IN ({", ".join(enabled_weather)}) + """ + + query_parameters = [] + + # Filter samples based on detection label if provided + if request.search_params.get("detection_label") not in (None, ""): + query += f""" + AND samples.name IN ( + SELECT DISTINCT(labels.name) + FROM datasets.bdd.det_train_labels labels + WHERE labels.category = :detection_label + ) + """ + + query_parameters.append( + StatementParameterListItem( + "detection_label", + value=request.search_params.get("detection_label") + ) + ) + + # Execute query asynchronously; + # we'll get a statement_id that we can use to poll for results + statement_response = self.client.statement_execution.execute_statement( + query, + self.warehouse_id, + catalog="datasets", + parameters=query_parameters, + row_limit=request.max_results, + wait_timeout="0s" + ) + + # Poll on our statement until it's no longer in an active state + while ( + statement_response.status.state in + (StatementState.PENDING, StatementState.RUNNING) + ): + statement_response = self.client.statement_execution.get_statement( + statement_response.statement_id + ) + + time.sleep(2.5) + + # Process the first batch of data + json_result = self._response_to_dicts(statement_response) + + for element in json_result: + yield element + + # Databricks paginates samples using "chunks"; iterate over chunks until next is None + while statement_response.result.next_chunk_index is not None: + statement_response = self.client.statement_execution.get_statement_result_chunk_n( + statement_response.statement_id, + statement_response.result.next_chunk_index + ) + + # Process the next batch of data + json_result = self._response_to_dicts(statement_response) + + for element in json_result: + yield element + + def _transform_sample(self, sample: dict) -> dict: + """Transform a dict of raw Databricks data into a FiftyOne Sample dict.""" + + return fo.Sample( + filepath=f"cloud://bucket/{sample.get('name')}", + detections=self._build_detections(sample), + ).to_dict() + + def _build_detections(self, sample: dict) -> fo.Detections: + # Images are a known, static size + image_width = 1280 + image_height = 720 + + # Extract detection labels and pre-process bounding boxes + labels_list = json.loads(sample["labels"]) + for label_data in labels_list: + if "box2d" in label_data: + label_data["box2d"] = { + k: float(v) + for k, v in label_data["box2d"].items() + } + + return fo.Detections( + detections=[ + fo.Detection( + label=label_data["category"], + # FiftyOne expects bounding boxes to be of the form + # [x, y, width, height] + # where values are normalized to the image's dimensions. + # + # Our source data is of the form + # {x1, y1, x2, y2} + # where values are in absolute pixels. + bounding_box=[ + label_data["box2d"]["x1"] / image_width, + label_data["box2d"]["y1"] / image_height, + (label_data["box2d"]["x2"] - label_data["box2d"]["x1"]) / image_width, + (label_data["box2d"]["y2"] - label_data["box2d"]["y1"]) / image_height + ] + ) + for label_data in labels_list + if "box2d" in label_data + ] + ) + + def _response_to_dicts(self, response: StatementResponse) -> list[dict]: + # Check for response errors before processing + self._check_for_error(response) + + # Extract column names from response + columns = response.manifest.schema.columns + column_names = [column.name for column in columns] + + # Extract data from response + data = response.result.data_array or [] + + # Each element in data is a list of raw column values. + # Remap ([col1, col2, ..., colN], [val1, val2, ..., valN]) tuples + # to {col1: val1, col2: val2, ..., colN: valN} dicts + return [ + { + key: value + for key, value in zip(column_names, datum) + } + for datum in data + ] + + def _check_for_error(self, response: StatementResponse): + if response is None: + raise ValueError("received null response from databricks") + + if response.status is not None: + if response.status.error is not None: + raise ValueError("databricks error: ({0}) {1}".format( + response.status.error.error_code, + response.status.error.message + )) + + if response.status.state in ( + StatementState.CLOSED, + StatementState.FAILED, + StatementState.CANCELED, + ): + raise ValueError( + f"databricks error: response state = {response.status.state}" + ) + .. _data-lens-bigquery: Google BigQuery --------------- -Here's a fully-functional Data Lens connector for BigQuery: +Below is an example of a Data Lens connector for BigQuery: .. code-block:: python :linenos: From 91a8e95920e54cfa5ea911b4ef97f53c32c269bb Mon Sep 17 00:00:00 2001 From: Tom Schmidt Date: Thu, 19 Dec 2024 11:31:14 -0800 Subject: [PATCH 2/2] add example of dynamic operator to data lens docs --- docs/source/teams/data_lens.rst | 81 +++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/docs/source/teams/data_lens.rst b/docs/source/teams/data_lens.rst index 8394c284d9..34d7fdee1e 100644 --- a/docs/source/teams/data_lens.rst +++ b/docs/source/teams/data_lens.rst @@ -1125,3 +1125,84 @@ as powerful as it needs to be. We can leverage internal libraries and services, hosted solutions, and tooling that meets the specific needs of our data. We can expose flexible but precise controls to users to allow them to find exactly the data that's needed. + +.. _data-lens-snippet-remap-fields: + +Snippet: Dynamic user inputs +--------------- + +As the volume and complexity of your data grows, you may want to expose many +options to Data Lens users, but doing so all at once can be overwhelming for +the user. In this example, we'll look at how we can use +:ref:`dynamic operators ` to conditionally expose +configuration options to Data Lens users. + +.. code-block:: python + :linenos: + + class MyOperator(DataLensOperator): + @property + def config(self) -> foo.OperatorConfig: + return OperatorConfig( + name="my_operator", + label="My operator", + dynamic=True, + ) + + +By setting `dynamic=True` in our operator config, our operator will be able to +customize the options shown to a Data Lens user based on the current state. +Let's use this to optionally show an "advanced options" section in our query +parameters. + +.. code-block:: python + :linenos: + + def resolve_input(self, ctx: foo.ExecutionContext): + inputs = types.Object() + + inputs.str("some_param", label="Parameter value") + inputs.str("other_param", label="Other value") + + inputs.bool("show_advanced", label="Show advanced options", default=False) + + # Since this is a dynamic operator, + # we can use `ctx.params` to conditionally show options + if ctx.params.get("show_advanced") is True: + # In this example, we'll optionally show configuration which allows a user + # to remap selected sample fields to another name. + # This could be used to enable users to import samples into datasets with + # varying schemas. + remappable_fields = ("field_a", "field_b") + for field_name in remappable_fields: + inputs.str(f"{field_name}_remap", label=f"Remap {field_name} to another name") + + return types.Property(inputs) + +Our operator's `resolve_input` method will be called each time `ctx.params` +changes, which allows us to create an experience that is tailored to the Data +Lens user's behavior. In this example, we're optionally displaying advanced +configuration that allows a user to remap sample fields. Applying this +remapping might look something like this. + +.. code-block:: python + :linenos: + + def _remap_sample_fields(self, sample: dict, request: DataLensSearchRequest): + remappable_fields = ("field_a", "field_b") + for field_name in remappable_fields: + remapped_field_name = request.search_params.get(f"{field_name}_remap") + if remapped_field_name not in (None, ""): + sample[remapped_field_name] = sample[field_name] + del sample[field_name] + +Of course, dynamic operators can be used for much more than this. Search +experiences can be broadened or narrowed to allow for both breadth and depth +within your connected data sources. + +As an example, suppose a user is searching for detections of "traffic light" +in an autonomous driving dataset. A dynamic operator can be used to expose +additional search options that are specific to traffic lights, such as being +able to select samples with only red, yellow, or green lights. In this way, +dynamic operators provide a simple mechanism for developing intuitive and +context-sensitive search experiences for Data Lens users.