Skip to content

Commit

Permalink
FIx race condition with workflow events
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
  • Loading branch information
bentsherman committed Jul 11, 2024
1 parent 16f39e0 commit c92b21f
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions plugins/nf-prov/src/main/nextflow/prov/ProvObserver.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package nextflow.prov
import java.nio.file.FileSystems
import java.nio.file.Path
import java.nio.file.PathMatcher
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
Expand Down Expand Up @@ -50,6 +52,8 @@ class ProvObserver implements TraceObserver {

private Map<Path,Path> workflowOutputs = [:]

private Lock lock = new ReentrantLock()

ProvObserver(Map<String,Map> formats, List<String> patterns) {
this.renderers = formats.collect( (name, config) -> createRenderer(name, config) )
this.matchers = patterns.collect( pattern ->
Expand Down Expand Up @@ -82,34 +86,36 @@ class ProvObserver implements TraceObserver {
if( !task.isSuccess() )
return

tasks << task
lock.withLock {
tasks << task
}
}

@Override
void onProcessCached(TaskHandler handler, TraceRecord trace) {
tasks << handler.task
lock.withLock {
tasks << handler.task
}
}

@Override
void onFilePublish(Path destination, Path source) {
boolean match = matchers.isEmpty() || matchers.any { matcher ->
matcher.matches(destination)
}

final match = matchers.isEmpty() || matchers.any { matcher -> matcher.matches(destination) }
if( !match )
return

workflowOutputs[source] = destination
lock.withLock {
workflowOutputs[source] = destination
}
}

@Override
void onFlowComplete() {
if( !session.isSuccess() )
return

renderers.each( renderer ->
for( final renderer : renderers )
renderer.render(session, tasks, workflowOutputs)
)
}

}

0 comments on commit c92b21f

Please sign in to comment.