Skip to content

Commit

Permalink
Moving heartbeat task to repeat at fixed rate (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsrees authored May 22, 2019
1 parent 86a20a2 commit f65ba25
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 17 deletions.
17 changes: 17 additions & 0 deletions src/main/kotlin/org/phoenixframework/DispatchQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ import java.util.concurrent.TimeUnit
interface DispatchQueue {
/** Queue a Runnable to be executed after a given time unit delay */
fun queue(delay: Long, unit: TimeUnit, runnable: () -> Unit): DispatchWorkItem

/**
* Creates and executes a periodic action that becomes enabled first after the given initial
* delay, and subsequently with the given period; that is, executions will commence after
* initialDelay, then initialDelay + period, then initialDelay + 2 * period, and so on.
*/
fun queueAtFixedRate(delay: Long, period: Long, unit: TimeUnit, runnable: () -> Unit): DispatchWorkItem
}

/** Abstracts away a future task */
Expand Down Expand Up @@ -64,6 +71,16 @@ class ScheduledDispatchQueue(poolSize: Int = 8) : DispatchQueue {
val scheduledFuture = scheduledThreadPoolExecutor.schedule(runnable, delay, unit)
return ScheduledDispatchWorkItem(scheduledFuture)
}

override fun queueAtFixedRate(
delay: Long,
period: Long,
unit: TimeUnit,
runnable: () -> Unit
): DispatchWorkItem {
val scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(runnable, delay, period, unit)
return ScheduledDispatchWorkItem(scheduledFuture)
}
}

/**
Expand Down
10 changes: 4 additions & 6 deletions src/main/kotlin/org/phoenixframework/Socket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ const val WS_CLOSE_NORMAL = 1000
/** The socket was closed due to a SocketException. Likely the client lost connectivity */
const val WS_CLOSE_SOCKET_EXCEPTION = 4000

/** The socket was closed due to an SSLException. Likely the client lost connectivity */
const val WS_CLOSE_SSL_EXCEPTION = 4001

/** The socket was closed due to an EOFException. Likely the server abruptly closed */
const val WS_CLOSE_EOF_EXCEPTION = 4002

/**
* Connects to a Phoenix Server
Expand Down Expand Up @@ -366,8 +361,11 @@ class Socket(

// Do not start up the heartbeat timer if skipHeartbeat is true
if (skipHeartbeat) return
val delay = heartbeatInterval
val period = heartbeatInterval

heartbeatTask =
dispatchQueue.queue(heartbeatInterval, TimeUnit.MILLISECONDS) { sendHeartbeat() }
dispatchQueue.queueAtFixedRate(delay, period, TimeUnit.MILLISECONDS) { sendHeartbeat() }
}

internal fun sendHeartbeat() {
Expand Down
6 changes: 3 additions & 3 deletions src/test/kotlin/org/phoenixframework/SocketTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ class SocketTest {

socket.onConnectionOpened()
verify(mockTask).cancel()
verify(mockDispatchQueue).queue(any(), any(), any())
verify(mockDispatchQueue).queueAtFixedRate(any(), any(), any(), any())
}

@Test
Expand Down Expand Up @@ -541,7 +541,7 @@ class SocketTest {
@Test
fun `resetHeartbeat() creates a future heartbeat task`() {
val mockTask = mock<DispatchWorkItem>()
whenever(mockDispatchQueue.queue(any(), any(), any())).thenReturn(mockTask)
whenever(mockDispatchQueue.queueAtFixedRate(any(), any(), any(), any())).thenReturn(mockTask)

whenever(connection.readyState).thenReturn(Transport.ReadyState.OPEN)
socket.connect()
Expand All @@ -552,7 +552,7 @@ class SocketTest {

assertThat(socket.heartbeatTask).isNotNull()
argumentCaptor<() -> Unit> {
verify(mockDispatchQueue).queue(eq(5_000L), eq(TimeUnit.MILLISECONDS), capture())
verify(mockDispatchQueue).queueAtFixedRate(eq(5_000L), eq(5_000L), eq(TimeUnit.MILLISECONDS), capture())

// fire the task
allValues.first().invoke()
Expand Down
44 changes: 38 additions & 6 deletions src/test/kotlin/org/phoenixframework/TestUtilities.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ManualDispatchQueue : DispatchQueue {

// Filter all work items that are due to be fired and have not been
// cancelled. Return early if there are no items to fire
val pastDueWorkItems = workItems.filter { it.deadline <= this.tickTime && !it.isCancelled }
val pastDueWorkItems = workItems.filter { it.isPastDue(tickTime) && !it.isCancelled }

// if no items are due, then return early
if (pastDueWorkItems.isEmpty()) return
Expand All @@ -33,10 +33,9 @@ class ManualDispatchQueue : DispatchQueue {
pastDueWorkItems.forEach { it.perform() }

// Remove all work items that are past due or canceled
workItems.removeAll { it.deadline <= this.tickTime || it.isCancelled }
workItems.removeAll { it.isPastDue(tickTime) || it.isCancelled }
}


override fun queue(delay: Long, unit: TimeUnit, runnable: () -> Unit): DispatchWorkItem {
// Converts the given unit and delay to the unit used by this class
val delayInMs = tickTimeUnit.convert(delay, unit)
Expand All @@ -47,23 +46,56 @@ class ManualDispatchQueue : DispatchQueue {

return workItem
}

override fun queueAtFixedRate(
delay: Long,
period: Long,
unit: TimeUnit,
runnable: () -> Unit
): DispatchWorkItem {

val delayInMs = tickTimeUnit.convert(delay, unit)
val periodInMs = tickTimeUnit.convert(period, unit)
val deadline = tickTime + delayInMs

val workItem = ManualDispatchWorkItem(runnable, deadline, periodInMs)
workItems.add(workItem)

return workItem
}
}

//------------------------------------------------------------------------------
// Work Item
//------------------------------------------------------------------------------
class ManualDispatchWorkItem(
private val runnable: () -> Unit,
val deadline: Long
private var deadline: Long,
private val period: Long = 0
) : DispatchWorkItem {

override var isCancelled: Boolean = false
private var performCount = 0

override fun cancel() { this.isCancelled = true }

// Test
fun isPastDue(tickTime: Long): Boolean {
return this.deadline <= tickTime
}

fun perform() {
if (isCancelled) return
runnable.invoke()
performCount += 1

// If the task is repeatable, then schedule the next deadline after the given period
deadline += (performCount * period)
}

// DispatchWorkItem
override var isCancelled: Boolean = false

override fun cancel() {
this.isCancelled = true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class WebSocketTransportTest {
val throwable = SSLException("t")
transport.onFailure(mockWebSocket, throwable, mockResponse)
verify(mockOnError).invoke(throwable, mockResponse)
verify(mockOnClose).invoke(4001)
verify(mockOnClose).invoke(4000)
}

@Test
Expand All @@ -120,7 +120,7 @@ class WebSocketTransportTest {
val throwable = EOFException()
transport.onFailure(mockWebSocket, throwable, mockResponse)
verify(mockOnError).invoke(throwable, mockResponse)
verify(mockOnClose).invoke(4002)
verify(mockOnClose).invoke(4000)
}

@Test
Expand Down

0 comments on commit f65ba25

Please sign in to comment.