diff --git a/.gitignore b/.gitignore index 4a4272142..45bb7f5d1 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,7 @@ target/ .ipynb_checkpoints/ *.iml /bin/ -*.jar +rumble*.jar *.*~ # for now ignore jsound package diff --git a/README.md b/README.md index 7e4866c15..d82971f2b 100644 --- a/README.md +++ b/README.md @@ -16,4 +16,4 @@ The documentation also contains an introduction specific to RumbleDB and how you [The documentation of the current master (for the adventurous and curious) is available here.](http://sparksoniq.readthedocs.io/en/latest/) -RumbleDB is an effort involving many researchers and ETH Zurich students: code and support by Stefan Irimescu, Ghislain Fourny, Gustavo Alonso, Renato Marroquin, Rodrigo Bruno, Falko Noé, Ioana Stefan, Andrea Rinaldi, Stevan Mihajlovic, Mario Arduini, Can Berker Çıkış, Elwin Stephan, David Dao, Zirun Wang, Ingo Müller, Dan-Ovidiu Graur, Thomas Zhou, Olivier Goerens, Alexandru Meterez, Remo Röthlisberger, Dominik Bruggisser, David Loughlin, David Buzatu. +RumbleDB is an effort involving many researchers and ETH Zurich students: code and support by Stefan Irimescu, Ghislain Fourny, Gustavo Alonso, Renato Marroquin, Rodrigo Bruno, Falko Noé, Ioana Stefan, Andrea Rinaldi, Stevan Mihajlovic, Mario Arduini, Can Berker Çıkış, Elwin Stephan, David Dao, Zirun Wang, Ingo Müller, Dan-Ovidiu Graur, Thomas Zhou, Olivier Goerens, Alexandru Meterez, Remo Röthlisberger, Dominik Bruggisser, David Loughlin, David Buzatu, Marco Schöb. diff --git a/docs/Getting started.md b/docs/Getting started.md index 64e878246..a06278e1c 100644 --- a/docs/Getting started.md +++ b/docs/Getting started.md @@ -78,11 +78,11 @@ If you use Linux, Florian Kellner also kindly contributed an [installation scrip RumbleDB requires an Apache Spark installation on Linux, Mac or Windows. -It is straightforward to directly [download it](https://spark.apache.org/downloads.html), unpack it and put it at a location of your choosing. We recommend to pick Spark 3.2.2. Let us call this location SPARK_HOME (it is a good idea, in fact to also define an environment variable SPARK_HOME pointing to the absolute path of this location). +It is straightforward to directly [download it](https://spark.apache.org/downloads.html), unpack it and put it at a location of your choosing. We recommend to pick Spark 3.4.3. Let us call this location SPARK_HOME (it is a good idea, in fact to also define an environment variable SPARK_HOME pointing to the absolute path of this location). What you need to do then is to add the subdirectory "bin" within the unpacked directory to the PATH variable. On macOS this is done by adding - export SPARK_HOME=/path/to/spark-3.2.2-bin-hadoop3.2 + export SPARK_HOME=/path/to/spark-3.4.3-bin-hadoop3.2 export PATH=$SPARK_HOME/bin:$PATH (with SPARK_HOME appropriately set to match your unzipped Spark directory) to the file .zshrc in your home directory, then making sure to force the change with @@ -111,9 +111,11 @@ Like Spark, RumbleDB is just a download and no installation is required. In order to run RumbleDB, you simply need to download one of the small .jar files from the [download page](https://github.com/RumbleDB/rumble/releases) and put it in a directory of your choice, for example, right besides your data. -If you use Spark 3.2+, use rumbledb-1.22.0-for-spark-3.2.jar. +If you use Spark 3.4+, use rumbledb-1.22.0-for-spark-3.4.jar. -If you use Spark 3.3+, use rumbledb-1.22.0-for-spark-3.3.jar. +If you use Spark 3.5+, use rumbledb-1.22.0-for-spark-3.5.jar. + +If you use Spark 4.0+ (preview), use rumbledb-1.22.0-for-spark-4.0.jar. These jars do not embed Spark, since you chose to set it up separately. They will work with your Spark installation with the spark-submit command. diff --git a/docs/install.md b/docs/install.md index 79454528d..6141f7c47 100644 --- a/docs/install.md +++ b/docs/install.md @@ -7,9 +7,9 @@ We show here how to install RumbleDB from the github repository if you wish to d The following software is required: - [Java SE](http://www.oracle.com/technetwork/java/javase/downloads/index.html) 8 (last tested on OpenJDK 8u251). The version of Java is important, as Spark only works with Java 8 or java 11. -- [Spark](https://spark.apache.org/), version 3.1.2 (for example) +- [Spark](https://spark.apache.org/), version 3.4.4 (for example) - [Ant](https://ant.apache.org/), version 1.11.1 -- [ANTLR](http://www.ant.org/), version 4.8 (supplied in our repository) +- [ANTLR](http://www.ant.org/), version 4.9.3 (supplied in our repository) - [Maven](https://maven.apache.org/) 3.6.0 Important: the ANTLR version varies with the Spark version, because Spark is also shipped with an ANTLR runtime (example: Spark 3.0 and 3.1 is with ANTLR 4.7, Spark 3.2 with ANTLR 4.8). The ANTLR runtime MUST match the ANTLR generator used to generate the RumbleDB jar file. diff --git a/pom.xml b/pom.xml index d468b6ca8..58e7d86b0 100644 --- a/pom.xml +++ b/pom.xml @@ -46,9 +46,10 @@ eclipse 1.8 1.8 - - ${project.basedir}/org.eclipse.jdt.core.prefs - + + -properties + ${project.basedir}/org.eclipse.jdt.core.prefs + @@ -63,6 +64,14 @@ + + org.apache.maven.plugins + maven-resources-plugin + 3.3.1 + + UTF-8 + + org.apache.maven.plugins maven-jar-plugin @@ -150,6 +159,7 @@ org.apache.maven.plugins maven-release-plugin + 3.0.1 true false diff --git a/src/main/java/org/rumbledb/cli/Main.java b/src/main/java/org/rumbledb/cli/Main.java index 436a14dc6..2a0e84e5e 100644 --- a/src/main/java/org/rumbledb/cli/Main.java +++ b/src/main/java/org/rumbledb/cli/Main.java @@ -37,12 +37,12 @@ public class Main { public static void main(String[] args) throws IOException { String javaVersion = System.getProperty("java.version"); - if (!javaVersion.startsWith("1.8") && !javaVersion.startsWith("11.")) { - System.err.println("[Error] RumbleDB requires Java 8 or Java 11."); + if (!javaVersion.startsWith("11.") && !javaVersion.startsWith("17.")) { + System.err.println("[Error] RumbleDB requires Java 11 or 17."); System.err.println("Your Java version: " + System.getProperty("java.version")); - System.err.println("You can download Java 8 or 11 from https://adoptium.net/"); + System.err.println("You can download Java 11 or 17 from https://adoptium.net/"); System.err.println( - "If you do have Java 8 or 11, but the wrong version appears above, then it means you need to set your JAVA_HOME environment variable properly to point to Java 8 or 11." + "If you do have Java 11 or 17, but the wrong version appears above, then it means you need to set your JAVA_HOME environment variable properly to point to Java 11 or 17." ); System.exit(43); } diff --git a/src/main/java/org/rumbledb/compiler/ExecutionModeVisitor.java b/src/main/java/org/rumbledb/compiler/ExecutionModeVisitor.java index 5e8a7ccc2..db847abd4 100644 --- a/src/main/java/org/rumbledb/compiler/ExecutionModeVisitor.java +++ b/src/main/java/org/rumbledb/compiler/ExecutionModeVisitor.java @@ -656,7 +656,11 @@ public StaticContext visitProlog(Prolog prolog, StaticContext argument) { @Override public StaticContext visitProgram(Program program, StaticContext argument) { visitDescendants(program, argument); - ExecutionMode mergedExecutionMode = getHighestExecutionModeFromStatements(exitStatementChildren); + ExecutionMode mergedExecutionMode = program.getStatementsAndOptionalExpr().getHighestExecutionMode(); + for (Statement statement : this.exitStatementChildren) { + ExecutionMode statementExecMode = statement.getHighestExecutionMode(this.visitorConfig); + mergedExecutionMode = getHighestExecutionMode(mergedExecutionMode, statementExecMode); + } program.setHighestExecutionMode(mergedExecutionMode); return argument; } diff --git a/src/main/java/org/rumbledb/compiler/SequentialClassificationVisitor.java b/src/main/java/org/rumbledb/compiler/SequentialClassificationVisitor.java index 812239241..72a47fa8b 100644 --- a/src/main/java/org/rumbledb/compiler/SequentialClassificationVisitor.java +++ b/src/main/java/org/rumbledb/compiler/SequentialClassificationVisitor.java @@ -57,7 +57,7 @@ public class SequentialClassificationVisitor extends AbstractNodeVisitor(); + this.variableBlockLevel = new HashMap<>(); } protected DescendentSequentialProperties defaultAction(Node node, DescendentSequentialProperties argument) { diff --git a/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java b/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java index 64a93bdf2..afca459f9 100644 --- a/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java +++ b/src/main/java/org/rumbledb/context/BuiltinFunctionCatalogue.java @@ -37,6 +37,8 @@ import org.rumbledb.runtime.functions.datetime.components.TimezoneFromTimeFunctionIterator; import org.rumbledb.runtime.functions.datetime.components.YearFromDateFunctionIterator; import org.rumbledb.runtime.functions.datetime.components.YearFromDateTimeFunctionIterator; +import org.rumbledb.runtime.functions.delta_lake.CreateDeltaLakeTableFunctionIterator; +import org.rumbledb.runtime.functions.delta_lake.DeleteDeltaLakeTableFunctionIterator; import org.rumbledb.runtime.functions.durations.components.DaysFromDurationFunctionIterator; import org.rumbledb.runtime.functions.durations.components.HoursFromDurationFunctionIterator; import org.rumbledb.runtime.functions.durations.components.ImplicitTimezoneIterator; @@ -2688,6 +2690,28 @@ private static BuiltinFunction createBuiltinFunction( BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL ); + /** + * function that creates a delta lake table at a given path location + */ + static final BuiltinFunction create_delta_lake_table = createBuiltinFunction( + new Name(Name.JN_NS, "jn", "create-delta-lake-table"), + "string", + "boolean", + CreateDeltaLakeTableFunctionIterator.class, + BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL + ); + + /** + * function that deletes a delta lake table at a given path location + */ + static final BuiltinFunction delete_delta_lake_table = createBuiltinFunction( + new Name(Name.JN_NS, "jn", "delete-delta-lake-table"), + "string", + "boolean", + DeleteDeltaLakeTableFunctionIterator.class, + BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL + ); + static { builtinFunctions = new HashMap<>(); @@ -2718,6 +2742,9 @@ private static BuiltinFunction createBuiltinFunction( builtinFunctions.put(avro_file2.getIdentifier(), avro_file2); builtinFunctions.put(parse_json.getIdentifier(), parse_json); + builtinFunctions.put(create_delta_lake_table.getIdentifier(), create_delta_lake_table); + builtinFunctions.put(delete_delta_lake_table.getIdentifier(), delete_delta_lake_table); + builtinFunctions.put(count.getIdentifier(), count); builtinFunctions.put(boolean_function.getIdentifier(), boolean_function); builtinFunctions.put(not_function.getIdentifier(), not_function); diff --git a/src/main/java/org/rumbledb/context/GlobalVariables.java b/src/main/java/org/rumbledb/context/GlobalVariables.java index ce93dec24..18754fb13 100644 --- a/src/main/java/org/rumbledb/context/GlobalVariables.java +++ b/src/main/java/org/rumbledb/context/GlobalVariables.java @@ -15,7 +15,7 @@ public class GlobalVariables implements Serializable, KryoSerializable { private Set globalVariables; public GlobalVariables() { - globalVariables = new HashSet<>(); + this.globalVariables = new HashSet<>(); } public void addGlobalVariable(Name globalVariable) { diff --git a/src/main/java/org/rumbledb/context/InScopeVariable.java b/src/main/java/org/rumbledb/context/InScopeVariable.java index 31510fc20..5edbf536f 100644 --- a/src/main/java/org/rumbledb/context/InScopeVariable.java +++ b/src/main/java/org/rumbledb/context/InScopeVariable.java @@ -83,6 +83,6 @@ public void read(Kryo kryo, Input input) { } public boolean isAssignable() { - return isAssignable; + return this.isAssignable; } } diff --git a/src/main/java/org/rumbledb/expressions/module/VariableDeclaration.java b/src/main/java/org/rumbledb/expressions/module/VariableDeclaration.java index 66b973b2a..e7c1f64f0 100644 --- a/src/main/java/org/rumbledb/expressions/module/VariableDeclaration.java +++ b/src/main/java/org/rumbledb/expressions/module/VariableDeclaration.java @@ -166,12 +166,12 @@ public void serializeToJSONiq(StringBuffer sb, int indent) { @Nullable public List getAnnotations() { - return annotations; + return this.annotations; } public boolean isAssignable() { - return isAssignable; + return this.isAssignable; } } diff --git a/src/main/java/org/rumbledb/expressions/primary/InlineFunctionExpression.java b/src/main/java/org/rumbledb/expressions/primary/InlineFunctionExpression.java index 95f05b526..399082c07 100644 --- a/src/main/java/org/rumbledb/expressions/primary/InlineFunctionExpression.java +++ b/src/main/java/org/rumbledb/expressions/primary/InlineFunctionExpression.java @@ -138,7 +138,7 @@ public StatementsAndOptionalExpr getBody() { @Nullable public List getAnnotations() { - return annotations; + return this.annotations; } @Override @@ -252,7 +252,7 @@ public void serializeToJSONiq(StringBuffer sb, int indent) { } public boolean hasSequentialPropertyAnnotation() { - return hasSequentialPropertyAnnotation; + return this.hasSequentialPropertyAnnotation; } public void setHasExitStatement(boolean hasExitStatement) { diff --git a/src/main/java/org/rumbledb/expressions/scripting/annotations/Annotation.java b/src/main/java/org/rumbledb/expressions/scripting/annotations/Annotation.java index 888a0a073..d56dbf09e 100644 --- a/src/main/java/org/rumbledb/expressions/scripting/annotations/Annotation.java +++ b/src/main/java/org/rumbledb/expressions/scripting/annotations/Annotation.java @@ -20,11 +20,11 @@ public Annotation(Name annotationName, List literals) { } public Name getAnnotationName() { - return annotationName; + return this.annotationName; } public List getLiterals() { - return literals; + return this.literals; } public static boolean checkAssignable( diff --git a/src/main/java/org/rumbledb/expressions/scripting/block/BlockStatement.java b/src/main/java/org/rumbledb/expressions/scripting/block/BlockStatement.java index 075209262..281bc1989 100644 --- a/src/main/java/org/rumbledb/expressions/scripting/block/BlockStatement.java +++ b/src/main/java/org/rumbledb/expressions/scripting/block/BlockStatement.java @@ -27,7 +27,7 @@ public T accept(AbstractNodeVisitor visitor, T argument) { @Override public List getChildren() { List result = new ArrayList<>(); - blockStatements.forEach(statement -> { + this.blockStatements.forEach(statement -> { if (statement != null) { result.add(statement); } @@ -49,6 +49,6 @@ public void serializeToJSONiq(StringBuffer sb, int indent) { } public List getBlockStatements() { - return blockStatements; + return this.blockStatements; } } diff --git a/src/main/java/org/rumbledb/expressions/scripting/control/SwitchStatement.java b/src/main/java/org/rumbledb/expressions/scripting/control/SwitchStatement.java index a19182d91..f6bc43efc 100644 --- a/src/main/java/org/rumbledb/expressions/scripting/control/SwitchStatement.java +++ b/src/main/java/org/rumbledb/expressions/scripting/control/SwitchStatement.java @@ -74,14 +74,14 @@ public void serializeToJSONiq(StringBuffer sb, int indent) { } public Expression getTestCondition() { - return testCondition; + return this.testCondition; } public List getCases() { - return cases; + return this.cases; } public Statement getDefaultStatement() { - return defaultStatement; + return this.defaultStatement; } } diff --git a/src/main/java/org/rumbledb/expressions/scripting/declaration/CommaVariableDeclStatement.java b/src/main/java/org/rumbledb/expressions/scripting/declaration/CommaVariableDeclStatement.java index b39f605b6..5eb42d9bc 100644 --- a/src/main/java/org/rumbledb/expressions/scripting/declaration/CommaVariableDeclStatement.java +++ b/src/main/java/org/rumbledb/expressions/scripting/declaration/CommaVariableDeclStatement.java @@ -23,18 +23,18 @@ public T accept(AbstractNodeVisitor visitor, T argument) { @Override public List getChildren() { - return new ArrayList<>(variables); + return new ArrayList<>(this.variables); } @Override public void serializeToJSONiq(StringBuffer sb, int indent) { indentIt(sb, indent); - for (VariableDeclStatement variableDeclStatement : variables) { + for (VariableDeclStatement variableDeclStatement : this.variables) { variableDeclStatement.serializeToJSONiq(sb, 0); } } public List getVariables() { - return variables; + return this.variables; } } diff --git a/src/main/java/org/rumbledb/expressions/scripting/declaration/VariableDeclStatement.java b/src/main/java/org/rumbledb/expressions/scripting/declaration/VariableDeclStatement.java index 5f3d3fb7e..7aa12568d 100644 --- a/src/main/java/org/rumbledb/expressions/scripting/declaration/VariableDeclStatement.java +++ b/src/main/java/org/rumbledb/expressions/scripting/declaration/VariableDeclStatement.java @@ -103,10 +103,10 @@ public Expression getVariableExpression() { } public List getAnnotations() { - return annotations; + return this.annotations; } public boolean isAssignable() { - return isAssignable; + return this.isAssignable; } } diff --git a/src/main/java/org/rumbledb/expressions/scripting/loops/ExitStatement.java b/src/main/java/org/rumbledb/expressions/scripting/loops/ExitStatement.java index abe9f8a28..eec89443d 100644 --- a/src/main/java/org/rumbledb/expressions/scripting/loops/ExitStatement.java +++ b/src/main/java/org/rumbledb/expressions/scripting/loops/ExitStatement.java @@ -34,7 +34,7 @@ public void serializeToJSONiq(StringBuffer sb, int indent) { } public Expression getExitExpression() { - return exitExpression; + return this.exitExpression; } } diff --git a/src/main/java/org/rumbledb/expressions/scripting/loops/FlowrStatement.java b/src/main/java/org/rumbledb/expressions/scripting/loops/FlowrStatement.java index ab9c208e8..1954f52fa 100644 --- a/src/main/java/org/rumbledb/expressions/scripting/loops/FlowrStatement.java +++ b/src/main/java/org/rumbledb/expressions/scripting/loops/FlowrStatement.java @@ -48,6 +48,6 @@ public void serializeToJSONiq(StringBuffer sb, int indent) { } public ReturnStatementClause getReturnStatementClause() { - return returnStatementClause; + return this.returnStatementClause; } } diff --git a/src/main/java/org/rumbledb/expressions/scripting/loops/ReturnStatementClause.java b/src/main/java/org/rumbledb/expressions/scripting/loops/ReturnStatementClause.java index 6eb09f720..49c67fe7e 100644 --- a/src/main/java/org/rumbledb/expressions/scripting/loops/ReturnStatementClause.java +++ b/src/main/java/org/rumbledb/expressions/scripting/loops/ReturnStatementClause.java @@ -46,6 +46,6 @@ public void serializeToJSONiq(StringBuffer sb, int indent) { } public Statement getReturnStatement() { - return returnStatement; + return this.returnStatement; } } diff --git a/src/main/java/org/rumbledb/expressions/scripting/loops/WhileStatement.java b/src/main/java/org/rumbledb/expressions/scripting/loops/WhileStatement.java index dfbe3bc2a..72172b06b 100644 --- a/src/main/java/org/rumbledb/expressions/scripting/loops/WhileStatement.java +++ b/src/main/java/org/rumbledb/expressions/scripting/loops/WhileStatement.java @@ -27,8 +27,8 @@ public T accept(AbstractNodeVisitor visitor, T argument) { @Override public List getChildren() { List result = new ArrayList<>(); - result.add(testCondition); - result.add(statement); + result.add(this.testCondition); + result.add(this.statement); return result; } @@ -43,10 +43,10 @@ public void serializeToJSONiq(StringBuffer sb, int indent) { } public Expression getTestCondition() { - return testCondition; + return this.testCondition; } public Statement getStatement() { - return statement; + return this.statement; } } diff --git a/src/main/java/org/rumbledb/runtime/functions/StaticUserDefinedFunctionCallIterator.java b/src/main/java/org/rumbledb/runtime/functions/StaticUserDefinedFunctionCallIterator.java index 326e01ae7..c3e7af696 100644 --- a/src/main/java/org/rumbledb/runtime/functions/StaticUserDefinedFunctionCallIterator.java +++ b/src/main/java/org/rumbledb/runtime/functions/StaticUserDefinedFunctionCallIterator.java @@ -131,7 +131,7 @@ protected void closeLocal() { public void setNextResult() { this.nextResult = null; - if (!encounteredExitStatement) { + if (!this.encounteredExitStatement) { try { if (this.userDefinedFunctionCallIterator.hasNext()) { this.nextResult = this.userDefinedFunctionCallIterator.next(); diff --git a/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java b/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java new file mode 100644 index 000000000..50247351c --- /dev/null +++ b/src/main/java/org/rumbledb/runtime/functions/delta_lake/CreateDeltaLakeTableFunctionIterator.java @@ -0,0 +1,89 @@ +package org.rumbledb.runtime.functions.delta_lake; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.rumbledb.api.Item; +import org.rumbledb.context.DynamicContext; +import org.rumbledb.context.RuntimeStaticContext; +import org.rumbledb.exceptions.CannotRetrieveResourceException; +import org.rumbledb.items.BooleanItem; +import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator; +import org.rumbledb.runtime.RuntimeIterator; +import org.rumbledb.runtime.functions.input.FileSystemUtil; +import org.apache.spark.sql.Row; +import sparksoniq.spark.SparkSessionManager; + +import java.io.File; +import java.net.URI; +import java.util.List; + +import static org.apache.spark.sql.functions.lit; +import static org.apache.spark.sql.functions.monotonically_increasing_id; + +public class CreateDeltaLakeTableFunctionIterator extends AtMostOneItemLocalRuntimeIterator { + + public CreateDeltaLakeTableFunctionIterator( + List arguments, + RuntimeStaticContext staticContext + ) { + super(arguments, staticContext); + } + + @Override + public Item materializeFirstItemOrNull(DynamicContext context) { + RuntimeIterator urlIterator = this.children.get(0); + urlIterator.open(context); + String url = urlIterator.next().getStringValue(); + urlIterator.close(); + URI uri = FileSystemUtil.resolveURI(this.staticURI, url, getMetadata()); + if (FileSystemUtil.exists(uri, context.getRumbleRuntimeConfiguration(), getMetadata())) { + throw new CannotRetrieveResourceException( + "File " + uri + " already exists. Cannot create new delta lake table at this location.", + getMetadata() + ); + } + try { + File directory = new File(uri.getPath()); + if (!directory.exists()) { + boolean mkdirs = directory.mkdirs(); + if (!mkdirs) { + throw new RuntimeException("Failed to create directory " + directory); + } + } + Dataset dataFrame = SparkSessionManager.getInstance() + .getOrCreateSession() + .emptyDataFrame(); + dataFrame = dataFrame.withColumn(SparkSessionManager.mutabilityLevelColumnName, lit(0)); + dataFrame = dataFrame.withColumn(SparkSessionManager.rowIdColumnName, monotonically_increasing_id()); + dataFrame = dataFrame.withColumn(SparkSessionManager.pathInColumnName, lit("")); + dataFrame = dataFrame.withColumn(SparkSessionManager.tableLocationColumnName, lit(uri.toString())); + + StructType schema = new StructType() + .add(SparkSessionManager.mutabilityLevelColumnName, DataTypes.IntegerType, false) + .add(SparkSessionManager.rowIdColumnName, DataTypes.IntegerType, false) + .add(SparkSessionManager.pathInColumnName, DataTypes.StringType, false) + .add(SparkSessionManager.tableLocationColumnName, DataTypes.StringType, false); + + Row newRow = RowFactory.create( + 0, + 0, + "", + uri.toString() + ); + + Dataset newRowDataFrame = SparkSessionManager.getInstance() + .getOrCreateSession() + .createDataFrame(List.of(newRow), schema); + + Dataset combinedDataFrame = dataFrame.union(newRowDataFrame); + + combinedDataFrame.write().format("delta").mode("error").save(uri.toString()); + return new BooleanItem(true); + } catch (RuntimeException e) { + e.printStackTrace(); + return new BooleanItem(false); + } + } +} diff --git a/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java new file mode 100644 index 000000000..c62424558 --- /dev/null +++ b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeleteDeltaLakeTableFunctionIterator.java @@ -0,0 +1,53 @@ +package org.rumbledb.runtime.functions.delta_lake; + +import org.apache.commons.io.FileUtils; +import org.rumbledb.api.Item; +import org.rumbledb.context.DynamicContext; +import org.rumbledb.context.RuntimeStaticContext; +import org.rumbledb.exceptions.CannotRetrieveResourceException; +import org.rumbledb.items.BooleanItem; +import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator; +import org.rumbledb.runtime.RuntimeIterator; +import org.rumbledb.runtime.functions.input.FileSystemUtil; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; + +public class DeleteDeltaLakeTableFunctionIterator extends AtMostOneItemLocalRuntimeIterator { + + public DeleteDeltaLakeTableFunctionIterator( + List arguments, + RuntimeStaticContext staticContext + ) { + super(arguments, staticContext); + } + + @Override + public Item materializeFirstItemOrNull(DynamicContext context) { + RuntimeIterator urlIterator = this.children.get(0); + urlIterator.open(context); + String url = urlIterator.next().getStringValue(); + urlIterator.close(); + URI uri = FileSystemUtil.resolveURI(this.staticURI, url, getMetadata()); + if (!FileSystemUtil.exists(uri, context.getRumbleRuntimeConfiguration(), getMetadata())) { + throw new CannotRetrieveResourceException("File " + uri + " not found.", getMetadata()); + } + + // URI tableURI = FileSystemUtil.resolveURIAgainstWorkingDirectory( + // this.currentAnnotation.getDeltaTablePath(), + // DeltaLakeConfigurationCatalogue.defaultDeltaLakeConfiguration, + // ExceptionMetadata.EMPTY_METADATA + // ); + + try { + File oldTable = new File(uri.getPath()); + FileUtils.deleteDirectory(oldTable); + return new BooleanItem(true); + } catch (IOException e) { + e.printStackTrace(); + return new BooleanItem(false); + } + } +} diff --git a/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java new file mode 100644 index 000000000..b06283ebc --- /dev/null +++ b/src/main/java/org/rumbledb/runtime/functions/delta_lake/DeltaLakeConfigurationCatalogue.java @@ -0,0 +1,32 @@ +package org.rumbledb.runtime.functions.delta_lake; + +import org.rumbledb.config.RumbleRuntimeConfiguration; + +public class DeltaLakeConfigurationCatalogue { + static final RumbleRuntimeConfiguration defaultDeltaLakeConfiguration = new RumbleRuntimeConfiguration( + new String[] { + "--print-iterator-tree", + "yes", + "--output-format", + "delta", + "--show-error-info", + "yes", + "--apply-updates", + "yes", + } + ); + + static final RumbleRuntimeConfiguration createDeltaLakeConfiguration = new RumbleRuntimeConfiguration( + new String[] { + "--print-iterator-tree", + "yes", + "--output-format", + "delta", + "--show-error-info", + "yes", + "--apply-updates", + "yes", + } + ); + +} diff --git a/src/main/java/org/rumbledb/runtime/scripting/ProgramIterator.java b/src/main/java/org/rumbledb/runtime/scripting/ProgramIterator.java index 91970ed72..0d7201275 100644 --- a/src/main/java/org/rumbledb/runtime/scripting/ProgramIterator.java +++ b/src/main/java/org/rumbledb/runtime/scripting/ProgramIterator.java @@ -89,7 +89,7 @@ private void setPULFromExitStatement(ExitStatementException exitStatementExcepti @Override protected Item nextLocal() { - if (!encounteredExitStatement) { + if (!this.encounteredExitStatement) { try { return this.statementsAndExprIterator.next(); } catch (ExitStatementException exitStatementException) { @@ -117,7 +117,7 @@ public boolean isUpdating() { @Override public PendingUpdateList getPendingUpdateList(DynamicContext context) { - if (!encounteredExitStatement) { + if (!this.encounteredExitStatement) { return this.statementsAndExprIterator.getPendingUpdateList(context); } return this.pendingUpdateList; diff --git a/src/main/java/org/rumbledb/runtime/scripting/block/StatementsWithExprIterator.java b/src/main/java/org/rumbledb/runtime/scripting/block/StatementsWithExprIterator.java index bf035392d..a80021b76 100644 --- a/src/main/java/org/rumbledb/runtime/scripting/block/StatementsWithExprIterator.java +++ b/src/main/java/org/rumbledb/runtime/scripting/block/StatementsWithExprIterator.java @@ -32,7 +32,7 @@ public StatementsWithExprIterator( this.children.addAll(statements); this.children.add(exprIterator); - for (RuntimeIterator child : children) { + for (RuntimeIterator child : this.children) { if (child.isSequential()) { this.isSequential = child.isSequential(); } diff --git a/src/main/java/org/rumbledb/runtime/scripting/loops/ExitStatementIterator.java b/src/main/java/org/rumbledb/runtime/scripting/loops/ExitStatementIterator.java index ca07d7336..5500dedea 100644 --- a/src/main/java/org/rumbledb/runtime/scripting/loops/ExitStatementIterator.java +++ b/src/main/java/org/rumbledb/runtime/scripting/loops/ExitStatementIterator.java @@ -72,7 +72,9 @@ protected Item nextLocal() { this.result = this.childIterator.materialize(this.currentDynamicContextForLocalExecution); this.pendingUpdateList = new PendingUpdateList(); if (this.childIterator.isUpdating()) { - pendingUpdateList = this.childIterator.getPendingUpdateList(this.currentDynamicContextForLocalExecution); + this.pendingUpdateList = this.childIterator.getPendingUpdateList( + this.currentDynamicContextForLocalExecution + ); } throw new ExitStatementException( this.pendingUpdateList, diff --git a/src/main/java/org/rumbledb/runtime/typing/TreatIterator.java b/src/main/java/org/rumbledb/runtime/typing/TreatIterator.java index 39f972fe1..f3b9d643e 100644 --- a/src/main/java/org/rumbledb/runtime/typing/TreatIterator.java +++ b/src/main/java/org/rumbledb/runtime/typing/TreatIterator.java @@ -240,7 +240,7 @@ public JSoundDataFrame getDataFrame(DynamicContext dynamicContext) { @Override public PendingUpdateList getPendingUpdateList(DynamicContext context) { - return iterator.getPendingUpdateList(context); + return this.iterator.getPendingUpdateList(context); } /** diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq new file mode 100644 index 000000000..fe541dcd8 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayDelete.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,7]; Output="[ "SUCCESS" ]" :) +let $data := delta-file("./tempDeltaTable") +return delete json $data.new_array[[1]]; +delta-file("./tempDeltaTable").new_array \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq new file mode 100644 index 000000000..e74f7460f --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayInsert.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,5]; Output="SUCCESS" :) +let $data := delta-file("./tempDeltaTable") +return insert json "SUCCESS" into $data.new_array at position 1; +delta-file("./tempDeltaTable").new_array[[1]] \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq new file mode 100644 index 000000000..12858250d --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaArrayReplace.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,6]; Output="DOUBLE SUCCESS" :) +let $data := delta-file("./tempDeltaTable") +return replace value of json $data.new_array[[1]] with "DOUBLE SUCCESS"; +delta-file("./tempDeltaTable").new_array[[1]] \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq new file mode 100644 index 000000000..177b46c42 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectDelete.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,4]; Output="null" :) +let $data := delta-file("./tempDeltaTable") +return delete json $data.success; +delta-file("./tempDeltaTable").success \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq new file mode 100644 index 000000000..19bd6bade --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectInsert.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,1]; Output="(SUCCESS, [ "SUCCESS" ])" :) +let $data := delta-file("./tempDeltaTable") +return (insert json "new_ins" : "SUCCESS" into $data, insert json "new_array" : ["SUCCESS"] into $data); +(delta-file("./tempDeltaTable").new_ins, delta-file("./tempDeltaTable").new_array) \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq new file mode 100644 index 000000000..5614b20b5 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectRename.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,3]; Output="DOUBLE SUCCESS" :) +let $data := delta-file("./tempDeltaTable") +return rename json $data.new_ins as "success"; +delta-file("./tempDeltaTable").success \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq new file mode 100644 index 000000000..212df1297 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionDeltaObjectReplace.jq @@ -0,0 +1,4 @@ +(:JIQS: ShouldRun; UpdateDim=[5,2]; Output="DOUBLE SUCCESS" :) +let $data := delta-file("./tempDeltaTable") +return replace value of json $data.new_ins with "DOUBLE SUCCESS"; +delta-file("./tempDeltaTable").new_ins \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq new file mode 100644 index 000000000..2003ce8a5 --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsCreateDeltaLakeTable.jq @@ -0,0 +1,3 @@ +(:JIQS: ShouldRun; UpdateDim=[5,0]; Output="true" :) +let $ret := create-delta-lake-table("./tempDeltaTable") +return $ret \ No newline at end of file diff --git a/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq new file mode 100644 index 000000000..55473e11d --- /dev/null +++ b/src/test/resources/test_files/runtime-delta-updates/delta-lake-functions/FunctionsDeleteDeltaLakeTable.jq @@ -0,0 +1,3 @@ +(:JIQS: ShouldRun; UpdateDim=[5,8]; Output="true" :) +let $ret := delete-delta-lake-table("./tempDeltaTable") +return $ret \ No newline at end of file