diff --git a/content/docs/guides/datasets/incremental.md b/content/docs/guides/datasets/incremental.md index 89bff1408..50f1f55b1 100644 --- a/content/docs/guides/datasets/incremental.md +++ b/content/docs/guides/datasets/incremental.md @@ -140,6 +140,8 @@ INSERT INTO default_schema.example_incremental (timestamp, action) If no unique key is specified, then the merge condition (`T.user_id = S.user_id` in this example) is set as `false`, causing rows to always be inserted rather than merged. + + ## A merge example
@@ -203,6 +205,83 @@ WHEN NOT MATCHED THEN INSERT (timestamp, user_id, action) VALUES (timestamp, user_id, action) ``` +## An insert overwrite example + +If your table don't have unique key and you want to keep your tasks idempotent, use insert overwrite strategy. Insert overwrite will either detect which partitions to overwrite or you can specify overwrite filter or even set it dynamically. First example will detect partitions to overwrite. + +```sql +config { + type: "incremental", + strategy: "insert_overwrite", + bigquery: { + partitionBy: "DATE(timestamp)" + } +} + +SELECT timestamp, action FROM weblogs.user_actions +${ when(incremental(), `WHERE DATE(timestamp) = "` + dataform.projectConfig.vars.execution_date +`"`) } +``` + +### Generated SQL + +As with above, the SQL generated by the above example will depend on the warehouse type, but generally follow the same format. + +If the dataset doesn't exist yet: + +```js +CREATE OR REPLACE TABLE default_schema.example_incremental PARTITION BY DATE(timestamp) AS + SELECT timestamp, action + FROM weblogs.user_actions; +``` + +When incrementally processing new rows: + +```js +DELETE FROM default_schema.example_incremental +WHERE DATE(timestamp) in + ( + select DATE(timestamp) from ( + SELECT timestamp, action + FROM weblogs.user_actions + WHERE DATE(timestamp) = "2022-06-01") + ); + INSERT INTO default_schema.example_incremental (timestamp, action) + SELECT timestamp, user_action + FROM weblogs.user_actions + WHERE DATE(timestamp) = "2022-06-01"; +``` + +### Setting overwrite filter + +To simplify DELETE statement, overwrite filter can be set. Below you can find example with variable specified for specific day that is being processed. + +```sql +config { + type: "incremental", + strategy: "insert_overwrite", + overwriteFilter: `${"DATE(timestamp) = \"" + dataform.projectConfig.vars.execution_date + "\""}` + bigquery: { + partitionBy: "DATE(timestamp)" + } +} + +SELECT timestamp, action FROM weblogs.user_actions +${ when(incremental(), `WHERE DATE(timestamp) = "` + dataform.projectConfig.vars.execution_date +`"`) } +``` + +### Generated SQL + +Initial model will be created similarly to previous example but incremental step is faster. + +```js +DELETE FROM default_schema.example_incremental +WHERE DATE(timestamp) = "2022-06-01"; + INSERT INTO default_schema.example_incremental (timestamp, action) + SELECT timestamp, user_action + FROM weblogs.user_actions + WHERE DATE(timestamp) = "2022-06-01"; +``` + ## Daily snapshots with incremental datasets Incremental datasets can be used to create a daily snapshot of mutable external datasets. diff --git a/core/adapters/bigquery.ts b/core/adapters/bigquery.ts index 95593cf15..faa5eba11 100644 --- a/core/adapters/bigquery.ts +++ b/core/adapters/bigquery.ts @@ -32,22 +32,35 @@ export class BigQueryAdapter extends Adapter implements IAdapter { if (table.type === "incremental") { if (!this.shouldWriteIncrementally(runConfig, tableMetadata)) { tasks.add(Task.statement(this.createOrReplace(table))); - } else { + + } else if (table.uniqueKey && table.uniqueKey.length > 0 && (!table.strategy || table.strategy==="merge")){ tasks.add( Task.statement( - table.uniqueKey && table.uniqueKey.length > 0 - ? this.mergeInto( + this.mergeInto( table.target, tableMetadata?.fields.map(f => f.name), this.where(table.incrementalQuery || table.query, table.where), table.uniqueKey, table.bigquery && table.bigquery.updatePartitionFilter ) - : this.insertInto( - table.target, - tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), - this.where(table.incrementalQuery || table.query, table.where) - ) + ) + ); + } else { + if (table.strategy==="insert_overwrite"){ + if(table.overwriteFilter){ + tasks.add(Task.statement(this.deleteWithStaticFilter(table.target,table.overwriteFilter))); + }else { + tasks.add(Task.statement(this.deleteDynamically(table.target,table.bigquery.partitionBy, + this.where(table.incrementalQuery || table.query, table.where)))); + } + } + tasks.add( + Task.statement( + this.insertInto( + table.target, + tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``), + this.where(table.incrementalQuery || table.query, table.where) + ) ) ); } @@ -114,6 +127,22 @@ export class BigQueryAdapter extends Adapter implements IAdapter { create or replace view ${this.resolveTarget(target)} as ${query}`; } + private deleteWithStaticFilter( + target: dataform.ITarget, + overwriteFilter: string + ) { + return `delete from ${this.resolveTarget(target)} T where ${overwriteFilter}`; + } + + private deleteDynamically( + target: dataform.ITarget, + partitionBy: string, + query: string + ) { + return `delete from ${this.resolveTarget(target)} T where ${partitionBy} in (select ${partitionBy} from (${query}))`; + } + + private mergeInto( target: dataform.ITarget, columns: string[], diff --git a/core/adapters/snowflake.ts b/core/adapters/snowflake.ts index 19f2ee6ce..3790863d8 100644 --- a/core/adapters/snowflake.ts +++ b/core/adapters/snowflake.ts @@ -31,27 +31,33 @@ export class SnowflakeAdapter extends Adapter implements IAdapter { Task.statement(this.dropIfExists(table.target, this.oppositeTableType(baseTableType))) ); } - if (table.type === "incremental") { if (!this.shouldWriteIncrementally(runConfig, tableMetadata)) { tasks.add(Task.statement(this.createOrReplace(table))); - } else { + } else if (table.uniqueKey && table.uniqueKey.length > 0 && (!table.strategy || table.strategy==="merge")){ tasks.add( Task.statement( - table.uniqueKey && table.uniqueKey.length > 0 - ? this.mergeInto( + this.mergeInto( table.target, tableMetadata.fields.map(f => f.name), this.where(table.incrementalQuery || table.query, table.where), table.uniqueKey - ) - : this.insertInto( - table.target, - tableMetadata.fields.map(f => f.name), - this.where(table.incrementalQuery || table.query, table.where) - ) - ) - ); + ))); + } else { + if (table.strategy==="insert_overwrite"){ + if(table.overwriteFilter){ + tasks.add(Task.statement(this.deleteWithStaticFilter(table.target,table.overwriteFilter))); + }else { + throw new Error("insert_overwrite requires setting overtwriteFilter."); + } + } + tasks.add(Task.statement(this.insertInto( + table.target, + tableMetadata.fields.map(f => f.name), + this.where(table.incrementalQuery || table.query, table.where) + ))); + + } } else { tasks.add(Task.statement(this.createOrReplace(table))); @@ -92,6 +98,21 @@ export class SnowflakeAdapter extends Adapter implements IAdapter { }as ${table.query}`; } + private deleteWithStaticFilter( + target: dataform.ITarget, + overwriteFilter: string + ) { + return `delete from ${this.resolveTarget(target)} T where ${overwriteFilter}`; + } + + private deleteDynamically( + target: dataform.ITarget, + partitionBy: string, + query: string + ) { + return `delete from ${this.resolveTarget(target)} T where ${partitionBy} in (select ${partitionBy} from (${query}))`; + } + private mergeInto( target: dataform.ITarget, columns: string[], diff --git a/core/table.ts b/core/table.ts index 66122e93d..6604ccb73 100644 --- a/core/table.ts +++ b/core/table.ts @@ -324,6 +324,10 @@ export interface ITableConfig * or the [Snowflake materialized view docs](https://docs.snowflake.com/en/user-guide/views-materialized.html). */ materialized?: boolean; + + strategy?: string; + + overwriteFilter?: string; } // TODO: This needs to be a method, I'm really not sure why, but it hits a runtime failure otherwise. @@ -347,7 +351,9 @@ export const ITableConfigProperties = () => "database", "columns", "description", - "materialized" + "materialized", + "strategy", + "overwriteFilter" ]); /** @@ -384,7 +390,8 @@ export class Table { "postOps", "actionDescriptor", "disabled", - "where" + "where", + "strategy" ] }; @@ -468,6 +475,14 @@ export class Table { this.materialized(config.materialized); } + if(config.strategy){ + this.strategy(config.strategy); + } + + if(config.overwriteFilter){ + this.overwriteFilter(config.overwriteFilter); + } + return this; } @@ -515,6 +530,15 @@ export class Table { public materialized(materialized: boolean) { this.proto.materialized = materialized; } + + public strategy(strategy: string) { + this.proto.strategy = strategy; + } + + public overwriteFilter(overwriteFilter: string) { + this.proto.overwriteFilter = overwriteFilter; + } + public snowflake(snowflake: dataform.ISnowflakeOptions) { checkExcessProperties( diff --git a/protos/core.proto b/protos/core.proto index 08b80a2e5..0ff069c0a 100644 --- a/protos/core.proto +++ b/protos/core.proto @@ -179,6 +179,8 @@ message Table { string where = 8 [deprecated = true]; string incremental_query = 26; repeated string unique_key = 30; + string strategy = 36; + string overwrite_filter = 37; // Pre/post operations. repeated string pre_ops = 13; diff --git a/tests/api/api.spec.ts b/tests/api/api.spec.ts index 176163f13..a090f96f4 100644 --- a/tests/api/api.spec.ts +++ b/tests/api/api.spec.ts @@ -426,6 +426,58 @@ suite("@dataform/api", () => { ); }); + test("bigquery_incremental_insert_overwrite_custom_overwrite_filter", () => { + const graph = dataform.CompiledGraph.create({ + projectConfig: { warehouse: "bigquery", defaultDatabase: "deeb" }, + tables: [ + { + target: { + schema: "schema", + name: "incremental" + }, + type: "incremental", + strategy: "insert_overwrite", + overwriteFilter: "1=1", + query: "select 1 as test", + where: "true" + } + ] + }); + const state = dataform.WarehouseState.create({ + tables: [ + { + target: { + schema: "schema", + name: "incremental" + }, + type: dataform.TableMetadata.Type.TABLE, + fields: [ + { + name: "existing_field" + } + ] + } + ] + }); + const executionGraph = new Builder(graph, {}, state).build(); + expect( + cleanSql( + executionGraph.actions.filter( + n => targetAsReadableString(n.target) === "schema.incremental" + )[0].tasks[0].statement + ) + ).equals( + cleanSql( + `delete from \`deeb.schema.incremental\` t + where 1=1 ; insert into \`deeb.schema.incremental\` (\`existing_field\`) + select \`existing_field\` from ( + select * from (select 1 as test) as subquery + where true + ) as insertions` + ) + ); + }); + test("bigquery_materialized", () => { const testGraph: dataform.ICompiledGraph = dataform.CompiledGraph.create({ projectConfig: { warehouse: "bigquery", defaultDatabase: "deeb" }, @@ -554,6 +606,66 @@ suite("@dataform/api", () => { ); }); + test("snowflake_incremental", () => { + const testGraph: dataform.ICompiledGraph = dataform.CompiledGraph.create({ + projectConfig: { warehouse: "snowflake"}, + tables: [ + { + target: { + schema: "schema", + name: "incremental" + }, + type: "incremental", + strategy: "insert_overwrite", + overwriteFilter: "1=1", + query: "select 1 as test", + where: "true" + } + ] + }); + const state = dataform.WarehouseState.create({ + tables: [ + { + target: { + schema: "schema", + name: "incremental" + }, + type: dataform.TableMetadata.Type.TABLE, + fields: [ + { + name: "existing_field" + } + ] + } + ] + }); + const executionGraph = new Builder(testGraph, {}, state).build(); + expect( + cleanSql( + executionGraph.actions.filter( + n => targetAsReadableString(n.target) === "schema.incremental" + )[0].tasks[0].statement + ) + ).equals( + cleanSql(`delete from "schema"."incremental" t where 1=1`) + ); + + expect( + cleanSql( + executionGraph.actions.filter( + n => targetAsReadableString(n.target) === "schema.incremental" + )[0].tasks[1].statement + ) + ).equals( + cleanSql(`insert into "schema"."incremental" (existing_field) + select existing_field from ( + select * from (select 1 as test) as subquery + where true + ) as insertions`) + ); + }); + + test("redshift_create", () => { const testGraph: dataform.ICompiledGraph = dataform.CompiledGraph.create({ projectConfig: { warehouse: "redshift" }, diff --git a/tests/cli/cli.spec.ts b/tests/cli/cli.spec.ts index 2b5d70535..441fc8a78 100644 --- a/tests/cli/cli.spec.ts +++ b/tests/cli/cli.spec.ts @@ -48,8 +48,8 @@ suite(__filename, () => { fs.ensureFileSync(filePath); fs.writeFileSync( filePath, - ` -config { type: "table" } + `config { type: "table", schema: \`\${dataform.projectConfig.vars.testVar1}\` } + select 1 as \${dataform.projectConfig.vars.testVar2} ` ); @@ -61,7 +61,7 @@ select 1 as \${dataform.projectConfig.vars.testVar2} "compile", projectDir, "--json", - "--vars=testVar1=testValue1,testVar2=testValue2" + "--vars=testVar1=dataform,testVar2=testValue2" ]) ); @@ -93,7 +93,7 @@ select 1 as \${dataform.projectConfig.vars.testVar2} defaultDatabase: "dataform-integration-tests", useRunCache: false, vars: { - testVar1: "testValue1", + testVar1: "dataform", testVar2: "testValue2" } }, @@ -118,7 +118,7 @@ select 1 as \${dataform.projectConfig.vars.testVar2} "test_credentials/bigquery.json", "--dry-run", "--json", - "--vars=testVar1=testValue1,testVar2=testValue2" + "--vars=testVar1=dataform,testVar2=testValue2" ]) ); @@ -152,7 +152,7 @@ select 1 as \${dataform.projectConfig.vars.testVar2} useRunCache: false, warehouse: "bigquery", vars: { - testVar1: "testValue1", + testVar1: "dataform", testVar2: "testValue2" } }, diff --git a/tests/integration/bigquery.spec.ts b/tests/integration/bigquery.spec.ts index d1e428507..e1adbcea9 100644 --- a/tests/integration/bigquery.spec.ts +++ b/tests/integration/bigquery.spec.ts @@ -41,7 +41,7 @@ suite("@dataform/integration/bigquery", { parallel: true }, ({ before, after }) const executedGraph = await dfapi.run(dbadapter, executionGraph).result(); const actionMap = keyBy(executedGraph.actions, v => targetAsReadableString(v.target)); - expect(Object.keys(actionMap).length).eql(19); + expect(Object.keys(actionMap).length).eql(20); // Check the status of action execution. const expectedFailedActions = [ @@ -89,18 +89,20 @@ suite("@dataform/integration/bigquery", { parallel: true }, ({ before, after }) for (const runIteration of [ { runConfig: { - actions: ["example_incremental", "example_incremental_merge"], + actions: ["example_incremental", "example_incremental_merge", "example_incremental_insert_overwrite"], includeDependencies: true }, expectedIncrementalRows: 3, - expectedIncrementalMergeRows: 2 + expectedIncrementalMergeRows: 2, + expectedIncrementalInsertOverwriteRows: 4 }, { runConfig: { - actions: ["example_incremental", "example_incremental_merge"] + actions: ["example_incremental", "example_incremental_merge", "example_incremental_insert_overwrite"] }, expectedIncrementalRows: 5, - expectedIncrementalMergeRows: 2 + expectedIncrementalMergeRows: 2, + expectedIncrementalInsertOverwriteRows: 2 } ]) { const executionGraph = await dfapi.build(compiledGraph, runIteration.runConfig, dbadapter); @@ -108,7 +110,7 @@ suite("@dataform/integration/bigquery", { parallel: true }, ({ before, after }) expect(dataform.RunResult.ExecutionStatus[runResult.status]).eql( dataform.RunResult.ExecutionStatus[dataform.RunResult.ExecutionStatus.SUCCESSFUL] ); - const [incrementalRows, incrementalMergeRows] = await Promise.all([ + const [incrementalRows, incrementalMergeRows,incrementalInsertOverwriteRows] = await Promise.all([ getTableRows( { database: "dataform-integration-tests", @@ -126,10 +128,20 @@ suite("@dataform/integration/bigquery", { parallel: true }, ({ before, after }) }, adapter, dbadapter + ), + getTableRows( + { + database: "dataform-integration-tests", + schema: "df_integration_test_eu_incremental_tables", + name: "example_incremental_insert_overwrite" + }, + adapter, + dbadapter ) ]); expect(incrementalRows.length).equals(runIteration.expectedIncrementalRows); expect(incrementalMergeRows.length).equals(runIteration.expectedIncrementalMergeRows); + expect(incrementalInsertOverwriteRows.length).equals(runIteration.expectedIncrementalInsertOverwriteRows); } }); diff --git a/tests/integration/bigquery_project/definitions/example_incremental_insert_overwrite.sqlx b/tests/integration/bigquery_project/definitions/example_incremental_insert_overwrite.sqlx new file mode 100644 index 000000000..77fcabbee --- /dev/null +++ b/tests/integration/bigquery_project/definitions/example_incremental_insert_overwrite.sqlx @@ -0,0 +1,28 @@ +WITH example_data AS ( +SELECT dt AS dt, id_1 AS id_1, val AS val +FROM + (SELECT DATE("2022-06-01") AS dt, 21 AS id_1 , 'not_partitioned' AS val) UNION ALL + (SELECT DATE("2022-06-02") AS dt, 22 AS id_1, 'not_partitioned' AS val) UNION ALL + (SELECT DATE("2022-06-03") AS dt, 23 AS id_1, 'original' AS val) UNION ALL + (SELECT DATE("2022-06-03") AS dt, 24 AS id_1, 'original' AS val) UNION ALL + (SELECT DATE("2022-06-03") AS dt, 24 AS id_1, 'original_dup' AS val) UNION ALL + (SELECT DATE("2022-06-03") AS dt, 24 AS id_1, 'original_dup' AS val) UNION ALL + (SELECT DATE("2022-06-03") AS dt, 25 AS id_1, 'new' AS val) UNION ALL + (SELECT DATE("2022-06-03") AS dt, 26 AS id_1, 'new' AS val) +) + +config { + type: "incremental", + strategy: "insert_overwrite", + overwriteFilter: "dt = \"2022-06-03\"", + bigquery: { + partitionBy: "dt", + updatePartitionFilter: "dt > \"2022-06-02\"" + }, + hermetic: false +} + +SELECT dt, id_1, val +FROM example_data + +${ when(incremental(), `WHERE id_1 > 24`, `WHERE id_1 > 22 and id_1 < 25`) } diff --git a/tests/integration/bigquery_project/definitions/example_incremental_merge.sqlx b/tests/integration/bigquery_project/definitions/example_incremental_merge.sqlx index 24929f9b1..324966d7c 100644 --- a/tests/integration/bigquery_project/definitions/example_incremental_merge.sqlx +++ b/tests/integration/bigquery_project/definitions/example_incremental_merge.sqlx @@ -12,6 +12,7 @@ FROM config { type: "incremental", uniqueKey: ["id_1", "id_2"], + // strategy: "merge", bigquery: { partitionBy: "RANGE_BUCKET(ts, GENERATE_ARRAY(0, 6, 2))", updatePartitionFilter: "ts > 2"