diff --git a/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java b/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java index 64a93bdf2..afca459f9 100644 --- a/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java +++ b/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java @@ -37,6 +37,8 @@ import org.rumbledb.runtime.functions.datetime.components.TimezoneFromTimeFunctionIterator; import org.rumbledb.runtime.functions.datetime.components.YearFromDateFunctionIterator; import org.rumbledb.runtime.functions.datetime.components.YearFromDateTimeFunctionIterator; +import org.rumbledb.runtime.functions.delta_lake.CreateDeltaLakeTableFunctionIterator; +import org.rumbledb.runtime.functions.delta_lake.DeleteDeltaLakeTableFunctionIterator; import org.rumbledb.runtime.functions.durations.components.DaysFromDurationFunctionIterator; import org.rumbledb.runtime.functions.durations.components.HoursFromDurationFunctionIterator; import org.rumbledb.runtime.functions.durations.components.ImplicitTimezoneIterator; @@ -2688,6 +2690,28 @@ private static BuiltinFunction createBuiltinFunction( BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL ); + /** + * function that creates a delta lake table at a given path location + */ + static final BuiltinFunction create_delta_lake_table = createBuiltinFunction( + new Name(Name.JN_NS, "jn", "create-delta-lake-table"), + "string", + "boolean", + CreateDeltaLakeTableFunctionIterator.class, + BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL + ); + + /** + * function that deletes a delta lake table at a given path location + */ + static final BuiltinFunction delete_delta_lake_table = createBuiltinFunction( + new Name(Name.JN_NS, "jn", "delete-delta-lake-table"), + "string", + "boolean", + DeleteDeltaLakeTableFunctionIterator.class, + BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL + ); + static { builtinFunctions = new HashMap<>(); @@ -2718,6 +2742,9 @@ private static BuiltinFunction createBuiltinFunction( builtinFunctions.put(avro_file2.getIdentifier(), avro_file2); builtinFunctions.put(parse_json.getIdentifier(), parse_json); + builtinFunctions.put(create_delta_lake_table.getIdentifier(), create_delta_lake_table); + builtinFunctions.put(delete_delta_lake_table.getIdentifier(), delete_delta_lake_table); + builtinFunctions.put(count.getIdentifier(), count); builtinFunctions.put(boolean_function.getIdentifier(), boolean_function); builtinFunctions.put(not_function.getIdentifier(), not_function); diff --git a/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java b/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java new file mode 100644 index 000000000..50247351c --- /dev/null +++ b/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java @@ -0,0 +1,89 @@ +package org.rumbledb.runtime.functions.delta_lake; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.rumbledb.api.Item; +import org.rumbledb.context.DynamicContext; +import org.rumbledb.context.RuntimeStaticContext; +import org.rumbledb.exceptions.CannotRetrieveResourceException; +import org.rumbledb.items.BooleanItem; +import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator; +import org.rumbledb.runtime.RuntimeIterator; +import org.rumbledb.runtime.functions.input.FileSystemUtil; +import org.apache.spark.sql.Row; +import sparksoniq.spark.SparkSessionManager; + +import java.io.File; +import java.net.URI; +import java.util.List; + +import static org.apache.spark.sql.functions.lit; +import static org.apache.spark.sql.functions.monotonically_increasing_id; + +public class CreateDeltaLakeTableFunctionIterator extends AtMostOneItemLocalRuntimeIterator { + + public CreateDeltaLakeTableFunctionIterator( + List arguments, + RuntimeStaticContext staticContext + ) { + super(arguments, staticContext); + } + + @Override + public Item materializeFirstItemOrNull(DynamicContext context) { + RuntimeIterator urlIterator = this.children.get(0); + urlIterator.open(context); + String url = urlIterator.next().getStringValue(); + urlIterator.close(); + URI uri = FileSystemUtil.resolveURI(this.staticURI, url, getMetadata()); + if (FileSystemUtil.exists(uri, context.getRumbleRuntimeConfiguration(), getMetadata())) { + throw new CannotRetrieveResourceException( + "File " + uri + " already exists. Cannot create new delta lake table at this location.", + getMetadata() + ); + } + try { + File directory = new File(uri.getPath()); + if (!directory.exists()) { + boolean mkdirs = directory.mkdirs(); + if (!mkdirs) { + throw new RuntimeException("Failed to create directory " + directory); + } + } + Dataset dataFrame = SparkSessionManager.getInstance() + .getOrCreateSession() + .emptyDataFrame(); + dataFrame = dataFrame.withColumn(SparkSessionManager.mutabilityLevelColumnName, lit(0)); + dataFrame = dataFrame.withColumn(SparkSessionManager.rowIdColumnName, monotonically_increasing_id()); + dataFrame = dataFrame.withColumn(SparkSessionManager.pathInColumnName, lit("")); + dataFrame = dataFrame.withColumn(SparkSessionManager.tableLocationColumnName, lit(uri.toString())); + + StructType schema = new StructType() + .add(SparkSessionManager.mutabilityLevelColumnName, DataTypes.IntegerType, false) + .add(SparkSessionManager.rowIdColumnName, DataTypes.IntegerType, false) + .add(SparkSessionManager.pathInColumnName, DataTypes.StringType, false) + .add(SparkSessionManager.tableLocationColumnName, DataTypes.StringType, false); + + Row newRow = RowFactory.create( + 0, + 0, + "", + uri.toString() + ); + + Dataset newRowDataFrame = SparkSessionManager.getInstance() + .getOrCreateSession() + .createDataFrame(List.of(newRow), schema); + + Dataset combinedDataFrame = dataFrame.union(newRowDataFrame); + + combinedDataFrame.write().format("delta").mode("error").save(uri.toString()); + return new BooleanItem(true); + } catch (RuntimeException e) { + e.printStackTrace(); + return new BooleanItem(false); + } + } +} diff --git a/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java new file mode 100644 index 000000000..c62424558 --- /dev/null +++ b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java @@ -0,0 +1,53 @@ +package org.rumbledb.runtime.functions.delta_lake; + +import org.apache.commons.io.FileUtils; +import org.rumbledb.api.Item; +import org.rumbledb.context.DynamicContext; +import org.rumbledb.context.RuntimeStaticContext; +import org.rumbledb.exceptions.CannotRetrieveResourceException; +import org.rumbledb.items.BooleanItem; +import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator; +import org.rumbledb.runtime.RuntimeIterator; +import org.rumbledb.runtime.functions.input.FileSystemUtil; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; + +public class DeleteDeltaLakeTableFunctionIterator extends AtMostOneItemLocalRuntimeIterator { + + public DeleteDeltaLakeTableFunctionIterator( + List arguments, + RuntimeStaticContext staticContext + ) { + super(arguments, staticContext); + } + + @Override + public Item materializeFirstItemOrNull(DynamicContext context) { + RuntimeIterator urlIterator = this.children.get(0); + urlIterator.open(context); + String url = urlIterator.next().getStringValue(); + urlIterator.close(); + URI uri = FileSystemUtil.resolveURI(this.staticURI, url, getMetadata()); + if (!FileSystemUtil.exists(uri, context.getRumbleRuntimeConfiguration(), getMetadata())) { + throw new CannotRetrieveResourceException("File " + uri + " not found.", getMetadata()); + } + + // URI tableURI = FileSystemUtil.resolveURIAgainstWorkingDirectory( + // this.currentAnnotation.getDeltaTablePath(), + // DeltaLakeConfigurationCatalogue.defaultDeltaLakeConfiguration, + // ExceptionMetadata.EMPTY_METADATA + // ); + + try { + File oldTable = new File(uri.getPath()); + FileUtils.deleteDirectory(oldTable); + return new BooleanItem(true); + } catch (IOException e) { + e.printStackTrace(); + return new BooleanItem(false); + } + } +} diff --git a/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java new file mode 100644 index 000000000..b06283ebc --- /dev/null +++ b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java @@ -0,0 +1,32 @@ +package org.rumbledb.runtime.functions.delta_lake; + +import org.rumbledb.config.RumbleRuntimeConfiguration; + +public class DeltaLakeConfigurationCatalogue { + static final RumbleRuntimeConfiguration defaultDeltaLakeConfiguration = new RumbleRuntimeConfiguration( + new String[] { + "--print-iterator-tree", + "yes", + "--output-format", + "delta", + "--show-error-info", + "yes", + "--apply-updates", + "yes", + } + ); + + static final RumbleRuntimeConfiguration createDeltaLakeConfiguration = new RumbleRuntimeConfiguration( + new String[] { + "--print-iterator-tree", + "yes", + "--output-format", + "delta", + "--show-error-info", + "yes", + "--apply-updates", + "yes", + } + ); + +} diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq new file mode 100644 index 000000000..fe541dcd8 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,7]; Output="[ "SUCCESS" ]" :) +let $data := delta-file("./tempDeltaTable") +return delete json $data.new_array[[1]]; +delta-file("./tempDeltaTable").new_array \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq new file mode 100644 index 000000000..e74f7460f --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,5]; Output="SUCCESS" :) +let $data := delta-file("./tempDeltaTable") +return insert json "SUCCESS" into $data.new_array at position 1; +delta-file("./tempDeltaTable").new_array[[1]] \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq new file mode 100644 index 000000000..12858250d --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,6]; Output="DOUBLE SUCCESS" :) +let $data := delta-file("./tempDeltaTable") +return replace value of json $data.new_array[[1]] with "DOUBLE SUCCESS"; +delta-file("./tempDeltaTable").new_array[[1]] \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq new file mode 100644 index 000000000..177b46c42 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,4]; Output="null" :) +let $data := delta-file("./tempDeltaTable") +return delete json $data.success; +delta-file("./tempDeltaTable").success \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq new file mode 100644 index 000000000..19bd6bade --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,1]; Output="(SUCCESS, [ "SUCCESS" ])" :) +let $data := delta-file("./tempDeltaTable") +return (insert json "new_ins" : "SUCCESS" into $data, insert json "new_array" : ["SUCCESS"] into $data); +(delta-file("./tempDeltaTable").new_ins, delta-file("./tempDeltaTable").new_array) \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq new file mode 100644 index 000000000..5614b20b5 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,3]; Output="DOUBLE SUCCESS" :) +let $data := delta-file("./tempDeltaTable") +return rename json $data.new_ins as "success"; +delta-file("./tempDeltaTable").success \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq new file mode 100644 index 000000000..212df1297 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,2]; Output="DOUBLE SUCCESS" :) +let $data := delta-file("./tempDeltaTable") +return replace value of json $data.new_ins with "DOUBLE SUCCESS"; +delta-file("./tempDeltaTable").new_ins \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq new file mode 100644 index 000000000..2003ce8a5 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq @@ -0,0 +1,3 @@ +(:JIQS: ShouldRun; UpdateDim=[5,0]; Output="true" :) +let $ret := create-delta-lake-table("./tempDeltaTable") +return $ret \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq new file mode 100644 index 000000000..55473e11d --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq @@ -0,0 +1,3 @@ +(:JIQS: ShouldRun; UpdateDim=[5,8]; Output="true" :) +let $ret := delete-delta-lake-table("./tempDeltaTable") +return $ret \ No newline at end of file