diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/SchemaPropagationResource.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/SchemaPropagationResource.scala index cb413a914da..98cc2e96a1c 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/SchemaPropagationResource.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/resource/SchemaPropagationResource.scala @@ -40,7 +40,14 @@ class SchemaPropagationResource { val responseContent = schemaPropagationResult.map(e => (e._1.operator, e._2.map(s => s.map(o => o.getAttributesScala))) ) - SchemaPropagationResponse(0, responseContent, null) + // remove internal incremental computation columns + val responseContentCleaned = responseContent.map(kv => { + val schemaWithoutInternalAttrs = kv._2.map(portSchema => { + portSchema.map(attrs => attrs.filter(attr => !attr.getName.startsWith("__internal"))) + }) + (kv._1, schemaWithoutInternalAttrs) + }) + SchemaPropagationResponse(0, responseContentCleaned, null) } catch { case e: Throwable => e.printStackTrace() diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/ProgressiveUtils.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/ProgressiveUtils.scala index 67a42bde311..befac8aa088 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/ProgressiveUtils.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/ProgressiveUtils.scala @@ -20,11 +20,19 @@ object ProgressiveUtils { Tuple.newBuilder(outputSchema).add(insertRetractFlagAttr, true).add(tuple).build } + def addInsertionFlag(fields: Array[Object], outputSchema: Schema): Tuple = { + Tuple.newBuilder(outputSchema).add(insertRetractFlagAttr, true).addSequentially(fields).build + } + def addRetractionFlag(tuple: Tuple, outputSchema: Schema): Tuple = { assert(!tuple.getSchema.containsAttribute(insertRetractFlagAttr.getName)) Tuple.newBuilder(outputSchema).add(insertRetractFlagAttr, false).add(tuple).build } + def addRetractionFlag(fields: Array[Object], outputSchema: Schema): Tuple = { + Tuple.newBuilder(outputSchema).add(insertRetractFlagAttr, false).addSequentially(fields).build + } + def isInsertion(tuple: Tuple): Boolean = { if (tuple.getSchema.containsAttribute(insertRetractFlagAttr.getName)) { tuple.getField[Boolean](insertRetractFlagAttr.getName) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/metadata/OperatorMetadataGenerator.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/metadata/OperatorMetadataGenerator.scala index c06528c2e3d..b61461a9947 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/metadata/OperatorMetadataGenerator.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/metadata/OperatorMetadataGenerator.scala @@ -32,7 +32,8 @@ case class OperatorInfo( dynamicInputPorts: Boolean = false, dynamicOutputPorts: Boolean = false, supportReconfiguration: Boolean = false, - allowPortCustomization: Boolean = false + allowPortCustomization: Boolean = false, + supportRetractableInput: Boolean = false ) case class OperatorMetadata( diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/OperatorDescriptor.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/OperatorDescriptor.scala index 3ee112de7b8..0f882db7bf3 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/OperatorDescriptor.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/OperatorDescriptor.scala @@ -17,7 +17,7 @@ import edu.uci.ics.texera.workflow.operators.difference.DifferenceOpDesc import edu.uci.ics.texera.workflow.operators.distinct.DistinctOpDesc import edu.uci.ics.texera.workflow.operators.download.BulkDownloaderOpDesc import edu.uci.ics.texera.workflow.operators.filter.SpecializedFilterOpDesc -import edu.uci.ics.texera.workflow.operators.hashJoin.HashJoinOpDesc +import edu.uci.ics.texera.workflow.operators.hashJoin.{HashJoinOpDesc, IncrementalJoinOpDesc} import edu.uci.ics.texera.workflow.operators.intersect.IntersectOpDesc import edu.uci.ics.texera.workflow.operators.intervalJoin.IntervalJoinOpDesc import edu.uci.ics.texera.workflow.operators.keywordSearch.KeywordSearchOpDesc @@ -135,6 +135,7 @@ trait StateTransferFunc new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"), new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"), new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"), + new Type(value = classOf[IncrementalJoinOpDesc[String]], name = "IncrementalJoin"), new Type(value = classOf[DistinctOpDesc], name = "Distinct"), new Type(value = classOf[IntersectOpDesc], name = "Intersect"), new Type(value = classOf[SymmetricDifferenceOpDesc], name = "SymmetricDifference"), diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/aggregate/FinalAggregateOpExec.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/aggregate/FinalAggregateOpExec.scala index 769a0de2b49..e5d5d0e7850 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/aggregate/FinalAggregateOpExec.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/aggregate/FinalAggregateOpExec.scala @@ -3,6 +3,7 @@ package edu.uci.ics.texera.workflow.common.operators.aggregate import edu.uci.ics.amber.engine.architecture.worker.PauseManager import edu.uci.ics.amber.engine.common.InputExhausted import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient +import edu.uci.ics.texera.workflow.common.ProgressiveUtils.{addInsertionFlag, addRetractionFlag} import edu.uci.ics.texera.workflow.common.operators.OperatorExecutor import edu.uci.ics.texera.workflow.common.operators.aggregate.PartialAggregateOpExec.internalAggObjKey import edu.uci.ics.texera.workflow.common.tuple.Tuple @@ -17,11 +18,19 @@ class FinalAggregateOpExec( ) extends OperatorExecutor { var groupByKeyAttributes: Array[Attribute] = _ - var schema: Schema = _ + var outputSchema: Schema = operatorSchemaInfo.outputSchemas(0) // each value in partialObjectsPerKey has the same length as aggFuncs // partialObjectsPerKey(key)[i] corresponds to aggFuncs[i] - var partialObjectsPerKey = new mutable.HashMap[List[Object], List[Object]]() + private var partialObjectsPerKey = Map[List[Object], List[Object]]() + + // for incremental computation + // the time interval that aggregate operator emits incremental update to downstream + val UPDATE_INTERVAL_MS = 1000 + // the timestamp of the last incremental update + private var lastUpdatedTime: Long = 0 + // the aggregation state at the last output, used to compute diff with the next output + private var previousAggResults = Map[List[Object], List[Object]]() override def open(): Unit = {} override def close(): Unit = {} @@ -37,37 +46,79 @@ class FinalAggregateOpExec( } tuple match { case Left(t) => - val key = - if (groupByKeys == null || groupByKeys.isEmpty) List() - else groupByKeys.map(k => t.getField[Object](k)) - - val partialObjects = - aggFuncs.indices.map(i => t.getField[Object](internalAggObjKey(i))).toList - if (!partialObjectsPerKey.contains(key)) { - partialObjectsPerKey.put(key, partialObjects) - } else { - val updatedPartialObjects = aggFuncs.indices - .map(i => { - val aggFunc = aggFuncs(i) - val partial1 = partialObjectsPerKey(key)(i) - val partial2 = partialObjects(i) - aggFunc.merge(partial1, partial2) - }) - .toList - partialObjectsPerKey.put(key, updatedPartialObjects) - } - Iterator() + insertToFinalAggState(t) + if (shouldEmitOutput()) emitDiffAndUpdateState() else Iterator() case Right(_) => - partialObjectsPerKey.iterator.map(pair => { - val finalAggValues = aggFuncs.indices.map(i => aggFuncs(i).finalAgg(pair._2(i))) + emitDiffAndUpdateState() + } + } + + private def shouldEmitOutput(): Boolean = { + System.currentTimeMillis - lastUpdatedTime > UPDATE_INTERVAL_MS + } + + private def emitDiffAndUpdateState(): Iterator[Tuple] = { + val resultIterator = calculateDiff() + // reset last updated time and previous output results + lastUpdatedTime = System.currentTimeMillis + // saves the current aggregation state, + // note that partialObjectsPerKey is an immutable map variable + // subsequent updates will change the map pointed by var, but not change the old map + previousAggResults = partialObjectsPerKey + resultIterator + } - val tupleBuilder = Tuple.newBuilder(operatorSchemaInfo.outputSchemas(0)) - // add group by keys and final agg values - tupleBuilder.addSequentially((pair._1 ++ finalAggValues).toArray) + private def calculateDiff(): Iterator[Tuple] = { + // find differences between the previous and the current aggregation state + val retractions = new mutable.ArrayBuffer[Tuple]() + val insertions = new mutable.ArrayBuffer[Tuple]() - tupleBuilder.build() + partialObjectsPerKey.keySet.foreach(k => { + if (!previousAggResults.contains(k)) { + // this key doesn't exist in the previous state, emit as an insertion tuple + val newFields = finalAggregate(k, partialObjectsPerKey(k)) + insertions.append(addInsertionFlag(newFields, outputSchema)) + } else if (previousAggResults(k) != partialObjectsPerKey(k)) { + // this key already exists in the state and its value has changed + // first retract the previously emitted value, then emit an insertion of the new value + val prevFields = finalAggregate(k, previousAggResults(k)) + retractions.append(addRetractionFlag(prevFields, outputSchema)) + val newFields = finalAggregate(k, partialObjectsPerKey(k)) + insertions.append(addInsertionFlag(newFields, outputSchema)) + } + }) + + val results = retractions ++ insertions + + results.iterator + } + + // apply partial aggregation's incremental update to the final aggregation state + private def insertToFinalAggState(t: Tuple): Unit = { + val key = + if (groupByKeys == null || groupByKeys.isEmpty) List() + else groupByKeys.map(k => t.getField[Object](k)) + + val partialObjects = + aggFuncs.indices.map(i => t.getField[Object](internalAggObjKey(i))).toList + if (!partialObjectsPerKey.contains(key)) { + partialObjectsPerKey += (key -> partialObjects) + } else { + val updatedPartialObjects = aggFuncs.indices + .map(i => { + val aggFunc = aggFuncs(i) + val partial1 = partialObjectsPerKey(key)(i) + val partial2 = partialObjects(i) + aggFunc.merge(partial1, partial2) }) + .toList + partialObjectsPerKey += (key -> updatedPartialObjects) } } + private def finalAggregate(key: List[Object], value: List[Object]): Array[Object] = { + val finalAggValues = aggFuncs.indices.map(i => aggFuncs(i).finalAgg(value(i))) + (key ++ finalAggValues).toArray + } + } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/aggregate/PartialAggregateOpExec.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/aggregate/PartialAggregateOpExec.scala index d7dfbff2f47..5fd15317d45 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/aggregate/PartialAggregateOpExec.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/aggregate/PartialAggregateOpExec.scala @@ -39,6 +39,12 @@ class PartialAggregateOpExec( var partialObjectsPerKey = new mutable.HashMap[List[Object], List[Object]]() + // for incremental computation + // the time interval that partial aggregate operator emits incremental update to final aggregate + val UPDATE_INTERVAL_MS = 500 + // the timestamp of the last incremental update + private var lastUpdatedTime: Long = 0 + override def open(): Unit = {} override def close(): Unit = {} @@ -53,27 +59,47 @@ class PartialAggregateOpExec( } tuple match { case Left(t) => - val key = - if (groupByKeys == null || groupByKeys.isEmpty) List() - else groupByKeys.map(k => t.getField[Object](k)) - - if (!partialObjectsPerKey.contains(key)) - partialObjectsPerKey.put(key, aggFuncs.map(aggFunc => aggFunc.init())) - - val partialObjects = - partialObjectsPerKey.getOrElseUpdate(key, aggFuncs.map(aggFunc => aggFunc.init())) - val updatedPartialObjects = aggFuncs.zip(partialObjects).map { - case (aggFunc, partial) => - aggFunc.iterate(partial, t) - } - partialObjectsPerKey.put(key, updatedPartialObjects) - Iterator() + insertToPartialAggState(t) + if (shouldEmitOutput()) emitOutputAndResetState() else Iterator() case Right(_) => - partialObjectsPerKey.iterator.map(pair => { - val tupleFields = pair._1 ++ pair._2 - Tuple.newBuilder(schema).addSequentially(tupleFields.toArray).build() - }) + emitOutputAndResetState() } } + private def shouldEmitOutput(): Boolean = { + System.currentTimeMillis - lastUpdatedTime > UPDATE_INTERVAL_MS + } + + private def emitOutputAndResetState(): scala.Iterator[Tuple] = { + lastUpdatedTime = System.currentTimeMillis + val resultIterator = getPartialOutputs() + this.partialObjectsPerKey = new mutable.HashMap[List[Object], List[Object]]() + resultIterator + } + + private def getPartialOutputs(): scala.Iterator[Tuple] = { + partialObjectsPerKey.iterator.map(pair => { + val tupleFields = pair._1 ++ pair._2 + Tuple.newBuilder(schema).addSequentially(tupleFields.toArray).build() + }) + } + + private def insertToPartialAggState(t: Tuple): Unit = { + val key = + if (groupByKeys == null || groupByKeys.isEmpty) List() + else groupByKeys.map(k => t.getField[Object](k)) + + if (!partialObjectsPerKey.contains(key)) + partialObjectsPerKey.put(key, aggFuncs.map(aggFunc => aggFunc.init())) + + val partialObjects = + partialObjectsPerKey.getOrElseUpdate(key, aggFuncs.map(aggFunc => aggFunc.init())) + val updatedPartialObjects = aggFuncs.zip(partialObjects).map { + case (aggFunc, partial) => + aggFunc.iterate(partial, t) + } + partialObjectsPerKey.put(key, updatedPartialObjects) + + } + } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/consolidate/ConsolidateOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/consolidate/ConsolidateOpDesc.scala new file mode 100644 index 00000000000..08f079ae926 --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/consolidate/ConsolidateOpDesc.scala @@ -0,0 +1,38 @@ +package edu.uci.ics.texera.workflow.common.operators.consolidate + +import edu.uci.ics.amber.engine.architecture.deploysemantics.layer.OpExecConfig +import edu.uci.ics.texera.workflow.common.ProgressiveUtils +import edu.uci.ics.texera.workflow.common.metadata.{ + InputPort, + OperatorGroupConstants, + OperatorInfo, + OutputPort +} +import edu.uci.ics.texera.workflow.common.operators.OperatorDescriptor +import edu.uci.ics.texera.workflow.common.tuple.schema.{OperatorSchemaInfo, Schema} + +import scala.collection.JavaConverters.asScalaBuffer +import scala.collection.immutable.List + +class ConsolidateOpDesc extends OperatorDescriptor { + override def operatorInfo: OperatorInfo = { + OperatorInfo( + "Consolidate", + "Consolidate retractable inputs, collect all of them and output append-only data", + OperatorGroupConstants.UTILITY_GROUP, + List(InputPort("")), + List(OutputPort("")), + supportRetractableInput = true + ) + } + + override def getOutputSchema(schemas: Array[Schema]): Schema = { + val newAttrs = asScalaBuffer(schemas(0).getAttributes) + .filter(attr => attr != ProgressiveUtils.insertRetractFlagAttr) + Schema.newBuilder().add(newAttrs.toArray: _*).build() + } + + override def operatorExecutor(operatorSchemaInfo: OperatorSchemaInfo): OpExecConfig = { + OpExecConfig.manyToOneLayer(operatorIdentifier, _ => new ConsolidateOpExec(operatorSchemaInfo)) + } +} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/consolidate/ConsolidateOpExec.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/consolidate/ConsolidateOpExec.scala new file mode 100644 index 00000000000..4674ee8958b --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/operators/consolidate/ConsolidateOpExec.scala @@ -0,0 +1,43 @@ +package edu.uci.ics.texera.workflow.common.operators.consolidate + +import edu.uci.ics.amber.engine.architecture.worker.PauseManager +import edu.uci.ics.amber.engine.common.InputExhausted +import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient +import edu.uci.ics.texera.workflow.common.ProgressiveUtils +import edu.uci.ics.texera.workflow.common.operators.OperatorExecutor +import edu.uci.ics.texera.workflow.common.tuple.Tuple +import edu.uci.ics.texera.workflow.common.tuple.schema.OperatorSchemaInfo + +import scala.collection.mutable.ArrayBuffer + +class ConsolidateOpExec(operatorSchemaInfo: OperatorSchemaInfo) extends OperatorExecutor { + + private val results = new ArrayBuffer[Tuple]() + + override def processTexeraTuple( + tuple: Either[Tuple, InputExhausted], + input: Int, + pauseManager: PauseManager, + asyncRPCClient: AsyncRPCClient + ): Iterator[Tuple] = { + + tuple match { + case Left(t) => + val (isInsertion, tupleValue) = + ProgressiveUtils.getTupleFlagAndValue(t, operatorSchemaInfo) + if (isInsertion) { + results += tupleValue + } else { + results -= tupleValue + } + Iterator() + case Right(_) => + results.iterator + } + + } + + override def open(): Unit = {} + + override def close(): Unit = {} +} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/tuple/Tuple.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/tuple/Tuple.java index 47886be6881..b10b62c4bda 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/tuple/Tuple.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/tuple/Tuple.java @@ -272,9 +272,9 @@ public BuilderV2 add(String attributeName, AttributeType attributeType, Object f */ public BuilderV2 addSequentially(Object[] fields) { checkNotNull(fields); - checkSchemaMatchesFields(schema.getAttributes(), Lists.newArrayList(fields)); + int startIndex = this.fieldNameMap.size(); for (int i = 0; i < fields.length; i++) { - this.add(schema.getAttributes().get(i), fields[i]); + this.add(schema.getAttributes().get(startIndex + i), fields[i]); } return this; } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/ProgressiveRetractionEnforcer.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/ProgressiveRetractionEnforcer.scala new file mode 100644 index 00000000000..6103f228e0d --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/ProgressiveRetractionEnforcer.scala @@ -0,0 +1,45 @@ +package edu.uci.ics.texera.workflow.common.workflow + +import edu.uci.ics.texera.workflow.common.{ProgressiveUtils, WorkflowContext} +import edu.uci.ics.texera.workflow.common.operators.consolidate.ConsolidateOpDesc + +import scala.collection.mutable.ArrayBuffer + +object ProgressiveRetractionEnforcer { + + def enforceDelta(logicalPlan: LogicalPlan, context: WorkflowContext): LogicalPlan = { + // first find the edges that we need to add the consolidate operator + val edgesToAddConsolidateOp = new ArrayBuffer[OperatorLink]() + logicalPlan.outputSchemaMap.foreach(kv => { + val op = kv._1 + val outSchemas = kv._2 + logicalPlan + .getDownstreamEdges(op.operator) + .zip(outSchemas) + .foreach(out => { + val outEdge = out._1 + val outSchema = out._2 + if (outSchema.containsAttribute(ProgressiveUtils.insertRetractFlagAttr.getName)) { + val downstreamOp = logicalPlan.getOperator(outEdge.destination.operatorID) + if (!downstreamOp.operatorInfo.supportRetractableInput) { + edgesToAddConsolidateOp.append(outEdge) + } + } + }) + }) + + var resultPlan = logicalPlan + edgesToAddConsolidateOp.foreach(edge => { + val newOp = new ConsolidateOpDesc() + newOp.setContext(context) + resultPlan = resultPlan.removeEdge(edge) + resultPlan = resultPlan + .addOperator(newOp) + .addEdge(edge.origin.operatorID, newOp.operatorID, edge.origin.portOrdinal, 0) + .addEdge(newOp.operatorID, edge.destination.operatorID, 0, edge.destination.portOrdinal) + }) + + resultPlan + } + +} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala index 412bb16b20f..0ce0f790add 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/common/workflow/WorkflowCompiler.scala @@ -84,12 +84,16 @@ class WorkflowCompiler(val logicalPlan: LogicalPlan, val context: WorkflowContex opResultStorage: OpResultStorage, lastCompletedJob: Option[LogicalPlan] = Option.empty ): Workflow = { + // perform rewrite to reuse cache of previous runs val cacheReuses = new WorkflowCacheChecker(lastCompletedJob, logicalPlan).getValidCacheReuse() val opsToReuseCache = cacheReuses.intersect(logicalPlan.opsToReuseCache.toSet) - val rewrittenLogicalPlan = + var rewrittenLogicalPlan = WorkflowCacheRewriter.transform(logicalPlan, opResultStorage, opsToReuseCache) rewrittenLogicalPlan.operatorMap.values.foreach(initOperator) + // perform rewrite to enforce progressive computation constraints + rewrittenLogicalPlan = ProgressiveRetractionEnforcer.enforceDelta(rewrittenLogicalPlan, context) + // assign sink storage to the logical plan after cache rewrite // as it will be converted to the actual physical plan assignSinkStorage(rewrittenLogicalPlan, opResultStorage, opsToReuseCache) diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/aggregate/SpecializedAggregateOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/aggregate/SpecializedAggregateOpDesc.scala index 20a67601ce6..cbd94e4e905 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/aggregate/SpecializedAggregateOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/aggregate/SpecializedAggregateOpDesc.scala @@ -2,6 +2,7 @@ package edu.uci.ics.texera.workflow.operators.aggregate import com.fasterxml.jackson.annotation.{JsonProperty, JsonPropertyDescription} import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle +import edu.uci.ics.texera.workflow.common.ProgressiveUtils import edu.uci.ics.texera.workflow.common.metadata.annotations.AutofillAttributeNameList import edu.uci.ics.texera.workflow.common.metadata.{ InputPort, @@ -71,6 +72,7 @@ class SpecializedAggregateOpDesc extends AggregateOpDesc { } Schema .newBuilder() + .add(ProgressiveUtils.insertRetractFlagAttr) .add(getGroupByKeysSchema(schemas).getAttributes) .add(aggregations.map(agg => agg.getAggregationAttribute(schemas(0))).asJava) .build() diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/dictionary/DictionaryMatcherOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/dictionary/DictionaryMatcherOpDesc.scala index b6938f03d4a..4bc7a6b1d03 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/dictionary/DictionaryMatcherOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/dictionary/DictionaryMatcherOpDesc.scala @@ -44,7 +44,8 @@ class DictionaryMatcherOpDesc extends MapOpDesc { OperatorGroupConstants.SEARCH_GROUP, inputPorts = List(InputPort()), outputPorts = List(OutputPort()), - supportReconfiguration = true + supportReconfiguration = true, + supportRetractableInput = true ) override def getOutputSchema(schemas: Array[Schema]): Schema = { diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/filter/SpecializedFilterOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/filter/SpecializedFilterOpDesc.java index 1ff1fa7daca..6dcf9a63314 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/filter/SpecializedFilterOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/filter/SpecializedFilterOpDesc.java @@ -39,6 +39,6 @@ public OperatorInfo operatorInfo() { OperatorGroupConstants.SEARCH_GROUP(), asScalaBuffer(singletonList(new InputPort("", false))).toList(), asScalaBuffer(singletonList(new OutputPort(""))).toList(), - false, false, true, false); + false, false, true, false, true); } } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/hashJoin/IncrementalJoinOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/hashJoin/IncrementalJoinOpDesc.scala new file mode 100644 index 00000000000..f308c3e2b1b --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/hashJoin/IncrementalJoinOpDesc.scala @@ -0,0 +1,89 @@ +package edu.uci.ics.texera.workflow.operators.hashJoin + +import com.fasterxml.jackson.annotation.{JsonIgnore, JsonProperty, JsonPropertyDescription} +import com.kjetland.jackson.jsonSchema.annotations.{JsonSchemaInject, JsonSchemaTitle} +import edu.uci.ics.amber.engine.architecture.deploysemantics.layer.OpExecConfig +import edu.uci.ics.texera.workflow.common.metadata.annotations.{ + AutofillAttributeName, + AutofillAttributeNameOnPort1 +} +import edu.uci.ics.texera.workflow.common.metadata.{ + InputPort, + OperatorGroupConstants, + OperatorInfo, + OutputPort +} +import edu.uci.ics.texera.workflow.common.operators.OperatorDescriptor +import edu.uci.ics.texera.workflow.common.tuple.schema.{OperatorSchemaInfo, Schema} + +@JsonSchemaInject(json = """ +{ + "attributeTypeRules": { + "leftAttributeName": { + "const": { + "$data": "rightAttributeName" + } + } + } +} +""") +class IncrementalJoinOpDesc[K] extends OperatorDescriptor { + + @JsonProperty(required = true) + @JsonSchemaTitle("Left Input Attribute") + @JsonPropertyDescription("attribute to be joined on the Left Input") + @AutofillAttributeName + var leftAttributeName: String = _ + + @JsonProperty(required = true) + @JsonSchemaTitle("Right Input Attribute") + @JsonPropertyDescription("attribute to be joined on the Right Input") + @AutofillAttributeNameOnPort1 + var rightAttributeName: String = _ + + // incremental inner join can reuse some logic from hash join + @JsonIgnore + lazy val hashJoinOpDesc: HashJoinOpDesc[K] = { + val op = new HashJoinOpDesc[K] + op.buildAttributeName = leftAttributeName + op.probeAttributeName = rightAttributeName + op.joinType = JoinType.INNER + op + } + + override def operatorExecutor(operatorSchemaInfo: OperatorSchemaInfo) = { + hashJoinOpDesc.setContext(this.context) + val hashJoinOpExec = hashJoinOpDesc.operatorExecutor(operatorSchemaInfo) + + OpExecConfig + .oneToOneLayer( + operatorIdentifier, + _ => + new IncrementalJoinOpExec[K]( + leftAttributeName, + rightAttributeName, + operatorSchemaInfo + ) + ) + .copy( + inputPorts = operatorInfo.inputPorts, + outputPorts = operatorInfo.outputPorts, + partitionRequirement = hashJoinOpExec.partitionRequirement, + derivePartition = hashJoinOpExec.derivePartition + ) + } + + override def operatorInfo: OperatorInfo = + OperatorInfo( + "Progressive Inner Join", + "join two inputs", + OperatorGroupConstants.JOIN_GROUP, + inputPorts = List(InputPort("left"), InputPort("right")), + outputPorts = List(OutputPort()) + ) + + // remove the probe attribute in the output + override def getOutputSchema(schemas: Array[Schema]): Schema = { + hashJoinOpDesc.getOutputSchema(schemas) + } +} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/hashJoin/IncrementalJoinOpExec.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/hashJoin/IncrementalJoinOpExec.scala new file mode 100644 index 00000000000..ee38078d3c3 --- /dev/null +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/hashJoin/IncrementalJoinOpExec.scala @@ -0,0 +1,124 @@ +package edu.uci.ics.texera.workflow.operators.hashJoin + +import edu.uci.ics.amber.engine.architecture.worker.PauseManager +import edu.uci.ics.amber.engine.common.InputExhausted +import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient +import edu.uci.ics.texera.workflow.common.operators.OperatorExecutor +import edu.uci.ics.texera.workflow.common.tuple.Tuple +import edu.uci.ics.texera.workflow.common.tuple.Tuple.BuilderV2 +import edu.uci.ics.texera.workflow.common.tuple.schema.{Attribute, OperatorSchemaInfo, Schema} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +class IncrementalJoinOpExec[K]( + val buildAttributeName: String, + val probeAttributeName: String, + val operatorSchemaInfo: OperatorSchemaInfo +) extends OperatorExecutor { + + val leftSchema: Schema = operatorSchemaInfo.inputSchemas(0) + val rightSchema: Schema = operatorSchemaInfo.inputSchemas(1) + + val leftTuples = new mutable.HashMap[K, (ArrayBuffer[Tuple], Boolean)]() + val rightTuples = new mutable.HashMap[K, (ArrayBuffer[Tuple], Boolean)]() + + override def processTexeraTuple( + tuple: Either[Tuple, InputExhausted], + input: Int, + pauseManager: PauseManager, + asyncRPCClient: AsyncRPCClient + ): Iterator[Tuple] = { + tuple match { + case Left(tuple) => + if (input == 0) { + // left input, find on the right + val key = tuple.getField(buildAttributeName).asInstanceOf[K] + val (matchedTuples, _) = + rightTuples.getOrElse(key, (new ArrayBuffer[Tuple](), false)) + val returnIter = matchedTuples + .map(right => { + join(tuple, right) + }) + .iterator + building(tuple, input) + returnIter + } else { + // right input, find on the left + val key = tuple.getField(probeAttributeName).asInstanceOf[K] + val (matchedTuples, _) = + leftTuples.getOrElse(key, (new ArrayBuffer[Tuple](), false)) + val returnIter = matchedTuples + .map(left => { + join(left, tuple) + }) + .iterator + building(tuple, input) + returnIter + } + case Right(_) => + Iterator() + } + } + + private def join(left: Tuple, right: Tuple): Tuple = { + val builder = Tuple + .newBuilder(operatorSchemaInfo.outputSchemas(0)) + .add(left) + + fillNonJoinFields( + builder, + rightSchema, + right.getFields.toArray(), + resolveDuplicateName = true + ) + + builder.build() + } + + def fillNonJoinFields( + builder: BuilderV2, + schema: Schema, + fields: Array[Object], + resolveDuplicateName: Boolean = false + ): Unit = { + schema.getAttributesScala.filter(attribute => attribute.getName != probeAttributeName) map { + (attribute: Attribute) => + { + val field = fields.apply(schema.getIndex(attribute.getName)) + if (resolveDuplicateName) { + val attributeName = attribute.getName + builder.add( + new Attribute( + if (leftSchema.getAttributeNames.contains(attributeName)) + attributeName + "#@1" + else attributeName, + attribute.getType + ), + field + ) + } else { + builder.add(attribute, field) + } + } + } + } + + private def building(tuple: Tuple, input: Int): Unit = { + val key = tuple.getField(buildAttributeName).asInstanceOf[K] + if (input == 0) { + val (storedTuples, _) = + leftTuples.getOrElseUpdate(key, (new ArrayBuffer[Tuple](), false)) + storedTuples += tuple + } else { + val (storedTuples, _) = + rightTuples.getOrElseUpdate(key, (new ArrayBuffer[Tuple](), false)) + storedTuples += tuple + } + } + + override def open(): Unit = {} + + override def close(): Unit = {} + +} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/keywordSearch/KeywordSearchOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/keywordSearch/KeywordSearchOpDesc.scala index 618648e8dbb..fc5bbea134a 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/keywordSearch/KeywordSearchOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/keywordSearch/KeywordSearchOpDesc.scala @@ -37,6 +37,7 @@ class KeywordSearchOpDesc extends FilterOpDesc { operatorGroupName = OperatorGroupConstants.SEARCH_GROUP, inputPorts = List(InputPort()), outputPorts = List(OutputPort()), - supportReconfiguration = true + supportReconfiguration = true, + supportRetractableInput = true ) } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/projection/ProjectionOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/projection/ProjectionOpDesc.scala index fd43c9c4761..1cf837e79d7 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/projection/ProjectionOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/projection/ProjectionOpDesc.scala @@ -55,7 +55,8 @@ class ProjectionOpDesc extends MapOpDesc { OperatorGroupConstants.UTILITY_GROUP, inputPorts = List(InputPort()), outputPorts = List(OutputPort()), - supportReconfiguration = false + supportReconfiguration = false, + supportRetractableInput = true ) } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/regex/RegexOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/regex/RegexOpDesc.scala index 2538747258d..21423cbad2d 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/regex/RegexOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/regex/RegexOpDesc.scala @@ -40,6 +40,7 @@ class RegexOpDesc extends FilterOpDesc { operatorGroupName = OperatorGroupConstants.SEARCH_GROUP, inputPorts = List(InputPort()), outputPorts = List(OutputPort()), - supportReconfiguration = true + supportReconfiguration = true, + supportRetractableInput = true ) } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sentiment/SentimentAnalysisOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sentiment/SentimentAnalysisOpDesc.scala index f9906c4740b..396374d177f 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sentiment/SentimentAnalysisOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sentiment/SentimentAnalysisOpDesc.scala @@ -53,7 +53,8 @@ class SentimentAnalysisOpDesc extends MapOpDesc { OperatorGroupConstants.ANALYTICS_GROUP, List(InputPort("")), List(OutputPort("")), - supportReconfiguration = true + supportReconfiguration = true, + supportRetractableInput = true ) override def getOutputSchema(schemas: Array[Schema]): Schema = { diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sink/managed/ProgressiveSinkOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sink/managed/ProgressiveSinkOpDesc.java index 8f52cf6f70b..4845aea20be 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sink/managed/ProgressiveSinkOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/sink/managed/ProgressiveSinkOpDesc.java @@ -58,7 +58,7 @@ public OperatorInfo operatorInfo() { "View the edu.uci.ics.texera.workflow results", OperatorGroupConstants.UTILITY_GROUP(), asScalaBuffer(singletonList(new InputPort("", false))).toList(), - List.empty(), false, false, false, false); + List.empty(), false, false, false, false, true); } @Override diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/typecasting/TypeCastingOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/typecasting/TypeCastingOpDesc.java index a78e40252de..432691699cc 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/typecasting/TypeCastingOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/typecasting/TypeCastingOpDesc.java @@ -45,7 +45,7 @@ public OperatorInfo operatorInfo() { OperatorGroupConstants.UTILITY_GROUP(), asScalaBuffer(singletonList(new InputPort("", false))).toList(), asScalaBuffer(singletonList(new OutputPort(""))).toList(), - false, false, false, false); + false, false, false, false, true); } @Override diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/python/source/PythonUDFSourceOpDescV2.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/python/source/PythonUDFSourceOpDescV2.java index c0249a8b91a..f8bb7552ba8 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/python/source/PythonUDFSourceOpDescV2.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/udf/python/source/PythonUDFSourceOpDescV2.java @@ -73,6 +73,7 @@ public OperatorInfo operatorInfo() { false, false, true, + false, false ); } diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/union/UnionOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/union/UnionOpDesc.scala index 043f1231f0e..8365ee2a3d9 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/union/UnionOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/union/UnionOpDesc.scala @@ -23,7 +23,8 @@ class UnionOpDesc extends OperatorDescriptor { "Unions the output rows from multiple input operators", OperatorGroupConstants.UTILITY_GROUP, inputPorts = List(InputPort(allowMultiInputs = true)), - outputPorts = List(OutputPort()) + outputPorts = List(OutputPort()), + supportRetractableInput = true ) override def getOutputSchema(schemas: Array[Schema]): Schema = { diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/unneststring/UnnestStringOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/unneststring/UnnestStringOpDesc.scala index 0cc3f58d7d0..8543342ad4e 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/unneststring/UnnestStringOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/unneststring/UnnestStringOpDesc.scala @@ -34,7 +34,8 @@ class UnnestStringOpDesc extends FlatMapOpDesc { "Unnest the string values in the column separated by a delimiter to multiple values", operatorGroupName = OperatorGroupConstants.UTILITY_GROUP, inputPorts = List(InputPort()), - outputPorts = List(OutputPort()) + outputPorts = List(OutputPort()), + supportRetractableInput = true ) override def operatorExecutor(operatorSchemaInfo: OperatorSchemaInfo) = { diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/lineChart/LineChartOpDesc.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/lineChart/LineChartOpDesc.scala index 4b15e44a53d..0a69b9113b0 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/lineChart/LineChartOpDesc.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/lineChart/LineChartOpDesc.scala @@ -2,7 +2,6 @@ package edu.uci.ics.texera.workflow.operators.visualization.lineChart import com.fasterxml.jackson.annotation.{JsonIgnore, JsonProperty, JsonPropertyDescription} import edu.uci.ics.amber.engine.architecture.deploysemantics.layer.OpExecConfig -import edu.uci.ics.amber.engine.common.virtualidentity.util.makeLayer import edu.uci.ics.texera.workflow.common.metadata.annotations.{ AutofillAttributeName, AutofillAttributeNameList @@ -13,12 +12,7 @@ import edu.uci.ics.texera.workflow.common.metadata.{ OperatorInfo, OutputPort } -import edu.uci.ics.texera.workflow.common.tuple.schema.{ - Attribute, - AttributeType, - OperatorSchemaInfo, - Schema -} +import edu.uci.ics.texera.workflow.common.tuple.schema.{OperatorSchemaInfo, Schema} import edu.uci.ics.texera.workflow.operators.aggregate.{ AggregationFunction, AggregationOperation, @@ -44,22 +38,14 @@ class LineChartOpDesc extends VisualizationOperator { @JsonProperty(value = "chart style", required = true, defaultValue = VisualizationConstants.LINE) var lineChartEnum: LineChartEnum = _ - @JsonIgnore - private var groupBySchema: Schema = _ - @JsonIgnore - private var finalAggValueSchema: Schema = _ - override def chartType: String = lineChartEnum.getChartStyle def noDataCol: Boolean = dataColumns == null || dataColumns.isEmpty def resultAttributeNames: List[String] = if (noDataCol) List("count") else dataColumns - override def operatorExecutorMultiLayer(operatorSchemaInfo: OperatorSchemaInfo) = { - if (nameColumn == null || nameColumn == "") { - throw new RuntimeException("line chart: name column is null or empty") - } - + @JsonIgnore + lazy val aggOperator: SpecializedAggregateOpDesc = { val aggOperator = new SpecializedAggregateOpDesc() aggOperator.context = this.context aggOperator.operatorID = this.operatorID @@ -81,21 +67,20 @@ class LineChartOpDesc extends VisualizationOperator { aggOperator.aggregations = aggOperations aggOperator.groupByKeys = List(nameColumn) } + aggOperator + } - val aggPlan = aggOperator.aggregateOperatorExecutor( + override def operatorExecutorMultiLayer(operatorSchemaInfo: OperatorSchemaInfo) = { + if (nameColumn == null || nameColumn == "") { + throw new RuntimeException("line chart: name column is null or empty") + } + + aggOperator.aggregateOperatorExecutor( OperatorSchemaInfo( operatorSchemaInfo.inputSchemas, Array(aggOperator.getOutputSchema(operatorSchemaInfo.inputSchemas)) ) ) - - val lineChartOpExec = OpExecConfig.oneToOneLayer( - makeLayer(operatorIdentifier, "visualize"), - _ => new LineChartOpExec(this, operatorSchemaInfo) - ) - - val finalAggOp = aggPlan.sinkOperators.head - aggPlan.addOperator(lineChartOpExec).addEdge(finalAggOp, lineChartOpExec.id) } override def operatorInfo: OperatorInfo = @@ -108,46 +93,7 @@ class LineChartOpDesc extends VisualizationOperator { ) override def getOutputSchema(schemas: Array[Schema]): Schema = { - Schema - .newBuilder() - .add(getGroupByKeysSchema(schemas).getAttributes) - .add(getFinalAggValueSchema.getAttributes) - .build() - } - - private def getGroupByKeysSchema(schemas: Array[Schema]): Schema = { - val groupByKeys = List(this.nameColumn) - Schema - .newBuilder() - .add(groupByKeys.map(key => schemas(0).getAttribute(key)).toArray: _*) - .build() - } - - private def getFinalAggValueSchema: Schema = { - if (noDataCol) { - Schema - .newBuilder() - .add(resultAttributeNames.head, AttributeType.INTEGER) - .build() - } else { - Schema - .newBuilder() - .add(resultAttributeNames.map(key => new Attribute(key, AttributeType.DOUBLE)).toArray: _*) - .build() - } - } - - def groupByFunc(): Schema => Schema = { schema => - { - // Since this is a partially evaluated tuple, there is no actual schema for this - // available anywhere. Constructing it once for re-use - if (groupBySchema == null) { - val schemaBuilder = Schema.newBuilder() - schemaBuilder.add(schema.getAttribute(nameColumn)) - groupBySchema = schemaBuilder.build - } - groupBySchema - } + aggOperator.getOutputSchema(schemas) } override def operatorExecutor(operatorSchemaInfo: OperatorSchemaInfo): OpExecConfig = { diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/lineChart/LineChartOpExec.scala b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/lineChart/LineChartOpExec.scala deleted file mode 100644 index 6af7ecc9fc1..00000000000 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/lineChart/LineChartOpExec.scala +++ /dev/null @@ -1,24 +0,0 @@ -package edu.uci.ics.texera.workflow.operators.visualization.lineChart - -import edu.uci.ics.texera.workflow.common.operators.map.MapOpExec -import edu.uci.ics.texera.workflow.common.tuple.Tuple -import edu.uci.ics.texera.workflow.common.tuple.schema.OperatorSchemaInfo - -class LineChartOpExec( - opDesc: LineChartOpDesc, - operatorSchemaInfo: OperatorSchemaInfo -) extends MapOpExec { - - setMapFunc(this.processTuple) - - def processTuple(t: Tuple): Tuple = { - val builder = Tuple.newBuilder(operatorSchemaInfo.outputSchemas(0)) - val inputSchema = t.getSchema - builder.add(inputSchema.getAttribute(opDesc.nameColumn), t.getField(opDesc.nameColumn)) - for (i <- opDesc.resultAttributeNames.indices) { - val dataName = opDesc.resultAttributeNames.apply(i) - builder.add(inputSchema.getAttribute(dataName), t.getField(dataName)) - } - builder.build - } -} diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/scatterplot/ScatterplotOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/scatterplot/ScatterplotOpDesc.java index 07d91675f8d..dd08072fca1 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/scatterplot/ScatterplotOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/scatterplot/ScatterplotOpDesc.java @@ -99,7 +99,7 @@ public OperatorInfo operatorInfo() { OperatorGroupConstants.VISUALIZATION_GROUP(), asScalaBuffer(singletonList(new InputPort("", false))).toList(), asScalaBuffer(singletonList(new OutputPort(""))).toList(), - false, false, false, false); + false, false, false, false, false); } @Override diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/wordCloud/WordCloudOpDesc.java b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/wordCloud/WordCloudOpDesc.java index cfa987a77d9..335ee29231c 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/wordCloud/WordCloudOpDesc.java +++ b/core/amber/src/main/scala/edu/uci/ics/texera/workflow/operators/visualization/wordCloud/WordCloudOpDesc.java @@ -97,7 +97,7 @@ public OperatorInfo operatorInfo() { OperatorGroupConstants.VISUALIZATION_GROUP(), asScalaBuffer(singletonList(new InputPort("", false))).toList(), asScalaBuffer(singletonList(new OutputPort(""))).toList(), - false, false, false, false); + false, false, false, false, false); } @Override