Skip to content

Commit

Permalink
Improve tests setup
Browse files Browse the repository at this point in the history
* no parallel in tests
* cleanup code in tests
* use auto port in tests
* simpler limiter implementation
* implement logging for multiplexed transport
  • Loading branch information
whyoleg committed Mar 13, 2024
1 parent 91a07cf commit 8d53cb7
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 99 deletions.
9 changes: 6 additions & 3 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
fail-fast: false
matrix:
os: [ 'ubuntu-latest' ]
target: [ 'jvmAll', 'js', 'native' ]
target: [ 'jvm', 'jvm11', 'jvm17', 'jvm21', 'js', 'native' ]
include:
- os: 'macos-14'
target: 'macos'
Expand All @@ -43,8 +43,11 @@ jobs:
- uses: actions/checkout@v4
- uses: ./.github/actions/setup-gradle

- run: ./gradlew ${{ matrix.target }}Test --continue
timeout-minutes: 30
- name: Build tests
run: ./gradlew ${{ matrix.target }}Test --continue -Prsocketbuild.skipTests=true

- name: Run tests
run: ./gradlew ${{ matrix.target }}Test --continue --no-configuration-cache --max-workers=1

- if: always() && !cancelled()
uses: actions/upload-artifact@v4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TestConnection : RSocketTransportSession.Sequential, RSocketClientTarget {

init {
coroutineContext.job.invokeOnCompletion {
sendChannel.close(it)
sendChannel.cancelWithCause(it)
receiveChannel.cancelWithCause(it)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
}
requestChannel { init, payloads ->
init.close()
payloads.onEach { it.close() }.launchIn(this)
flow { repeat(10) { emitOrClose(payload("server got -> [$it]")) } }
flow {
coroutineScope {
payloads.onEach { it.close() }.launchIn(this)
repeat(10) { emitOrClose(payload("server got -> [$it]")) }
}
}
}
}
}
Expand Down Expand Up @@ -251,10 +255,10 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
}
})
val request = flow<Payload> { error("test") }
//TODO
kotlin.runCatching {
// TODO: should requester fail if there was a failure in `request`?
assertFailsWith(IllegalStateException::class) {
requester.requestChannel(Payload.Empty, request).collect()
}.also(::println)
}
val e = error.await()
assertTrue(e is RSocketError.ApplicationError)
assertEquals("test", e.message)
Expand Down Expand Up @@ -376,10 +380,10 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
requesterReceiveChannel.cancel()
delay(1000)

assertTrue(requesterSendChannel.isClosedForSend)
assertTrue(responderSendChannel.isClosedForSend)
assertTrue(requesterReceiveChannel.isClosedForReceive)
assertTrue(responderReceiveChannel.isClosedForReceive)
assertTrue(requesterSendChannel.isClosedForSend, "requesterSendChannel.isClosedForSend")
assertTrue(responderSendChannel.isClosedForSend, "responderSendChannel.isClosedForSend")
assertTrue(requesterReceiveChannel.isClosedForReceive, "requesterReceiveChannel.isClosedForReceive")
assertTrue(responderReceiveChannel.isClosedForReceive, "responderReceiveChannel.isClosedForReceive")
}

private suspend fun initRequestChannel(
Expand Down Expand Up @@ -413,7 +417,7 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {

private suspend inline fun cancel(
requesterChannel: SendChannel<Payload>,
responderChannel: ReceiveChannel<Payload>
responderChannel: ReceiveChannel<Payload>,
) {
responderChannel.cancel()
delay(100)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class RSocketRequesterTest : TestWithConnection(), TestWithLeakCheck {
@Test
fun testRequestReplyWithCancel() = test {
connection.test {
withTimeoutOrNull(100) { requester.requestResponse(Payload.Empty) }
assertTrue(withTimeoutOrNull(100) { requester.requestResponse(Payload.Empty) } == null)

awaitFrame { assertTrue(it is RequestFrame) }
awaitFrame { assertTrue(it is CancelFrame) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import io.rsocket.kotlin.ktor.client.*
import io.rsocket.kotlin.ktor.server.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.test.*
import io.rsocket.kotlin.transport.tests.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.test.*
Expand All @@ -38,7 +37,6 @@ import io.rsocket.kotlin.ktor.client.RSocketSupport as ClientRSocketSupport
import io.rsocket.kotlin.ktor.server.RSocketSupport as ServerRSocketSupport

class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck {
private val port = PortProvider.next()
private val client = HttpClient(ClientCIO) {
install(ClientWebSockets)
install(ClientRSocketSupport) {
Expand All @@ -52,7 +50,7 @@ class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck {

private var responderJob: Job? = null

private val server = embeddedServer(ServerCIO, port) {
private val server = embeddedServer(ServerCIO, port = 0) {
install(ServerWebSockets)
install(ServerRSocketSupport) {
server = TestServer()
Expand Down Expand Up @@ -87,7 +85,7 @@ class WebSocketConnectionTest : SuspendTest, TestWithLeakCheck {

@Test
fun testWorks() = test {
val rSocket = client.rSocket(port = port)
val rSocket = client.rSocket(port = server.resolvedConnectors().single().port)
val requesterJob = rSocket.coroutineContext.job

rSocket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds

//TODO: need to somehow rework those tests, as now they are super flaky
// there is some issue in K/N tcp...
abstract class TransportTest : SuspendTest, TestWithLeakCheck {
override val testTimeout: Duration = 3.minutes
override val testTimeout: Duration = 5.minutes

private val testJob = SupervisorJob()
protected val testContext = testJob + TestExceptionHandler
Expand Down Expand Up @@ -60,25 +61,25 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {
@Test
fun fireAndForget10() = test {
(1..10).map { async { client.fireAndForget(payload(it)) } }.awaitAll()
delay(1000) //TODO: leak check
delay(100) // TODO: leak check
}

@Test
open fun largePayloadFireAndForget10() = test {
(1..10).map { async { client.fireAndForget(requesterLargePayload) } }.awaitAll()
delay(1000) //TODO: leak check
delay(100) // TODO: leak check
}

@Test
fun metadataPush10() = test {
(1..10).map { async { client.metadataPush(packet(requesterData)) } }.awaitAll()
delay(1000) //TODO: leak check
delay(100) // TODO: leak check
}

@Test
open fun largePayloadMetadataPush10() = test {
(1..10).map { async { client.metadataPush(packet(requesterLargeData)) } }.awaitAll()
delay(1000) //TODO: leak check
delay(100) // TODO: leak check
}

@Test
Expand All @@ -89,44 +90,50 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {

@Test
fun requestChannel1() = test(10.seconds) {
val list = client.requestChannel(payload(0), flowOf(payload(0))).onEach { it.close() }.toList()
assertEquals(1, list.size)
val count =
client.requestChannel(payload(0), flowOf(payload(0)))
.onEach { it.close() }
.count()
assertEquals(1, count)
}

@Test
fun requestChannel3() = test {
val request = flow {
repeat(3) { emit(payload(it)) }
}
val list =
client.requestChannel(payload(0), request).flowOn(PrefetchStrategy(3, 0)).onEach { it.close() }.toList()
assertEquals(3, list.size)
val count =
client.requestChannel(payload(0), request)
.flowOn(PrefetchStrategy(3, 0))
.onEach { it.close() }
.count()
assertEquals(3, count)
}

@Test
open fun largePayloadRequestChannel200() = test {
val request = flow {
repeat(200) { emit(requesterLargePayload) }
}
val list =
val count =
client.requestChannel(requesterLargePayload, request)
.flowOn(PrefetchStrategy(Int.MAX_VALUE, 0))
.onEach { it.close() }
.toList()
assertEquals(200, list.size)
.count()
assertEquals(200, count)
}

@Test
@Ignore //flaky, ignore for now
@IgnoreNative //flaky, ignore for now
fun requestChannel20000() = test {
val request = flow {
repeat(20_000) { emit(payload(7)) }
}
val list = client.requestChannel(payload(7), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)).onEach {
val count = client.requestChannel(payload(7), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)).onEach {
assertEquals(requesterData, it.data.readText())
assertEquals(requesterMetadata, it.metadata?.readText())
}.toList()
assertEquals(20_000, list.size)
}.count()
assertEquals(20_000, count)
}

@Test
Expand All @@ -135,23 +142,26 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {
val request = flow {
repeat(200_000) { emit(payload(it)) }
}
val list =
client.requestChannel(payload(0), request).flowOn(PrefetchStrategy(10000, 0)).onEach { it.close() }.toList()
assertEquals(200_000, list.size)
val count =
client.requestChannel(payload(0), request)
.flowOn(PrefetchStrategy(10000, 0))
.onEach { it.close() }
.count()
assertEquals(200_000, count)
}

@Test
@Ignore //flaky, ignore for now
@IgnoreNative //flaky, ignore for now
fun requestChannel16x256() = test {
val request = flow {
repeat(256) {
emit(payload(it))
}
}
(0..16).map {
async(Dispatchers.Default) {
val list = client.requestChannel(payload(0), request).onEach { it.close() }.toList()
assertEquals(256, list.size)
async {
val count = client.requestChannel(payload(0), request).onEach { it.close() }.count()
assertEquals(256, count)
}
}.awaitAll()
}
Expand All @@ -165,30 +175,30 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {
}
}
(0..256).map {
async(Dispatchers.Default) {
val list = client.requestChannel(payload(0), request).onEach { it.close() }.toList()
assertEquals(512, list.size)
async {
val count = client.requestChannel(payload(0), request).onEach { it.close() }.count()
assertEquals(512, count)
}
}.awaitAll()
}

@Test
@Ignore //flaky, ignore for now
@IgnoreNative //flaky, ignore for now
fun requestChannel500NoLeak() = test {
val request = flow {
repeat(10_000) { emitOrClose(payload(3)) }
}
val list =
val count =
client
.requestChannel(payload(3), request)
.flowOn(PrefetchStrategy(Int.MAX_VALUE, 0))
.take(500)
.onEach {
assertEquals(requesterData, it.data.readText())
assertEquals(requesterMetadata, it.metadata?.readText())
}.toList()
assertEquals(500, list.size)
delay(1000) //TODO: leak check
}
.count()
assertEquals(500, count)
}

@Test
Expand All @@ -202,6 +212,7 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {
}

@Test
@IgnoreNative
fun requestResponse100() = test {
(1..100).map { async { client.requestResponse(payload(it)).let(Companion::checkPayload) } }.awaitAll()
}
Expand All @@ -212,7 +223,7 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {
}

@Test
@Ignore //flaky, ignore for now
@IgnoreNative //flaky, ignore for now
fun requestResponse10000() = test {
(1..10000).map { async { client.requestResponse(payload(3)).let(Companion::checkPayload) } }.awaitAll()
}
Expand All @@ -225,29 +236,29 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {

@Test
fun requestStream5() = test {
val list =
client.requestStream(payload(3)).flowOn(PrefetchStrategy(5, 0)).take(5).onEach { checkPayload(it) }.toList()
assertEquals(5, list.size)
val count =
client.requestStream(payload(3)).flowOn(PrefetchStrategy(5, 0)).take(5).onEach { checkPayload(it) }.count()
assertEquals(5, count)
}

@Test
@IgnoreNative
fun requestStream10000() = test {
val list = client.requestStream(payload(3)).onEach { checkPayload(it) }.toList()
assertEquals(10000, list.size)
val count = client.requestStream(payload(3)).onEach { checkPayload(it) }.count()
assertEquals(10000, count)
}

@Test
@Ignore //flaky, ignore for now
@IgnoreNative
fun requestStream500NoLeak() = test {
val list =
val count =
client
.requestStream(payload(3))
.flowOn(PrefetchStrategy(Int.MAX_VALUE, 0))
.take(500)
.onEach { checkPayload(it) }
.toList()
assertEquals(500, list.size)
delay(1000) //TODO: leak check
.count()
assertEquals(500, count)
}

companion object {
Expand Down
Loading

0 comments on commit 8d53cb7

Please sign in to comment.