From 8821cf2a28523ceaee327dbbc80452b288385f74 Mon Sep 17 00:00:00 2001 From: David-C-L Date: Thu, 24 Oct 2024 00:58:26 +0100 Subject: [PATCH 1/4] create function iterators for create and delete delta table funcs --- .../CreateDeltaLakeTableFunctionIterator.java | 92 +++++++++++++++++++ .../DeleteDeltaLakeTableFunctionIterator.java | 55 +++++++++++ .../DeltaLakeConfigurationCatalogue.java | 32 +++++++ .../FunctionDeltaArrayDelete.jq | 3 + .../FunctionDeltaArrayInsert.jq | 3 + .../FunctionDeltaArrayReplace.jq | 3 + .../FunctionDeltaObjectDelete.jq | 3 + .../FunctionDeltaObjectInsert.jq | 3 + .../FunctionDeltaObjectRename.jq | 3 + .../FunctionDeltaObjectReplace.jq | 3 + .../FunctionsCreateDeltaLakeTable.jq | 3 + .../FunctionsDeleteDeltaLakeTable.jq | 4 + 12 files changed, 207 insertions(+) create mode 100644 src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java create mode 100644 src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java create mode 100644 src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java create mode 100644 src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq create mode 100644 src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq create mode 100644 src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq create mode 100644 src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq create mode 100644 src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq create mode 100644 src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq create mode 100644 src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq create mode 100644 src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq create mode 100644 src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq diff --git a/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java b/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java new file mode 100644 index 000000000..1b4370bab --- /dev/null +++ b/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java @@ -0,0 +1,92 @@ +package org.rumbledb.runtime.functions.delta_lake; + +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.rumbledb.api.Item; +import org.rumbledb.cli.JsoniqQueryExecutor; +import org.rumbledb.context.DynamicContext; +import org.rumbledb.context.RuntimeStaticContext; +import org.rumbledb.exceptions.CannotRetrieveResourceException; +import org.rumbledb.exceptions.ExceptionMetadata; +import org.rumbledb.items.BooleanItem; +import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator; +import org.rumbledb.runtime.RuntimeIterator; +import org.rumbledb.runtime.functions.input.FileSystemUtil; +import org.rumbledb.runtime.primary.VariableReferenceIterator; +import org.apache.spark.sql.Row; +import sparksoniq.spark.SparkSessionManager; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; + +import static org.apache.spark.sql.functions.lit; +import static org.apache.spark.sql.functions.monotonically_increasing_id; + +public class CreateDeltaLakeTableFunctionIterator extends AtMostOneItemLocalRuntimeIterator { + + public CreateDeltaLakeTableFunctionIterator( + List arguments, + RuntimeStaticContext staticContext + ) { + super(arguments, staticContext); + } + + @Override + public Item materializeFirstItemOrNull(DynamicContext context) { + RuntimeIterator urlIterator = this.children.get(0); + urlIterator.open(context); + String url = urlIterator.next().getStringValue(); + urlIterator.close(); + URI uri = FileSystemUtil.resolveURI(this.staticURI, url, getMetadata()); + if (FileSystemUtil.exists(uri, context.getRumbleRuntimeConfiguration(), getMetadata())) { + throw new CannotRetrieveResourceException("File " + uri + " already exists. Cannot create new delta lake table at this location.", getMetadata()); + } + try { + File directory = new File(uri.getPath()); + if (!directory.exists()) { + boolean mkdirs = directory.mkdirs(); + if (!mkdirs) { + throw new RuntimeException("Failed to create directory " + directory); + } + } + Dataset dataFrame = SparkSessionManager.getInstance() + .getOrCreateSession() + .emptyDataFrame(); + dataFrame = dataFrame.withColumn(SparkSessionManager.mutabilityLevelColumnName, lit(0)); + dataFrame = dataFrame.withColumn(SparkSessionManager.rowIdColumnName, monotonically_increasing_id()); + dataFrame = dataFrame.withColumn(SparkSessionManager.pathInColumnName, lit("")); + dataFrame = dataFrame.withColumn(SparkSessionManager.tableLocationColumnName, lit(uri.toString())); + + StructType schema = new StructType() + .add(SparkSessionManager.mutabilityLevelColumnName, DataTypes.IntegerType, false) + .add(SparkSessionManager.rowIdColumnName, DataTypes.IntegerType, false) + .add(SparkSessionManager.pathInColumnName, DataTypes.StringType, false) + .add(SparkSessionManager.tableLocationColumnName, DataTypes.StringType, false); + + Row newRow = RowFactory.create( + 0, + 0, + "", + uri.toString() + ); + + Dataset newRowDataFrame = SparkSessionManager.getInstance().getOrCreateSession().createDataFrame(List.of(newRow), schema); + + Dataset combinedDataFrame = dataFrame.union(newRowDataFrame); + + combinedDataFrame.write().format("delta").mode("error").save(uri.toString()); + return new BooleanItem(true); + } catch (RuntimeException e) { + e.printStackTrace(); + return new BooleanItem(false); + } + } +} diff --git a/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java new file mode 100644 index 000000000..47f95d050 --- /dev/null +++ b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java @@ -0,0 +1,55 @@ +package org.rumbledb.runtime.functions.delta_lake; + +import org.apache.commons.io.FileUtils; +import org.rumbledb.api.Item; +import org.rumbledb.context.DynamicContext; +import org.rumbledb.context.RuntimeStaticContext; +import org.rumbledb.exceptions.CannotRetrieveResourceException; +import org.rumbledb.exceptions.ExceptionMetadata; +import org.rumbledb.items.BooleanItem; +import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator; +import org.rumbledb.runtime.RuntimeIterator; +import org.rumbledb.runtime.functions.input.FileSystemUtil; +import org.rumbledb.runtime.primary.VariableReferenceIterator; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; + +public class DeleteDeltaLakeTableFunctionIterator extends AtMostOneItemLocalRuntimeIterator { + + public DeleteDeltaLakeTableFunctionIterator( + List arguments, + RuntimeStaticContext staticContext + ) { + super(arguments, staticContext); + } + + @Override + public Item materializeFirstItemOrNull(DynamicContext context) { + RuntimeIterator urlIterator = this.children.get(0); + urlIterator.open(context); + String url = urlIterator.next().getStringValue(); + urlIterator.close(); + URI uri = FileSystemUtil.resolveURI(this.staticURI, url, getMetadata()); + if (!FileSystemUtil.exists(uri, context.getRumbleRuntimeConfiguration(), getMetadata())) { + throw new CannotRetrieveResourceException("File " + uri + " not found.", getMetadata()); + } + +// URI tableURI = FileSystemUtil.resolveURIAgainstWorkingDirectory( +// this.currentAnnotation.getDeltaTablePath(), +// DeltaLakeConfigurationCatalogue.defaultDeltaLakeConfiguration, +// ExceptionMetadata.EMPTY_METADATA +// ); + + try { + File oldTable = new File(uri.getPath()); + FileUtils.deleteDirectory(oldTable); + return new BooleanItem(true); + } catch (IOException e) { + e.printStackTrace(); + return new BooleanItem(false); + } + } +} diff --git a/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java new file mode 100644 index 000000000..3932a2d2c --- /dev/null +++ b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java @@ -0,0 +1,32 @@ +package org.rumbledb.runtime.functions.delta_lake; + +import org.rumbledb.config.RumbleRuntimeConfiguration; + +public class DeltaLakeConfigurationCatalogue { + static final RumbleRuntimeConfiguration defaultDeltaLakeConfiguration = new RumbleRuntimeConfiguration( + new String[] { + "--print-iterator-tree", + "yes", + "--output-format", + "delta", + "--show-error-info", + "yes", + "--apply-updates", + "yes", + } + ); + + static final RumbleRuntimeConfiguration createDeltaLakeConfiguration = new RumbleRuntimeConfiguration( + new String[] { + "--print-iterator-tree", + "yes", + "--output-format", + "delta", + "--show-error-info", + "yes", + "--apply-updates", + "yes", + } + ); + +} diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq new file mode 100644 index 000000000..8cda6c596 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq @@ -0,0 +1,3 @@ +(:JIQS: ShouldRun; UpdateDim=[0,13]; Output="" :) +let $data := delta-file("./tempDeltaTable") +return delete json $data.string_array[[2]] \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq new file mode 100644 index 000000000..e6190a18d --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq @@ -0,0 +1,3 @@ +(:JIQS: ShouldRun; UpdateDim=[0,9]; Output="" :) +let $data := delta-file("./tempDeltaTable") +return insert json "SUCCESS" into $data.string_array at position 2 \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq new file mode 100644 index 000000000..dc3eab8bf --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq @@ -0,0 +1,3 @@ +(:JIQS: ShouldRun; UpdateDim=[0,11]; Output="" :) +let $data := delta-file("./tempDeltaTable") +return replace value of json $data.string_array[[2]] with "DOUBLE SUCCESS" \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq new file mode 100644 index 000000000..2272f9b7e --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq @@ -0,0 +1,3 @@ +(:JIQS: ShouldRun; UpdateDim=[0,1]; Output="" :) +let $data := delta-file("./tempDeltaTable") +return delete json $data.bool \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq new file mode 100644 index 000000000..072bd22d5 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq @@ -0,0 +1,3 @@ +(:JIQS: ShouldRun; UpdateDim=[0,3]; Output="" :) +let $data := delta-file("./tempDeltaTable") +return insert json "new_ins" : "SUCCESS" into $data \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq new file mode 100644 index 000000000..0c3e0e50e --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq @@ -0,0 +1,3 @@ +(:JIQS: ShouldRun; UpdateDim=[0,7]; Output="" :) +let $data := delta-file("./tempDeltaTable") +return rename json $data.new_ins as "success" \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq new file mode 100644 index 000000000..e3e29f83d --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq @@ -0,0 +1,3 @@ +(:JIQS: ShouldRun; UpdateDim=[0,5]; Output="" :) +let $data := delta-file("./tempDeltaTable") +return replace value of json $data.new_ins with "DOUBLE SUCCESS" \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq new file mode 100644 index 000000000..aaf6ee497 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq @@ -0,0 +1,3 @@ +(:JIQS: ShouldRun; UpdateDim=[0,0]; CreateTable; UpdateTable="./src/test/resources/test_files/runtime-delta-updates/simple-updates/tempDeltaTable"; Output="" :) +let $data := parquet-file("../../../queries/sample-json.snappy.parquet") +return $data \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq new file mode 100644 index 000000000..42bd2eca9 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,0]; Output="" :) +create-delta-lake-table("./tempDeltaTable"); +let $data := delta-file("./tempDeltaTable") +return $data \ No newline at end of file From 2dd4a40b26c71a69404294cc8d193310b038e4f7 Mon Sep 17 00:00:00 2001 From: David-C-L Date: Thu, 24 Oct 2024 00:59:19 +0100 Subject: [PATCH 2/4] add functions to built in catalogue --- .../context/BuiltinFunctionCatalogue.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java b/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java index 64a93bdf2..71cbb3bd4 100644 --- a/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java +++ b/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java @@ -37,6 +37,8 @@ import org.rumbledb.runtime.functions.datetime.components.TimezoneFromTimeFunctionIterator; import org.rumbledb.runtime.functions.datetime.components.YearFromDateFunctionIterator; import org.rumbledb.runtime.functions.datetime.components.YearFromDateTimeFunctionIterator; +import org.rumbledb.runtime.functions.delta_lake.CreateDeltaLakeTableFunctionIterator; +import org.rumbledb.runtime.functions.delta_lake.DeleteDeltaLakeTableFunctionIterator; import org.rumbledb.runtime.functions.durations.components.DaysFromDurationFunctionIterator; import org.rumbledb.runtime.functions.durations.components.HoursFromDurationFunctionIterator; import org.rumbledb.runtime.functions.durations.components.ImplicitTimezoneIterator; @@ -2688,6 +2690,28 @@ private static BuiltinFunction createBuiltinFunction( BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL ); + /** + * function that creates a delta lake table at a given path location + */ + static final BuiltinFunction create_delta_lake_table = createBuiltinFunction( + new Name(Name.JN_NS, "jn", "create-delta-lake-table"), + "string", + "boolean", + CreateDeltaLakeTableFunctionIterator.class, + BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL + ); + + /** + * function that deletes a delta lake table at a given path location + */ + static final BuiltinFunction delete_delta_lake_table = createBuiltinFunction( + new Name(Name.JN_NS, "jn", "delete-delta-lake-table"), + "string", + "boolean", + DeleteDeltaLakeTableFunctionIterator.class, + BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL + ); + static { builtinFunctions = new HashMap<>(); @@ -2718,6 +2742,9 @@ private static BuiltinFunction createBuiltinFunction( builtinFunctions.put(avro_file2.getIdentifier(), avro_file2); builtinFunctions.put(parse_json.getIdentifier(), parse_json); + builtinFunctions.put(create_delta_lake_table.getIdentifier(), create_delta_lake_table); + builtinFunctions.put(delete_delta_lake_table.getIdentifier(), delete_delta_lake_table); + builtinFunctions.put(count.getIdentifier(), count); builtinFunctions.put(boolean_function.getIdentifier(), boolean_function); builtinFunctions.put(not_function.getIdentifier(), not_function); From 62e4f145f496bacd11eb535be48805888b7a90b2 Mon Sep 17 00:00:00 2001 From: David-C-L Date: Thu, 24 Oct 2024 00:59:40 +0100 Subject: [PATCH 3/4] create tests for new delta lake funcs --- .../delta-lake-functions/FunctionDeltaArrayDelete.jq | 5 +++-- .../delta-lake-functions/FunctionDeltaArrayInsert.jq | 5 +++-- .../delta-lake-functions/FunctionDeltaArrayReplace.jq | 5 +++-- .../delta-lake-functions/FunctionDeltaObjectDelete.jq | 5 +++-- .../delta-lake-functions/FunctionDeltaObjectInsert.jq | 5 +++-- .../delta-lake-functions/FunctionDeltaObjectRename.jq | 5 +++-- .../delta-lake-functions/FunctionDeltaObjectReplace.jq | 5 +++-- .../delta-lake-functions/FunctionsCreateDeltaLakeTable.jq | 6 +++--- .../delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq | 7 +++---- 9 files changed, 27 insertions(+), 21 deletions(-) diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq index 8cda6c596..fe541dcd8 100644 --- a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq @@ -1,3 +1,4 @@ -(:JIQS: ShouldRun; UpdateDim=[0,13]; Output="" :) +(:JIQS: ShouldRun; UpdateDim=[5,7]; Output="[ "SUCCESS" ]" :) let $data := delta-file("./tempDeltaTable") -return delete json $data.string_array[[2]] \ No newline at end of file +return delete json $data.new_array[[1]]; +delta-file("./tempDeltaTable").new_array \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq index e6190a18d..e74f7460f 100644 --- a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq @@ -1,3 +1,4 @@ -(:JIQS: ShouldRun; UpdateDim=[0,9]; Output="" :) +(:JIQS: ShouldRun; UpdateDim=[5,5]; Output="SUCCESS" :) let $data := delta-file("./tempDeltaTable") -return insert json "SUCCESS" into $data.string_array at position 2 \ No newline at end of file +return insert json "SUCCESS" into $data.new_array at position 1; +delta-file("./tempDeltaTable").new_array[[1]] \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq index dc3eab8bf..12858250d 100644 --- a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq @@ -1,3 +1,4 @@ -(:JIQS: ShouldRun; UpdateDim=[0,11]; Output="" :) +(:JIQS: ShouldRun; UpdateDim=[5,6]; Output="DOUBLE SUCCESS" :) let $data := delta-file("./tempDeltaTable") -return replace value of json $data.string_array[[2]] with "DOUBLE SUCCESS" \ No newline at end of file +return replace value of json $data.new_array[[1]] with "DOUBLE SUCCESS"; +delta-file("./tempDeltaTable").new_array[[1]] \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq index 2272f9b7e..177b46c42 100644 --- a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq @@ -1,3 +1,4 @@ -(:JIQS: ShouldRun; UpdateDim=[0,1]; Output="" :) +(:JIQS: ShouldRun; UpdateDim=[5,4]; Output="null" :) let $data := delta-file("./tempDeltaTable") -return delete json $data.bool \ No newline at end of file +return delete json $data.success; +delta-file("./tempDeltaTable").success \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq index 072bd22d5..19bd6bade 100644 --- a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq @@ -1,3 +1,4 @@ -(:JIQS: ShouldRun; UpdateDim=[0,3]; Output="" :) +(:JIQS: ShouldRun; UpdateDim=[5,1]; Output="(SUCCESS, [ "SUCCESS" ])" :) let $data := delta-file("./tempDeltaTable") -return insert json "new_ins" : "SUCCESS" into $data \ No newline at end of file +return (insert json "new_ins" : "SUCCESS" into $data, insert json "new_array" : ["SUCCESS"] into $data); +(delta-file("./tempDeltaTable").new_ins, delta-file("./tempDeltaTable").new_array) \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq index 0c3e0e50e..5614b20b5 100644 --- a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq @@ -1,3 +1,4 @@ -(:JIQS: ShouldRun; UpdateDim=[0,7]; Output="" :) +(:JIQS: ShouldRun; UpdateDim=[5,3]; Output="DOUBLE SUCCESS" :) let $data := delta-file("./tempDeltaTable") -return rename json $data.new_ins as "success" \ No newline at end of file +return rename json $data.new_ins as "success"; +delta-file("./tempDeltaTable").success \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq index e3e29f83d..212df1297 100644 --- a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq @@ -1,3 +1,4 @@ -(:JIQS: ShouldRun; UpdateDim=[0,5]; Output="" :) +(:JIQS: ShouldRun; UpdateDim=[5,2]; Output="DOUBLE SUCCESS" :) let $data := delta-file("./tempDeltaTable") -return replace value of json $data.new_ins with "DOUBLE SUCCESS" \ No newline at end of file +return replace value of json $data.new_ins with "DOUBLE SUCCESS"; +delta-file("./tempDeltaTable").new_ins \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq index aaf6ee497..2003ce8a5 100644 --- a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq @@ -1,3 +1,3 @@ -(:JIQS: ShouldRun; UpdateDim=[0,0]; CreateTable; UpdateTable="./src/test/resources/test_files/runtime-delta-updates/simple-updates/tempDeltaTable"; Output="" :) -let $data := parquet-file("../../../queries/sample-json.snappy.parquet") -return $data \ No newline at end of file +(:JIQS: ShouldRun; UpdateDim=[5,0]; Output="true" :) +let $ret := create-delta-lake-table("./tempDeltaTable") +return $ret \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq index 42bd2eca9..55473e11d 100644 --- a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq @@ -1,4 +1,3 @@ -(:JIQS: ShouldRun; UpdateDim=[5,0]; Output="" :) -create-delta-lake-table("./tempDeltaTable"); -let $data := delta-file("./tempDeltaTable") -return $data \ No newline at end of file +(:JIQS: ShouldRun; UpdateDim=[5,8]; Output="true" :) +let $ret := delete-delta-lake-table("./tempDeltaTable") +return $ret \ No newline at end of file From cd9cf10e0895cb060142234a8d92b8ad6ac1ada7 Mon Sep 17 00:00:00 2001 From: David-C-L Date: Thu, 24 Oct 2024 11:52:21 +0100 Subject: [PATCH 4/4] run mvn spotless:apply --- .../context/BuiltinFunctionCatalogue.java | 20 +++++----- .../CreateDeltaLakeTableFunctionIterator.java | 37 +++++++++---------- .../DeleteDeltaLakeTableFunctionIterator.java | 12 +++--- .../DeltaLakeConfigurationCatalogue.java | 32 ++++++++-------- 4 files changed, 48 insertions(+), 53 deletions(-) diff --git a/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java b/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java index 71cbb3bd4..afca459f9 100644 --- a/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java +++ b/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java @@ -2694,22 +2694,22 @@ private static BuiltinFunction createBuiltinFunction( * function that creates a delta lake table at a given path location */ static final BuiltinFunction create_delta_lake_table = createBuiltinFunction( - new Name(Name.JN_NS, "jn", "create-delta-lake-table"), - "string", - "boolean", - CreateDeltaLakeTableFunctionIterator.class, - BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL + new Name(Name.JN_NS, "jn", "create-delta-lake-table"), + "string", + "boolean", + CreateDeltaLakeTableFunctionIterator.class, + BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL ); /** * function that deletes a delta lake table at a given path location */ static final BuiltinFunction delete_delta_lake_table = createBuiltinFunction( - new Name(Name.JN_NS, "jn", "delete-delta-lake-table"), - "string", - "boolean", - DeleteDeltaLakeTableFunctionIterator.class, - BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL + new Name(Name.JN_NS, "jn", "delete-delta-lake-table"), + "string", + "boolean", + DeleteDeltaLakeTableFunctionIterator.class, + BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL ); static { diff --git a/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java b/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java index 1b4370bab..50247351c 100644 --- a/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java +++ b/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java @@ -1,30 +1,22 @@ package org.rumbledb.runtime.functions.delta_lake; -import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import org.rumbledb.api.Item; -import org.rumbledb.cli.JsoniqQueryExecutor; import org.rumbledb.context.DynamicContext; import org.rumbledb.context.RuntimeStaticContext; import org.rumbledb.exceptions.CannotRetrieveResourceException; -import org.rumbledb.exceptions.ExceptionMetadata; import org.rumbledb.items.BooleanItem; import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator; import org.rumbledb.runtime.RuntimeIterator; import org.rumbledb.runtime.functions.input.FileSystemUtil; -import org.rumbledb.runtime.primary.VariableReferenceIterator; import org.apache.spark.sql.Row; import sparksoniq.spark.SparkSessionManager; import java.io.File; -import java.io.IOException; import java.net.URI; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Arrays; import java.util.List; import static org.apache.spark.sql.functions.lit; @@ -47,7 +39,10 @@ public Item materializeFirstItemOrNull(DynamicContext context) { urlIterator.close(); URI uri = FileSystemUtil.resolveURI(this.staticURI, url, getMetadata()); if (FileSystemUtil.exists(uri, context.getRumbleRuntimeConfiguration(), getMetadata())) { - throw new CannotRetrieveResourceException("File " + uri + " already exists. Cannot create new delta lake table at this location.", getMetadata()); + throw new CannotRetrieveResourceException( + "File " + uri + " already exists. Cannot create new delta lake table at this location.", + getMetadata() + ); } try { File directory = new File(uri.getPath()); @@ -58,27 +53,29 @@ public Item materializeFirstItemOrNull(DynamicContext context) { } } Dataset dataFrame = SparkSessionManager.getInstance() - .getOrCreateSession() - .emptyDataFrame(); + .getOrCreateSession() + .emptyDataFrame(); dataFrame = dataFrame.withColumn(SparkSessionManager.mutabilityLevelColumnName, lit(0)); dataFrame = dataFrame.withColumn(SparkSessionManager.rowIdColumnName, monotonically_increasing_id()); dataFrame = dataFrame.withColumn(SparkSessionManager.pathInColumnName, lit("")); dataFrame = dataFrame.withColumn(SparkSessionManager.tableLocationColumnName, lit(uri.toString())); StructType schema = new StructType() - .add(SparkSessionManager.mutabilityLevelColumnName, DataTypes.IntegerType, false) - .add(SparkSessionManager.rowIdColumnName, DataTypes.IntegerType, false) - .add(SparkSessionManager.pathInColumnName, DataTypes.StringType, false) - .add(SparkSessionManager.tableLocationColumnName, DataTypes.StringType, false); + .add(SparkSessionManager.mutabilityLevelColumnName, DataTypes.IntegerType, false) + .add(SparkSessionManager.rowIdColumnName, DataTypes.IntegerType, false) + .add(SparkSessionManager.pathInColumnName, DataTypes.StringType, false) + .add(SparkSessionManager.tableLocationColumnName, DataTypes.StringType, false); Row newRow = RowFactory.create( - 0, - 0, - "", - uri.toString() + 0, + 0, + "", + uri.toString() ); - Dataset newRowDataFrame = SparkSessionManager.getInstance().getOrCreateSession().createDataFrame(List.of(newRow), schema); + Dataset newRowDataFrame = SparkSessionManager.getInstance() + .getOrCreateSession() + .createDataFrame(List.of(newRow), schema); Dataset combinedDataFrame = dataFrame.union(newRowDataFrame); diff --git a/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java index 47f95d050..c62424558 100644 --- a/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java +++ b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java @@ -5,12 +5,10 @@ import org.rumbledb.context.DynamicContext; import org.rumbledb.context.RuntimeStaticContext; import org.rumbledb.exceptions.CannotRetrieveResourceException; -import org.rumbledb.exceptions.ExceptionMetadata; import org.rumbledb.items.BooleanItem; import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator; import org.rumbledb.runtime.RuntimeIterator; import org.rumbledb.runtime.functions.input.FileSystemUtil; -import org.rumbledb.runtime.primary.VariableReferenceIterator; import java.io.File; import java.io.IOException; @@ -37,11 +35,11 @@ public Item materializeFirstItemOrNull(DynamicContext context) { throw new CannotRetrieveResourceException("File " + uri + " not found.", getMetadata()); } -// URI tableURI = FileSystemUtil.resolveURIAgainstWorkingDirectory( -// this.currentAnnotation.getDeltaTablePath(), -// DeltaLakeConfigurationCatalogue.defaultDeltaLakeConfiguration, -// ExceptionMetadata.EMPTY_METADATA -// ); + // URI tableURI = FileSystemUtil.resolveURIAgainstWorkingDirectory( + // this.currentAnnotation.getDeltaTablePath(), + // DeltaLakeConfigurationCatalogue.defaultDeltaLakeConfiguration, + // ExceptionMetadata.EMPTY_METADATA + // ); try { File oldTable = new File(uri.getPath()); diff --git a/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java index 3932a2d2c..b06283ebc 100644 --- a/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java +++ b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java @@ -5,27 +5,27 @@ public class DeltaLakeConfigurationCatalogue { static final RumbleRuntimeConfiguration defaultDeltaLakeConfiguration = new RumbleRuntimeConfiguration( new String[] { - "--print-iterator-tree", - "yes", - "--output-format", - "delta", - "--show-error-info", - "yes", - "--apply-updates", - "yes", + "--print-iterator-tree", + "yes", + "--output-format", + "delta", + "--show-error-info", + "yes", + "--apply-updates", + "yes", } ); static final RumbleRuntimeConfiguration createDeltaLakeConfiguration = new RumbleRuntimeConfiguration( new String[] { - "--print-iterator-tree", - "yes", - "--output-format", - "delta", - "--show-error-info", - "yes", - "--apply-updates", - "yes", + "--print-iterator-tree", + "yes", + "--output-format", + "delta", + "--show-error-info", + "yes", + "--apply-updates", + "yes", } );