Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delta lake functions #1268

Merged
merged 6 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.rumbledb.runtime.functions.datetime.components.TimezoneFromTimeFunctionIterator;
import org.rumbledb.runtime.functions.datetime.components.YearFromDateFunctionIterator;
import org.rumbledb.runtime.functions.datetime.components.YearFromDateTimeFunctionIterator;
import org.rumbledb.runtime.functions.delta_lake.CreateDeltaLakeTableFunctionIterator;
import org.rumbledb.runtime.functions.delta_lake.DeleteDeltaLakeTableFunctionIterator;
import org.rumbledb.runtime.functions.durations.components.DaysFromDurationFunctionIterator;
import org.rumbledb.runtime.functions.durations.components.HoursFromDurationFunctionIterator;
import org.rumbledb.runtime.functions.durations.components.ImplicitTimezoneIterator;
Expand Down Expand Up @@ -2688,6 +2690,28 @@ private static BuiltinFunction createBuiltinFunction(
BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL
);

/**
* function that creates a delta lake table at a given path location
*/
static final BuiltinFunction create_delta_lake_table = createBuiltinFunction(
new Name(Name.JN_NS, "jn", "create-delta-lake-table"),
"string",
"boolean",
CreateDeltaLakeTableFunctionIterator.class,
BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL
);

/**
* function that deletes a delta lake table at a given path location
*/
static final BuiltinFunction delete_delta_lake_table = createBuiltinFunction(
new Name(Name.JN_NS, "jn", "delete-delta-lake-table"),
"string",
"boolean",
DeleteDeltaLakeTableFunctionIterator.class,
BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL
);

static {
builtinFunctions = new HashMap<>();

Expand Down Expand Up @@ -2718,6 +2742,9 @@ private static BuiltinFunction createBuiltinFunction(
builtinFunctions.put(avro_file2.getIdentifier(), avro_file2);
builtinFunctions.put(parse_json.getIdentifier(), parse_json);

builtinFunctions.put(create_delta_lake_table.getIdentifier(), create_delta_lake_table);
builtinFunctions.put(delete_delta_lake_table.getIdentifier(), delete_delta_lake_table);

builtinFunctions.put(count.getIdentifier(), count);
builtinFunctions.put(boolean_function.getIdentifier(), boolean_function);
builtinFunctions.put(not_function.getIdentifier(), not_function);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package org.rumbledb.runtime.functions.delta_lake;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.rumbledb.api.Item;
import org.rumbledb.context.DynamicContext;
import org.rumbledb.context.RuntimeStaticContext;
import org.rumbledb.exceptions.CannotRetrieveResourceException;
import org.rumbledb.items.BooleanItem;
import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator;
import org.rumbledb.runtime.RuntimeIterator;
import org.rumbledb.runtime.functions.input.FileSystemUtil;
import org.apache.spark.sql.Row;
import sparksoniq.spark.SparkSessionManager;

import java.io.File;
import java.net.URI;
import java.util.List;

import static org.apache.spark.sql.functions.lit;
import static org.apache.spark.sql.functions.monotonically_increasing_id;

public class CreateDeltaLakeTableFunctionIterator extends AtMostOneItemLocalRuntimeIterator {

public CreateDeltaLakeTableFunctionIterator(
List<RuntimeIterator> arguments,
RuntimeStaticContext staticContext
) {
super(arguments, staticContext);
}

@Override
public Item materializeFirstItemOrNull(DynamicContext context) {
RuntimeIterator urlIterator = this.children.get(0);
urlIterator.open(context);
String url = urlIterator.next().getStringValue();
urlIterator.close();
URI uri = FileSystemUtil.resolveURI(this.staticURI, url, getMetadata());
if (FileSystemUtil.exists(uri, context.getRumbleRuntimeConfiguration(), getMetadata())) {
throw new CannotRetrieveResourceException(
"File " + uri + " already exists. Cannot create new delta lake table at this location.",
getMetadata()
);
}
try {
File directory = new File(uri.getPath());
if (!directory.exists()) {
boolean mkdirs = directory.mkdirs();
if (!mkdirs) {
throw new RuntimeException("Failed to create directory " + directory);
}
}
Dataset<Row> dataFrame = SparkSessionManager.getInstance()
.getOrCreateSession()
.emptyDataFrame();
dataFrame = dataFrame.withColumn(SparkSessionManager.mutabilityLevelColumnName, lit(0));
dataFrame = dataFrame.withColumn(SparkSessionManager.rowIdColumnName, monotonically_increasing_id());
dataFrame = dataFrame.withColumn(SparkSessionManager.pathInColumnName, lit(""));
dataFrame = dataFrame.withColumn(SparkSessionManager.tableLocationColumnName, lit(uri.toString()));

StructType schema = new StructType()
.add(SparkSessionManager.mutabilityLevelColumnName, DataTypes.IntegerType, false)
.add(SparkSessionManager.rowIdColumnName, DataTypes.IntegerType, false)
.add(SparkSessionManager.pathInColumnName, DataTypes.StringType, false)
.add(SparkSessionManager.tableLocationColumnName, DataTypes.StringType, false);

Row newRow = RowFactory.create(
0,
0,
"",
uri.toString()
);

Dataset<Row> newRowDataFrame = SparkSessionManager.getInstance()
.getOrCreateSession()
.createDataFrame(List.of(newRow), schema);

Dataset<Row> combinedDataFrame = dataFrame.union(newRowDataFrame);

combinedDataFrame.write().format("delta").mode("error").save(uri.toString());
return new BooleanItem(true);
} catch (RuntimeException e) {
e.printStackTrace();
return new BooleanItem(false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.rumbledb.runtime.functions.delta_lake;

import org.apache.commons.io.FileUtils;
import org.rumbledb.api.Item;
import org.rumbledb.context.DynamicContext;
import org.rumbledb.context.RuntimeStaticContext;
import org.rumbledb.exceptions.CannotRetrieveResourceException;
import org.rumbledb.items.BooleanItem;
import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator;
import org.rumbledb.runtime.RuntimeIterator;
import org.rumbledb.runtime.functions.input.FileSystemUtil;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;

public class DeleteDeltaLakeTableFunctionIterator extends AtMostOneItemLocalRuntimeIterator {

public DeleteDeltaLakeTableFunctionIterator(
List<RuntimeIterator> arguments,
RuntimeStaticContext staticContext
) {
super(arguments, staticContext);
}

@Override
public Item materializeFirstItemOrNull(DynamicContext context) {
RuntimeIterator urlIterator = this.children.get(0);
urlIterator.open(context);
String url = urlIterator.next().getStringValue();
urlIterator.close();
URI uri = FileSystemUtil.resolveURI(this.staticURI, url, getMetadata());
if (!FileSystemUtil.exists(uri, context.getRumbleRuntimeConfiguration(), getMetadata())) {
throw new CannotRetrieveResourceException("File " + uri + " not found.", getMetadata());
}

// URI tableURI = FileSystemUtil.resolveURIAgainstWorkingDirectory(
// this.currentAnnotation.getDeltaTablePath(),
// DeltaLakeConfigurationCatalogue.defaultDeltaLakeConfiguration,
// ExceptionMetadata.EMPTY_METADATA
// );

try {
File oldTable = new File(uri.getPath());
FileUtils.deleteDirectory(oldTable);
return new BooleanItem(true);
} catch (IOException e) {
e.printStackTrace();
return new BooleanItem(false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.rumbledb.runtime.functions.delta_lake;

import org.rumbledb.config.RumbleRuntimeConfiguration;

public class DeltaLakeConfigurationCatalogue {
static final RumbleRuntimeConfiguration defaultDeltaLakeConfiguration = new RumbleRuntimeConfiguration(
new String[] {
"--print-iterator-tree",
"yes",
"--output-format",
"delta",
"--show-error-info",
"yes",
"--apply-updates",
"yes",
}
);

static final RumbleRuntimeConfiguration createDeltaLakeConfiguration = new RumbleRuntimeConfiguration(
new String[] {
"--print-iterator-tree",
"yes",
"--output-format",
"delta",
"--show-error-info",
"yes",
"--apply-updates",
"yes",
}
);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,7]; Output="[ "SUCCESS" ]" :)
let $data := delta-file("./tempDeltaTable")
return delete json $data.new_array[[1]];
delta-file("./tempDeltaTable").new_array
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,5]; Output="SUCCESS" :)
let $data := delta-file("./tempDeltaTable")
return insert json "SUCCESS" into $data.new_array at position 1;
delta-file("./tempDeltaTable").new_array[[1]]
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,6]; Output="DOUBLE SUCCESS" :)
let $data := delta-file("./tempDeltaTable")
return replace value of json $data.new_array[[1]] with "DOUBLE SUCCESS";
delta-file("./tempDeltaTable").new_array[[1]]
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,4]; Output="null" :)
let $data := delta-file("./tempDeltaTable")
return delete json $data.success;
delta-file("./tempDeltaTable").success
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,1]; Output="(SUCCESS, [ "SUCCESS" ])" :)
let $data := delta-file("./tempDeltaTable")
return (insert json "new_ins" : "SUCCESS" into $data, insert json "new_array" : ["SUCCESS"] into $data);
(delta-file("./tempDeltaTable").new_ins, delta-file("./tempDeltaTable").new_array)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,3]; Output="DOUBLE SUCCESS" :)
let $data := delta-file("./tempDeltaTable")
return rename json $data.new_ins as "success";
delta-file("./tempDeltaTable").success
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(:JIQS: ShouldRun; UpdateDim=[5,2]; Output="DOUBLE SUCCESS" :)
let $data := delta-file("./tempDeltaTable")
return replace value of json $data.new_ins with "DOUBLE SUCCESS";
delta-file("./tempDeltaTable").new_ins
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
(:JIQS: ShouldRun; UpdateDim=[5,0]; Output="true" :)
let $ret := create-delta-lake-table("./tempDeltaTable")
return $ret
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
(:JIQS: ShouldRun; UpdateDim=[5,8]; Output="true" :)
let $ret := delete-delta-lake-table("./tempDeltaTable")
return $ret
Loading