Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance incremental computation support in Texera #2165

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,14 @@ class SchemaPropagationResource {
val responseContent = schemaPropagationResult.map(e =>
(e._1.operator, e._2.map(s => s.map(o => o.getAttributesScala)))
)
SchemaPropagationResponse(0, responseContent, null)
// remove internal incremental computation columns
val responseContentCleaned = responseContent.map(kv => {
val schemaWithoutInternalAttrs = kv._2.map(portSchema => {
portSchema.map(attrs => attrs.filter(attr => !attr.getName.startsWith("__internal")))
})
(kv._1, schemaWithoutInternalAttrs)
})
SchemaPropagationResponse(0, responseContentCleaned, null)
} catch {
case e: Throwable =>
e.printStackTrace()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@ object ProgressiveUtils {
Tuple.newBuilder(outputSchema).add(insertRetractFlagAttr, true).add(tuple).build
}

def addInsertionFlag(fields: Array[Object], outputSchema: Schema): Tuple = {
Tuple.newBuilder(outputSchema).add(insertRetractFlagAttr, true).addSequentially(fields).build
}

def addRetractionFlag(tuple: Tuple, outputSchema: Schema): Tuple = {
assert(!tuple.getSchema.containsAttribute(insertRetractFlagAttr.getName))
Tuple.newBuilder(outputSchema).add(insertRetractFlagAttr, false).add(tuple).build
}

def addRetractionFlag(fields: Array[Object], outputSchema: Schema): Tuple = {
Tuple.newBuilder(outputSchema).add(insertRetractFlagAttr, false).addSequentially(fields).build
}

def isInsertion(tuple: Tuple): Boolean = {
if (tuple.getSchema.containsAttribute(insertRetractFlagAttr.getName)) {
tuple.getField[Boolean](insertRetractFlagAttr.getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ case class OperatorInfo(
dynamicInputPorts: Boolean = false,
dynamicOutputPorts: Boolean = false,
supportReconfiguration: Boolean = false,
allowPortCustomization: Boolean = false
allowPortCustomization: Boolean = false,
supportRetractableInput: Boolean = false
)

case class OperatorMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import edu.uci.ics.texera.workflow.operators.difference.DifferenceOpDesc
import edu.uci.ics.texera.workflow.operators.distinct.DistinctOpDesc
import edu.uci.ics.texera.workflow.operators.download.BulkDownloaderOpDesc
import edu.uci.ics.texera.workflow.operators.filter.SpecializedFilterOpDesc
import edu.uci.ics.texera.workflow.operators.hashJoin.HashJoinOpDesc
import edu.uci.ics.texera.workflow.operators.hashJoin.{HashJoinOpDesc, IncrementalJoinOpDesc}
import edu.uci.ics.texera.workflow.operators.intersect.IntersectOpDesc
import edu.uci.ics.texera.workflow.operators.intervalJoin.IntervalJoinOpDesc
import edu.uci.ics.texera.workflow.operators.keywordSearch.KeywordSearchOpDesc
Expand Down Expand Up @@ -135,6 +135,7 @@ trait StateTransferFunc
new Type(value = classOf[RandomKSamplingOpDesc], name = "RandomKSampling"),
new Type(value = classOf[ReservoirSamplingOpDesc], name = "ReservoirSampling"),
new Type(value = classOf[HashJoinOpDesc[String]], name = "HashJoin"),
new Type(value = classOf[IncrementalJoinOpDesc[String]], name = "IncrementalJoin"),
new Type(value = classOf[DistinctOpDesc], name = "Distinct"),
new Type(value = classOf[IntersectOpDesc], name = "Intersect"),
new Type(value = classOf[SymmetricDifferenceOpDesc], name = "SymmetricDifference"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package edu.uci.ics.texera.workflow.common.operators.aggregate
import edu.uci.ics.amber.engine.architecture.worker.PauseManager
import edu.uci.ics.amber.engine.common.InputExhausted
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient
import edu.uci.ics.texera.workflow.common.ProgressiveUtils.{addInsertionFlag, addRetractionFlag}
import edu.uci.ics.texera.workflow.common.operators.OperatorExecutor
import edu.uci.ics.texera.workflow.common.operators.aggregate.PartialAggregateOpExec.internalAggObjKey
import edu.uci.ics.texera.workflow.common.tuple.Tuple
Expand All @@ -17,11 +18,19 @@ class FinalAggregateOpExec(
) extends OperatorExecutor {

var groupByKeyAttributes: Array[Attribute] = _
var schema: Schema = _
var outputSchema: Schema = operatorSchemaInfo.outputSchemas(0)

// each value in partialObjectsPerKey has the same length as aggFuncs
// partialObjectsPerKey(key)[i] corresponds to aggFuncs[i]
var partialObjectsPerKey = new mutable.HashMap[List[Object], List[Object]]()
private var partialObjectsPerKey = Map[List[Object], List[Object]]()

// for incremental computation
// the time interval that aggregate operator emits incremental update to downstream
val UPDATE_INTERVAL_MS = 1000
// the timestamp of the last incremental update
private var lastUpdatedTime: Long = 0
// the aggregation state at the last output, used to compute diff with the next output
private var previousAggResults = Map[List[Object], List[Object]]()

override def open(): Unit = {}
override def close(): Unit = {}
Expand All @@ -37,37 +46,79 @@ class FinalAggregateOpExec(
}
tuple match {
case Left(t) =>
val key =
if (groupByKeys == null || groupByKeys.isEmpty) List()
else groupByKeys.map(k => t.getField[Object](k))

val partialObjects =
aggFuncs.indices.map(i => t.getField[Object](internalAggObjKey(i))).toList
if (!partialObjectsPerKey.contains(key)) {
partialObjectsPerKey.put(key, partialObjects)
} else {
val updatedPartialObjects = aggFuncs.indices
.map(i => {
val aggFunc = aggFuncs(i)
val partial1 = partialObjectsPerKey(key)(i)
val partial2 = partialObjects(i)
aggFunc.merge(partial1, partial2)
})
.toList
partialObjectsPerKey.put(key, updatedPartialObjects)
}
Iterator()
insertToFinalAggState(t)
if (shouldEmitOutput()) emitDiffAndUpdateState() else Iterator()
case Right(_) =>
partialObjectsPerKey.iterator.map(pair => {
val finalAggValues = aggFuncs.indices.map(i => aggFuncs(i).finalAgg(pair._2(i)))
emitDiffAndUpdateState()
}
}

private def shouldEmitOutput(): Boolean = {
System.currentTimeMillis - lastUpdatedTime > UPDATE_INTERVAL_MS
}

private def emitDiffAndUpdateState(): Iterator[Tuple] = {
val resultIterator = calculateDiff()
// reset last updated time and previous output results
lastUpdatedTime = System.currentTimeMillis
// saves the current aggregation state,
// note that partialObjectsPerKey is an immutable map variable
// subsequent updates will change the map pointed by var, but not change the old map
previousAggResults = partialObjectsPerKey
resultIterator
}

val tupleBuilder = Tuple.newBuilder(operatorSchemaInfo.outputSchemas(0))
// add group by keys and final agg values
tupleBuilder.addSequentially((pair._1 ++ finalAggValues).toArray)
private def calculateDiff(): Iterator[Tuple] = {
// find differences between the previous and the current aggregation state
val retractions = new mutable.ArrayBuffer[Tuple]()
val insertions = new mutable.ArrayBuffer[Tuple]()

tupleBuilder.build()
partialObjectsPerKey.keySet.foreach(k => {
if (!previousAggResults.contains(k)) {
// this key doesn't exist in the previous state, emit as an insertion tuple
val newFields = finalAggregate(k, partialObjectsPerKey(k))
insertions.append(addInsertionFlag(newFields, outputSchema))
} else if (previousAggResults(k) != partialObjectsPerKey(k)) {
// this key already exists in the state and its value has changed
// first retract the previously emitted value, then emit an insertion of the new value
val prevFields = finalAggregate(k, previousAggResults(k))
retractions.append(addRetractionFlag(prevFields, outputSchema))
val newFields = finalAggregate(k, partialObjectsPerKey(k))
insertions.append(addInsertionFlag(newFields, outputSchema))
}
})

val results = retractions ++ insertions

results.iterator
}

// apply partial aggregation's incremental update to the final aggregation state
private def insertToFinalAggState(t: Tuple): Unit = {
val key =
if (groupByKeys == null || groupByKeys.isEmpty) List()
else groupByKeys.map(k => t.getField[Object](k))

val partialObjects =
aggFuncs.indices.map(i => t.getField[Object](internalAggObjKey(i))).toList
if (!partialObjectsPerKey.contains(key)) {
partialObjectsPerKey += (key -> partialObjects)
} else {
val updatedPartialObjects = aggFuncs.indices
.map(i => {
val aggFunc = aggFuncs(i)
val partial1 = partialObjectsPerKey(key)(i)
val partial2 = partialObjects(i)
aggFunc.merge(partial1, partial2)
})
.toList
partialObjectsPerKey += (key -> updatedPartialObjects)
}
}

private def finalAggregate(key: List[Object], value: List[Object]): Array[Object] = {
val finalAggValues = aggFuncs.indices.map(i => aggFuncs(i).finalAgg(value(i)))
(key ++ finalAggValues).toArray
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class PartialAggregateOpExec(

var partialObjectsPerKey = new mutable.HashMap[List[Object], List[Object]]()

// for incremental computation
// the time interval that partial aggregate operator emits incremental update to final aggregate
val UPDATE_INTERVAL_MS = 500
// the timestamp of the last incremental update
private var lastUpdatedTime: Long = 0

override def open(): Unit = {}
override def close(): Unit = {}

Expand All @@ -53,27 +59,47 @@ class PartialAggregateOpExec(
}
tuple match {
case Left(t) =>
val key =
if (groupByKeys == null || groupByKeys.isEmpty) List()
else groupByKeys.map(k => t.getField[Object](k))

if (!partialObjectsPerKey.contains(key))
partialObjectsPerKey.put(key, aggFuncs.map(aggFunc => aggFunc.init()))

val partialObjects =
partialObjectsPerKey.getOrElseUpdate(key, aggFuncs.map(aggFunc => aggFunc.init()))
val updatedPartialObjects = aggFuncs.zip(partialObjects).map {
case (aggFunc, partial) =>
aggFunc.iterate(partial, t)
}
partialObjectsPerKey.put(key, updatedPartialObjects)
Iterator()
insertToPartialAggState(t)
if (shouldEmitOutput()) emitOutputAndResetState() else Iterator()
case Right(_) =>
partialObjectsPerKey.iterator.map(pair => {
val tupleFields = pair._1 ++ pair._2
Tuple.newBuilder(schema).addSequentially(tupleFields.toArray).build()
})
emitOutputAndResetState()
}
}

private def shouldEmitOutput(): Boolean = {
System.currentTimeMillis - lastUpdatedTime > UPDATE_INTERVAL_MS
}

private def emitOutputAndResetState(): scala.Iterator[Tuple] = {
lastUpdatedTime = System.currentTimeMillis
val resultIterator = getPartialOutputs()
this.partialObjectsPerKey = new mutable.HashMap[List[Object], List[Object]]()
resultIterator
}
Comment on lines +69 to +78
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see similar code for partial and final aggregate operators to do time-based snapshots to push partial results out. If the time-based snapshot is a universal strategy for incremental operators to push out partial results, is it better to make it a standard framework?


private def getPartialOutputs(): scala.Iterator[Tuple] = {
partialObjectsPerKey.iterator.map(pair => {
val tupleFields = pair._1 ++ pair._2
Tuple.newBuilder(schema).addSequentially(tupleFields.toArray).build()
})
}

private def insertToPartialAggState(t: Tuple): Unit = {
val key =
if (groupByKeys == null || groupByKeys.isEmpty) List()
else groupByKeys.map(k => t.getField[Object](k))

if (!partialObjectsPerKey.contains(key))
partialObjectsPerKey.put(key, aggFuncs.map(aggFunc => aggFunc.init()))

val partialObjects =
partialObjectsPerKey.getOrElseUpdate(key, aggFuncs.map(aggFunc => aggFunc.init()))
val updatedPartialObjects = aggFuncs.zip(partialObjects).map {
case (aggFunc, partial) =>
aggFunc.iterate(partial, t)
}
partialObjectsPerKey.put(key, updatedPartialObjects)

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package edu.uci.ics.texera.workflow.common.operators.consolidate

import edu.uci.ics.amber.engine.architecture.deploysemantics.layer.OpExecConfig
import edu.uci.ics.texera.workflow.common.ProgressiveUtils
import edu.uci.ics.texera.workflow.common.metadata.{
InputPort,
OperatorGroupConstants,
OperatorInfo,
OutputPort
}
import edu.uci.ics.texera.workflow.common.operators.OperatorDescriptor
import edu.uci.ics.texera.workflow.common.tuple.schema.{OperatorSchemaInfo, Schema}

import scala.collection.JavaConverters.asScalaBuffer
import scala.collection.immutable.List

class ConsolidateOpDesc extends OperatorDescriptor {
override def operatorInfo: OperatorInfo = {
OperatorInfo(
"Consolidate",
"Consolidate retractable inputs, collect all of them and output append-only data",
OperatorGroupConstants.UTILITY_GROUP,
List(InputPort("")),
List(OutputPort("")),
supportRetractableInput = true
)
}

override def getOutputSchema(schemas: Array[Schema]): Schema = {
val newAttrs = asScalaBuffer(schemas(0).getAttributes)
.filter(attr => attr != ProgressiveUtils.insertRetractFlagAttr)
Schema.newBuilder().add(newAttrs.toArray: _*).build()
}

override def operatorExecutor(operatorSchemaInfo: OperatorSchemaInfo): OpExecConfig = {
OpExecConfig.manyToOneLayer(operatorIdentifier, _ => new ConsolidateOpExec(operatorSchemaInfo))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package edu.uci.ics.texera.workflow.common.operators.consolidate

import edu.uci.ics.amber.engine.architecture.worker.PauseManager
import edu.uci.ics.amber.engine.common.InputExhausted
import edu.uci.ics.amber.engine.common.rpc.AsyncRPCClient
import edu.uci.ics.texera.workflow.common.ProgressiveUtils
import edu.uci.ics.texera.workflow.common.operators.OperatorExecutor
import edu.uci.ics.texera.workflow.common.tuple.Tuple
import edu.uci.ics.texera.workflow.common.tuple.schema.OperatorSchemaInfo

import scala.collection.mutable.ArrayBuffer

class ConsolidateOpExec(operatorSchemaInfo: OperatorSchemaInfo) extends OperatorExecutor {

private val results = new ArrayBuffer[Tuple]()

override def processTexeraTuple(
tuple: Either[Tuple, InputExhausted],
input: Int,
pauseManager: PauseManager,
asyncRPCClient: AsyncRPCClient
): Iterator[Tuple] = {

tuple match {
case Left(t) =>
val (isInsertion, tupleValue) =
ProgressiveUtils.getTupleFlagAndValue(t, operatorSchemaInfo)
if (isInsertion) {
results += tupleValue
} else {
results -= tupleValue
}
Iterator()
case Right(_) =>
results.iterator
}

}

override def open(): Unit = {}

override def close(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ public BuilderV2 add(String attributeName, AttributeType attributeType, Object f
*/
public BuilderV2 addSequentially(Object[] fields) {
checkNotNull(fields);
checkSchemaMatchesFields(schema.getAttributes(), Lists.newArrayList(fields));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need such an assertion for the normal tuple fields. if we need to add new fields (e.g., retraction or not), we can treat it separately? If so, I can do it in a future PR.

int startIndex = this.fieldNameMap.size();
for (int i = 0; i < fields.length; i++) {
this.add(schema.getAttributes().get(i), fields[i]);
this.add(schema.getAttributes().get(startIndex + i), fields[i]);
}
return this;
}
Expand Down
Loading