Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "open" and "close" function for stateful Java operators #3251

Merged
merged 9 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,19 @@ import scala.collection.mutable
*/
class AggregateOpExec(descString: String) extends OperatorExecutor {
private val desc: AggregateOpDesc = objectMapper.readValue(descString, classOf[AggregateOpDesc])
private val keyedPartialAggregates = new mutable.HashMap[List[Object], List[Object]]()
private var keyedPartialAggregates: mutable.HashMap[List[Object], List[Object]] = _
private var distributedAggregations: List[DistributedAggregation[Object]] = _

override def open(): Unit = {
keyedPartialAggregates = new mutable.HashMap[List[Object], List[Object]]()
distributedAggregations = null
}

override def close(): Unit = {
keyedPartialAggregates.clear()
distributedAggregations = null
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {

// Initialize distributedAggregations if it's not yet initialized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,22 @@ package edu.uci.ics.amber.operator.cartesianProduct
import edu.uci.ics.amber.core.executor.OperatorExecutor
import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}
import edu.uci.ics.amber.operator.hashJoin.JoinUtils

import scala.collection.mutable.ArrayBuffer

/**
* Executes a Cartesian Product operation between tuples from two input streams.
*/
class CartesianProductOpExec extends OperatorExecutor {

private var leftTuples: ArrayBuffer[Tuple] = _

override def open(): Unit = {
leftTuples = ArrayBuffer[Tuple]()
}

override def close(): Unit = {
leftTuples.clear()
}

/**
* Processes incoming tuples from either the left or right input stream.
* Tuples from the left stream are collected until the stream is exhausted.
Expand All @@ -30,12 +36,5 @@ class CartesianProductOpExec extends OperatorExecutor {
} else {
leftTuples.map(leftTuple => JoinUtils.joinTuples(leftTuple, tuple)).iterator
}

}

override def open(): Unit = {
leftTuples = ArrayBuffer[Tuple]()
}

override def close(): Unit = leftTuples.clear()
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import java.io.StringReader
import scala.collection.mutable
import scala.collection.mutable.ListBuffer

class DictionaryMatcherOpExec(
descString: String
) extends MapOpExec {
class DictionaryMatcherOpExec(descString: String) extends MapOpExec {
private val desc: DictionaryMatcherOpDesc =
objectMapper.readValue(descString, classOf[DictionaryMatcherOpDesc])
// this is needed for the matching types Phrase and Conjunction
Expand All @@ -25,7 +23,7 @@ class DictionaryMatcherOpExec(
/** An unmodifiable set containing some common URL words that are not usually useful
* for searching.
*/
final val URL_STOP_WORDS_SET = List[String](
private final val URL_STOP_WORDS_SET = List[String](
"http",
"https",
"org",
Expand All @@ -51,7 +49,9 @@ class DictionaryMatcherOpExec(
}

override def close(): Unit = {
tokenizedDictionaryEntries = null
if (tokenizedDictionaryEntries != null) {
tokenizedDictionaryEntries.clear()
}
dictionaryEntries = null
luceneAnalyzer = null
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,20 @@ import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}
import scala.collection.mutable

class DifferenceOpExec extends OperatorExecutor {
private var leftHashSet: mutable.HashSet[Tuple] = _
private var rightHashSet: mutable.HashSet[Tuple] = _
private var exhaustedCounter: Int = _

override def open(): Unit = {
leftHashSet = new mutable.HashSet()
rightHashSet = new mutable.HashSet()
exhaustedCounter = 0
}

private val leftHashSet: mutable.HashSet[Tuple] = new mutable.HashSet()
private val rightHashSet: mutable.HashSet[Tuple] = new mutable.HashSet()
private var exhaustedCounter: Int = 0
override def close(): Unit = {
leftHashSet.clear()
rightHashSet.clear()
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
if (port == 1) { // right input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@ import scala.collection.mutable
* It uses a `LinkedHashSet` to preserve the input order while removing duplicates.
*/
class DistinctOpExec extends OperatorExecutor {
private val seenTuples: mutable.LinkedHashSet[Tuple] = mutable.LinkedHashSet()
private var seenTuples: mutable.LinkedHashSet[Tuple] = _

override def open(): Unit = {
seenTuples = mutable.LinkedHashSet()
}

override def close(): Unit = {
seenTuples.clear()
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
seenTuples.add(tuple)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ class HashJoinBuildOpExec[K](descString: String) extends OperatorExecutor {
objectMapper.readValue(descString, classOf[HashJoinOpDesc[K]])
var buildTableHashMap: mutable.HashMap[K, ListBuffer[Tuple]] = _

override def open(): Unit = {
buildTableHashMap = new mutable.HashMap[K, mutable.ListBuffer[Tuple]]()
}

override def close(): Unit = {
buildTableHashMap.clear()
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {

val key = tuple.getField(desc.buildAttributeName).asInstanceOf[K]
Expand All @@ -24,12 +32,4 @@ class HashJoinBuildOpExec[K](descString: String) extends OperatorExecutor {
case (k, v) => v.map(t => TupleLike(List(k) ++ t.getFields))
}
}

override def open(): Unit = {
buildTableHashMap = new mutable.HashMap[K, mutable.ListBuffer[Tuple]]()
}

override def close(): Unit = {
buildTableHashMap.clear()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ class HashJoinProbeOpExec[K](
objectMapper.readValue(descString, classOf[HashJoinOpDesc[K]])
var buildTableHashMap: mutable.HashMap[K, (ListBuffer[Tuple], Boolean)] = _

override def open(): Unit = {
buildTableHashMap = new mutable.HashMap[K, (mutable.ListBuffer[Tuple], Boolean)]()
}

override def close(): Unit = {
buildTableHashMap.clear()
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] =
if (port == 0) {
// Load build hash map
Expand Down Expand Up @@ -122,13 +130,4 @@ class HashJoinProbeOpExec[K](
.map(attributeName => attributeName -> tuple.getField(attributeName)): _*
)
)

override def open(): Unit = {
buildTableHashMap = new mutable.HashMap[K, (mutable.ListBuffer[Tuple], Boolean)]()
}

override def close(): Unit = {
buildTableHashMap.clear()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,20 @@ import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}
import scala.collection.mutable

class IntersectOpExec extends OperatorExecutor {
private val leftSet = new mutable.HashSet[Tuple]()
private val rightSet = new mutable.HashSet[Tuple]()
private var leftSet: mutable.HashSet[Tuple] = _
private var rightSet: mutable.HashSet[Tuple] = _
private var exhaustedCounter: Int = _

override def open(): Unit = {
leftSet = new mutable.HashSet[Tuple]()
rightSet = new mutable.HashSet[Tuple]()
exhaustedCounter = 0
}

private var exhaustedCounter: Int = 0
override def close(): Unit = {
leftSet.clear()
rightSet.clear()
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,30 @@ import edu.uci.ics.amber.core.executor.OperatorExecutor
import edu.uci.ics.amber.core.tuple.{AttributeType, Tuple, TupleLike}
import edu.uci.ics.amber.operator.hashJoin.JoinUtils
import edu.uci.ics.amber.util.JSONUtils.objectMapper

import java.sql.Timestamp
import scala.collection.mutable.ListBuffer

/** This Operator have two assumptions:
* 1. The tuples in both inputs come in ascending order
* 2. The left input join key takes as points, join condition is: left key in the range of (right key, right key + constant)
*/
class IntervalJoinOpExec(
descString: String
) extends OperatorExecutor {
class IntervalJoinOpExec(descString: String) extends OperatorExecutor {
private val desc: IntervalJoinOpDesc =
objectMapper.readValue(descString, classOf[IntervalJoinOpDesc])
var leftTable: ListBuffer[Tuple] = new ListBuffer[Tuple]()
var rightTable: ListBuffer[Tuple] = new ListBuffer[Tuple]()
private var leftTable: ListBuffer[Tuple] = _
private var rightTable: ListBuffer[Tuple] = _

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
override def open(): Unit = {
leftTable = new ListBuffer[Tuple]()
rightTable = new ListBuffer[Tuple]()
}

override def close(): Unit = {
leftTable.clear()
rightTable.clear()
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
if (port == 0) {
leftTable += tuple
if (rightTable.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ import edu.uci.ics.amber.util.JSONUtils.objectMapper

class LimitOpExec(descString: String) extends OperatorExecutor {
private val desc: LimitOpDesc = objectMapper.readValue(descString, classOf[LimitOpDesc])
var count = 0
var count: Int = _

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
override def open(): Unit = {
count = 0
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
if (count < desc.limit) {
count += 1
Iterator(tuple)
} else {
Iterator()
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,19 @@ class ReservoirSamplingOpExec(descString: String, idx: Int, workerCount: Int)
private val desc: ReservoirSamplingOpDesc =
objectMapper.readValue(descString, classOf[ReservoirSamplingOpDesc])
private val count: Int = equallyPartitionGoal(desc.k, workerCount)(idx)
private var n: Int = 0
private val reservoir: Array[Tuple] = Array.ofDim(count)
private var n: Int = _
private var reservoir: Array[Tuple] = _
private val rand: Random = new Random(workerCount)

override def open(): Unit = {
n = 0
reservoir = Array.ofDim(count)
}

override def close(): Unit = {
reservoir = null
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {

if (n < count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@ import edu.uci.ics.amber.util.JSONUtils.objectMapper

import scala.collection.mutable.ArrayBuffer

class SortPartitionsOpExec(
descString: String
) extends OperatorExecutor {
class SortPartitionsOpExec(descString: String) extends OperatorExecutor {
private val desc: SortPartitionsOpDesc =
objectMapper.readValue(descString, classOf[SortPartitionsOpDesc])
private var unorderedTuples: ArrayBuffer[Tuple] = _

override def open(): Unit = {
unorderedTuples = new ArrayBuffer[Tuple]()
}

override def close(): Unit = {
unorderedTuples.clear()
}

private def sortTuples(): Iterator[TupleLike] = unorderedTuples.sortWith(compareTuples).iterator

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
Expand All @@ -36,13 +42,4 @@ class SortPartitionsOpExec(
true // unsupported type
}
}

override def open(): Unit = {
unorderedTuples = new ArrayBuffer[Tuple]()
}

override def close(): Unit = {
unorderedTuples.clear()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,15 @@ class SplitOpExec(
descString: String
) extends OperatorExecutor {
val desc: SplitOpDesc = objectMapper.readValue(descString, classOf[SplitOpDesc])
lazy val random: Random = if (desc.random) new Random() else new Random(desc.seed)
var random: Random = _

override def open(): Unit = {
random = if (desc.random) new Random() else new Random(desc.seed)
}

override def close(): Unit = {
random = null
}

override def processTupleMultiPort(
tuple: Tuple,
Expand All @@ -24,5 +32,4 @@ class SplitOpExec(
}

override def processTuple(tuple: Tuple, port: Int): Iterator[Tuple] = ???

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,23 @@ package edu.uci.ics.amber.operator.symmetricDifference

import edu.uci.ics.amber.core.executor.OperatorExecutor
import edu.uci.ics.amber.core.tuple.{Tuple, TupleLike}

import scala.collection.mutable

class SymmetricDifferenceOpExec extends OperatorExecutor {
private val leftSet = new mutable.HashSet[Tuple]()
private val rightSet = new mutable.HashSet[Tuple]()
private var leftSet: mutable.HashSet[Tuple] = _
private var rightSet: mutable.HashSet[Tuple] = _
private var exhaustedCounter: Int = _

override def open(): Unit = {
leftSet = new mutable.HashSet[Tuple]()
rightSet = new mutable.HashSet[Tuple]()
exhaustedCounter = 0
}

private var exhaustedCounter: Int = 0
override def close(): Unit = {
leftSet.clear()
rightSet.clear()
}

override def processTuple(tuple: Tuple, port: Int): Iterator[TupleLike] = {
// add the tuple to corresponding set
Expand Down
Loading