Skip to content

Commit

Permalink
add example databricks connector to data lens docs
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-vx51 committed Dec 19, 2024
1 parent 4bcc45d commit 9f78594
Showing 1 changed file with 341 additions and 3 deletions.
344 changes: 341 additions & 3 deletions docs/source/teams/data_lens.rst
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ Using these inputs, we are expected to return a generator which yields
:class:`DataLensSearchResponse <fiftyone.operators.data_lens.models.DataLensSearchResponse>`
objects. To start, we'll create some synthetic data to better understand the
interaction between Data Lens and our operator. We'll look at a
:ref:`more realistic example <data-lens-bigquery>` later on.
:ref:`more realistic example <data-lens-databricks>` later on.

.. note::

Expand Down Expand Up @@ -573,7 +573,7 @@ There are a couple things to note about the changes we made here.
This example is meant to illustrate how users can interact with our
operator. For a more realistic view into how inputs can tailor our search
experience, see our example
:ref:`integration with BigQuery <data-lens-bigquery>`.
:ref:`integration with Databricks <data-lens-databricks>`.

.. _data-lens-preview-vs-import:

Expand All @@ -594,12 +594,350 @@ ______________________________
This section provides example Data Lens connectors for various popular data
sources.

.. _data-lens-databricks:

Databricks
----------

Below is an example of a Data Lens connector for Databricks. This example uses
a schema consistent with the Berkeley DeepDrive dataset format.

.. code-block:: python
:linenos:
import json
import time
from typing import Generator
import fiftyone as fo
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.sql import (
StatementResponse, StatementState, StatementParameterListItem
)
from fiftyone import operators as foo
from fiftyone.operators import types
from fiftyone.operators.data_lens import (
DataLensOperator, DataLensSearchRequest, DataLensSearchResponse
)
class DatabricksConnector(DataLensOperator):
"""Data Lens operator which retrieves samples from Databricks."""
@property
def config(self) -> foo.OperatorConfig:
return foo.OperatorConfig(
name="databricks_connector",
label="Databricks Connector",
unlisted=True,
execute_as_generator=True,
)
def resolve_input(self, ctx: foo.ExecutionContext):
inputs = types.Object()
# Times of day
inputs.bool(
"daytime",
label="Day",
description="Show daytime samples",
default=True,
)
inputs.bool(
"night",
label="Night",
description="Show night samples",
default=True,
)
inputs.bool(
"dawn/dusk",
label="Dawn / Dusk",
description="Show dawn/dusk samples",
default=True,
)
# Weather
inputs.bool(
"clear",
label="Clear weather",
description="Show samples with clear weather",
default=True,
)
inputs.bool(
"rainy",
label="Rainy weather",
description="Show samples with rainy weather",
default=True,
)
# Detection label
inputs.str(
"detection_label",
label="Detection label",
description="Filter samples by detection label",
)
return types.Property(inputs)
def handle_lens_search_request(
self,
request: DataLensSearchRequest,
ctx: foo.ExecutionContext
) -> Generator[DataLensSearchResponse, None, None]:
handler = DatabricksHandler()
for response in handler.handle_request(request, ctx):
yield response
class DatabricksHandler:
"""Handler for interacting with Databricks tables."""
def __init__(self):
self.client = None
self.warehouse_id = None
def handle_request(
self,
request: DataLensSearchRequest,
ctx: foo.ExecutionContext
) -> Generator[DataLensSearchResponse, None, None]:
# Initialize the client
self._init_client(ctx)
# Iterate over samples
sample_buffer = []
for sample in self._iter_data(request):
sample_buffer.append(self._transform_sample(sample))
# Yield batches of data as they are available
if len(sample_buffer) == request.batch_size:
yield DataLensSearchResponse(
result_count=len(sample_buffer),
query_result=sample_buffer,
)
sample_buffer = []
# Yield final batch if it's non-empty
if len(sample_buffer) > 0:
yield DataLensSearchResponse(
result_count=len(sample_buffer),
query_result=sample_buffer,
)
# No more samples.
def _init_client(self, ctx: foo.ExecutionContext):
"""Prepare the Databricks client for query execution."""
# Initialize the Databricks client using credentials provided via `ctx.secret`
self.client = WorkspaceClient(
host=ctx.secret("DATABRICKS_HOST"),
account_id=ctx.secret("DATABRICKS_ACCOUNT_ID"),
client_id=ctx.secret("DATABRICKS_CLIENT_ID"),
client_secret=ctx.secret("DATABRICKS_CLIENT_SECRET"),
)
# Start a SQL warehouse instance to execute our query
self.warehouse_id = self._start_warehouse()
if self.warehouse_id is None:
raise ValueError("No available warehouse")
def _start_warehouse(self) -> str:
"""Start a SQL warehouse and return its ID."""
last_warehouse_id = None
# If any warehouses are already running, use the first available
for warehouse in self.client.warehouses.list():
last_warehouse_id = warehouse.id
if warehouse.health.status is not None:
return warehouse.id
# Otherwise, manually start the last available warehouse
if last_warehouse_id is not None:
self.client.warehouses.start(last_warehouse_id)
return last_warehouse_id
def _iter_data(self, request: DataLensSearchRequest) -> Generator[dict, None, None]:
"""Iterate over sample data retrieved from Databricks."""
# Filter samples based on selected times of day
enabled_times_of_day = tuple([
f'"{tod}"'
for tod in ["daytime", "night", "dawn/dusk"]
if request.search_params.get(tod, False)
])
# Filter samples based on selected weather
enabled_weather = tuple([
f'"{weather}"'
for weather in ["clear", "rainy"]
if request.search_params.get(weather, False)
])
# Build Databricks query
query = f"""
SELECT * FROM datasets.bdd.det_train samples
WHERE
samples.attributes.timeofday IN ({", ".join(enabled_times_of_day)})
AND samples.attributes.weather IN ({", ".join(enabled_weather)})
"""
query_parameters = []
# Filter samples based on detection label if provided
if request.search_params.get("detection_label") not in (None, ""):
query += f"""
AND samples.name IN (
SELECT DISTINCT(labels.name)
FROM datasets.bdd.det_train_labels labels
WHERE labels.category = :detection_label
)
"""
query_parameters.append(
StatementParameterListItem(
"detection_label",
value=request.search_params.get("detection_label")
)
)
# Execute query asynchronously;
# we'll get a statement_id that we can use to poll for results
statement_response = self.client.statement_execution.execute_statement(
query,
self.warehouse_id,
catalog="datasets",
parameters=query_parameters,
row_limit=request.max_results,
wait_timeout="0s"
)
# Poll on our statement until it's no longer in an active state
while (
statement_response.status.state in
(StatementState.PENDING, StatementState.RUNNING)
):
statement_response = self.client.statement_execution.get_statement(
statement_response.statement_id
)
time.sleep(2.5)
# Process the first batch of data
json_result = self._response_to_dicts(statement_response)
for element in json_result:
yield element
# Databricks paginates samples using "chunks"; iterate over chunks until next is None
while statement_response.result.next_chunk_index is not None:
statement_response = self.client.statement_execution.get_statement_result_chunk_n(
statement_response.statement_id,
statement_response.result.next_chunk_index
)
# Process the next batch of data
json_result = self._response_to_dicts(statement_response)
for element in json_result:
yield element
def _transform_sample(self, sample: dict) -> dict:
"""Transform a dict of raw Databricks data into a FiftyOne Sample dict."""
return fo.Sample(
filepath=f"cloud://bucket/{sample.get('name')}",
detections=self._build_detections(sample),
).to_dict()
def _build_detections(self, sample: dict) -> fo.Detections:
# Images are a known, static size
image_width = 1280
image_height = 720
# Extract detection labels and pre-process bounding boxes
labels_list = json.loads(sample["labels"])
for label_data in labels_list:
if "box2d" in label_data:
label_data["box2d"] = {
k: float(v)
for k, v in label_data["box2d"].items()
}
return fo.Detections(
detections=[
fo.Detection(
label=label_data["category"],
# FiftyOne expects bounding boxes to be of the form
# [x, y, width, height]
# where values are normalized to the image's dimensions.
#
# Our source data is of the form
# {x1, y1, x2, y2}
# where values are in absolute pixels.
bounding_box=[
label_data["box2d"]["x1"] / image_width,
label_data["box2d"]["y1"] / image_height,
(label_data["box2d"]["x2"] - label_data["box2d"]["x1"]) / image_width,
(label_data["box2d"]["y2"] - label_data["box2d"]["y1"]) / image_height
]
)
for label_data in labels_list
if "box2d" in label_data
]
)
def _response_to_dicts(self, response: StatementResponse) -> list[dict]:
# Check for response errors before processing
self._check_for_error(response)
# Extract column names from response
columns = response.manifest.schema.columns
column_names = [column.name for column in columns]
# Extract data from response
data = response.result.data_array or []
# Each element in data is a list of raw column values.
# Remap ([col1, col2, ..., colN], [val1, val2, ..., valN]) tuples
# to {col1: val1, col2: val2, ..., colN: valN} dicts
return [
{
key: value
for key, value in zip(column_names, datum)
}
for datum in data
]
def _check_for_error(self, response: StatementResponse):
if response is None:
raise ValueError("received null response from databricks")
if response.status is not None:
if response.status.error is not None:
raise ValueError("databricks error: ({0}) {1}".format(
response.status.error.error_code,
response.status.error.message
))
if response.status.state in (
StatementState.CLOSED,
StatementState.FAILED,
StatementState.CANCELED,
):
raise ValueError(
f"databricks error: response state = {response.status.state}"
)
.. _data-lens-bigquery:

Google BigQuery
---------------

Here's a fully-functional Data Lens connector for BigQuery:
Below is an example of a Data Lens connector for BigQuery:

.. code-block:: python
:linenos:
Expand Down

0 comments on commit 9f78594

Please sign in to comment.