Skip to content

Commit

Permalink
Merge branch 'Spark3.5.0' of github.com:RumbleDB/rumble into Spark3.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Ghislain Fourny committed Oct 28, 2024
2 parents f9fb6d3 + 673578d commit 4a6e6cb
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 67 deletions.
72 changes: 36 additions & 36 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,31 +209,31 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<artifactId>spark-core_2.13</artifactId>
<version>3.5.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.12</artifactId>
<artifactId>spark-mllib_2.13</artifactId>
<version>3.5.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<artifactId>hadoop-aws</artifactId>
<version>3.3.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<artifactId>spark-avro_2.13</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
Expand Down Expand Up @@ -277,37 +277,37 @@
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<!--<dependency>
<groupId>edu.vanderbilt.accre</groupId>
<artifactId>laurelin</artifactId>
<version>1.0.1</version>
</dependency>-->
<dependency>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-core</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-spark_2.12</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<!--<dependency>
<groupId>edu.vanderbilt.accre</groupId>
<artifactId>laurelin</artifactId>
<version>1.0.1</version>
</dependency>-->
<dependency>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-core</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-spark_2.13</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>

<distributionManagement>
<snapshotRepository>
Expand Down
17 changes: 12 additions & 5 deletions src/main/java/org/rumbledb/items/parsing/ItemParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@
import org.rumbledb.types.BuiltinTypesCatalogue;
import org.rumbledb.types.FieldDescriptor;
import org.rumbledb.types.ItemType;
import scala.collection.mutable.WrappedArray;
import scala.collection.immutable.ArraySeq;
import scala.collection.Iterator;

import sparksoniq.spark.SparkSessionManager;

import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Date;
Expand Down Expand Up @@ -589,9 +590,15 @@ private static Item convertValueToItem(
}
} else {
@SuppressWarnings("unchecked")
Object arrayObject = ((WrappedArray<Object>) o).array();
for (int index = 0; index < Array.getLength(arrayObject); index++) {
Object value = Array.get(arrayObject, index);
Iterator<Object> iterator = null;
if (o instanceof scala.collection.mutable.ArraySeq) {
iterator = ((scala.collection.mutable.ArraySeq<Object>) o).iterator();
} else {
iterator = ((ArraySeq<Object>) o).iterator();
}
while (iterator.hasNext()) {
Object value = iterator.next();

members.add(convertValueToItem(value, dataType, metadata, memberType));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import scala.collection.mutable.WrappedArray;
import scala.collection.immutable.ArraySeq;
import scala.collection.Iterator;
import sparksoniq.spark.SparkSessionManager;

public class FlworDataFrameUtils {
Expand Down Expand Up @@ -807,19 +808,20 @@ private static Object deserializeByteArray(byte[] toDeserialize, Kryo kryo, Inpu
}

public static void deserializeWrappedParameters(
WrappedArray<byte[]> wrappedParameters,
ArraySeq<byte[]> wrappedParameters,
List<List<Item>> deserializedParams,
Kryo kryo,
Input input
) {
Object[] serializedParams = (Object[]) wrappedParameters.array();
for (Object serializedParam : serializedParams) {
if (serializedParam == null) {
Iterator<byte[]> iterator = wrappedParameters.iterator();
while (iterator.hasNext()) {
byte[] bytes = iterator.next();
if (bytes == null) {
deserializedParams.add(Collections.emptyList());
continue;
}
@SuppressWarnings("unchecked")
List<Item> deserializedParam = (List<Item>) deserializeByteArray((byte[]) serializedParam, kryo, input);
List<Item> deserializedParam = (List<Item>) deserializeByteArray((byte[]) bytes, kryo, input);
deserializedParams.add(deserializedParam);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.rumbledb.api.Item;
import org.rumbledb.exceptions.OurBadException;

import scala.collection.mutable.WrappedArray;
import scala.collection.immutable.ArraySeq;
import scala.collection.Iterator;


import java.util.ArrayList;
import java.util.List;

public class GroupClauseArrayMergeAggregateResultsUDF implements UDF1<WrappedArray<Object>, Object[]> {
public class GroupClauseArrayMergeAggregateResultsUDF implements UDF1<ArraySeq<Object>, Object[]> {


private static final long serialVersionUID = 1L;
Expand All @@ -43,22 +45,23 @@ public GroupClauseArrayMergeAggregateResultsUDF() {
}

@Override
public Object[] call(WrappedArray<Object> wrappedParameters) {
public Object[] call(ArraySeq<Object> wrappedParameters) {
this.nextResult.clear();
this.deserializedParams.clear();
List<Object> result = new ArrayList<Object>();
Object[] insideArrays = (Object[]) wrappedParameters.array();
for (Object o : insideArrays) {
Iterator<Object> iterator = wrappedParameters.iterator();
while (iterator.hasNext()) {
Object o = iterator.next();
if (o instanceof Row) {
Row row = (Row) o;
result.add(row);
}
if (o instanceof WrappedArray) {
@SuppressWarnings("rawtypes")
WrappedArray wrappedArray = (WrappedArray) o;
Object[] insideArrays2 = (Object[]) wrappedArray.array();
for (Object p : insideArrays2)
result.add(p);
if (o instanceof ArraySeq) {
@SuppressWarnings("unchecked")
ArraySeq<Object> arraySeq = (ArraySeq<Object>) o;
Iterator<Object> iterator2 = arraySeq.iterator();
while (iterator2.hasNext())
result.add(iterator2.next());
} else {
throw new OurBadException("We cannot process " + o.getClass().getCanonicalName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import org.apache.spark.sql.api.java.UDF1;
import org.rumbledb.api.Item;
import org.rumbledb.runtime.flwor.FlworDataFrameUtils;
import scala.collection.mutable.WrappedArray;
import scala.collection.immutable.ArraySeq;

import java.util.ArrayList;
import java.util.List;

public class GroupClauseSerializeAggregateResultsUDF implements UDF1<WrappedArray<byte[]>, byte[]> {
public class GroupClauseSerializeAggregateResultsUDF implements UDF1<ArraySeq<byte[]>, byte[]> {


private static final long serialVersionUID = 1L;
Expand All @@ -43,7 +43,7 @@ public GroupClauseSerializeAggregateResultsUDF() {
}

@Override
public byte[] call(WrappedArray<byte[]> wrappedParameters) {
public byte[] call(ArraySeq<byte[]> wrappedParameters) {
this.nextResult.clear();
this.deserializedParams.clear();
FlworDataFrameUtils.deserializeWrappedParameters(
Expand Down
8 changes: 7 additions & 1 deletion src/test/java/iq/Bugs.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package iq;

import iq.base.AnnotationsTestsBase;
import scala.Function0;
import scala.util.Properties;

import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -51,7 +52,12 @@ public class Bugs extends AnnotationsTestsBase {
public static final String javaVersion =
System.getProperty("java.version");
public static final String scalaVersion =
Properties.scalaPropOrElse("version.number", "unknown");
Properties.scalaPropOrElse("version.number", new Function0<String>() {
@Override
public String apply() {
return "unknown";
}
});
protected static List<File> _testFiles = new ArrayList<>();
protected final File testFile;

Expand Down
8 changes: 7 additions & 1 deletion src/test/java/iq/DeltaUpdateRuntimeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.rumbledb.api.Item;
import org.rumbledb.api.SequenceOfItems;
import scala.util.Properties;
import scala.Function0;
import sparksoniq.spark.SparkSessionManager;
import utils.FileManager;

Expand All @@ -58,7 +59,12 @@ public class DeltaUpdateRuntimeTests extends AnnotationsTestsBase {
public static final String javaVersion =
System.getProperty("java.version");
public static final String scalaVersion =
Properties.scalaPropOrElse("version.number", "unknown");
Properties.scalaPropOrElse("version.number", new Function0<String>() {
@Override
public String apply() {
return "unknown";
}
});

public RumbleRuntimeConfiguration getConfiguration() {
return new RumbleRuntimeConfiguration(
Expand Down
11 changes: 10 additions & 1 deletion src/test/java/iq/RuntimeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import scala.util.Properties;
import sparksoniq.spark.SparkSessionManager;
import utils.FileManager;
import scala.Function0;
import scala.util.Properties;


import java.io.File;
import java.util.*;
Expand All @@ -51,7 +54,13 @@ public class RuntimeTests extends AnnotationsTestsBase {
public static final String javaVersion =
System.getProperty("java.version");
public static final String scalaVersion =
Properties.scalaPropOrElse("version.number", "unknown");
Properties.scalaPropOrElse("version.number", new Function0<String>() {
@Override
public String apply() {
return "unknown";
}
});

protected static List<File> _testFiles = new ArrayList<>();
protected final File testFile;

Expand Down
9 changes: 8 additions & 1 deletion src/test/java/iq/StaticTypeTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.rumbledb.config.RumbleRuntimeConfiguration;

import iq.base.AnnotationsTestsBase;
import scala.Function0;
import scala.util.Properties;

import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -37,7 +38,13 @@ public class StaticTypeTests extends AnnotationsTestsBase {
public static final String javaVersion =
System.getProperty("java.version");
public static final String scalaVersion =
Properties.scalaPropOrElse("version.number", "unknown");
Properties.scalaPropOrElse("version.number", new Function0<String>() {
@Override
public String apply() {
return "unknown";
}
});

protected static List<File> _testFiles = new ArrayList<>();
protected final File testFile;

Expand Down
9 changes: 8 additions & 1 deletion src/test/java/iq/UpdatesForRumbleBenchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.rumbledb.exceptions.ExceptionMetadata;
import org.rumbledb.runtime.functions.input.FileSystemUtil;
import scala.util.Properties;
import scala.Function0;
import sparksoniq.spark.SparkSessionManager;

import java.io.BufferedWriter;
Expand All @@ -28,7 +29,13 @@ public class UpdatesForRumbleBenchmark {
public static final String javaVersion =
System.getProperty("java.version");
public static final String scalaVersion =
Properties.scalaPropOrElse("version.number", "unknown");
Properties.scalaPropOrElse("version.number", new Function0<String>() {
@Override
public String apply() {
return "unknown";
}
});


public List<FileTuple> benchmarkFiles;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(:JIQS: ShouldRun; Output="({ "label" : 0, "name" : "a", "prediction" : [ "3", "4", "2", "6", "5" ] }, { "label" : 1, "name" : "b", "prediction" : [ "3", "4", "6", "5", "1" ] }, { "label" : 2, "name" : "c", "prediction" : [ "4", "2", "6", "5", "1" ] }, { "label" : 3, "name" : "d", "prediction" : [ "3", "2", "6", "5", "1" ] }, { "label" : 4, "name" : "e", "prediction" : [ "3", "4", "2", "6", "1" ] }, { "label" : 5, "name" : "f", "prediction" : [ "3", "4", "2", "5", "1" ] })" :)
(:JIQS: ShouldRun; Output="({ "label" : 0, "name" : "a", "prediction" : [ "4", "2", "3", "6", "5" ] }, { "label" : 1, "name" : "b", "prediction" : [ "4", "3", "6", "5", "1" ] }, { "label" : 2, "name" : "c", "prediction" : [ "4", "2", "6", "5", "1" ] }, { "label" : 3, "name" : "d", "prediction" : [ "2", "3", "6", "5", "1" ] }, { "label" : 4, "name" : "e", "prediction" : [ "4", "2", "3", "6", "1" ] }, { "label" : 5, "name" : "f", "prediction" : [ "4", "2", "3", "5", "1" ] })" :)
let $data := annotate(
json-file("../../../../queries/rumbleML/sample-ml-data-flat.json"),
{ "label": "integer", "binaryLabel": "integer", "name": "string", "age": "double", "weight": "double", "booleanCol": "boolean", "nullCol": "null", "stringCol": "string", "stringArrayCol": ["string"], "intArrayCol": ["integer"], "doubleArrayCol": ["double"], "doubleArrayArrayCol": [["double"]] }
Expand Down

0 comments on commit 4a6e6cb

Please sign in to comment.