Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

overwrite filter strategy #1343

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
overwrite filter for BQ
  • Loading branch information
stankiewicz committed May 27, 2022
commit 793ccd6c5bd4410a26216dd6da35549bb561ae59
48 changes: 38 additions & 10 deletions core/adapters/bigquery.ts
Original file line number Diff line number Diff line change
@@ -32,22 +32,34 @@ export class BigQueryAdapter extends Adapter implements IAdapter {
if (table.type === "incremental") {
if (!this.shouldWriteIncrementally(runConfig, tableMetadata)) {
tasks.add(Task.statement(this.createOrReplace(table)));
} else {
} else if (table.uniqueKey && table.uniqueKey.length > 0 && (table.strategy===null || table.strategy==="merge")){
tasks.add(
Task.statement(
table.uniqueKey && table.uniqueKey.length > 0
? this.mergeInto(
this.mergeInto(
table.target,
tableMetadata?.fields.map(f => f.name),
this.where(table.incrementalQuery || table.query, table.where),
table.uniqueKey,
table.bigquery && table.bigquery.updatePartitionFilter
)
: this.insertInto(
table.target,
tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``),
this.where(table.incrementalQuery || table.query, table.where)
)
)
);
} else {
if (table.strategy==="insert_overwrite"){
if(table.overwriteFilter){
tasks.add(Task.statement(this.deleteWithStaticFilter(table.target,table.overwriteFilter)));
}else {
tasks.add(Task.statement(this.deleteDynamically(table.target,table.bigquery.partitionBy,
this.where(table.incrementalQuery || table.query, table.where))));
}
}
tasks.add(
Task.statement(
this.insertInto(
table.target,
tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``),
this.where(table.incrementalQuery || table.query, table.where)
)
)
);
}
@@ -82,15 +94,15 @@ export class BigQueryAdapter extends Adapter implements IAdapter {
}
if (table.bigquery && table.bigquery.partitionBy && table.bigquery.requirePartitionFilter){
options.push(`require_partition_filter=${table.bigquery.requirePartitionFilter}`)
}
}
if(table.bigquery && table.bigquery.additionalOptions){
for(const [optionName, optionValue] of Object.entries(table.bigquery.additionalOptions)){
options.push(`${optionName}=${optionValue}`)
}
}

return `create or replace ${
table.materialized
table.materialized
? "materialized "
: ""
}${this.tableTypeAsSql(
@@ -114,6 +126,22 @@ export class BigQueryAdapter extends Adapter implements IAdapter {
create or replace view ${this.resolveTarget(target)} as ${query}`;
}

private deleteWithStaticFilter(
target: dataform.ITarget,
overwriteFilter: string
) {
return `delete from ${this.resolveTarget(target)} T where ${overwriteFilter}`;
}

private deleteDynamically(
target: dataform.ITarget,
partitionBy: string,
query: string
) {
return `delete from ${this.resolveTarget(target)} T where ${partitionBy} in (select ${partitionBy} from (${query}))`;
}


private mergeInto(
target: dataform.ITarget,
columns: string[],
2 changes: 2 additions & 0 deletions protos/core.proto
Original file line number Diff line number Diff line change
@@ -179,6 +179,8 @@ message Table {
string where = 8 [deprecated = true];
string incremental_query = 26;
repeated string unique_key = 30;
string strategy = 36;
string overwrite_filter = 37;

// Pre/post operations.
repeated string pre_ops = 13;
52 changes: 52 additions & 0 deletions tests/api/api.spec.ts
Original file line number Diff line number Diff line change
@@ -426,6 +426,58 @@ suite("@dataform/api", () => {
);
});

test("bigquery_incremental_insert_overwrite_custom_overwrite_filter", () => {
const graph = dataform.CompiledGraph.create({
projectConfig: { warehouse: "bigquery", defaultDatabase: "deeb" },
tables: [
{
target: {
schema: "schema",
name: "incremental"
},
type: "incremental",
strategy: "insert_overwrite",
overwriteFilter: "1=1",
query: "select 1 as test",
where: "true"
}
]
});
const state = dataform.WarehouseState.create({
tables: [
{
target: {
schema: "schema",
name: "incremental"
},
type: dataform.TableMetadata.Type.TABLE,
fields: [
{
name: "existing_field"
}
]
}
]
});
const executionGraph = new Builder(graph, {}, state).build();
expect(
cleanSql(
executionGraph.actions.filter(
n => targetAsReadableString(n.target) === "schema.incremental"
)[0].tasks[0].statement
)
).equals(
cleanSql(
`delete from \`deeb.schema.incremental\` t
where 1=1 ; insert into \`deeb.schema.incremental\` (\`existing_field\`)
select \`existing_field\` from (
select * from (select 1 as test) as subquery
where true
) as insertions`
)
);
});

test("bigquery_materialized", () => {
const testGraph: dataform.ICompiledGraph = dataform.CompiledGraph.create({
projectConfig: { warehouse: "bigquery", defaultDatabase: "deeb" },
12 changes: 6 additions & 6 deletions tests/cli/cli.spec.ts
Original file line number Diff line number Diff line change
@@ -48,8 +48,8 @@ suite(__filename, () => {
fs.ensureFileSync(filePath);
fs.writeFileSync(
filePath,
`
config { type: "table" }
`config { type: "table", schema: \`\${dataform.projectConfig.vars.testVar1}\` }

select 1 as \${dataform.projectConfig.vars.testVar2}
`
);
@@ -73,11 +73,11 @@ select 1 as \${dataform.projectConfig.vars.testVar2}
type: "table",
target: {
database: "dataform-integration-tests",
schema: "dataform",
schema: "testValue1",
name: "example"
},
canonicalTarget: {
schema: "dataform",
schema: "testValue1",
name: "example",
database: "dataform-integration-tests"
},
@@ -102,7 +102,7 @@ select 1 as \${dataform.projectConfig.vars.testVar2}
targets: [
{
database: "dataform-integration-tests",
schema: "dataform",
schema: "testValue1",
name: "example"
}
]
@@ -133,7 +133,7 @@ select 1 as \${dataform.projectConfig.vars.testVar2}
target: {
database: "dataform-integration-tests",
name: "example",
schema: "dataform"
schema: "testValue1"
},
tasks: [
{