Skip to content

Commit

Permalink
Merge pull request #328 from benjamin-awd/add-force-on-cluster
Browse files Browse the repository at this point in the history
Add force on cluster config option
  • Loading branch information
BentsiLeviav authored Jan 6, 2025
2 parents d2a63b0 + 81def6c commit b9f9191
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 7 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,23 @@ if `cluster` is set in profile, `on_cluster_clause` now will return cluster info
- Distributed materializations
- Models with Replicated engines


By default, tables and incremental materializations with non-replicated engines will not be affected by the `cluster` setting (model would be created on the connected node only).

To force relations to be created on a cluster regardless of their engine or materialization, use the `force_on_cluster` argument:
```sql
{{ config(
engine='Null',
materialized='materialized_view',
force_on_cluster='true'
)
}}
```

table and incremental materializations with non-replicated engine will not be affected by `cluster` setting (model would
be created on the connected node only).


### Compatibility

If a model has been created without a `cluster` setting, dbt-clickhouse will detect the situation and run all DDL/DML
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
@dataclass
class ClickHouseConfig(AdapterConfig):
engine: str = 'MergeTree()'
force_on_cluster: Optional[bool] = False
order_by: Optional[Union[List[str], str]] = 'tuple()'
partition_by: Optional[Union[List[str], str]] = None
sharding_key: Optional[Union[List[str], str]] = 'rand()'
Expand Down
14 changes: 7 additions & 7 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,20 @@ def create_from(
# If the database is set, and the source schema is "defaulted" to the source.name, override the
# schema with the database instead, since that's presumably what's intended for clickhouse
schema = relation_config.schema

cluster = quoting.credentials.cluster or ''
can_on_cluster = None
# We placed a hardcoded const (instead of importing it from dbt-core) in order to decouple the packages
if relation_config.resource_type == NODE_TYPE_SOURCE:
if schema == relation_config.source_name and relation_config.database:
schema = relation_config.database

if cluster and str(relation_config.config.get("force_on_cluster")).lower() == "true":
can_on_cluster = True

else:
cluster = quoting.credentials.cluster if quoting.credentials.cluster else ''
materialized = (
relation_config.config.materialized if relation_config.config.materialized else ''
)
engine = (
relation_config.config.get('engine') if relation_config.config.get('engine') else ''
)
materialized = relation_config.config.get('materialized') or ''
engine = relation_config.config.get('engine') or ''
can_on_cluster = cls.get_on_cluster(cluster, materialized, engine)

return cls.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,105 @@ def test_base(self, project):
assert len(results) == 1

self.assert_total_count_correct(project)


class TestMergeTreeForceClusterMaterialization(BaseSimpleMaterializations):
'''Test MergeTree materialized view is created across a cluster using the
`force_on_cluster` config argument
'''

@pytest.fixture(scope="class")
def models(self):
config_force_on_cluster = """
{{ config(
engine='MergeTree',
materialized='materialized_view',
force_on_cluster='true'
)
}}
"""

return {
"force_on_cluster.sql": config_force_on_cluster + model_base,
"schema.yml": schema_base_yml,
}

@pytest.fixture(scope="class")
def seeds(self):
return {
"schema.yml": base_seeds_schema_yml,
"base.csv": seeds_base_csv,
}

def assert_total_count_correct(self, project):
'''Check if table is created on cluster'''
cluster = project.test_config['cluster']

# check if data is properly distributed/replicated
table_relation = relation_from_name(project.adapter, "force_on_cluster")
# ClickHouse cluster in the docker-compose file
# under tests/integration is configured with 3 nodes
host_count = project.run_sql(
f"select count(host_name) as host_count from system.clusters where cluster='{cluster}'",
fetch="one",
)
assert host_count[0] > 1

table_count = project.run_sql(
f"select count() From clusterAllReplicas('{cluster}', system.tables) "
f"where database='{table_relation.schema}' and name='{table_relation.identifier}'",
fetch="one",
)

assert table_count[0] == 3

mv_count = project.run_sql(
f"select count() From clusterAllReplicas('{cluster}', system.tables) "
f"where database='{table_relation.schema}' and name='{table_relation.identifier}_mv'",
fetch="one",
)

assert mv_count[0] == 3

@pytest.mark.skipif(
os.environ.get('DBT_CH_TEST_CLUSTER', '').strip() == '', reason='Not on a cluster'
)
def test_base(self, project):
# cluster setting must exist
cluster = project.test_config['cluster']
assert cluster

# seed command
results = run_dbt(["seed"])
# seed result length
assert len(results) == 1

# run command
results = run_dbt()
# run result length
assert len(results) == 1

# names exist in result nodes
check_result_nodes_by_name(results, ["force_on_cluster"])

# check relation types
expected = {
"base": "table",
"replicated": "table",
}
check_relation_types(project.adapter, expected)

relation = relation_from_name(project.adapter, "base")
# table rowcount
result = project.run_sql(f"select count(*) as num_rows from {relation}", fetch="one")
assert result[0] == 10

# relations_equal
self.assert_total_count_correct(project)

# run full refresh
results = run_dbt(['--debug', 'run', '--full-refresh'])
# run result length
assert len(results) == 1

self.assert_total_count_correct(project)

0 comments on commit b9f9191

Please sign in to comment.