-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unit tests for streams manager #45090
Unit tests for streams manager #45090
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Skipped Deployment
|
322e28a
to
8a98da0
Compare
8a98da0
to
d635c2c
Compare
@@ -33,68 +35,98 @@ class DefaultStreamsManager( | |||
return streamManagers[stream] ?: throw IllegalArgumentException("Stream not found: $stream") | |||
} | |||
|
|||
override suspend fun awaitAllStreamsComplete() { | |||
override suspend fun awaitAllStreamsClosed() { | |||
streamManagers.forEach { (_, manager) -> manager.awaitStreamClosed() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: error handling
* Count the end-of-stream. Expect this exactly once. Expect no further `countRecordIn`, and | ||
* expect that `markClosed` will always occur after this. | ||
*/ | ||
fun countEndOfStream(): Long |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a weird name. I'm not really sure what "counting the end of stream" means. Total number of records in the stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand when reading further.
I think the notion of "counting" is off here. I don't expect count
to increment a counter. We can leave it as is for now
streamManagers.forEach { (_, manager) -> manager.awaitStreamClosed() } | ||
} | ||
} | ||
|
||
/** Manages the state of a single stream. */ | ||
interface StreamManager { | ||
fun countRecordIn(sizeBytes: Long): Long | ||
/** Count incoming record and return the record's *index*. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks to me like most of the functions here are setting state on the stream itself. Maybe this is just part of a Stream
?
listOf( | ||
Pair(stream1, SetRecordCount(10)), | ||
Pair(stream1, AddPersisted(0, 9)), | ||
Pair(stream1, ExpectPersistedUntil(9)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bit confusing. Is the persistedUntil the message number or message count?
What
@Named
)