Skip to content

Commit

Permalink
GH-3827: Fix RemoteFile GET STREAM session leak
Browse files Browse the repository at this point in the history
Fixes #3827

The `AbstractRemoteFileOutboundGateway.doGet()` for a `STREAM` option
does not close the `session` in case of error.
This may lead to some leaks or exhausted caches

* Close `session` in the `catch()` of the `AbstractRemoteFileOutboundGateway.doGet()`
* Adjust the `SftpServerOutboundTests` to configure a `CachingSessionFactory`
for the `testStream()` to verify there is no leaks attempting to
`GET STREAM` non-existing remote file twice

**Cherry-pick to `5.5.x`**
  • Loading branch information
artembilan authored and garyrussell committed Jul 18, 2022
1 parent d9a42c9 commit 8f44870
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -673,6 +673,7 @@ private Object doGet(final Message<?> requestMessage) {
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session);
}
catch (IOException e) {
session.close();
throw new MessageHandlingException(requestMessage,
"Error handling message in the [" + this
+ "]. Failed to get the remote file [" + remoteFilePath + "] as a stream", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,13 @@
auto-create-directory="true"
remote-file-separator="/" />

<int-sftp:outbound-gateway session-factory="sftpSessionFactory"
<bean id="cachingSessionFactory" class="org.springframework.integration.file.remote.session.CachingSessionFactory">
<constructor-arg name="sessionFactory" ref="sftpSessionFactory"/>
<constructor-arg name="sessionCacheSize" value="1"/>
<property name="sessionWaitTimeout" value="100"/>
</bean>

<int-sftp:outbound-gateway session-factory="cachingSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
Expand Down Expand Up @@ -273,7 +274,7 @@ public void testInt3172LocalDirectoryExpressionMGETRecursive() throws IOExceptio

@Test
@SuppressWarnings("unchecked")
void testLSRecursive() throws IOException {
void testLSRecursive() {
String dir = "sftpSource/";
this.inboundLSRecursive.send(new GenericMessage<Object>(dir));
Message<?> result = this.output.receive(1000);
Expand All @@ -291,7 +292,7 @@ void testLSRecursive() throws IOException {

@Test
@SuppressWarnings("unchecked")
void testLSRecursiveALL() throws IOException {
void testLSRecursiveALL() {
String dir = "sftpSource/";
this.inboundLSRecursiveALL.send(new GenericMessage<Object>(dir));
Message<?> result = this.output.receive(1000);
Expand Down Expand Up @@ -481,7 +482,7 @@ public void testInt3088MPutNotRecursive() throws Exception {
while (output.receive(0) != null) {
// drain
}
this.inboundMPut.send(new GenericMessage<File>(getSourceLocalDirectory()));
this.inboundMPut.send(new GenericMessage<>(getSourceLocalDirectory()));
@SuppressWarnings("unchecked")
Message<List<String>> out = (Message<List<String>>) this.output.receive(1000);
assertThat(out).isNotNull();
Expand Down Expand Up @@ -655,6 +656,15 @@ public void testStream() {
.containsEntry(FileHeaders.REMOTE_DIRECTORY, "sftpSource/")
.containsEntry(FileHeaders.REMOTE_FILE, " sftpSource1.txt");
verify(session).close();

assertThatExceptionOfType(MessageHandlingException.class)
.isThrownBy(() -> this.inboundGetStream.send(new GenericMessage<>(dir + "doesNotExist.txt")))
.withStackTraceContaining("No such file or directory");

// No leak for not closed session after the previous failure
assertThatExceptionOfType(MessageHandlingException.class)
.isThrownBy(() -> this.inboundGetStream.send(new GenericMessage<>(dir + "doesNotExist.txt")))
.withStackTraceContaining("No such file or directory");
}

@Test
Expand Down

0 comments on commit 8f44870

Please sign in to comment.