-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sourcerer2: Moving to string based versions, introducing JDBC driver #22
base: sourcerer2
Are you sure you want to change the base?
Conversation
…g typing for versions and ids
|
||
/** | ||
* Throw to indicate that the current version of a stream does not match the expected one. | ||
*/ | ||
public class UnexpectedVersionException extends IllegalStateException { | ||
private final Integer currentVersion; | ||
private final StreamVersion currentVersion; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
/** | ||
* Throw to indicate that the current version of a stream does not match the expected one. | ||
*/ | ||
public class UnexpectedVersionException extends IllegalStateException { | ||
private final Integer currentVersion; | ||
private final StreamVersion currentVersion; | ||
private final ExpectedVersion expectedVersion; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
private fun serializeMetadata(metadata: Map<String, String>): String { | ||
// TODO: Handle errors more nicely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import org.reactivestreams.Publisher | ||
import org.slf4j.LoggerFactory | ||
|
||
internal class DbstoreEventRepository<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
private fun serializeEvent(event: T): String { | ||
// TODO: Handle errors more nicely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
private fun parseEvent(data: String): T { | ||
// TODO: Handle errors more nicely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
private fun parseMetadata(metadata: String): ImmutableMap<String, String> { | ||
// TODO: Handle errors more nicely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streamId: StreamId, | ||
category: String, | ||
fromVersion: DbstoreStreamVersion? = null, | ||
maxEvents: Int = 4096 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
category: String, | ||
shard: Int?, | ||
fromVersion: DbstoreRepositoryVersion? = null, | ||
maxEvents: Int = 4096 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return "ExpectedVersion.AnyExisting" | ||
} | ||
} | ||
object Any : ExpectedVersion() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+ "' with 'not created'", | ||
baseVersion, newVersion) | ||
else -> | ||
throw RuntimeException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExpectedVersion.Any -> 0 | ||
ExpectedVersion.AnyExisting -> 1 | ||
is ExpectedVersion.Exactly -> 2 | ||
ExpectedVersion.NotCreated -> 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExpectedVersion.NotCreated -> when (newVersion) { | ||
ExpectedVersion.NotCreated -> baseVersion | ||
else -> | ||
throw RuntimeException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} else baseVersion ?: (newVersion ?: ExpectedVersion.any()) | ||
} | ||
|
||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return "ExpectedVersion.NotCreated" | ||
} | ||
} | ||
object AnyExisting : ExpectedVersion() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw IllegalArgumentException("Version is not in expected format") | ||
} | ||
|
||
val timestampStr = version.substring(0, 24) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
sealed class EventSubscriptionUpdate<T> { | ||
class CaughtUp<T> : EventSubscriptionUpdate<T>() | ||
data class Event<T>(val event: EventRecord<T>): EventSubscriptionUpdate<T>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
import java.time.Instant; | ||
import java.util.List; | ||
import java.util.UUID; | ||
import java.util.stream.IntStream; | ||
|
||
import static org.mockito.Matchers.any; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
|
||
public class EventSubscriptionManagerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SonarQube analysis reported 52 issues Watch the comments in this conversation to review them. 7 extra issuesNote: The following issues were found on lines that were not modified in the pull request. Because these issues can't be reported as line comments, they are summarized here:
|
That way we know it doesn't blow up when it doesn't exist.
To make the options explicit.
Allow for noop:ing a create if aggregate already exists
Prepare for adding retry-support.
If an update was made to the aggregate after it was read but before it was updated, retry (according to policy).
Don't say "current version null" when current version is unknown, as it looks like current version is notExisting. Keep UnexpectedVersionException in stack trace for AtomicWriteException.
Instead use RetryHandler and RetryPolicy directly.
Don't add variance.
Add support for retrying atomic write failures
Which supports preferRandomNode
Update esjc dependency to 1.8.1
…default timeout to 1000ms
…store-health-probe Prometheus monitoring for EventStore health probe
Upgrading to ESJC 2.x (with 64 bit positions)
This change is