Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nextflow 24.04.4 never exit due to incomplete file transfer #5363

Open
divinomas-gh opened this issue Oct 4, 2024 · 25 comments
Open

nextflow 24.04.4 never exit due to incomplete file transfer #5363

divinomas-gh opened this issue Oct 4, 2024 · 25 comments

Comments

@divinomas-gh
Copy link

Bug report

Expected behavior and actual behavior

I have used nextflow 22.10.6.5843, which runs smoothly. After I updated my nextflow to v24.0.4.4, the same script hangs with some files not finished for transferring. The files to be transferred are totally around 50Gb.

Steps to reproduce the problem

Program output

Oct-03 12:42:12.157 [main] DEBUG nextflow.Session - Session await > all processes finished
Oct-03 12:42:17.082 [Task monitor] DEBUG n.processor.TaskPollingMonitor - <<< barrier arrives (monitor: slurm) - terminating tasks monitor poll loop
Oct-03 12:42:17.082 [main] DEBUG nextflow.Session - Session await > all barriers passed
Oct-03 12:42:17.093 [main] DEBUG nextflow.util.ThreadPoolManager - Thread pool 'TaskFinalizer' shutdown completed (hard=false)
Oct-03 12:42:22.095 [main] INFO nextflow.util.ThreadPoolHelper - Waiting for file transfers to complete (7 files)
Oct-03 12:43:22.102 [main] DEBUG nextflow.util.ThreadPoolHelper - Waiting for file transfers to complete (7 files)
Oct-03 12:44:22.104 [main] DEBUG nextflow.util.ThreadPoolHelper - Waiting for file transfers to complete (6 files)
Oct-03 12:45:22.106 [main] DEBUG nextflow.util.ThreadPoolHelper - Waiting for file transfers to complete (6 files)
Oct-03 12:46:22.108 [main] DEBUG nextflow.util.ThreadPoolHelper - Waiting for file transfers to complete (6 files)
Oct-03 12:47:22.110 [main] DEBUG nextflow.util.ThreadPoolHelper - Waiting for file transfers to complete (6 files)
Oct-03 12:48:22.112 [main] DEBUG nextflow.util.ThreadPoolHelper - Waiting for file transfers to complete (4 files)
Oct-03 12:49:22.114 [main] DEBUG nextflow.util.ThreadPoolHelper - Waiting for file transfers to complete (3 files)
Oct-03 12:50:22.116 [main] DEBUG nextflow.util.ThreadPoolHelper - Waiting for file transfers to complete (3 files)
.......
.......
.......
Oct-04 00:41:23.430 [main] DEBUG nextflow.util.ThreadPoolHelper - Waiting for file transfers to complete (3 files)
Oct-04 00:42:18.432 [main] WARN nextflow.util.ThreadPoolHelper - Exiting before file transfers were completed -- Some files may be lost
Oct-04 00:42:18.432 [main] DEBUG nextflow.util.ThreadPoolManager - Thread pool 'PublishDir' shutdown completed (hard=false)
Oct-04 00:42:18.463 [main] DEBUG n.trace.WorkflowStatsObserver - Workflow completed > WorkflowStats[succeededCount=32; failedCount=0; ignoredCount=0; cachedCount=0; pendingCount=0; submittedCount=0; runningCount=0; retriesCount=0; abortedCount=0; succeedDuration=226d 11h 31m 9s; failedDuration=0ms; cachedDuration=0ms;loadCpus=0; loadMemory=0; peakRunning=5; peakCpus=125; peakMemory=0; ]
Oct-04 00:42:18.733 [main] DEBUG nextflow.cache.CacheDB - Closing CacheDB done
Oct-04 00:42:18.820 [main] DEBUG nextflow.util.ThreadPoolManager - Thread pool 'FileTransfer' shutdown completed (hard=false)
Oct-04 00:42:18.820 [main] DEBUG nextflow.script.ScriptRunner - > Execution complete -- Goodbye

Environment

  • Nextflow version: 24.0.4.4
  • Java version: openjdk version "20.0.2-internal" 2023-07-18
  • Operating system: linux
  • Bash version: GNU bash, version 5.1.16(1)-release

Additional context

@bentsherman
Copy link
Member

Did it hang or did it just exit without finishing all of the file transfers? Your issue title suggests the former but your log suggests the latter

@divinomas-gh
Copy link
Author

Did it hang or did it just exit without finishing all of the file transfers? Your issue title suggests the former but your log suggests the latter

It hangs for ~12 hours, then shows "Exiting before file transfers were completed -- Some files may be lost" message, and then hangs without exit for days.

@bentsherman
Copy link
Member

Then it looks like one of the file uploads hung up. Nextflow will timeout after 12 hours so that is the expected behavior. As for the file upload, it's hard to know the root cause. I would see if it happens consistently first. If not, it might be some intermittent networking issue

@matthdsm
Copy link
Contributor

We are encountering the same issue using both 24.04 and the latest edge release. Is there an option to "retry" the file transfer after it hangs x amount of time?

@bentsherman
Copy link
Member

@matthdsm does it happen consistently? and are you saying it doesn't happen for other versions?

@matthdsm
Copy link
Contributor

it happens often, but not consistently. We've started noticing the phenomenon after updating to 24.04, but I not a 100% sure it didn't happend before.

@tverbeiren
Copy link

I'm experiencing the same issue. It does not happen for every workflow but it seems to happen consistently for one of the workflows we run.

@matthdsm What did you do in the end to resolve the issue?

@pditommaso
Copy link
Member

Are you able to include the jstack of the hanging (linux) process?

@tverbeiren
Copy link

See attached, does that suffice @pditommaso ?

jstack-failed-publishdir.txt

@pditommaso
Copy link
Member

There are two threads in waiting status for publishing data. Still don't know the reason

"PublishDir-2" #22947 prio=5 os_prio=0 cpu=12174.17ms elapsed=48025.04s tid=0x00007fecd80e7010 nid=0x5b05 waiting on condition  [0x00007fed75ef4000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@17.0.12/Native Method)
	- parking to wait for  <0x0000000579d180e0> (a java.util.concurrent.CountDownLatch$Sync)
	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.12/LockSupport.java:211)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@17.0.12/AbstractQueuedSynchronizer.java:715)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@17.0.12/AbstractQueuedSynchronizer.java:1047)
	at java.util.concurrent.CountDownLatch.await(java.base@17.0.12/CountDownLatch.java:230)
	at com.amazonaws.services.s3.transfer.MultipleFileTransferStateChangeListener.transferStateChanged(MultipleFileTransferStateChangeListener.java:40)
	at com.amazonaws.services.s3.transfer.internal.AbstractTransfer.setState(AbstractTransfer.java:165)
	at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:144)
	at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:115)
	at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:45)
	at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy.rejectedExecution(java.base@17.0.12/ThreadPoolExecutor.java:2037)
	at java.util.concurrent.ThreadPoolExecutor.reject(java.base@17.0.12/ThreadPoolExecutor.java:833)
	at java.util.concurrent.ThreadPoolExecutor.execute(java.base@17.0.12/ThreadPoolExecutor.java:1365)
	at java.util.concurrent.AbstractExecutorService.submit(java.base@17.0.12/AbstractExecutorService.java:145)
	at com.amazonaws.services.s3.transfer.internal.UploadMonitor.create(UploadMonitor.java:95)
	at com.amazonaws.services.s3.transfer.TransferManager.doUpload(TransferManager.java:701)
	at com.amazonaws.services.s3.transfer.TransferManager.uploadFileList(TransferManager.java:1935)
	at com.amazonaws.services.s3.transfer.TransferManager.uploadDirectory(TransferManager.java:1693)
	at nextflow.cloud.aws.nio.S3Client.uploadDirectory(S3Client.java:635)
	at nextflow.cloud.aws.nio.S3FileSystemProvider.upload(S3FileSystemProvider.java:335)
	at nextflow.file.FileHelper.copyPath(FileHelper.groovy:998)
	at nextflow.processor.PublishDir.processFileImpl(PublishDir.groovy:507)
	at nextflow.processor.PublishDir.processFile(PublishDir.groovy:406)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@17.0.12/Native Method)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@17.0.12/NativeMethodAccessorImpl.java:77)
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@17.0.12/DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(java.base@17.0.12/Method.java:569)
	at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:343)
	at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328)
	at groovy.lang.MetaClassImpl.doInvokeMethod(MetaClassImpl.java:1333)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1088)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1007)
	at org.codehaus.groovy.runtime.InvokerHelper.invokePogoMethod(InvokerHelper.java:645)
	at org.codehaus.groovy.runtime.InvokerHelper.invokeMethod(InvokerHelper.java:628)
	at org.codehaus.groovy.runtime.InvokerHelper.invokeMethodSafe(InvokerHelper.java:82)
	at nextflow.processor.PublishDir$_retryableProcessFile_closure2.doCall(PublishDir.groovy:397)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@17.0.12/Native Method)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@17.0.12/NativeMethodAccessorImpl.java:77)
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@17.0.12/DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(java.base@17.0.12/Method.java:569)
	at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:343)
	at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328)
	at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:279)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1007)
	at groovy.lang.Closure.call(Closure.java:433)
	at org.codehaus.groovy.runtime.ConvertedClosure.invokeCustom(ConvertedClosure.java:52)
	at org.codehaus.groovy.runtime.ConversionHandler.invoke(ConversionHandler.java:113)
	at jdk.proxy2.$Proxy67.get(jdk.proxy2/Unknown Source)
	at dev.failsafe.Functions.lambda$get$0(Functions.java:46)
	at dev.failsafe.Functions$$Lambda$643/0x00007fed7d176bc0.apply(Unknown Source)
	at dev.failsafe.internal.RetryPolicyExecutor.lambda$apply$0(RetryPolicyExecutor.java:75)
	at dev.failsafe.internal.RetryPolicyExecutor$$Lambda$647/0x00007fed7d1778e8.apply(Unknown Source)
	at dev.failsafe.SyncExecutionImpl.executeSync(SyncExecutionImpl.java:176)
	at dev.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:437)
	at dev.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:129)
	at nextflow.processor.PublishDir.retryableProcessFile(PublishDir.groovy:396)
	at nextflow.processor.PublishDir.safeProcessFile(PublishDir.groovy:367)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@17.0.12/Native Method)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@17.0.12/NativeMethodAccessorImpl.java:77)
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@17.0.12/DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(java.base@17.0.12/Method.java:569)
	at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:343)
	at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328)
	at groovy.lang.MetaClassImpl.doInvokeMethod(MetaClassImpl.java:1333)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1088)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1007)
	at org.codehaus.groovy.runtime.InvokerHelper.invokePogoMethod(InvokerHelper.java:645)
	at org.codehaus.groovy.runtime.InvokerHelper.invokeMethod(InvokerHelper.java:628)
	at org.codehaus.groovy.runtime.InvokerHelper.invokeMethodSafe(InvokerHelper.java:82)
	at nextflow.processor.PublishDir$_apply1_closure1.doCall(PublishDir.groovy:342)
	at nextflow.processor.PublishDir$_apply1_closure1.call(PublishDir.groovy)
	at groovy.lang.Closure.run(Closure.java:505)
	at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.12/Executors.java:539)
	at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run(java.base@17.0.12/Thread.java:840)

"PublishDir-4" #23750 prio=5 os_prio=0 cpu=688.14ms elapsed=47228.99s tid=0x00007fed08014bc0 nid=0x5e2a waiting on condition  [0x00007fec8acfb000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@17.0.12/Native Method)
	- parking to wait for  <0x000000057c9a96d8> (a java.util.concurrent.CountDownLatch$Sync)
	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.12/LockSupport.java:211)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@17.0.12/AbstractQueuedSynchronizer.java:715)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@17.0.12/AbstractQueuedSynchronizer.java:1047)
	at java.util.concurrent.CountDownLatch.await(java.base@17.0.12/CountDownLatch.java:230)
	at com.amazonaws.services.s3.transfer.MultipleFileTransferStateChangeListener.transferStateChanged(MultipleFileTransferStateChangeListener.java:40)
	at com.amazonaws.services.s3.transfer.internal.AbstractTransfer.setState(AbstractTransfer.java:165)
	at com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:144)
	at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:115)
	at com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:45)
	at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy.rejectedExecution(java.base@17.0.12/ThreadPoolExecutor.java:2037)
	at java.util.concurrent.ThreadPoolExecutor.reject(java.base@17.0.12/ThreadPoolExecutor.java:833)
	at java.util.concurrent.ThreadPoolExecutor.execute(java.base@17.0.12/ThreadPoolExecutor.java:1365)
	at java.util.concurrent.AbstractExecutorService.submit(java.base@17.0.12/AbstractExecutorService.java:145)
	at com.amazonaws.services.s3.transfer.internal.UploadMonitor.create(UploadMonitor.java:95)
	at com.amazonaws.services.s3.transfer.TransferManager.doUpload(TransferManager.java:701)
	at com.amazonaws.services.s3.transfer.TransferManager.uploadFileList(TransferManager.java:1935)
	at com.amazonaws.services.s3.transfer.TransferManager.uploadDirectory(TransferManager.java:1693)
	at nextflow.cloud.aws.nio.S3Client.uploadDirectory(S3Client.java:635)
	at nextflow.cloud.aws.nio.S3FileSystemProvider.upload(S3FileSystemProvider.java:335)
	at nextflow.file.FileHelper.copyPath(FileHelper.groovy:998)
	at nextflow.processor.PublishDir.processFileImpl(PublishDir.groovy:507)
	at nextflow.processor.PublishDir.processFile(PublishDir.groovy:406)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@17.0.12/Native Method)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@17.0.12/NativeMethodAccessorImpl.java:77)
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@17.0.12/DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(java.base@17.0.12/Method.java:569)
	at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:343)
	at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328)
	at groovy.lang.MetaClassImpl.doInvokeMethod(MetaClassImpl.java:1333)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1088)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1007)
	at org.codehaus.groovy.runtime.InvokerHelper.invokePogoMethod(InvokerHelper.java:645)
	at org.codehaus.groovy.runtime.InvokerHelper.invokeMethod(InvokerHelper.java:628)
	at org.codehaus.groovy.runtime.InvokerHelper.invokeMethodSafe(InvokerHelper.java:82)
	at nextflow.processor.PublishDir$_retryableProcessFile_closure2.doCall(PublishDir.groovy:397)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@17.0.12/Native Method)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@17.0.12/NativeMethodAccessorImpl.java:77)
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@17.0.12/DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(java.base@17.0.12/Method.java:569)
	at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:343)
	at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328)
	at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:279)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1007)
	at groovy.lang.Closure.call(Closure.java:433)
	at org.codehaus.groovy.runtime.ConvertedClosure.invokeCustom(ConvertedClosure.java:52)
	at org.codehaus.groovy.runtime.ConversionHandler.invoke(ConversionHandler.java:113)
	at jdk.proxy2.$Proxy67.get(jdk.proxy2/Unknown Source)
	at dev.failsafe.Functions.lambda$get$0(Functions.java:46)
	at dev.failsafe.Functions$$Lambda$643/0x00007fed7d176bc0.apply(Unknown Source)
	at dev.failsafe.internal.RetryPolicyExecutor.lambda$apply$0(RetryPolicyExecutor.java:75)
	at dev.failsafe.internal.RetryPolicyExecutor$$Lambda$647/0x00007fed7d1778e8.apply(Unknown Source)
	at dev.failsafe.SyncExecutionImpl.executeSync(SyncExecutionImpl.java:176)
	at dev.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:437)
	at dev.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:129)
	at nextflow.processor.PublishDir.retryableProcessFile(PublishDir.groovy:396)
	at nextflow.processor.PublishDir.safeProcessFile(PublishDir.groovy:367)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@17.0.12/Native Method)
	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@17.0.12/NativeMethodAccessorImpl.java:77)
	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@17.0.12/DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(java.base@17.0.12/Method.java:569)
	at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:343)
	at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328)
	at groovy.lang.MetaClassImpl.doInvokeMethod(MetaClassImpl.java:1333)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1088)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1007)
	at org.codehaus.groovy.runtime.InvokerHelper.invokePogoMethod(InvokerHelper.java:645)
	at org.codehaus.groovy.runtime.InvokerHelper.invokeMethod(InvokerHelper.java:628)
	at org.codehaus.groovy.runtime.InvokerHelper.invokeMethodSafe(InvokerHelper.java:82)
	at nextflow.processor.PublishDir$_apply1_closure1.doCall(PublishDir.groovy:342)
	at nextflow.processor.PublishDir$_apply1_closure1.call(PublishDir.groovy)
	at groovy.lang.Closure.run(Closure.java:505)
	at java.util.concurrent.Executors$RunnableAdapter.call(java.base@17.0.12/Executors.java:539)
	at java.util.concurrent.FutureTask.run(java.base@17.0.12/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.12/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.12/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run(java.base@17.0.12/Thread.java:840)

@pditommaso
Copy link
Member

Interestingly both are executed via CallerRunsPolicy.rejectedExecution, that may cause to break the CountDownLatch synchronization.

@jorgee
Copy link
Contributor

jorgee commented Jan 22, 2025

This is what I think it is happening:
I think it is uploading a directory with a lot of files. One of the executions of this upload is rejected (I guess because of threads and queue limits). The callersRunPolicy runs com.amazonaws.services.s3.transfer.internal.UploadMonitor.call synchronously in the same thread. but the AWS client always expects the to be executed async at least for the case of uploading directories that use the uploadFileList. It has a CountDownLatch to wait for all executions that is the one that is blocked on wait and can't be counted down because the code never reaches this part.
If the queue size for the S3 thread pool is increased this could be mitigated, but maybe we could change the default RejectedExecutionHandler by something like the AbortPolicy. It will produce an exception and the transfer will be retried by the failsafe management.

@matthdsm
Copy link
Contributor

I don't think it has anything to do with the S3 transfer, because we're seeing this issue on a shared FS too.

@pditommaso
Copy link
Member

@jorgee was suspecting something like that. What about using maxThreads and the queueSize? the idea would be to block the submitting thread instead of queue too many upload tasks upload causing the invocation of rejectionPolicy.

.withMaxSize(maxThreads)
.withQueueSize(maxQueueSize)

@bentsherman
Copy link
Member

Would it help to break up directory publishing by publishing individual files instead? See #3933

@tverbeiren
Copy link

Thanks for your feedback!

@bentsherman Your PR is still open. Do you happen to have a nightly build or something I can try and test it? Reconfiguring the workflow so it outputs individual files is not an option, I'm afraid. We just spent quite some time tuning the output structure according to our needs.

@pditommaso @jorgee What could I try setting then?

  • aws.client.uploadMaxThreads
  • (in pre-run script) NXF_OPTS="-Dnxf.pool.maxThreads=???"
  • executor.queueSize

And what would be good values for these?

@tverbeiren
Copy link

(...) but maybe we could change the default RejectedExecutionHandler by something like the AbortPolicy. It will produce an exception and the transfer will be retried by the failsafe management.

I would be very much in favour of this option. The situation right now is the worst possible one: users think the pipeline ran successfully but no data is written out. And when they resume the pipeline (in order to pick up cached tasks and just try to re-publish the files) it turns out no caching information is available because the head job did not finish properly.

@pditommaso
Copy link
Member

@tverbeiren can you please try the following settings?

threadPool.S3TransferManager.maxThreads = <num cpus * 3>
threadPool.S3TransferManager.maxQueueSize = <num cpus * 3>

@jorgee
Copy link
Contributor

jorgee commented Jan 22, 2025

These are the current defaults, maxQueueSize must be bigger in your case.

DEFAULT_MIN_THREAD = 10
DEFAULT_MAX_THREAD = Math.max(DEFAULT_MIN_THREAD, Runtime.runtime.availableProcessors()*3)
DEFAULT_QUEUE_SIZE = 10_000

@pditommaso
Copy link
Member

pditommaso commented Jan 22, 2025

I claim maxQueueSize should be the same as maxThreads

@jorgee
Copy link
Contributor

jorgee commented Jan 22, 2025

If I correctly understood the rejection mechanism, it is mainly when threads and queues are full. So, if we reduce the queue, the rejection should happen earlier.

@pditommaso
Copy link
Member

Think you are right, my assumption it was used a blocking queue that prevent more jobs to be added once it's full.

We may need to recover this implementation

class BlockingBlockingQueue<E> extends LinkedBlockingQueue<E> {

@pditommaso
Copy link
Member

Push a tentative solution #5700

@tverbeiren
Copy link

Using the following configuration, all files are properly published:

threadPool.S3TransferManager.maxThreads = <cpus * 3>
threadPool.S3TransferManager.maxQueueSize = 100000

Do you see any disadvantages in setting this for all our workflows (with a proper solution pending)?

@pditommaso pditommaso changed the title nextflow 24.0.4.4 never exit due to incomplete file transfer nextflow 24.04.4 never exit due to incomplete file transfer Jan 23, 2025
@pditommaso
Copy link
Member

Do you see any disadvantages in setting this for all our workflows (with a proper solution pending)?

it should be a valid workaround

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants