From 029768f271c499bebcd42f9c6815db0c4f34abde Mon Sep 17 00:00:00 2001 From: Eric Chapman Date: Fri, 6 Sep 2019 16:17:11 -0500 Subject: [PATCH 1/2] added logic to cancel in-flight promises when connection drops --- Sources/Redis/Client/RedisClient.swift | 16 +++++++++++++ Tests/RedisTests/RedisDatabaseTests.swift | 28 +++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/Sources/Redis/Client/RedisClient.swift b/Sources/Redis/Client/RedisClient.swift index 177277a..c216736 100644 --- a/Sources/Redis/Client/RedisClient.swift +++ b/Sources/Redis/Client/RedisClient.swift @@ -20,6 +20,9 @@ public final class RedisClient: DatabaseConnection, BasicWorker { /// The channel private let channel: Channel + + /// Stores the inflight promises so they can be fulfilled when the channel drops + var inflightPromises: [String:EventLoopPromise] = [:] /// Creates a new Redis client on the provided data source and sink. init(queue: RedisCommandHandler, channel: Channel) { @@ -28,6 +31,12 @@ public final class RedisClient: DatabaseConnection, BasicWorker { self.extend = [:] self.isClosed = false channel.closeFuture.always { + // send closed error for the promises that have not been fulfilled + for promise in self.inflightPromises.values { + promise.fail(error: ChannelError.ioOnClosedChannel) + } + self.inflightPromises.removeAll() + self.isClosed = true } } @@ -55,6 +64,13 @@ public final class RedisClient: DatabaseConnection, BasicWorker { // create a new promise to fulfill later let promise = eventLoop.newPromise(RedisData.self) + // logic to store in-flight requests that can be cancelled when connection drops + let key = UUID().uuidString + self.inflightPromises[key] = promise + promise.futureResult.always { + self.inflightPromises.removeValue(forKey: key) + } + // write the message and the promise to the channel, which the `RequestResponseHandler` will capture return self.channel.writeAndFlush((message, promise)) .flatMap { return promise.futureResult } diff --git a/Tests/RedisTests/RedisDatabaseTests.swift b/Tests/RedisTests/RedisDatabaseTests.swift index 9fdc6cc..7b1562b 100644 --- a/Tests/RedisTests/RedisDatabaseTests.swift +++ b/Tests/RedisTests/RedisDatabaseTests.swift @@ -26,6 +26,34 @@ class RedisDatabaseTests: XCTestCase { try redis.delete("hello").wait() XCTAssertNil(try redis.get("hello", as: String.self).wait()) } + + func testDroppedConnection() throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let config = RedisClientConfig.makeTest() + let database = try RedisDatabase(config: config) + let redis = try database.newConnection(on: group).wait() + defer { redis.close() } + + let timeout: UInt32 = 1 + var dataReceived = false + var errorReceived = false + + let command = RedisData.array(["brpop", "hello", "\(timeout)"].map { RedisData(bulk: $0) }) + _ = redis.send(command).do { data in + dataReceived = true + }.catch { error in + errorReceived = true + } + + // Close the connection + redis.close() + + // Sleep for an extra second seconds to give the transaction time to complete + sleep(timeout+1) + + XCTAssertEqual(dataReceived, false) + XCTAssertEqual(errorReceived, true) + } func testSelect() throws { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) From 2abb592e4854239dd3eddf07a933396aa45f4579 Mon Sep 17 00:00:00 2001 From: Eric Chapman Date: Sat, 7 Sep 2019 07:15:27 -0500 Subject: [PATCH 2/2] moved connection closed logic to RequestResponseHandler --- Sources/Redis/Client/RedisClient.swift | 16 ---------------- Sources/Redis/RequestResponseHandler.swift | 10 ++++++++++ 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/Sources/Redis/Client/RedisClient.swift b/Sources/Redis/Client/RedisClient.swift index c216736..168219e 100644 --- a/Sources/Redis/Client/RedisClient.swift +++ b/Sources/Redis/Client/RedisClient.swift @@ -21,9 +21,6 @@ public final class RedisClient: DatabaseConnection, BasicWorker { /// The channel private let channel: Channel - /// Stores the inflight promises so they can be fulfilled when the channel drops - var inflightPromises: [String:EventLoopPromise] = [:] - /// Creates a new Redis client on the provided data source and sink. init(queue: RedisCommandHandler, channel: Channel) { self.queue = queue @@ -31,12 +28,6 @@ public final class RedisClient: DatabaseConnection, BasicWorker { self.extend = [:] self.isClosed = false channel.closeFuture.always { - // send closed error for the promises that have not been fulfilled - for promise in self.inflightPromises.values { - promise.fail(error: ChannelError.ioOnClosedChannel) - } - self.inflightPromises.removeAll() - self.isClosed = true } } @@ -64,13 +55,6 @@ public final class RedisClient: DatabaseConnection, BasicWorker { // create a new promise to fulfill later let promise = eventLoop.newPromise(RedisData.self) - // logic to store in-flight requests that can be cancelled when connection drops - let key = UUID().uuidString - self.inflightPromises[key] = promise - promise.futureResult.always { - self.inflightPromises.removeValue(forKey: key) - } - // write the message and the promise to the channel, which the `RequestResponseHandler` will capture return self.channel.writeAndFlush((message, promise)) .flatMap { return promise.futureResult } diff --git a/Sources/Redis/RequestResponseHandler.swift b/Sources/Redis/RequestResponseHandler.swift index 854d76f..762459d 100644 --- a/Sources/Redis/RequestResponseHandler.swift +++ b/Sources/Redis/RequestResponseHandler.swift @@ -116,4 +116,14 @@ final class RequestResponseHandler: ChannelDuplexHandler { ctx.write(self.wrapOutboundOut(request), promise: promise) } } + + public func handlerAdded(ctx: ChannelHandlerContext) { + // handles returning "closed channel" for promises in flight when connection closes + ctx.channel.closeFuture.always { + for promise in self.promiseBuffer { + promise.fail(error: ChannelError.ioOnClosedChannel) + } + self.promiseBuffer.removeAll() + } + } }