Skip to content

Commit

Permalink
EventEngine beta flag handling changes
Browse files Browse the repository at this point in the history
Change enableSubscribeBeta flag to enableEventEngine
Add "ee" query parameter to requests if enableEventEngine=true
  • Loading branch information
wkal-pubnub committed Oct 20, 2023
1 parent c65d84b commit 1861577
Show file tree
Hide file tree
Showing 16 changed files with 182 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ abstract class BaseIntegrationTest {
return pnConfiguration
}

private fun getServerPnConfiguration(): PNConfiguration {
protected fun getServerPnConfiguration(): PNConfiguration {
val pnConfiguration = PNConfiguration(userId = UserId(PubNub.generateUUID()))
pnConfiguration.subscribeKey = Keys.pamSubKey
pnConfiguration.publishKey = Keys.pamPubKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import com.pubnub.api.await
import com.pubnub.api.callbacks.SubscribeCallback
import com.pubnub.api.enums.PNHeartbeatNotificationOptions
import com.pubnub.api.enums.PNOperationType
import com.pubnub.api.listen
import com.pubnub.api.models.consumer.PNStatus
import com.pubnub.api.models.consumer.pubsub.PNPresenceEventResult
import com.pubnub.api.subscribeToBlocking
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import okhttp3.logging.HttpLoggingInterceptor
import org.awaitility.Awaitility
import org.awaitility.Durations
import org.hamcrest.core.IsEqual
import org.junit.Assert
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
Expand Down Expand Up @@ -251,4 +255,64 @@ class PresenceIntegrationTests : BaseIntegrationTest() {
subscribeSuccess.get() && heartbeatCallsCount.get() > 2
}
}

@Test
fun testPresenceWithEeQueryParam() {
val success = AtomicBoolean()
val config = getBasicPnConfiguration()
config.enableEventEngine = true
config.heartbeatInterval = 1
var output: String? = null
config.httpLoggingInterceptor = HttpLoggingInterceptor {
if (it.contains("GET https://")) {
output = it
success.set(true)
}
}.apply { level = HttpLoggingInterceptor.Level.BASIC }
val pubnub = PubNub(config)
try {
pubnub.presence(
channels = listOf("a"),
connected = true
)

success.listen()

val url = output?.substringAfter("--> GET ")?.toHttpUrlOrNull()
Assert.assertNotNull(url)
assertTrue(url!!.queryParameterNames.contains("ee"))
} finally {
pubnub.forceDestroy()
}
}

@Test
fun testPresenceWithoutEeQueryParam() {
val success = AtomicBoolean()
val config = getBasicPnConfiguration()
config.enableEventEngine = false
config.heartbeatInterval = 1
var output: String? = null
config.httpLoggingInterceptor = HttpLoggingInterceptor {
if (it.contains("GET https://")) {
output = it
success.set(true)
}
}.apply { level = HttpLoggingInterceptor.Level.BASIC }
val pubnub = PubNub(config)
try {
pubnub.presence(
channels = listOf("a"),
connected = true
)

success.listen()

val url = output?.substringAfter("--> GET ")?.toHttpUrlOrNull()
Assert.assertNotNull(url)
assertFalse(url!!.queryParameterNames.contains("ee"))
} finally {
pubnub.forceDestroy()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import com.pubnub.api.models.consumer.PNStatus
import com.pubnub.api.models.consumer.pubsub.PNMessageResult
import com.pubnub.api.subscribeToBlocking
import com.pubnub.api.unsubscribeFromBlocking
import okhttp3.HttpUrl.Companion.toHttpUrlOrNull
import okhttp3.logging.HttpLoggingInterceptor
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertTrue
import org.junit.Test
import java.util.concurrent.atomic.AtomicBoolean
Expand Down Expand Up @@ -126,4 +130,60 @@ class SubscribeIntegrationTests : BaseIntegrationTest() {

success.listen()
}

@Test
fun testSubscribeWithEeQueryParam() {
val success = AtomicBoolean()
val config = getBasicPnConfiguration()
config.enableEventEngine = true
var output: String? = null
config.httpLoggingInterceptor = HttpLoggingInterceptor {
if (it.contains("GET https://")) {
output = it
success.set(true)
}
}.apply { level = HttpLoggingInterceptor.Level.BASIC }
val pubnub = PubNub(config)
try {
pubnub.subscribe(
channels = listOf("a")
)

success.listen()

val url = output?.substringAfter("--> GET ")?.toHttpUrlOrNull()
assertNotNull(url)
assertTrue(url!!.queryParameterNames.contains("ee"))
} finally {
pubnub.forceDestroy()
}
}

@Test
fun testSubscribeWithoutEeQueryParam() {
val success = AtomicBoolean()
val config = getBasicPnConfiguration()
config.enableEventEngine = false
var output: String? = null
config.httpLoggingInterceptor = HttpLoggingInterceptor {
if (it.contains("GET https://")) {
output = it
success.set(true)
}
}.apply { level = HttpLoggingInterceptor.Level.BASIC }
val pubnub = PubNub(config)
try {
pubnub.subscribe(
channels = listOf("a")
)

success.listen()

val url = output?.substringAfter("--> GET ")?.toHttpUrlOrNull()
assertNotNull(url)
assertFalse(url!!.queryParameterNames.contains("ee"))
} finally {
pubnub.forceDestroy()
}
}
}
34 changes: 12 additions & 22 deletions src/main/kotlin/com/pubnub/api/PNConfiguration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,43 +26,33 @@ import javax.net.ssl.X509ExtendedTrustManager
* A storage for user-provided information which describe further PubNub client behaviour.
* Configuration instance contains additional set of properties which
* allow performing precise PubNub client configuration.
*
*/
open class PNConfiguration(
userId: UserId,
enableSubscribeBeta: Boolean = false
var userId: UserId
) {
var enableEventEngine: Boolean = false

@Deprecated(
replaceWith = ReplaceWith(
"PNConfiguration(userId = UserId(uuid), enableSubscribeBeta = enableSubscribeBeta)",
"PNConfiguration(userId = UserId(uuid))",
"com.pubnub.api.PNConfiguration"
),
level = DeprecationLevel.WARNING,
message = "Use PNConfiguration(UserId, Boolean) instead"
message = "Use PNConfiguration(UserId) instead, and set the enableEventEngine property separately."
)
constructor(uuid: String, enableSubscribeBeta: Boolean = false) : this(UserId(uuid), enableSubscribeBeta)

var enableSubscribeBeta: Boolean = enableSubscribeBeta
internal set
constructor(uuid: String, enableEventEngine: Boolean = false) : this(UserId(uuid)) {
this.enableEventEngine = enableEventEngine
}

@Deprecated(
level = DeprecationLevel.WARNING,
message = """Use UserId instead e.g. config.userId = UserId("uuid")""",
message = """Use UserId instead e.g. config.userId.value""",
replaceWith = ReplaceWith("userId.value")
)
@Volatile
var uuid: String = userId.value
set(value) {
PubNubUtil.require(value.isValid(), PubNubError.UUID_NULL_OR_EMPTY)
field = value
}

@Suppress("DEPRECATION")
var userId: UserId
get() = UserId(uuid)
set(value) {
uuid = value.value
}
var uuid: String
get() = userId.value
set(value) { userId = UserId(value) }

private val log = LoggerFactory.getLogger("PNConfiguration")

Expand Down
22 changes: 11 additions & 11 deletions src/main/kotlin/com/pubnub/api/PubNub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class PubNub internal constructor(
heartbeatProvider = HeartbeatProviderImpl(this),
leaveProvider = LeaveProviderImpl(this),
heartbeatInterval = Duration.ofSeconds(configuration.heartbeatInterval.toLong()),
enableEventEngine = configuration.enableSubscribeBeta,
enableEventEngine = configuration.enableEventEngine,
retryPolicy = configuration.retryPolicy,
eventEngineConf = eventEnginesConf.presence
)
Expand Down Expand Up @@ -321,7 +321,7 @@ class PubNub internal constructor(
channelGroups: List<String> = emptyList(),
withPresence: Boolean = false,
withTimetoken: Long = 0L
) = if (configuration.enableSubscribeBeta) {
) = if (configuration.enableEventEngine) {
subscribe.subscribe(channels.toSet(), channelGroups.toSet(), withPresence, withTimetoken)
presence.joined(channels.toSet(), channelGroups.toSet())
} else {
Expand All @@ -348,7 +348,7 @@ class PubNub internal constructor(
fun unsubscribe(
channels: List<String> = emptyList(),
channelGroups: List<String> = emptyList()
) = if (configuration.enableSubscribeBeta) {
) = if (configuration.enableEventEngine) {
subscribe.unsubscribe(channels.toSet(), channelGroups.toSet())
presence.left(channels.toSet(), channelGroups.toSet())
} else {
Expand All @@ -359,7 +359,7 @@ class PubNub internal constructor(
* Unsubscribe from all channels and all channel groups
*/
fun unsubscribeAll() =
if (configuration.enableSubscribeBeta) {
if (configuration.enableEventEngine) {
subscribe.unsubscribeAll()
presence.leftAll()
} else {
Expand All @@ -372,7 +372,7 @@ class PubNub internal constructor(
* @return A list of channels the client is currently subscribed to.
*/
fun getSubscribedChannels() =
if (configuration.enableSubscribeBeta)
if (configuration.enableEventEngine)
subscribe.getSubscribedChannels()
else
subscriptionManager.getSubscribedChannels()
Expand All @@ -383,7 +383,7 @@ class PubNub internal constructor(
* @return A list of channel groups the client is currently subscribed to.
*/
fun getSubscribedChannelGroups() =
if (configuration.enableSubscribeBeta)
if (configuration.enableEventEngine)
subscribe.getSubscribedChannelGroups()
else
subscriptionManager.getSubscribedChannelGroups()
Expand Down Expand Up @@ -780,7 +780,7 @@ class PubNub internal constructor(
channels: List<String> = emptyList(),
channelGroups: List<String> = emptyList(),
connected: Boolean = false
) = if (configuration.enableSubscribeBeta) {
) = if (configuration.enableEventEngine) {
presence.presence(
channels = channels.toSet(),
channelGroups = channelGroups.toSet(),
Expand Down Expand Up @@ -2030,7 +2030,7 @@ class PubNub internal constructor(
* Force the SDK to try and reach out PubNub. Monitor the results in [SubscribeCallback.status]
*/
fun reconnect() {
if (configuration.enableSubscribeBeta) {
if (configuration.enableEventEngine) {
// todo handle in Subscribe EE
subscribe.reconnect()
} else {
Expand All @@ -2044,7 +2044,7 @@ class PubNub internal constructor(
* Monitor the results in [SubscribeCallback.status]
*/
fun disconnect() {
if (configuration.enableSubscribeBeta) {
if (configuration.enableEventEngine) {
subscribe.disconnect()
} else {
subscriptionManager.disconnect()
Expand All @@ -2055,7 +2055,7 @@ class PubNub internal constructor(
* Frees up threads and allows for a clean exit.
*/
fun destroy() {
if (configuration.enableSubscribeBeta) {
if (configuration.enableEventEngine) {
subscribe.destroy()
retrofitManager.destroy()
// todo add presenceEventEngineDestroy
Expand All @@ -2069,7 +2069,7 @@ class PubNub internal constructor(
* Same as [destroy] but immediately.
*/
fun forceDestroy() {
if (configuration.enableSubscribeBeta) {
if (configuration.enableEventEngine) {
subscribe.destroy()
retrofitManager.destroy(true)
// todo add presenceEventEngineDestroy
Expand Down
6 changes: 6 additions & 0 deletions src/main/kotlin/com/pubnub/api/PubNubUtil.kt
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ internal object PubNubUtil {
throw PubNubException(error)
}
}

internal fun maybeAddEeQueryParam(configuration: PNConfiguration, queryParams: MutableMap<String, String>) {
if (configuration.enableEventEngine) {
queryParams["ee"] = ""
}
}
}

internal fun <E> List<E>.toCsv(): String {
Expand Down
3 changes: 3 additions & 0 deletions src/main/kotlin/com/pubnub/api/endpoints/presence/GetState.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.pubnub.api.Endpoint
import com.pubnub.api.PubNub
import com.pubnub.api.PubNubError
import com.pubnub.api.PubNubException
import com.pubnub.api.PubNubUtil
import com.pubnub.api.enums.PNOperationType
import com.pubnub.api.models.consumer.presence.PNGetStateResult
import com.pubnub.api.models.server.Envelope
Expand Down Expand Up @@ -36,6 +37,8 @@ class GetState internal constructor(
override fun doWork(queryParams: HashMap<String, String>): Call<Envelope<JsonElement>> {
addQueryParams(queryParams)

PubNubUtil.maybeAddEeQueryParam(pubnub.configuration, queryParams)

return pubnub.retrofitManager.presenceService.getState(
pubnub.configuration.subscribeKey,
channels.toCsv(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.pubnub.api.Endpoint
import com.pubnub.api.PubNub
import com.pubnub.api.PubNubError
import com.pubnub.api.PubNubException
import com.pubnub.api.PubNubUtil
import com.pubnub.api.enums.PNOperationType
import retrofit2.Call
import retrofit2.Response
Expand Down Expand Up @@ -43,6 +44,8 @@ class Heartbeat internal constructor(
queryParams["state"] = pubnub.mapper.toJson(it)
}

PubNubUtil.maybeAddEeQueryParam(pubnub.configuration, queryParams)

return pubnub.retrofitManager.presenceService.heartbeat(
pubnub.configuration.subscribeKey,
channelsCsv,
Expand Down
3 changes: 3 additions & 0 deletions src/main/kotlin/com/pubnub/api/endpoints/presence/HereNow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.pubnub.api.endpoints.presence
import com.google.gson.JsonElement
import com.pubnub.api.Endpoint
import com.pubnub.api.PubNub
import com.pubnub.api.PubNubUtil
import com.pubnub.api.enums.PNOperationType
import com.pubnub.api.models.consumer.presence.PNHereNowChannelData
import com.pubnub.api.models.consumer.presence.PNHereNowOccupantData
Expand Down Expand Up @@ -32,6 +33,8 @@ class HereNow internal constructor(
override fun doWork(queryParams: HashMap<String, String>): Call<Envelope<JsonElement>> {
addQueryParams(queryParams)

PubNubUtil.maybeAddEeQueryParam(pubnub.configuration, queryParams)

return if (!isGlobalHereNow()) {
pubnub.retrofitManager.presenceService.hereNow(
pubnub.configuration.subscribeKey,
Expand Down
3 changes: 3 additions & 0 deletions src/main/kotlin/com/pubnub/api/endpoints/presence/Leave.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.pubnub.api.Endpoint
import com.pubnub.api.PubNub
import com.pubnub.api.PubNubError
import com.pubnub.api.PubNubException
import com.pubnub.api.PubNubUtil
import com.pubnub.api.enums.PNOperationType
import com.pubnub.api.toCsv
import retrofit2.Call
Expand All @@ -28,6 +29,8 @@ class Leave internal constructor(pubnub: PubNub) : Endpoint<Void, Boolean>(pubnu
override fun doWork(queryParams: HashMap<String, String>): Call<Void> {
queryParams["channel-group"] = channelGroups.toCsv()

PubNubUtil.maybeAddEeQueryParam(pubnub.configuration, queryParams)

return pubnub.retrofitManager.presenceService.leave(
pubnub.configuration.subscribeKey,
channels.toCsv(),
Expand Down
Loading

0 comments on commit 1861577

Please sign in to comment.