2424import java .io .OutputStream ;
2525import java .net .URI ;
2626import java .net .URISyntaxException ;
27+ import java .util .ArrayList ;
28+ import java .util .Collections ;
2729import java .util .Comparator ;
30+ import java .util .LinkedHashMap ;
2831import java .util .List ;
32+ import java .util .Map ;
33+ import java .util .concurrent .locks .Lock ;
2934import java .util .regex .Matcher ;
30- import java .util .stream .Collectors ;
3135
3236import org .apache .commons .logging .Log ;
3337import org .apache .commons .logging .LogFactory ;
5357import org .springframework .integration .file .support .FileUtils ;
5458import org .springframework .integration .metadata .MetadataStore ;
5559import org .springframework .integration .metadata .SimpleMetadataStore ;
60+ import org .springframework .integration .support .locks .DefaultLockRegistry ;
5661import org .springframework .messaging .MessagingException ;
5762import org .springframework .util .Assert ;
63+ import org .springframework .util .CollectionUtils ;
5864import org .springframework .util .ObjectUtils ;
5965import org .springframework .util .StringUtils ;
6066
@@ -86,6 +92,19 @@ public abstract class AbstractInboundFileSynchronizer<F>
8692
8793 private final RemoteFileTemplate <F > remoteFileTemplate ;
8894
95+ private final DefaultLockRegistry lockRegistry = new DefaultLockRegistry ();
96+
97+ @ SuppressWarnings ("serial" )
98+ private final Map <String , List <F >> fetchCache =
99+ Collections .synchronizedMap (new LinkedHashMap <>(100 , 0.75f , true ) {
100+
101+ @ Override
102+ protected boolean removeEldestEntry (Map .Entry <String , List <F >> eldest ) {
103+ return size () > 100 ;
104+ }
105+
106+ });
107+
89108 @ SuppressWarnings ("NullAway.Init" )
90109 private EvaluationContext evaluationContext ;
91110
@@ -331,9 +350,6 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max
331350 return ;
332351 }
333352 String remoteDirectory = this .remoteDirectoryExpression .getValue (this .evaluationContext , String .class );
334- if (this .logger .isTraceEnabled ()) {
335- this .logger .trace ("Synchronizing " + remoteDirectory + " to " + localDirectory );
336- }
337353 try {
338354 Integer transferred = this .remoteFileTemplate .execute (session ->
339355 transferFilesFromRemoteToLocal (remoteDirectory , localDirectory , maxFetchSize , session ));
@@ -350,94 +366,142 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max
350366 private Integer transferFilesFromRemoteToLocal (@ Nullable String remoteDirectory , File localDirectory ,
351367 int maxFetchSize , Session <F > session ) throws IOException {
352368
353- F [] files = session .list (remoteDirectory );
354- if (!ObjectUtils .isEmpty (files )) {
355- files = FileUtils .purgeUnwantedElements (files , e -> !isFile (e ), this .comparator );
369+ String remoteDirectoryKey = remoteDirectory == null ? "" : remoteDirectory ;
370+
371+ Lock remoteDirectoryLock = null ;
372+ if (maxFetchSize > 0 ) {
373+ // The result of session.list is going to be sliced by this maxFetchSize and cached.
374+ // Therefore, a lock for the directory to avoid race condition from different threads.
375+ // The perfomance degration is minimal since session.list is filtered once,
376+ // and all the later slices are handled only from the in-memory cache.
377+ remoteDirectoryLock = this .lockRegistry .obtain (remoteDirectoryKey );
378+ remoteDirectoryLock .lock ();
356379 }
357- if (!ObjectUtils .isEmpty (files )) {
358- boolean filteringOneByOne = this .filter != null && this .filter .supportsSingleFileFiltering ();
359- List <F > filteredFiles = applyFilter (files , this .filter != null , filteringOneByOne , maxFetchSize );
380+ else {
381+ // The cache makes sense only for maxFetchSize > 0.
382+ this .fetchCache .remove (remoteDirectoryKey );
383+ }
384+
385+ try {
386+ List <F > remoteFiles = null ;
387+ if (maxFetchSize > 0 ) {
388+ remoteFiles = this .fetchCache .get (remoteDirectoryKey );
389+ }
390+
391+ if (CollectionUtils .isEmpty (remoteFiles )) {
392+ // The session.list and filter all the files only once.
393+ // If maxFetchSize > 0, the rest of filtered files are going to be cached
394+ // for subsequent fetches.
395+ // If no maxFetchSize, all the files are transferred at once anyway.
396+ remoteFiles = listAndFilterFiles (remoteDirectory , session );
397+ }
360398
361- int copied = filteredFiles .size ();
362- int accepted = 0 ;
399+ List <F > sliceToTransfer = remoteFiles ;
400+ List <F > remoteFilesToCache = null ;
401+ if (!CollectionUtils .isEmpty (remoteFiles ) && maxFetchSize > 0 ) {
402+ remoteFilesToCache = remoteFiles ;
403+ sliceToTransfer = remoteFiles .stream ().limit (maxFetchSize ).toList ();
404+ remoteFilesToCache .removeAll (sliceToTransfer );
405+ }
363406
364- for (F file : filteredFiles ) {
365- F fileToCopy = file ;
366- if (filteringOneByOne ) {
367- if ((maxFetchSize < 0 || accepted < maxFetchSize )
368- && this .filter != null && this .filter .accept (fileToCopy )) {
407+ int copied = 0 ;
369408
370- accepted ++;
409+ for (int i = 0 ; i < sliceToTransfer .size (); i ++) {
410+ F file = sliceToTransfer .get (i );
411+ boolean transferred = false ;
412+ try {
413+ if (transferFile (remoteDirectory , localDirectory , session , file )) {
414+ copied ++;
415+ }
416+ }
417+ catch (RuntimeException | IOException ex ) {
418+ // The filtering has happened before transfer, so if it fails,
419+ // all the following files have to be rest from the filter.
420+ if (this .filter != null && this .filter .supportsSingleFileFiltering ()) {
421+ for (int j = i ; j < remoteFiles .size (); j ++) {
422+ F fileToReset = remoteFiles .get (j );
423+ resetFilterIfNecessary (fileToReset );
424+ }
371425 }
372426 else {
373- fileToCopy = null ;
374- copied --;
427+ rollbackFromFileToListEnd (remoteFiles , file );
375428 }
429+
430+ if (maxFetchSize > 0 ) {
431+ // When trasfer fails, reset the cache as well
432+ // for a fresh session.list on the next synchronization.
433+ this .fetchCache .remove (remoteDirectoryKey );
434+ }
435+
436+ throw ex ;
376437 }
377- copied =
378- copyIfNotNull (remoteDirectory , localDirectory , session , filteringOneByOne ,
379- filteredFiles , copied , fileToCopy );
380438 }
439+
440+ if (maxFetchSize > 0 ) {
441+ if (!CollectionUtils .isEmpty (remoteFilesToCache )) {
442+ this .fetchCache .put (remoteDirectoryKey , remoteFilesToCache );
443+ }
444+ else {
445+ this .fetchCache .remove (remoteDirectoryKey );
446+ }
447+ }
448+
381449 return copied ;
382450 }
383- else {
384- return 0 ;
451+ finally {
452+ if (remoteDirectoryLock != null ) {
453+ remoteDirectoryLock .unlock ();
454+ }
385455 }
386456 }
387457
388- private int copyIfNotNull (@ Nullable String remoteDirectory , File localDirectory ,
389- Session <F > session , boolean filteringOneByOne ,
390- List <F > filteredFiles , int copied , @ Nullable F file ) throws IOException {
391-
392- boolean renamedFailed = false ;
393- EvaluationContext localFileEvaluationContext = null ;
394- if (this .localFilenameGeneratorExpression != null ) {
395- localFileEvaluationContext = ExpressionUtils .createStandardEvaluationContext (this .beanFactory );
396- localFileEvaluationContext .setVariable ("remoteDirectory" , remoteDirectory );
458+ private List <F > listAndFilterFiles (@ Nullable String remoteDirectory , Session <F > session ) throws IOException {
459+ F [] files = session .list (remoteDirectory );
460+ if (!ObjectUtils .isEmpty (files )) {
461+ files = FileUtils .purgeUnwantedElements (files , e -> !isFile (e ), this .comparator );
397462 }
398- try {
399- if (file != null &&
400- !copyFileToLocalDirectory (remoteDirectory , localFileEvaluationContext , file , localDirectory ,
401- session )) {
402463
403- renamedFailed = true ;
404- }
405- }
406- catch (RuntimeException | IOException e1 ) {
407- if (filteringOneByOne ) {
408- resetFilterIfNecessary (file );
464+ if (!ObjectUtils .isEmpty (files )) {
465+ List <F > filteredFiles ;
466+ if (this .filter != null ) {
467+ if (this .filter .supportsSingleFileFiltering ()) {
468+ filteredFiles = new ArrayList <>(files .length );
469+ for (F file : files ) {
470+ if (this .filter .accept (file )) {
471+ filteredFiles .add (file );
472+ }
473+ }
474+ }
475+ else {
476+ filteredFiles = filterFiles (files );
477+ }
409478 }
410479 else {
411- rollbackFromFileToListEnd (filteredFiles , file );
480+ filteredFiles = new ArrayList <>();
481+ Collections .addAll (filteredFiles , files );
412482 }
413- throw e1 ;
483+
484+ return filteredFiles ;
414485 }
415- return renamedFailed ? copied - 1 : copied ;
486+
487+ return Collections .emptyList ();
416488 }
417489
418- private List <F > applyFilter (F [] files , boolean haveFilter , boolean filteringOneByOne , int maxFetchSize ) {
419- List <F > filteredFiles ;
420- if (!filteringOneByOne && haveFilter ) {
421- filteredFiles = filterFiles (files );
422- }
423- else {
424- filteredFiles = List .of (files );
425- }
426- if (maxFetchSize >= 0 && filteredFiles .size () > maxFetchSize && !filteringOneByOne ) {
427- if (haveFilter ) {
428- rollbackFromFileToListEnd (filteredFiles , filteredFiles .get (maxFetchSize ));
429- }
430- filteredFiles = filteredFiles .stream ()
431- .limit (maxFetchSize )
432- .collect (Collectors .toList ());
490+ private boolean transferFile (@ Nullable String remoteDirectory , File localDirectory , Session <F > session , F file )
491+ throws IOException {
492+
493+ EvaluationContext localFileEvaluationContext = null ;
494+ if (this .localFilenameGeneratorExpression != null ) {
495+ localFileEvaluationContext = ExpressionUtils .createStandardEvaluationContext (this .beanFactory );
496+ localFileEvaluationContext .setVariable ("remoteDirectory" , remoteDirectory );
433497 }
434- return filteredFiles ;
498+
499+ return copyFileToLocalDirectory (remoteDirectory , localFileEvaluationContext , file , localDirectory , session );
435500 }
436501
437502 protected void rollbackFromFileToListEnd (List <F > filteredFiles , F file ) {
438- if (this .filter instanceof ReversibleFileListFilter ) {
439- ((ReversibleFileListFilter <F >) this .filter )
440- .rollback (file , filteredFiles );
503+ if (this .filter instanceof ReversibleFileListFilter <F > reversibleFileListFilter ) {
504+ reversibleFileListFilter .rollback (file , filteredFiles );
441505 }
442506 }
443507
@@ -530,12 +594,12 @@ else if (this.logger.isWarnEnabled()) {
530594 }
531595
532596 private void resetFilterIfNecessary (F remoteFile ) {
533- if (this .filter instanceof ResettableFileListFilter ) {
597+ if (this .filter instanceof ResettableFileListFilter < F > resettableFileListFilter ) {
534598 if (this .logger .isInfoEnabled ()) {
535599 this .logger .info ("Removing the remote file '" + remoteFile +
536600 "' from the filter for a subsequent transfer attempt" );
537601 }
538- (( ResettableFileListFilter < F >) this . filter ) .remove (remoteFile );
602+ resettableFileListFilter .remove (remoteFile );
539603 }
540604 }
541605
0 commit comments