Skip to content

Commit

Permalink
remove calls to java stream()
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Apr 24, 2024
1 parent f4993cd commit 741c4e0
Show file tree
Hide file tree
Showing 92 changed files with 561 additions and 1,307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,21 @@ class AirbyteExceptionHandler : Thread.UncaughtExceptionHandler {
// message.
// This assumes that any wrapping exceptions are just noise (e.g. runtime exception).
val deinterpolatableException =
ExceptionUtils.getThrowableList(throwable)
.stream()
.filter { t: Throwable ->
THROWABLES_TO_DEINTERPOLATE.stream().anyMatch {
deinterpolatableClass: Class<out Throwable> ->
deinterpolatableClass.isAssignableFrom(t.javaClass)
}
ExceptionUtils.getThrowableList(throwable).firstOrNull { t: Throwable ->
THROWABLES_TO_DEINTERPOLATE.any { deinterpolatableClass: Class<out Throwable> ->
deinterpolatableClass.isAssignableFrom(t.javaClass)
}
.findFirst()
}
val messageWasMangled: Boolean
if (deinterpolatableException.isPresent) {
val originalMessage = deinterpolatableException.get().message
if (deinterpolatableException != null) {
val originalMessage = deinterpolatableException.message
mangledMessage =
STRINGS_TO_DEINTERPOLATE
.stream() // Sort the strings longest to shortest, in case any target string is
// Sort the strings longest to shortest, in case any target string is
// a substring of another
// e.g. "airbyte_internal" should be swapped out before "airbyte"
.sorted(Comparator.comparing { obj: String -> obj.length }.reversed())
.reduce(originalMessage) { message: String?, targetString: String? ->
.sortedWith(Comparator.comparing { obj: String -> obj.length }.reversed())
.fold(originalMessage) { message: String?, targetString: String? ->
deinterpolate(message, targetString)
}
messageWasMangled = mangledMessage != originalMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import java.nio.file.Path
import java.util.*
import java.util.concurrent.*
import java.util.function.Consumer
import java.util.function.Predicate
import org.apache.commons.lang3.ThreadUtils
import org.apache.commons.lang3.concurrent.BasicThreadFactory
import org.slf4j.Logger
Expand Down Expand Up @@ -280,7 +279,7 @@ internal constructor(
* stream consumer.
*/
val partitionSize = streamConsumer.parallelism
val partitions = Lists.partition(streams.stream().toList(), partitionSize)
val partitions = Lists.partition(streams.toList(), partitionSize)

// Submit each stream partition for concurrent execution
partitions.forEach(
Expand Down Expand Up @@ -352,11 +351,10 @@ internal constructor(
* active so long as the database connection pool is open.
*/
@VisibleForTesting
val ORPHANED_THREAD_FILTER: Predicate<Thread> = Predicate { runningThread: Thread ->
fun filterOrphanedThread(runningThread: Thread): Boolean =
(runningThread.name != Thread.currentThread().name &&
!runningThread.isDaemon &&
TYPE_AND_DEDUPE_THREAD_NAME != runningThread.name)
}

const val INTERRUPT_THREAD_DELAY_MINUTES: Int = 1
const val EXIT_THREAD_DELAY_MINUTES: Int = 2
Expand Down Expand Up @@ -424,8 +422,7 @@ internal constructor(
) {
val currentThread = Thread.currentThread()

val runningThreads =
ThreadUtils.getAllThreads().stream().filter(ORPHANED_THREAD_FILTER).toList()
val runningThreads = ThreadUtils.getAllThreads().filter(::filterOrphanedThread).toList()
if (runningThreads.isNotEmpty()) {
LOGGER.warn(
"""
Expand Down Expand Up @@ -461,7 +458,7 @@ internal constructor(
scheduledExecutorService.schedule(
{
if (
ThreadUtils.getAllThreads().stream().anyMatch { runningThread: Thread ->
ThreadUtils.getAllThreads().any { runningThread: Thread ->
!runningThread.isDaemon && runningThread.name != currentThread.name
}
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ open class StandardNameTransformer : NamingConventionTransformer {
} else if (root.isArray) {
return Jsons.jsonNode(
MoreIterators.toList(root.elements())
.stream()
.map { r: JsonNode -> formatJsonPath(r) }
.toList()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicLong
import java.util.function.Consumer
import java.util.stream.Collectors
import kotlin.jvm.optionals.getOrNull

private val logger = KotlinLogging.logger {}
Expand Down Expand Up @@ -155,18 +154,11 @@ class AsyncStreamConsumer(
bufferManager.close()

val streamSyncSummaries =
streamNames
.stream()
.collect(
Collectors.toMap(
{ streamDescriptor: StreamDescriptor -> streamDescriptor },
{ streamDescriptor: StreamDescriptor ->
StreamSyncSummary(
Optional.of(getRecordCounter(streamDescriptor).get()),
)
},
),
streamNames.associateWith { streamDescriptor: StreamDescriptor ->
StreamSyncSummary(
Optional.of(getRecordCounter(streamDescriptor).get()),
)
}
onClose.accept(hasFailed, streamSyncSummaries)

// as this throws an exception, we need to be after all other close functions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.stream.Collectors
import kotlin.math.min

private val logger = KotlinLogging.logger {}
Expand Down Expand Up @@ -194,12 +193,10 @@ internal constructor(
)
val workersWithBatchesSize =
runningWorkerBatchesSizes
.stream()
.filter { obj: Optional<Long> -> obj.isPresent }
.mapToLong { obj: Optional<Long> -> obj.get() }
.sum()
.sumOf { obj: Optional<Long> -> obj.get() }
val workersWithoutBatchesCount =
runningWorkerBatchesSizes.stream().filter { obj: Optional<Long> -> obj.isEmpty }.count()
runningWorkerBatchesSizes.count { obj: Optional<Long> -> obj.isEmpty }
val workersWithoutBatchesSizeEstimate =
(min(
flusher.optimalBatchSizeBytes.toDouble(),
Expand Down Expand Up @@ -232,36 +229,20 @@ internal constructor(
fun orderStreamsByPriority(streams: Set<StreamDescriptor>): List<StreamDescriptor> {
// eagerly pull attributes so that values are consistent throughout comparison
val sdToQueueSize =
streams
.stream()
.collect(
Collectors.toMap(
{ s: StreamDescriptor -> s },
{ streamDescriptor: StreamDescriptor ->
bufferDequeue.getQueueSizeBytes(
streamDescriptor,
)
},
),
streams.associateWith { streamDescriptor: StreamDescriptor ->
bufferDequeue.getQueueSizeBytes(
streamDescriptor,
)
}

val sdToTimeOfLastRecord =
streams
.stream()
.collect(
Collectors.toMap(
{ s: StreamDescriptor -> s },
{ streamDescriptor: StreamDescriptor ->
bufferDequeue.getTimeOfLastRecord(
streamDescriptor,
)
},
),
streams.associateWith { streamDescriptor: StreamDescriptor ->
bufferDequeue.getTimeOfLastRecord(
streamDescriptor,
)

}
return streams
.stream()
.sorted(
.sortedWith(
Comparator.comparing(
{ s: StreamDescriptor -> sdToQueueSize[s]!!.orElseThrow() },
Comparator.reverseOrder(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Consumer
import java.util.stream.Collectors

private val logger = KotlinLogging.logger {}

Expand Down Expand Up @@ -153,14 +152,10 @@ constructor(
)
val stateIdToCount =
batch.data
.stream()
.map(StreamAwareQueue.MessageWithMeta::stateId)
.collect(
Collectors.groupingBy(
{ stateId: Long -> stateId },
Collectors.counting(),
),
)
.groupingBy { it }
.eachCount()
.mapValues { it.value.toLong() }
logger.info {
"Flush Worker (${humanReadableFlushWorkerId(
flushWorkerId,
Expand All @@ -169,14 +164,7 @@ constructor(
)} bytes."
}

flusher.flush(
desc,
batch.data
.stream()
.map(
StreamAwareQueue.MessageWithMeta::message,
),
)
flusher.flush(desc, batch.data.map { it.message }.stream())
batch.flushStates(stateIdToCount, outputRecordCollector)
}
logger.info {
Expand Down Expand Up @@ -207,22 +195,12 @@ constructor(
// wait for all buffers to be flushed.
while (true) {
val streamDescriptorToRemainingRecords =
bufferDequeue.bufferedStreams
.stream()
.collect(
Collectors.toMap(
{ desc: StreamDescriptor -> desc },
{ desc: StreamDescriptor ->
bufferDequeue.getQueueSizeInRecords(desc).orElseThrow()
},
),
)

val anyRecordsLeft =
streamDescriptorToRemainingRecords.values.stream().anyMatch { size: Long ->
size > 0
bufferDequeue.bufferedStreams.associateWith { desc: StreamDescriptor ->
bufferDequeue.getQueueSizeInRecords(desc).orElseThrow()
}

val anyRecordsLeft = streamDescriptorToRemainingRecords.values.any { it > 0 }

if (!anyRecordsLeft) {
break
}
Expand All @@ -234,7 +212,6 @@ constructor(
)
.append(System.lineSeparator())
streamDescriptorToRemainingRecords.entries
.stream()
.filter { entry: Map.Entry<StreamDescriptor, Long> -> entry.value > 0 }
.forEach { entry: Map.Entry<StreamDescriptor, Long> ->
workerInfo.append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,7 @@ class BufferDequeue(
get() = memoryManager.maxMemoryBytes

val totalGlobalQueueSizeBytes: Long
get() =
buffers.values
.stream()
.map { obj: StreamAwareQueue -> obj.currentMemoryUsage }
.mapToLong { obj: Long -> obj }
.sum()
get() = buffers.values.sumOf { obj: StreamAwareQueue -> obj.currentMemoryUsage }

fun getQueueSizeInRecords(streamDescriptor: StreamDescriptor): Optional<Long> {
return getBuffer(streamDescriptor).map { buf: StreamAwareQueue -> buf.size().toLong() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ class MemoryAwareMessageBatch(
* can be flushed back to platform via stateManager.
*/
fun flushStates(
stateIdToCount: Map<Long?, Long?>,
stateIdToCount: Map<Long, Long>,
outputRecordCollector: Consumer<AirbyteMessage>,
) {
stateIdToCount.forEach { (stateId: Long?, count: Long?) ->
stateIdToCount.forEach { (stateId: Long, count: Long) ->
stateManager.decrement(
stateId!!,
count!!,
stateId,
count,
)
}
stateManager.flushStates(outputRecordCollector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
synchronized(lock) {
aliasIds.addAll(
descToStateIdQ.values
.stream()
.flatMap { obj: LinkedBlockingDeque<Long> -> obj.stream() }
.flatMap { obj: LinkedBlockingDeque<Long> -> obj }
.toList(),
)
descToStateIdQ.clear()
Expand All @@ -292,19 +291,12 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) {
descToStateIdQ[SENTINEL_GLOBAL_DESC] = LinkedBlockingDeque()
descToStateIdQ[SENTINEL_GLOBAL_DESC]!!.add(retroactiveGlobalStateId)

val combinedCounter: Long =
stateIdToCounter.values
.stream()
.mapToLong { obj: AtomicLong -> obj.get() }
.sum()
val combinedCounter: Long = stateIdToCounter.values.sumOf { it.get() }
stateIdToCounter.clear()
stateIdToCounter[retroactiveGlobalStateId] = AtomicLong(combinedCounter)

val statsCounter: Long =
stateIdToCounterForPopulatingDestinationStats.values
.stream()
.mapToLong { obj: AtomicLong -> obj.get() }
.sum()
stateIdToCounterForPopulatingDestinationStats.values.sumOf { it.get() }
stateIdToCounterForPopulatingDestinationStats.clear()
stateIdToCounterForPopulatingDestinationStats.put(
retroactiveGlobalStateId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import io.airbyte.protocol.models.v0.AirbyteStateMessage
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
import io.airbyte.protocol.models.v0.StreamDescriptor
import java.util.*
import java.util.function.Supplier
import java.util.stream.Collectors

/**
* This [DestStateLifecycleManager] handles any state where the state messages are scoped by stream.
Expand Down Expand Up @@ -125,29 +123,25 @@ class DestStreamStateLifecycleManager(private val defaultNamespace: String?) :
private fun listStatesInOrder(
streamToState: Map<StreamDescriptor, AirbyteMessage>
): Queue<AirbyteMessage> {
return streamToState.entries
.stream() // typically, we support by namespace and then stream name, so we retain
// that pattern here.
.sorted(
Comparator.comparing<Map.Entry<StreamDescriptor, AirbyteMessage>, String>(
{ entry: Map.Entry<StreamDescriptor, AirbyteMessage> ->
entry.key.namespace
},
Comparator.nullsFirst<String>(Comparator.naturalOrder<String>())
) // namespace is allowed to be null
.thenComparing<String> { entry: Map.Entry<StreamDescriptor, AirbyteMessage>
->
entry.key.name
}
)
.map<AirbyteMessage> { obj: Map.Entry<StreamDescriptor, AirbyteMessage> ->
obj.value
}
.collect(
Collectors.toCollection<AirbyteMessage, LinkedList<AirbyteMessage>>(
Supplier<LinkedList<AirbyteMessage>> { LinkedList() }
return LinkedList(
streamToState.entries

// typically, we support by namespace and then stream name, so we retain
// that pattern here.
.sortedWith(
Comparator.comparing<Map.Entry<StreamDescriptor, AirbyteMessage>, String>(
{ entry: Map.Entry<StreamDescriptor, AirbyteMessage> ->
entry.key.namespace
},
Comparator.nullsFirst<String>(Comparator.naturalOrder<String>())
) // namespace is allowed to be null
.thenComparing<String> {
entry: Map.Entry<StreamDescriptor, AirbyteMessage> ->
entry.key.name
}
)
)
.map { obj: Map.Entry<StreamDescriptor, AirbyteMessage> -> obj.value }
)
}

/**
Expand Down
Loading

0 comments on commit 741c4e0

Please sign in to comment.