Skip to content

Commit

Permalink
Merge pull request #1260 from David-C-L/deltalake-scripting-updates
Browse files Browse the repository at this point in the history
Deltalake scripting updates
  • Loading branch information
ghislainfourny authored Sep 20, 2024
2 parents 41053cc + b02e732 commit 7ee88c3
Show file tree
Hide file tree
Showing 13 changed files with 155 additions and 30 deletions.
10 changes: 0 additions & 10 deletions src/main/java/org/rumbledb/api/Rumble.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ public SequenceOfItems runQuery(String query) {
this.configuration
);

if (iterator.isUpdating()) {
PendingUpdateList pul = iterator.getPendingUpdateList(dynamicContext);
pul.applyUpdates(iterator.getMetadata());
}

return new SequenceOfItems(iterator, dynamicContext, this.configuration);
}

Expand All @@ -78,11 +73,6 @@ public SequenceOfItems runQuery(URI location) throws IOException {
this.configuration
);

if (iterator.isUpdating()) {
PendingUpdateList pul = iterator.getPendingUpdateList(dynamicContext);
pul.applyUpdates(iterator.getMetadata());
}

return new SequenceOfItems(iterator, dynamicContext, this.configuration);
}

Expand Down
45 changes: 31 additions & 14 deletions src/main/java/org/rumbledb/api/SequenceOfItems.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.rumbledb.items.ItemFactory;
import org.rumbledb.runtime.RuntimeIterator;

import org.rumbledb.runtime.update.PendingUpdateList;
import sparksoniq.spark.SparkSessionManager;

/**
Expand Down Expand Up @@ -52,7 +53,7 @@ public SequenceOfItems(
* Opens the iterator.
*/
public void open() {
if (!this.isUpdating()) {
if (this.isMaterialisable()) {
this.iterator.open(this.dynamicContext);
}
this.isOpen = true;
Expand All @@ -71,7 +72,7 @@ public boolean isOpen() {
* Closes the iterator.
*/
public void close() {
if (!this.isUpdating()) {
if (this.isOpen) {
this.iterator.close();
}
this.isOpen = false;
Expand All @@ -83,7 +84,7 @@ public void close() {
* @return true if there are more items, false otherwise.
*/
public boolean hasNext() {
if (this.isUpdating()) {
if (!this.isMaterialisable()) {
return false;
}
return this.iterator.hasNext();
Expand All @@ -96,7 +97,7 @@ public boolean hasNext() {
* @return the next item.
*/
public Item next() {
if (this.isUpdating()) {
if (!this.isMaterialisable()) {
return ItemFactory.getInstance().createNullItem();
}
return this.iterator.next();
Expand All @@ -120,14 +121,32 @@ public boolean availableAsDataFrame() {
return this.iterator.isDataFrame();
}

/**
* Returns whether the iterator is updating
*
* @return true if updating; otherwise false.
*/
public boolean availableAsPUL() {
return this.iterator.isUpdating();
}

/**
* Return whether the iterator of the sequence should be evaluated to materialise the sequence of items.
*
* @return true if materialisable; otherwise false
*/
private boolean isMaterialisable() {
return !(this.availableAsPUL() && !this.iterator.isSequential());
}

/**
* Returns the sequence of items as an RDD of Items rather than iterating over them locally.
* It is not possible to do so if the iterator is open.
*
* @return an RDD of Items.
*/
public JavaRDD<Item> getAsRDD() {
if (this.isUpdating()) {
if (!this.isMaterialisable()) {
return SparkSessionManager.getInstance().getJavaSparkContext().emptyRDD();
}
if (this.isOpen) {
Expand All @@ -143,7 +162,7 @@ public JavaRDD<Item> getAsRDD() {
* @return a data frame.
*/
public Dataset<Row> getAsDataFrame() {
if (this.isUpdating()) {
if (!this.isMaterialisable()) {
return SparkSessionManager.getInstance().getOrCreateSession().emptyDataFrame();
}
if (this.isOpen) {
Expand All @@ -153,12 +172,11 @@ public Dataset<Row> getAsDataFrame() {
}

/**
* Returns whether the iterator is updating
*
* @return true if updating; otherwise false.
* Applies the PUL available when the iterator is updating.
*/
public boolean isUpdating() {
return this.iterator.isUpdating();
public void applyPUL() {
PendingUpdateList pul = this.iterator.getPendingUpdateList(this.dynamicContext);
pul.applyUpdates(this.iterator.getMetadata());
}

/*
Expand All @@ -168,7 +186,7 @@ public boolean isUpdating() {
*/
public long populateList(List<Item> resultList) {
resultList.clear();
if (this.isUpdating()) {
if (!this.isMaterialisable()) {
return -1;
}
this.iterator.open(this.dynamicContext);
Expand Down Expand Up @@ -205,7 +223,7 @@ public long populateList(List<Item> resultList) {

public long populateListWithWarningOnlyIfCapReached(List<Item> resultList) {
if (this.availableAsRDD()) {
if (this.isUpdating()) {
if (!this.isMaterialisable()) {
return -1;
}
JavaRDD<Item> rdd = this.iterator.getRDD(this.dynamicContext);
Expand All @@ -215,5 +233,4 @@ public long populateListWithWarningOnlyIfCapReached(List<Item> resultList) {
}
}


}
8 changes: 8 additions & 0 deletions src/main/java/org/rumbledb/cli/JsoniqQueryExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.rumbledb.optimizations.Profiler;
import org.rumbledb.runtime.functions.input.FileSystemUtil;

import org.rumbledb.runtime.update.PendingUpdateList;
import sparksoniq.spark.SparkSessionManager;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -186,6 +187,10 @@ public List<Item> runQuery() throws IOException {
}
}

if (this.configuration.applyUpdates() && sequence.availableAsPUL() && outputPath != null) {
sequence.applyPUL();
}

long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
if (logPath != null) {
Expand Down Expand Up @@ -226,6 +231,9 @@ public long runInteractive(String query, List<Item> resultList) throws IOExcepti
if (!sequence.availableAsRDD()) {
return sequence.populateList(resultList);
}
if (this.configuration.applyUpdates() && sequence.availableAsPUL()) {
sequence.applyPUL();
}
resultList.clear();
JavaRDD<Item> rdd = sequence.getAsRDD();
return SparkSessionManager.collectRDDwithLimitWarningOnly(rdd, resultList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.rumbledb.expressions.scripting.loops.ReturnStatementClause;
import org.rumbledb.expressions.scripting.statement.StatementsAndExpr;
import org.rumbledb.expressions.scripting.statement.StatementsAndOptionalExpr;
import org.rumbledb.expressions.typing.TreatExpression;
import org.rumbledb.expressions.update.AppendExpression;
import org.rumbledb.expressions.update.CopyDeclaration;
import org.rumbledb.expressions.update.DeleteExpression;
Expand Down Expand Up @@ -139,6 +140,18 @@ public ExpressionClassification visitCommaExpression(
return expression.getExpressionClassification();
}

// Region Typing

@Override
public ExpressionClassification visitTreatExpression(TreatExpression expression, ExpressionClassification argument) {
ExpressionClassification result = this.visit(expression.getMainExpression(), argument);
expression.setExpressionClassification(result);
return result;
}


// Endregion

// Region FLWOR

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,7 @@ public RuntimeIterator visitTreatExpression(TreatExpression expression, RuntimeI
RuntimeIterator runtimeIterator = new TreatIterator(
childExpression,
expression.getSequenceType(),
expression.isUpdating(),
expression.errorCodeThatShouldBeThrown(),
expression.getStaticContextForRuntime(this.config, this.visitorConfig)
);
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/rumbledb/config/RumbleRuntimeConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class RumbleRuntimeConfiguration implements Serializable, KryoSerializabl
private boolean nativeExecution;
private boolean functionInlining;
private boolean thirdFeature;
private boolean applyUpdates;

private Map<String, String> shortcutMap;
private Set<String> yesNoShortcuts;
Expand Down Expand Up @@ -422,6 +423,12 @@ public void init() {
this.functionInlining = true;
}

if (this.arguments.containsKey("apply-updates")) {
this.applyUpdates = this.arguments.get("apply-updates").equals("yes");
} else {
this.applyUpdates = false;
}

if (this.arguments.containsKey("optimize-general-comparison-to-value-comparison")) {
this.optimizeGeneralComparisonToValueComparison = this.arguments.get(
"optimize-general-comparison-to-value-comparison"
Expand Down Expand Up @@ -644,6 +651,14 @@ public void setFunctionInlining(boolean b) {
this.functionInlining = b;
}

public boolean applyUpdates() {
return this.applyUpdates;
}

public void setApplyUpdates(boolean b) {
this.applyUpdates = b;
}

public boolean optimizeGeneralComparisonToValueComparison() {
return this.optimizeGeneralComparisonToValueComparison;
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/rumbledb/runtime/scripting/ProgramIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ protected Item nextLocal() {
return null;
}

@Override
public boolean isSequential() {
return this.statementsAndExprIterator.isSequential();
}

@Override
public boolean isUpdating() {
return this.statementsAndExprIterator.isUpdating();
}

@Override
public PendingUpdateList getPendingUpdateList(DynamicContext context) {
if (!encounteredExitStatement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ public JSoundDataFrame getDataFrame(DynamicContext dynamicContext) {
return exprIterator.getDataFrame(dynamicContext);
}

@Override
public boolean isUpdating() {
return this.children.get(this.children.size() - 1).isUpdating();
}

@Override
public PendingUpdateList getPendingUpdateList(DynamicContext context) {
RuntimeIterator exprIterator = this.children.get(this.children.size() - 1);
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/org/rumbledb/runtime/typing/TreatIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.rumbledb.runtime.HybridRuntimeIterator;
import org.rumbledb.runtime.RuntimeIterator;
import org.rumbledb.runtime.functions.sequences.general.TreatAsClosure;
import org.rumbledb.runtime.update.PendingUpdateList;
import org.rumbledb.types.ItemType;
import org.rumbledb.types.ItemTypeFactory;
import org.rumbledb.types.SequenceType;
Expand Down Expand Up @@ -51,12 +52,14 @@ public class TreatIterator extends HybridRuntimeIterator {
public TreatIterator(
RuntimeIterator iterator,
SequenceType sequenceType,
boolean isUpdating,
ErrorCode errorCode,
RuntimeStaticContext staticContext
) {
super(Collections.singletonList(iterator), staticContext);
this.iterator = iterator;
this.sequenceType = sequenceType;
this.isUpdating = isUpdating;
this.errorCode = errorCode;
if (!this.sequenceType.isEmptySequence()) {
this.itemType = this.sequenceType.getItemType();
Expand All @@ -73,6 +76,15 @@ public TreatIterator(
}
}

public TreatIterator(
RuntimeIterator iterator,
SequenceType sequenceType,
ErrorCode errorCode,
RuntimeStaticContext staticContext
) {
this(iterator, sequenceType, false, errorCode, staticContext);
}

@Override
public boolean hasNextLocal() {
return this.hasNext;
Expand Down Expand Up @@ -226,6 +238,11 @@ public JSoundDataFrame getDataFrame(DynamicContext dynamicContext) {
throw errorToThrow("" + dataItemType);
}

@Override
public PendingUpdateList getPendingUpdateList(DynamicContext context) {
return iterator.getPendingUpdateList(context);
}

/**
* Converts a homogeneous RDD of atomic values to a DataFrame
*
Expand Down
19 changes: 17 additions & 2 deletions src/test/java/iq/DeltaUpdateRuntimeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public RumbleRuntimeConfiguration getConfiguration() {
"yes",
"--show-error-info",
"yes",
"--apply-updates",
"yes",
"--materialization-cap",
"900000"
}
Expand All @@ -86,7 +88,9 @@ public RumbleRuntimeConfiguration getConfiguration() {
"--output-format",
"delta",
"--show-error-info",
"yes"
"yes",
"--apply-updates",
"yes",
}
);

Expand All @@ -97,7 +101,9 @@ public RumbleRuntimeConfiguration getConfiguration() {
"--output-format",
"delta",
"--show-error-info",
"yes"
"yes",
"--apply-updates",
"yes",
}
);

Expand Down Expand Up @@ -254,6 +260,7 @@ protected void checkExpectedOutput(

protected String runIterators(SequenceOfItems sequence) {
String actualOutput = getIteratorOutput(sequence);
applyPossibleUpdates(sequence);
return actualOutput;
}

Expand Down Expand Up @@ -304,10 +311,12 @@ && getConfiguration().getResultSizeCap() > 0)

private String getRDDResults(SequenceOfItems sequence) {
JavaRDD<Item> rdd = sequence.getAsRDD();
applyPossibleUpdates(sequence);
JavaRDD<String> output = rdd.map(o -> o.serialize());
List<String> collectedOutput = new ArrayList<String>();
SparkSessionManager.collectRDDwithLimitWarningOnly(output, collectedOutput);


if (collectedOutput.isEmpty()) {
return "";
}
Expand All @@ -329,4 +338,10 @@ private String getRDDResults(SequenceOfItems sequence) {
return result;
}

private void applyPossibleUpdates(SequenceOfItems sequence) {
if (getConfiguration().applyUpdates() && sequence.availableAsPUL()) {
sequence.applyPUL();
}
}

}
Loading

0 comments on commit 7ee88c3

Please sign in to comment.