Skip to content

Commit

Permalink
Make Event Engine code internal.
Browse files Browse the repository at this point in the history
  • Loading branch information
marcin-cebo committed Oct 25, 2023
1 parent 2c461d0 commit 0cc0511
Show file tree
Hide file tree
Showing 54 changed files with 75 additions and 75 deletions.
2 changes: 1 addition & 1 deletion src/main/kotlin/com/pubnub/api/eventengine/Effect.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.pubnub.api.eventengine

interface Effect {
internal interface Effect {
fun runEffect()
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

class EffectDispatcher<T : EffectInvocation>(
internal class EffectDispatcher<T : EffectInvocation>(
private val effectFactory: EffectFactory<T>,
private val effectSource: Source<T>,
private val managedEffects: ConcurrentHashMap<String, ManagedEffect> = ConcurrentHashMap(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.pubnub.api.eventengine

interface EffectFactory<T : EffectInvocation> {
internal interface EffectFactory<T : EffectInvocation> {
fun create(effectInvocation: T): Effect?
}
10 changes: 5 additions & 5 deletions src/main/kotlin/com/pubnub/api/eventengine/EffectInvocation.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.pubnub.api.eventengine

interface EffectInvocation {
internal interface EffectInvocation {
val id: String
val type: EffectInvocationType
}

sealed interface EffectInvocationType
internal sealed interface EffectInvocationType

data class Cancel(val idToCancel: String) : EffectInvocationType
internal data class Cancel(val idToCancel: String) : EffectInvocationType

object Managed : EffectInvocationType
internal object Managed : EffectInvocationType

object NonManaged : EffectInvocationType
internal object NonManaged : EffectInvocationType
2 changes: 1 addition & 1 deletion src/main/kotlin/com/pubnub/api/eventengine/Event.kt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package com.pubnub.api.eventengine

interface Event
internal interface Event
2 changes: 1 addition & 1 deletion src/main/kotlin/com/pubnub/api/eventengine/EventEngine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.pubnub.api.eventengine
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

class EventEngine<Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>>(
internal class EventEngine<Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>>(
private val effectSink: Sink<Ei>,
private val eventSource: Source<Ev>,
private var currentState: S,
Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/com/pubnub/api/eventengine/EventEngineConf.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.pubnub.api.eventengine

interface EventEngineConf<Ei : EffectInvocation, Ev : Event> {
internal interface EventEngineConf<Ei : EffectInvocation, Ev : Event> {
val eventSink: Sink<Ev>
val eventSource: Source<Ev>
val effectSink: Sink<Ei>
val effectSource: Source<Ei>
}

class QueueEventEngineConf<Ei : EffectInvocation, Ev : Event>(
internal class QueueEventEngineConf<Ei : EffectInvocation, Ev : Event>(
effectSinkSource: SinkSource<Ei> = QueueSinkSource(),
eventSinkSource: SinkSource<Ev> = QueueSinkSource()
) : EventEngineConf<Ei, Ev> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.pubnub.api.eventengine

class EventEngineManager<Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>, Ee : EventEngine<Ei, Ev, S>>(
internal class EventEngineManager<Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>, Ee : EventEngine<Ei, Ev, S>>(
private val eventEngine: Ee,
private val effectDispatcher: EffectDispatcher<Ei>,
private val eventSink: Sink<Ev>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.pubnub.api.eventengine

interface ManagedEffect : Effect {
internal interface ManagedEffect : Effect {
fun cancel()
}
4 changes: 2 additions & 2 deletions src/main/kotlin/com/pubnub/api/eventengine/QueueSinkSource.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package com.pubnub.api.eventengine
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue

interface SinkSource<T> : Sink<T>, Source<T>
internal interface SinkSource<T> : Sink<T>, Source<T>

class QueueSinkSource<T>(private val queue: BlockingQueue<T> = LinkedBlockingQueue()) : SinkSource<T> {
internal class QueueSinkSource<T>(private val queue: BlockingQueue<T> = LinkedBlockingQueue()) : SinkSource<T> {
override fun take(): T {
return queue.take()
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/com/pubnub/api/eventengine/Sink.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.pubnub.api.eventengine

interface Sink<T> {
internal interface Sink<T> {
fun add(item: T)
}
2 changes: 1 addition & 1 deletion src/main/kotlin/com/pubnub/api/eventengine/Source.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.pubnub.api.eventengine

interface Source<T> {
internal interface Source<T> {
fun take(): T
}
6 changes: 3 additions & 3 deletions src/main/kotlin/com/pubnub/api/eventengine/State.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.pubnub.api.eventengine

interface State<Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> {
internal interface State<Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> {
open fun onEntry(): Set<Ei> = setOf()
open fun onExit(): Set<Ei> = setOf()
abstract fun transition(event: Ev): Pair<S, Set<Ei>>
}

fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.noTransition(): Pair<S, Set<Ei>> = Pair(this, emptySet())
fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.transitionTo(
internal fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.noTransition(): Pair<S, Set<Ei>> = Pair(this, emptySet())
internal fun <Ei : EffectInvocation, Ev : Event, S : State<Ei, Ev, S>> S.transitionTo(
state: S,
vararg invocations: Ei
): Pair<S, Set<Ei>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ import com.pubnub.api.presence.eventengine.effect.PresenceEffectInvocation
import com.pubnub.api.presence.eventengine.event.PresenceEvent
import com.pubnub.api.presence.eventengine.state.PresenceState

typealias PresenceEventEngineManager = EventEngineManager<PresenceEffectInvocation, PresenceEvent, PresenceState, PresenceEventEngine>
internal typealias PresenceEventEngineManager = EventEngineManager<PresenceEffectInvocation, PresenceEvent, PresenceState, PresenceEventEngine>
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ import com.pubnub.api.subscribe.eventengine.effect.SubscribeEffectInvocation
import com.pubnub.api.subscribe.eventengine.event.SubscribeEvent
import com.pubnub.api.subscribe.eventengine.state.SubscribeState

typealias SubscribeEventEngineManager = EventEngineManager<SubscribeEffectInvocation, SubscribeEvent, SubscribeState, SubscribeEventEngine>
internal typealias SubscribeEventEngineManager = EventEngineManager<SubscribeEffectInvocation, SubscribeEvent, SubscribeState, SubscribeEventEngine>
2 changes: 1 addition & 1 deletion src/main/kotlin/com/pubnub/api/presence/Presence.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import com.pubnub.api.subscribe.eventengine.effect.RetryPolicy
import java.time.Duration
import java.util.concurrent.Executors

interface Presence {
internal interface Presence {
companion object {
internal fun create(
heartbeatProvider: HeartbeatProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import com.pubnub.api.presence.eventengine.effect.PresenceEffectInvocation
import com.pubnub.api.presence.eventengine.event.PresenceEvent
import com.pubnub.api.presence.eventengine.state.PresenceState

typealias PresenceEventEngine = EventEngine<PresenceEffectInvocation, PresenceEvent, PresenceState>
internal typealias PresenceEventEngine = EventEngine<PresenceEffectInvocation, PresenceEvent, PresenceState>

fun PresenceEventEngine(
internal fun PresenceEventEngine(
effectSink: Sink<PresenceEffectInvocation>,
eventSource: Source<PresenceEvent>,
currentState: PresenceState = PresenceState.HeartbeatInactive,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.pubnub.api.eventengine.Sink
import com.pubnub.api.presence.eventengine.event.PresenceEvent
import org.slf4j.LoggerFactory

class HeartbeatEffect(
internal class HeartbeatEffect(
private val heartbeatRemoteAction: RemoteAction<Boolean>,
private val presenceEventSink: Sink<PresenceEvent>
) : Effect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.pubnub.api.endpoints.remoteaction.RemoteAction
import com.pubnub.api.eventengine.Effect
import org.slf4j.LoggerFactory

class LeaveEffect(
internal class LeaveEffect(
private val leaveRemoteAction: RemoteAction<Boolean>
) : Effect {
private val log = LoggerFactory.getLogger(LeaveEffect::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.pubnub.api.eventengine.EffectInvocationType
import com.pubnub.api.eventengine.Managed
import com.pubnub.api.eventengine.NonManaged

sealed class PresenceEffectInvocation(override val type: EffectInvocationType) : EffectInvocation {
internal sealed class PresenceEffectInvocation(override val type: EffectInvocationType) : EffectInvocation {
override val id: String = "any value for NonManged and Cancel effect"

data class Heartbeat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture

class WaitEffect( // todo handle pubnub.configuration.heartbeatInterval <= 0
private val heartbeatInterval: Duration, // todo if the interval is 0 or less, do not start the timer
private val presenceEventSink: Sink<PresenceEvent>, // todo this should be check at PresenceEventEngineCreation
internal class WaitEffect(
private val heartbeatInterval: Duration,
private val presenceEventSink: Sink<PresenceEvent>,
private val executorService: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
) : ManagedEffect {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package com.pubnub.api.presence.eventengine.effect.effectprovider

import com.pubnub.api.endpoints.remoteaction.RemoteAction

fun interface HeartbeatProvider {
internal fun interface HeartbeatProvider {
fun getHeartbeatRemoteAction(channels: Set<String>, channelGroups: Set<String>): RemoteAction<Boolean>
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package com.pubnub.api.presence.eventengine.effect.effectprovider

import com.pubnub.api.endpoints.remoteaction.RemoteAction

fun interface LeaveProvider {
internal fun interface LeaveProvider {
fun getLeaveRemoteAction(channels: Set<String>, channelGroups: Set<String>): RemoteAction<Boolean>
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.pubnub.api.presence.eventengine.event
import com.pubnub.api.PubNubException
import com.pubnub.api.eventengine.Event

sealed class PresenceEvent : Event {
internal sealed class PresenceEvent : Event {
data class Joined(val channels: Set<String>, val channelGroups: Set<String>) : PresenceEvent()
data class Left(val channels: Set<String>, val channelGroups: Set<String>) : PresenceEvent()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.pubnub.api.eventengine.transitionTo
import com.pubnub.api.presence.eventengine.effect.PresenceEffectInvocation
import com.pubnub.api.presence.eventengine.event.PresenceEvent

sealed class PresenceState : State<PresenceEffectInvocation, PresenceEvent, PresenceState> {
internal sealed class PresenceState : State<PresenceEffectInvocation, PresenceEvent, PresenceState> {
object HeartbeatInactive : PresenceState() {
override fun transition(event: PresenceEvent): Pair<PresenceState, Set<PresenceEffectInvocation>> {
return when (event) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/com/pubnub/api/subscribe/Subscribe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.Executors

private const val PRESENCE_CHANNEL_SUFFIX = "-pnpres"

class Subscribe(
internal class Subscribe(
private val subscribeEventEngineManager: SubscribeEventEngineManager,
private val subscriptionData: SubscriptionData = SubscriptionData()
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import com.pubnub.api.subscribe.eventengine.effect.SubscribeEffectInvocation
import com.pubnub.api.subscribe.eventengine.event.SubscribeEvent
import com.pubnub.api.subscribe.eventengine.state.SubscribeState

typealias SubscribeEventEngine = EventEngine<SubscribeEffectInvocation, SubscribeEvent, SubscribeState>
internal typealias SubscribeEventEngine = EventEngine<SubscribeEffectInvocation, SubscribeEvent, SubscribeState>

fun SubscribeEventEngine(
internal fun SubscribeEventEngine(
effectSink: Sink<SubscribeEffectInvocation>,
eventSource: Source<SubscribeEvent>,
currentState: SubscribeState = SubscribeState.Unsubscribed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.pubnub.api.presence.eventengine.event.PresenceEvent
import com.pubnub.api.subscribe.eventengine.effect.SubscribeEffectInvocation
import com.pubnub.api.subscribe.eventengine.event.SubscribeEvent

class EventEnginesConf(
internal class EventEnginesConf(
val subscribe: EventEngineConf<SubscribeEffectInvocation, SubscribeEvent> = QueueEventEngineConf(),
val presence: EventEngineConf<PresenceEffectInvocation, PresenceEvent> = QueueEventEngineConf()
)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.pubnub.api.subscribe.eventengine.data

class SubscriptionData {
internal class SubscriptionData {
internal val channels: MutableSet<String> = mutableSetOf()
internal val channelGroups: MutableSet<String> = mutableSetOf()
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import com.pubnub.api.models.consumer.pubsub.message_actions.PNMessageActionResu
import com.pubnub.api.models.consumer.pubsub.objects.PNObjectEventResult
import org.slf4j.LoggerFactory

class EmitMessagesEffect(
internal class EmitMessagesEffect(
private val messagesConsumer: MessagesConsumer,
private val messages: List<PNEvent>
) : Effect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.pubnub.api.eventengine.Effect
import com.pubnub.api.models.consumer.PNStatus
import org.slf4j.LoggerFactory

class EmitStatusEffect(
internal class EmitStatusEffect(
private val statusConsumer: StatusConsumer,
private val status: PNStatus
) : Effect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import com.pubnub.api.subscribe.eventengine.event.SubscribeEvent
import com.pubnub.api.subscribe.eventengine.event.SubscriptionCursor
import org.slf4j.LoggerFactory

class HandshakeEffect(
internal class HandshakeEffect(
private val handshakeRemoteAction: RemoteAction<SubscriptionCursor>,
private val subscribeEventSink: Sink<SubscribeEvent>,
) : ManagedEffect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit

class HandshakeReconnectEffect(
internal class HandshakeReconnectEffect(
private val handshakeRemoteAction: RemoteAction<SubscriptionCursor>,
private val subscribeEventSink: Sink<SubscribeEvent>,
private val policy: RetryPolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.pubnub.api.models.consumer.pubsub.files.PNFileEventResult
import com.pubnub.api.models.consumer.pubsub.message_actions.PNMessageActionResult
import com.pubnub.api.models.consumer.pubsub.objects.PNObjectEventResult

interface MessagesConsumer {
internal interface MessagesConsumer {
fun announce(message: PNMessageResult)

fun announce(presence: PNPresenceEventResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.pubnub.api.eventengine.Sink
import com.pubnub.api.subscribe.eventengine.event.SubscribeEvent
import org.slf4j.LoggerFactory

class ReceiveMessagesEffect(
internal class ReceiveMessagesEffect(
private val receiveMessagesRemoteAction: RemoteAction<ReceiveMessagesResult>,
private val subscribeEventSink: Sink<SubscribeEvent>,
) : ManagedEffect {
Expand All @@ -21,7 +21,7 @@ class ReceiveMessagesEffect(
subscribeEventSink.add(
SubscribeEvent.ReceiveFailure(
status.exception
?: PubNubException("Unknown error") // todo check it that can happen
?: PubNubException("Unknown error")
)
)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit

class ReceiveReconnectEffect(
internal class ReceiveReconnectEffect(
private val receiveMessagesRemoteAction: RemoteAction<ReceiveMessagesResult>,
private val subscribeEventSink: Sink<SubscribeEvent>,
private val policy: RetryPolicy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.pubnub.api.subscribe.eventengine.effect
import java.time.Duration
import kotlin.math.pow

abstract class RetryPolicy {
internal abstract class RetryPolicy {
protected abstract val maxRetries: Int
protected abstract fun computeDelay(count: Int): Duration
fun nextDelay(attempt: Int): Duration? {
Expand All @@ -14,18 +14,18 @@ abstract class RetryPolicy {
}
}

object NoRetriesPolicy : RetryPolicy() {
internal object NoRetriesPolicy : RetryPolicy() {
override val maxRetries: Int = 0
override fun computeDelay(count: Int): Duration = Duration.ZERO
}

class LinearPolicy(
internal class LinearPolicy(
override val maxRetries: Int = 5, // LinearPolicy is created in PNConfiguration default maxRetries is -1 which is unlimited
private val fixedDelay: Duration = Duration.ofSeconds(3)
) : RetryPolicy() {
override fun computeDelay(count: Int): Duration = fixedDelay
}

class ExponentialPolicy(override val maxRetries: Int = 5) : RetryPolicy() { // LinearPolicy is created in PNConfiguration default maxRetries is -1 which is unlimited
internal class ExponentialPolicy(override val maxRetries: Int = 5) : RetryPolicy() { // LinearPolicy is created in PNConfiguration default maxRetries is -1 which is unlimited
override fun computeDelay(count: Int): Duration = Duration.ofSeconds((2.0.pow(count - 1)).toLong())
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ package com.pubnub.api.subscribe.eventengine.effect

import com.pubnub.api.models.consumer.PNStatus

interface StatusConsumer {
internal interface StatusConsumer {
fun announce(status: PNStatus)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.pubnub.api.subscribe.eventengine.event.SubscriptionCursor
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService

data class ReceiveMessagesResult(
internal data class ReceiveMessagesResult(
val messages: List<PNEvent>,
val subscriptionCursor: SubscriptionCursor
)
Expand Down
Loading

0 comments on commit 0cc0511

Please sign in to comment.