Skip to content

Commit

Permalink
Merge pull request #1270 from RumbleDB/delta-lake-functions
Browse files Browse the repository at this point in the history
Delta lake functions
  • Loading branch information
ghislainfourny authored Oct 24, 2024
2 parents eeba107 + d8dac1f commit e151cc8
Show file tree
Hide file tree
Showing 13 changed files with 235 additions and 0 deletions.
27 changes: 27 additions & 0 deletions src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.rumbledb.runtime.functions.datetime.components.TimezoneFromTimeFunctionIterator;
import org.rumbledb.runtime.functions.datetime.components.YearFromDateFunctionIterator;
import org.rumbledb.runtime.functions.datetime.components.YearFromDateTimeFunctionIterator;
import org.rumbledb.runtime.functions.delta_lake.CreateDeltaLakeTableFunctionIterator;
import org.rumbledb.runtime.functions.delta_lake.DeleteDeltaLakeTableFunctionIterator;
import org.rumbledb.runtime.functions.durations.components.DaysFromDurationFunctionIterator;
import org.rumbledb.runtime.functions.durations.components.HoursFromDurationFunctionIterator;
import org.rumbledb.runtime.functions.durations.components.ImplicitTimezoneIterator;
Expand Down Expand Up @@ -2688,6 +2690,28 @@ private static BuiltinFunction createBuiltinFunction(
BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL
);

/**
* function that creates a delta lake table at a given path location
*/
static final BuiltinFunction create_delta_lake_table = createBuiltinFunction(
new Name(Name.JN_NS, "jn", "create-delta-lake-table"),
"string",
"boolean",
CreateDeltaLakeTableFunctionIterator.class,
BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL
);

/**
* function that deletes a delta lake table at a given path location
*/
static final BuiltinFunction delete_delta_lake_table = createBuiltinFunction(
new Name(Name.JN_NS, "jn", "delete-delta-lake-table"),
"string",
"boolean",
DeleteDeltaLakeTableFunctionIterator.class,
BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL
);

static {
builtinFunctions = new HashMap<>();

Expand Down Expand Up @@ -2718,6 +2742,9 @@ private static BuiltinFunction createBuiltinFunction(
builtinFunctions.put(avro_file2.getIdentifier(), avro_file2);
builtinFunctions.put(parse_json.getIdentifier(), parse_json);

builtinFunctions.put(create_delta_lake_table.getIdentifier(), create_delta_lake_table);
builtinFunctions.put(delete_delta_lake_table.getIdentifier(), delete_delta_lake_table);

builtinFunctions.put(count.getIdentifier(), count);
builtinFunctions.put(boolean_function.getIdentifier(), boolean_function);
builtinFunctions.put(not_function.getIdentifier(), not_function);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.rumbledb.runtime.functions.delta_lake;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.rumbledb.api.Item;
import org.rumbledb.context.DynamicContext;
import org.rumbledb.context.RuntimeStaticContext;
import org.rumbledb.exceptions.CannotRetrieveResourceException;
import org.rumbledb.items.BooleanItem;
import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator;
import org.rumbledb.runtime.RuntimeIterator;
import org.rumbledb.runtime.functions.input.FileSystemUtil;
import org.apache.spark.sql.Row;
import sparksoniq.spark.SparkSessionManager;

import java.io.File;
import java.net.URI;
import java.util.List;

import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.monotonically_increasing_id;

public class CreateDeltaLakeTableFunctionIterator extends AtMostOneItemLocalRuntimeIterator {

public CreateDeltaLakeTableFunctionIterator(
List<RuntimeIterator> arguments,
RuntimeStaticContext staticContext
) {
super(arguments, staticContext);
}

@Override
public Item materializeFirstItemOrNull(DynamicContext context) {
RuntimeIterator urlIterator = this.children.get(0);
urlIterator.open(context);
String url = urlIterator.next().getStringValue();
urlIterator.close();
URI uri = FileSystemUtil.resolveURI(this.staticURI, url, getMetadata());
if (FileSystemUtil.exists(uri, context.getRumbleRuntimeConfiguration(), getMetadata())) {
throw new CannotRetrieveResourceException(
"File " + uri + " already exists. Cannot create new delta lake table at this location.",
getMetadata()
);
}
try {
File directory = new File(uri.getPath());
if (!directory.exists()) {
boolean mkdirs = directory.mkdirs();
if (!mkdirs) {
throw new RuntimeException("Failed to create directory " + directory);
}
}
Dataset<Row> dataFrame = SparkSessionManager.getInstance()
.getOrCreateSession()
.emptyDataFrame();
dataFrame = dataFrame.withColumn(SparkSessionManager.mutabilityLevelColumnName, lit(0));
dataFrame = dataFrame.withColumn(SparkSessionManager.rowIdColumnName, monotonically_increasing_id());
dataFrame = dataFrame.withColumn(SparkSessionManager.pathInColumnName, lit(""));
dataFrame = dataFrame.withColumn(SparkSessionManager.tableLocationColumnName, lit(uri.toString()));

StructType schema = new StructType()
.add(SparkSessionManager.mutabilityLevelColumnName, DataTypes.IntegerType, false)
.add(SparkSessionManager.rowIdColumnName, DataTypes.IntegerType, false)
.add(SparkSessionManager.pathInColumnName, DataTypes.StringType, false)
.add(SparkSessionManager.tableLocationColumnName, DataTypes.StringType, false);

Row newRow = RowFactory.create(
0,
0,
"",
uri.toString()
);

Dataset<Row> newRowDataFrame = SparkSessionManager.getInstance()
.getOrCreateSession()
.createDataFrame(List.of(newRow), schema);

Dataset<Row> combinedDataFrame = dataFrame.union(newRowDataFrame);

combinedDataFrame.write().format("delta").mode("error").save(uri.toString());
return new BooleanItem(true);
} catch (RuntimeException e) {
e.printStackTrace();
return new BooleanItem(false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.rumbledb.runtime.functions.delta_lake;

import org.apache.commons.io.FileUtils;
import org.rumbledb.api.Item;
import org.rumbledb.context.DynamicContext;
import org.rumbledb.context.RuntimeStaticContext;
import org.rumbledb.exceptions.CannotRetrieveResourceException;
import org.rumbledb.items.BooleanItem;
import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator;
import org.rumbledb.runtime.RuntimeIterator;
import org.rumbledb.runtime.functions.input.FileSystemUtil;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;

public class DeleteDeltaLakeTableFunctionIterator extends AtMostOneItemLocalRuntimeIterator {

public DeleteDeltaLakeTableFunctionIterator(
List<RuntimeIterator> arguments,
RuntimeStaticContext staticContext
) {
super(arguments, staticContext);
}

@Override
public Item materializeFirstItemOrNull(DynamicContext context) {
RuntimeIterator urlIterator = this.children.get(0);
urlIterator.open(context);
String url = urlIterator.next().getStringValue();
urlIterator.close();
URI uri = FileSystemUtil.resolveURI(this.staticURI, url, getMetadata());
if (!FileSystemUtil.exists(uri, context.getRumbleRuntimeConfiguration(), getMetadata())) {
throw new CannotRetrieveResourceException("File " + uri + " not found.", getMetadata());
}

// URI tableURI = FileSystemUtil.resolveURIAgainstWorkingDirectory(
// this.currentAnnotation.getDeltaTablePath(),
// DeltaLakeConfigurationCatalogue.defaultDeltaLakeConfiguration,
// ExceptionMetadata.EMPTY_METADATA
// );

try {
File oldTable = new File(uri.getPath());
FileUtils.deleteDirectory(oldTable);
return new BooleanItem(true);
} catch (IOException e) {
e.printStackTrace();
return new BooleanItem(false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.rumbledb.runtime.functions.delta_lake;

import org.rumbledb.config.RumbleRuntimeConfiguration;

public class DeltaLakeConfigurationCatalogue {
static final RumbleRuntimeConfiguration defaultDeltaLakeConfiguration = new RumbleRuntimeConfiguration(
new String[] {
"--print-iterator-tree",
"yes",
"--output-format",
"delta",
"--show-error-info",
"yes",
"--apply-updates",
"yes",
}
);

static final RumbleRuntimeConfiguration createDeltaLakeConfiguration = new RumbleRuntimeConfiguration(
new String[] {
"--print-iterator-tree",
"yes",
"--output-format",
"delta",
"--show-error-info",
"yes",
"--apply-updates",
"yes",
}
);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,7]; Output="[ "SUCCESS" ]" :)
let $data := delta-file("./tempDeltaTable")
return delete json $data.new_array[[1]];
delta-file("./tempDeltaTable").new_array
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,5]; Output="SUCCESS" :)
let $data := delta-file("./tempDeltaTable")
return insert json "SUCCESS" into $data.new_array at position 1;
delta-file("./tempDeltaTable").new_array[[1]]
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,6]; Output="DOUBLE SUCCESS" :)
let $data := delta-file("./tempDeltaTable")
return replace value of json $data.new_array[[1]] with "DOUBLE SUCCESS";
delta-file("./tempDeltaTable").new_array[[1]]
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,4]; Output="null" :)
let $data := delta-file("./tempDeltaTable")
return delete json $data.success;
delta-file("./tempDeltaTable").success
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,1]; Output="(SUCCESS, [ "SUCCESS" ])" :)
let $data := delta-file("./tempDeltaTable")
return (insert json "new_ins" : "SUCCESS" into $data, insert json "new_array" : ["SUCCESS"] into $data);
(delta-file("./tempDeltaTable").new_ins, delta-file("./tempDeltaTable").new_array)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,3]; Output="DOUBLE SUCCESS" :)
let $data := delta-file("./tempDeltaTable")
return rename json $data.new_ins as "success";
delta-file("./tempDeltaTable").success
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,2]; Output="DOUBLE SUCCESS" :)
let $data := delta-file("./tempDeltaTable")
return replace value of json $data.new_ins with "DOUBLE SUCCESS";
delta-file("./tempDeltaTable").new_ins
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
(:JIQS: ShouldRun; UpdateDim=[5,0]; Output="true" :)
let $ret := create-delta-lake-table("./tempDeltaTable")
return $ret
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
(:JIQS: ShouldRun; UpdateDim=[5,8]; Output="true" :)
let $ret := delete-delta-lake-table("./tempDeltaTable")
return $ret

0 comments on commit e151cc8

Please sign in to comment.