From f65ba256b13b637a3b315837949c195c2f5a3eeb Mon Sep 17 00:00:00 2001 From: Daniel Rees Date: Wed, 22 May 2019 13:44:31 -0400 Subject: [PATCH] Moving heartbeat task to repeat at fixed rate (#52) --- .../org/phoenixframework/DispatchQueue.kt | 17 +++++++ .../kotlin/org/phoenixframework/Socket.kt | 10 ++--- .../kotlin/org/phoenixframework/SocketTest.kt | 6 +-- .../org/phoenixframework/TestUtilities.kt | 44 ++++++++++++++++--- .../WebSocketTransportTest.kt | 4 +- 5 files changed, 64 insertions(+), 17 deletions(-) diff --git a/src/main/kotlin/org/phoenixframework/DispatchQueue.kt b/src/main/kotlin/org/phoenixframework/DispatchQueue.kt index 7881c3a..bd996f4 100644 --- a/src/main/kotlin/org/phoenixframework/DispatchQueue.kt +++ b/src/main/kotlin/org/phoenixframework/DispatchQueue.kt @@ -36,6 +36,13 @@ import java.util.concurrent.TimeUnit interface DispatchQueue { /** Queue a Runnable to be executed after a given time unit delay */ fun queue(delay: Long, unit: TimeUnit, runnable: () -> Unit): DispatchWorkItem + + /** + * Creates and executes a periodic action that becomes enabled first after the given initial + * delay, and subsequently with the given period; that is, executions will commence after + * initialDelay, then initialDelay + period, then initialDelay + 2 * period, and so on. + */ + fun queueAtFixedRate(delay: Long, period: Long, unit: TimeUnit, runnable: () -> Unit): DispatchWorkItem } /** Abstracts away a future task */ @@ -64,6 +71,16 @@ class ScheduledDispatchQueue(poolSize: Int = 8) : DispatchQueue { val scheduledFuture = scheduledThreadPoolExecutor.schedule(runnable, delay, unit) return ScheduledDispatchWorkItem(scheduledFuture) } + + override fun queueAtFixedRate( + delay: Long, + period: Long, + unit: TimeUnit, + runnable: () -> Unit + ): DispatchWorkItem { + val scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(runnable, delay, period, unit) + return ScheduledDispatchWorkItem(scheduledFuture) + } } /** diff --git a/src/main/kotlin/org/phoenixframework/Socket.kt b/src/main/kotlin/org/phoenixframework/Socket.kt index 4e667ab..15f920b 100644 --- a/src/main/kotlin/org/phoenixframework/Socket.kt +++ b/src/main/kotlin/org/phoenixframework/Socket.kt @@ -54,11 +54,6 @@ const val WS_CLOSE_NORMAL = 1000 /** The socket was closed due to a SocketException. Likely the client lost connectivity */ const val WS_CLOSE_SOCKET_EXCEPTION = 4000 -/** The socket was closed due to an SSLException. Likely the client lost connectivity */ -const val WS_CLOSE_SSL_EXCEPTION = 4001 - -/** The socket was closed due to an EOFException. Likely the server abruptly closed */ -const val WS_CLOSE_EOF_EXCEPTION = 4002 /** * Connects to a Phoenix Server @@ -366,8 +361,11 @@ class Socket( // Do not start up the heartbeat timer if skipHeartbeat is true if (skipHeartbeat) return + val delay = heartbeatInterval + val period = heartbeatInterval + heartbeatTask = - dispatchQueue.queue(heartbeatInterval, TimeUnit.MILLISECONDS) { sendHeartbeat() } + dispatchQueue.queueAtFixedRate(delay, period, TimeUnit.MILLISECONDS) { sendHeartbeat() } } internal fun sendHeartbeat() { diff --git a/src/test/kotlin/org/phoenixframework/SocketTest.kt b/src/test/kotlin/org/phoenixframework/SocketTest.kt index 8edf55d..620d501 100644 --- a/src/test/kotlin/org/phoenixframework/SocketTest.kt +++ b/src/test/kotlin/org/phoenixframework/SocketTest.kt @@ -503,7 +503,7 @@ class SocketTest { socket.onConnectionOpened() verify(mockTask).cancel() - verify(mockDispatchQueue).queue(any(), any(), any()) + verify(mockDispatchQueue).queueAtFixedRate(any(), any(), any(), any()) } @Test @@ -541,7 +541,7 @@ class SocketTest { @Test fun `resetHeartbeat() creates a future heartbeat task`() { val mockTask = mock() - whenever(mockDispatchQueue.queue(any(), any(), any())).thenReturn(mockTask) + whenever(mockDispatchQueue.queueAtFixedRate(any(), any(), any(), any())).thenReturn(mockTask) whenever(connection.readyState).thenReturn(Transport.ReadyState.OPEN) socket.connect() @@ -552,7 +552,7 @@ class SocketTest { assertThat(socket.heartbeatTask).isNotNull() argumentCaptor<() -> Unit> { - verify(mockDispatchQueue).queue(eq(5_000L), eq(TimeUnit.MILLISECONDS), capture()) + verify(mockDispatchQueue).queueAtFixedRate(eq(5_000L), eq(5_000L), eq(TimeUnit.MILLISECONDS), capture()) // fire the task allValues.first().invoke() diff --git a/src/test/kotlin/org/phoenixframework/TestUtilities.kt b/src/test/kotlin/org/phoenixframework/TestUtilities.kt index 3671821..3b4d193 100644 --- a/src/test/kotlin/org/phoenixframework/TestUtilities.kt +++ b/src/test/kotlin/org/phoenixframework/TestUtilities.kt @@ -24,7 +24,7 @@ class ManualDispatchQueue : DispatchQueue { // Filter all work items that are due to be fired and have not been // cancelled. Return early if there are no items to fire - val pastDueWorkItems = workItems.filter { it.deadline <= this.tickTime && !it.isCancelled } + val pastDueWorkItems = workItems.filter { it.isPastDue(tickTime) && !it.isCancelled } // if no items are due, then return early if (pastDueWorkItems.isEmpty()) return @@ -33,10 +33,9 @@ class ManualDispatchQueue : DispatchQueue { pastDueWorkItems.forEach { it.perform() } // Remove all work items that are past due or canceled - workItems.removeAll { it.deadline <= this.tickTime || it.isCancelled } + workItems.removeAll { it.isPastDue(tickTime) || it.isCancelled } } - override fun queue(delay: Long, unit: TimeUnit, runnable: () -> Unit): DispatchWorkItem { // Converts the given unit and delay to the unit used by this class val delayInMs = tickTimeUnit.convert(delay, unit) @@ -47,6 +46,23 @@ class ManualDispatchQueue : DispatchQueue { return workItem } + + override fun queueAtFixedRate( + delay: Long, + period: Long, + unit: TimeUnit, + runnable: () -> Unit + ): DispatchWorkItem { + + val delayInMs = tickTimeUnit.convert(delay, unit) + val periodInMs = tickTimeUnit.convert(period, unit) + val deadline = tickTime + delayInMs + + val workItem = ManualDispatchWorkItem(runnable, deadline, periodInMs) + workItems.add(workItem) + + return workItem + } } //------------------------------------------------------------------------------ @@ -54,16 +70,32 @@ class ManualDispatchQueue : DispatchQueue { //------------------------------------------------------------------------------ class ManualDispatchWorkItem( private val runnable: () -> Unit, - val deadline: Long + private var deadline: Long, + private val period: Long = 0 ) : DispatchWorkItem { - override var isCancelled: Boolean = false + private var performCount = 0 - override fun cancel() { this.isCancelled = true } + + // Test + fun isPastDue(tickTime: Long): Boolean { + return this.deadline <= tickTime + } fun perform() { if (isCancelled) return runnable.invoke() + performCount += 1 + + // If the task is repeatable, then schedule the next deadline after the given period + deadline += (performCount * period) + } + + // DispatchWorkItem + override var isCancelled: Boolean = false + + override fun cancel() { + this.isCancelled = true } } diff --git a/src/test/kotlin/org/phoenixframework/WebSocketTransportTest.kt b/src/test/kotlin/org/phoenixframework/WebSocketTransportTest.kt index 881b542..cc2353b 100644 --- a/src/test/kotlin/org/phoenixframework/WebSocketTransportTest.kt +++ b/src/test/kotlin/org/phoenixframework/WebSocketTransportTest.kt @@ -107,7 +107,7 @@ class WebSocketTransportTest { val throwable = SSLException("t") transport.onFailure(mockWebSocket, throwable, mockResponse) verify(mockOnError).invoke(throwable, mockResponse) - verify(mockOnClose).invoke(4001) + verify(mockOnClose).invoke(4000) } @Test @@ -120,7 +120,7 @@ class WebSocketTransportTest { val throwable = EOFException() transport.onFailure(mockWebSocket, throwable, mockResponse) verify(mockOnError).invoke(throwable, mockResponse) - verify(mockOnClose).invoke(4002) + verify(mockOnClose).invoke(4000) } @Test