Skip to content

Commit

Permalink
feat: Improve :tool:execution:parallel. (#2080)
Browse files Browse the repository at this point in the history
* Integrate :tool:execution:parallel with :tool:log.
* Add function for creating Parallel.Type object dynamically.
* Improve error message for Parallel.Context.lazyProperty.
* Move implementations to internal package.
* Add function for checking returned state.
* Add selector for context property
  • Loading branch information
jan-goral authored Jul 13, 2021
1 parent 76d806e commit 1ca2f77
Show file tree
Hide file tree
Showing 19 changed files with 145 additions and 70 deletions.
1 change: 1 addition & 0 deletions tool/execution/parallel/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ repositories {
tasks.withType<KotlinCompile> { kotlinOptions.jvmTarget = "1.8" }

dependencies {
api(project(":tool:log"))
implementation(Dependencies.KOTLIN_COROUTINES_CORE)
testImplementation(Dependencies.JUNIT)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package flank.exection.parallel

import flank.exection.parallel.internal.DynamicType
import flank.exection.parallel.internal.EagerProperties

// ======================= Signature =======================
Expand Down Expand Up @@ -34,9 +35,21 @@ infix fun <R : Any> Parallel.Type<R>.using(

/**
* Factory function for creating special task that can validate arguments before execution.
* Typically the [Parallel.Context] with added [EagerProperties] is used to validate initial state.
* The [Parallel.Context] with added [EagerProperties] can validate if state contains required initial values.
*/
internal fun <C : Parallel.Context> validator(
context: (() -> C)
): Parallel.Task<Unit> =
context() using { context().also { it.state = this }.run { validate() } }

// ======================= Type =======================

/**
* Factory function for creating dynamic [Parallel.Type].
*/
inline fun <reified T : Any> type(): Parallel.Type<T> = type(T::class.java)

/**
* Factory function for creating dynamic [Parallel.Type].
*/
fun <T : Any> type(type: Class<T>): Parallel.Type<T> = DynamicType(type)
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flank.exection.parallel

import flank.exection.parallel.internal.Execution
import flank.exection.parallel.internal.invoke
import flank.exection.parallel.internal.minusContextValidators
import kotlinx.coroutines.flow.Flow

/**
Expand All @@ -12,7 +13,7 @@ import kotlinx.coroutines.flow.Flow
infix operator fun Tasks.invoke(
args: ParallelState
): Flow<ParallelState> =
Execution(this, args).invoke()
Execution(minusContextValidators(), args).invoke()

// ======================= Extensions =======================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package flank.exection.parallel
import flank.exection.parallel.internal.ContextProvider
import flank.exection.parallel.internal.EagerProperties
import flank.exection.parallel.internal.lazyProperty
import java.lang.System.currentTimeMillis
import flank.log.Output

// ======================= Types =======================

Expand All @@ -13,7 +13,7 @@ import java.lang.System.currentTimeMillis
object Parallel {

/**
* Abstraction for execution data provider which is also an context for task execution.
* Abstraction for execution data provider which is also a context for task execution.
* For initialization purpose some properties are exposed as variable.
*/
open class Context : Type<Unit> {
Expand Down Expand Up @@ -46,7 +46,12 @@ object Parallel {
protected operator fun <T : Any> Type<T>.unaryMinus() = lazyProperty(this)

/**
* Internal accessor for initializing (validating) eager properties
* DSL for creating lazy delegate accessor to the state value for a given type with additional property selector.
*/
protected operator fun <T : Any, V> Type<T>.invoke(select: T.() -> V) = lazyProperty(this, select)

/**
* Internal accessor for initializing (validating) eager properties.
*/
internal fun validate() = eager()
}
Expand All @@ -67,8 +72,8 @@ object Parallel {
/**
* The task signature.
*
* @param type A return type of a task
* @param args A set of types for arguments
* @param type A return a type of task.
* @param args A set of argument types.
*/
data class Signature<R : Any>(
val type: Type<R>,
Expand All @@ -81,15 +86,6 @@ object Parallel {
*/
class Function<X : Context>(override val context: () -> X) : ContextProvider<X>()

data class Event internal constructor(
val type: Type<*>,
val data: Any,
val timestamp: Long = currentTimeMillis(),
) {
object Start
object Stop
}

object Logger : Type<Output>

/**
Expand All @@ -111,11 +107,6 @@ object Parallel {
*/
typealias ExecuteTask<R> = suspend ParallelState.() -> R

/**
* Common signature for structural log output.
*/
typealias Output = Any.() -> Unit

/**
* Immutable state for parallel execution.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package flank.exection.parallel

import flank.exection.parallel.internal.contextValidators
import flank.exection.parallel.internal.contextValidatorTypes
import flank.exection.parallel.internal.reduceTo
import flank.exection.parallel.internal.type

/**
* Reduce given [Tasks] by [expected] types to remove unneeded tasks from the graph.
* The returned graph will hold only tasks that are returning selected types, their dependencies and derived dependencies.
* Additionally this is keeping also the validators for initial state.
* Additionally, this is keeping also the validators for initial state.
*
* @return Reduced [Tasks]
*/
operator fun Tasks.invoke(
expected: Set<Parallel.Type<*>>
): Tasks =
reduceTo(expected + contextValidators())
reduceTo(expected + contextValidatorTypes())

/**
* Shortcut for tasks reducing.
*/
operator fun Tasks.invoke(
vararg expected: Parallel.Type<*>
): Tasks = invoke(expected.toSet())

/**
* Remove the [Tasks] by given [types].
*/
operator fun Tasks.minus(
types: Set<Parallel.Type<*>>
): Tasks =
filterNot { task -> task.type in types }.toSet()
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package flank.exection.parallel

/**
* Select value by type.
*/
@Suppress("UNCHECKED_CAST")
fun <T : Any> ParallelState.select(type: Parallel.Type<T>) = get(type) as T
Original file line number Diff line number Diff line change
@@ -1,42 +1,12 @@
package flank.exection.parallel

import flank.exection.parallel.internal.args
import flank.exection.parallel.internal.graph.findCycles
import flank.exection.parallel.internal.graph.findDuplicatedDependencies
import flank.exection.parallel.internal.graph.findMissingDependencies
import flank.exection.parallel.internal.type
import kotlinx.coroutines.runBlocking
import flank.exection.parallel.internal.validateExecutionGraphs

/**
* Validate the given [Tasks] and [ParallelState] for finding missing dependencies or broken paths.
*
* @param initial The initial arguments for tasks execution.
* @return Valid [Tasks] if graph has no broken paths or missing dependencies.
*/
fun Tasks.validate(initial: ParallelState = emptyMap()): Tasks = run {
// Separate initial validators from tasks. Validators are important now but not during the execution.
val (validators, tasks) = splitTasks()

// check if initial state is providing all required values specified in context.
runBlocking { validators.forEach { check -> check.execute(initial) } }

map(Parallel.Task<*>::type).findDuplicatedDependencies(initial.keys).run {
if (isNotEmpty()) throw Parallel.DependenciesError.Duplicate(this)
}

val graph = associate { task -> task.type to task.args }

graph.findMissingDependencies(initial.keys).run {
if (isNotEmpty()) throw Parallel.DependenciesError.Missing(this)
}

graph.findCycles().run {
if (isNotEmpty()) throw Parallel.DependenciesError.Cycles(this)
}

tasks.toSet()
}

private fun Iterable<Parallel.Task<*>>.splitTasks() = this
.groupBy { task -> task.type is Parallel.Context }
.run { getOrDefault(true, emptyList()) to getOrDefault(false, emptyList()) }
fun Tasks.validate(initial: ParallelState = emptyMap()): Tasks =
validateExecutionGraphs(initial)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package flank.exection.parallel

import flank.exection.parallel.internal.checkThrowableValues

/**
* Verify that given [ParallelState] has no errors.
*/
fun ParallelState.verify(): ParallelState = checkThrowableValues()
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import flank.exection.parallel.validator
/**
* Abstract factory for creating task function.
*/
abstract class ContextProvider<X : Parallel.Context> {
abstract class ContextProvider<X : Parallel.Context> internal constructor() {
protected abstract val context: () -> X

operator fun <R> invoke(body: suspend X.() -> R): ExecuteTask<R> =
{ context().also { it.state = this }.body() }

val validator by lazy { validator(context) }
val validate by lazy { validator(context) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package flank.exection.parallel.internal

import flank.exection.parallel.Parallel

internal class DynamicType<T : Any>(val type: Class<T>) : Parallel.Type<T> {
override fun equals(other: Any?): Boolean = (other as? DynamicType<*>)?.type == type
override fun hashCode(): Int = type.hashCode() + javaClass.hashCode()
override fun toString(): String = type.canonicalName
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package flank.exection.parallel.internal

import flank.exection.parallel.Output
import flank.exection.parallel.Parallel
import flank.exection.parallel.Parallel.Event
import flank.exection.parallel.Parallel.Logger
import flank.exection.parallel.Parallel.Task
import flank.exection.parallel.Parallel.Type
import flank.exection.parallel.ParallelState
import flank.exection.parallel.Property
import flank.exection.parallel.Tasks
import flank.log.Event
import flank.log.Output
import flank.log.event
import flank.log.normalize
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -221,7 +223,7 @@ private suspend fun Execution.execute(
jobs += type to scope.launch {

// Extend root output for adding additional data.
val out: Output = output?.let { { it(Event(type, this)) } } ?: {}
val out: Output = output?.normalize { type event it } ?: {}

// Log the task was started
Event.Start.out()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ import flank.exection.parallel.Tasks

/**
* Get initial state validators.
*
* This is necessary to perform validations of initial state before the execution.
*/
internal fun Tasks.contextValidators(): List<Parallel.Context> =
internal fun Tasks.contextValidatorTypes(): List<Parallel.Context> =
mapNotNull { task -> task.type as? Parallel.Context }

/**
* Return graph without context validation tasks.
*
* Typically, context validation tasks should be used for testing purposes so running them on production is redundant.
* This function can be used to filter them out.
*/
internal fun Tasks.minusContextValidators(): Tasks =
filterNot { task -> task.type is Parallel.Context }.toSet()
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,20 @@ import flank.exection.parallel.ParallelState
/**
* Factory function for lazy property delegate.
*/
internal fun <T : Any> Parallel.Context.lazyProperty(type: Parallel.Type<T>) = lazy {
internal fun <T : Any, R> Parallel.Context.lazyProperty(
type: Parallel.Type<T>,
select: T.() -> R
): Lazy<R> = lazy {
fun errorMessage() = "Cannot resolve dependency of type: $type. Make sure is specified as argument"
@Suppress("UNCHECKED_CAST")
state[type] as T
(state[type] as? T ?: throw IllegalStateException(errorMessage())).select()
}

internal fun <T : Any> Parallel.Context.lazyProperty(
type: Parallel.Type<T>
): Lazy<T> =
lazyProperty(type) { this }

/**
* Eager properties provider and initializer.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package flank.exection.parallel.internal

import flank.exection.parallel.Parallel
import flank.exection.parallel.ParallelState
import flank.exection.parallel.Tasks
import flank.exection.parallel.internal.graph.findCycles
import flank.exection.parallel.internal.graph.findDuplicatedDependencies
import flank.exection.parallel.internal.graph.findMissingDependencies
import kotlinx.coroutines.runBlocking

// TODO: Check all cases and collect results, instead of throwing the first encountered error.
internal fun Tasks.validateExecutionGraphs(initial: ParallelState = emptyMap()): Tasks = run {
// Separate initial validators from tasks. Validators are important now but not during the execution.
val (validators, tasks) = this
.groupBy { task -> task.type is Parallel.Context }
.run { getOrDefault(true, emptyList()) to getOrDefault(false, emptyList()) }

// check if initial state is providing all required values specified in context.
runBlocking { validators.forEach { check -> check.execute(initial) } }

map(Parallel.Task<*>::type).findDuplicatedDependencies(initial.keys).run {
if (isNotEmpty()) throw Parallel.DependenciesError.Duplicate(this)
}

val graph = associate { task -> task.type to task.args }

graph.findMissingDependencies(initial.keys).run {
if (isNotEmpty()) throw Parallel.DependenciesError.Missing(this)
}

graph.findCycles().run {
if (isNotEmpty()) throw Parallel.DependenciesError.Cycles(this)
}

tasks.toSet()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package flank.exection.parallel.internal

import flank.exection.parallel.ParallelState

internal fun ParallelState.checkThrowableValues(): ParallelState =
onEach { (_, value) -> if (value is Throwable) throw value }
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ fun main() {
)

Example.run {
execute + context.validator
execute + context.validate
}.validate(
// Comment line below to simulate error on context.validator
args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ class ExecuteKtTest {
)
val actual = runBlocking { execute(args).last() }

assert(actual[A] is NullPointerException)
assert(actual[A] is IllegalStateException)
}

/**
Expand Down
Loading

0 comments on commit 1ca2f77

Please sign in to comment.