Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Static types for process inputs/outputs #4553

Draft
wants to merge 38 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e974900
Refactor ast xform classes
bentsherman Nov 19, 2023
4612de3
Move process and workflow DSLs into separate classes
bentsherman Nov 20, 2023
5ad9813
Add ProcessFn annotation
bentsherman Nov 30, 2023
01ef1db
Rename ProcessDsl -> ProcessBuilder, add separate builder for process…
bentsherman Nov 30, 2023
c465000
Add WorkflowFn annotation
bentsherman Nov 30, 2023
8f2c090
Add support for native processes, use reflection to invoke workflows
bentsherman Dec 1, 2023
48fdfc2
Separate process input channel logic from task processor
bentsherman Dec 1, 2023
041e10a
Remove params from WorkflowFn
bentsherman Dec 1, 2023
a52a829
Simplify ProcessFn param names
bentsherman Dec 2, 2023
570892c
Separate `InParam`s from task config
bentsherman Dec 2, 2023
cf0e4b2
Fix process input channel logic
bentsherman Dec 2, 2023
0c490e8
Fix bugs
bentsherman Dec 6, 2023
8733ba6
Refactor process inputs and outputs
bentsherman Dec 8, 2023
d2268b2
Refactor process inputs/outputs DSL
bentsherman Dec 9, 2023
bca231b
Move ProcessBuilder#applyConfig() into subclass
bentsherman Dec 9, 2023
872a3e2
Add CombineManyOp to combine process input channels
bentsherman Dec 9, 2023
72b54f6
Save variable refs in ProcessFn
bentsherman Dec 9, 2023
dfd5aea
Fix bugs
bentsherman Dec 9, 2023
1e77a22
Fix task hash (resume still not working)
bentsherman Dec 10, 2023
c00ee3f
Update tests
bentsherman Dec 13, 2023
f7b3fa8
Move annotation API to separate branch
bentsherman Dec 13, 2023
c300f00
Minor edits
bentsherman Dec 13, 2023
ce2de32
Minor edits
bentsherman Dec 13, 2023
47a85be
Fix storeDir warning and task context caching
bentsherman Dec 17, 2023
cc2c08e
Merge upstream changes
bentsherman Dec 17, 2023
8b5fbb6
Merge branch 'master' into ben-programmatic-api
bentsherman Dec 17, 2023
48423e4
Fix failing integration tests
bentsherman Dec 18, 2023
36510f4
Fix failing integration tests, minor changes
bentsherman Dec 18, 2023
353493e
Update tests
bentsherman Dec 18, 2023
177120c
Add comments
bentsherman Dec 18, 2023
8441989
Fix stdout evaluation
bentsherman Dec 18, 2023
700ea34
Merge branch 'master' into ben-programmatic-api
bentsherman Mar 28, 2024
1f6705b
Move LazyHelper to script package, update copyright
bentsherman Mar 28, 2024
ecdaaa4
cleanup
bentsherman Mar 28, 2024
4509b28
Infer staging of file inputs from input types
bentsherman Mar 29, 2024
8efcfc0
Update docs
bentsherman Mar 29, 2024
8a3a827
Fix error with legacy syntax
bentsherman Mar 29, 2024
1cd6fce
Rename CombineManyOp -> MergeWithEachOp
bentsherman Mar 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add comments
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
bentsherman committed Dec 18, 2023
commit 177120cb2c3a013547f91c34dda6665e7841833e
Original file line number Diff line number Diff line change
@@ -26,8 +26,8 @@ import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel
/**
* Operator for combining many source channels into a single channel,
* with the option to only merge channels that are not marked as "iterators".
* Operator for merging many source channels into a single channel,
* with the option to combine channels that are marked as "iterators".
*
* @see ProcessDef#collectInputs(Object[])
*
@@ -41,12 +41,26 @@ class CombineManyOp {

private List<Integer> iterators

/**
* List of queues to receive values from source channels.
*/
private List<List> queues = []

/**
* Mask of source channels that are singletons.
*/
private List<Boolean> singletons

/**
* True when all source channels are singletons and therefore
* the operator should emit a singleton channel.
*/
private boolean emitSingleton

/**
* True when all source channels are iterators and therefore
* the operator should simply emit the combinations.
*/
private boolean emitCombination

private transient List<List> combinations
Original file line number Diff line number Diff line change
@@ -28,15 +28,28 @@ import nextflow.util.LazyHelper
@CompileStatic
class ProcessFileInput implements PathArityAware {

/**
* Lazy expression (e.g. lazy var, closure, GString) which
* defines which files to stage in terms of the task inputs.
* It is evaluated for each task against the task context.
*/
private Object value

/**
* Optional name which, if specified, will be added to the task
* context as an escape-aware list of paths.
*/
private String name

/**
* Flag to support legacy `file` input.
*/
private boolean pathQualifier

/**
* File pattern which defines how the input files should be named
* when they are staged into a task directory.
*/
private Object filePattern

ProcessFileInput(Object value, String name, boolean pathQualifier, Map<String,?> opts) {
Original file line number Diff line number Diff line change
@@ -35,6 +35,11 @@ import nextflow.util.LazyHelper
@CompileStatic
class ProcessFileOutput implements PathArityAware {

/**
* Lazy expression (e.g. lazy var, closure, GString) which
* defines which files to unstage from the task directory.
* It will be evaluated for each task against the task directory.
*/
private Object target

/**
Original file line number Diff line number Diff line change
@@ -32,8 +32,16 @@ import nextflow.extension.ToListOp
@CompileStatic
class ProcessInput implements Cloneable {

/**
* Parameter name under which the input value for each task
* will be added to the task context.
*/
private String name

/**
* Input channel which is created when the process is invoked
* in a workflow.
*/
private DataflowReadChannel channel

/**
Original file line number Diff line number Diff line change
@@ -28,12 +28,31 @@ class ProcessInputs implements List<ProcessInput>, Cloneable {
@Delegate
private List<ProcessInput> params = []

/**
* Input variables which will be evaluated for each task
* in terms of the task inputs and added to the task context.
*/
private Map<String,?> vars = [:]

/**
* Environment variables which will be evaluated for each
* task against the task context and added to the task
* environment.
*/
private Map<String,?> env = [:]

/**
* Input files which will be evaluated for each task
* against the task context and staged into the task
* directory.
*/
private List<ProcessFileInput> files = []

/**
* Lazy expression which will be evaluated for each task
* against the task context and provided as the standard
* input to the task.
*/
Object stdin

@Override
Original file line number Diff line number Diff line change
@@ -32,16 +32,41 @@ import nextflow.util.LazyHelper
@CompileStatic
class ProcessOutput implements Cloneable {

/**
* List of declared outputs of the parent process.
*/
private ProcessOutputs declaredOutputs

/**
* Lazy expression (e.g. lazy var, closure, GString) which
* defines the output value in terms of the task context,
* including environment variables, files, and standard output.
* It will be evaluated for each task after it is executed.
*/
private Object target

/**
* Optional parameter name under which the output channel
* is made available in the process outputs (i.e. `.out`).
*/
private String name

/**
* Optional channel topic which this output channel will
* be sent to.
*/
private String topic

/**
* When true, a task will not fail if any environment
* vars or files for this output are missing.
*/
private boolean optional

/**
* Output channel which is created when the process is invoked
* in a workflow.
*/
private DataflowWriteChannel channel

ProcessOutput(ProcessOutputs declaredOutputs, Object target, Map<String,?> opts) {
Original file line number Diff line number Diff line change
@@ -30,8 +30,18 @@ class ProcessOutputs implements List<ProcessOutput>, Cloneable {
@Delegate
private List<ProcessOutput> params = []

/**
* Environment variables which will be exported from the
* task environment for each task and made available to
* process outputs.
*/
private Map<String,String> env = [:]

/**
* Output files which will be unstaged from the task
* directory for each task and made available to process
* outputs.
*/
private Map<String,ProcessFileOutput> files = [:]

@Override
12 changes: 6 additions & 6 deletions modules/nextflow/src/main/groovy/nextflow/util/LazyHelper.groovy
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
/**
* Helper methods for lazy binding and resolution.
* Helper methods for lazy evaluation.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
* @author Ben Sherman <bentshermann@gmail.com>
@@ -29,7 +29,7 @@ import groovy.transform.ToString
class LazyHelper {

/**
* Resolve a lazy value against a given binding.
* Evaluate a lazy expression against a given binding.
*
* @param binding
* @param value
@@ -50,14 +50,14 @@ class LazyHelper {
}

/**
* Interface for types that can be lazily resolved
* Interface for types that can be lazily evaluated
*/
interface LazyAware {
Object resolve(Object binding)
}

/**
* A list that can be lazily resolved
* A list that can be lazily evaluated
*/
@CompileStatic
class LazyList implements LazyAware, List {
@@ -88,7 +88,7 @@ class LazyList implements LazyAware, List {
}

/**
* A map whose values can be lazily resolved
* A map whose values can be lazily evaluated
*/
@CompileStatic
class LazyMap implements Map<String,Object> {
@@ -263,7 +263,7 @@ class LazyMap implements Map<String,Object> {
}

/**
* A variable that can be lazily resolved
* A variable that can be lazily evaluated
*/
@CompileStatic
@EqualsAndHashCode