-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: flaky H2ScalaAllPersistenceIdsTest #199
Conversation
core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
Outdated
Show resolved
Hide resolved
@@ -119,7 +117,9 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E | |||
override def persistenceIds(): Source[String, NotUsed] = | |||
Source | |||
.repeat(0) | |||
.flatMapConcat(_ => delaySource.flatMapConcat(_ => currentPersistenceIds())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been roll back this change, and then increasing the readJournalConfig.refreshInterval to 10x times, but it will still be flaky. And then I made a change like this.
It seems that throttle is more useful than Source.tick(), which can also be shown in the printing frequency of the log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Under the throttle of this PR, the actual number of db executions is mostly about 10x times.
execute query, total count: 10
2024-05-29 08:52:18,885 - org.apache.pekko.actor.CoordinatedShutdown -> INFO [test-pekko.actor.default-dispatcher-5] [ test CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
execute query, total count: 11
2024-05-29 08:53:01,234 - org.apache.pekko.actor.CoordinatedShutdown -> INFO [test-pekko.actor.default-dispatcher-5] [ test CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
However, in the original delaySource, most of the db executions were about more than 1,000 times, about 100x times that of the new approach. I guess the original way didn't limit the flow.
execute query, total count: 1042
2024-05-29 08:53:40,691 - org.apache.pekko.actor.CoordinatedShutdown -> INFO [test-pekko.actor.default-dispatcher-4] [ test CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
execute query, total count: 1316
2024-05-29 08:55:42,865 - org.apache.pekko.actor.CoordinatedShutdown -> INFO [test-pekko.actor.default-dispatcher-6] [ test CoordinatedShutdown - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm - we might need to backport this to 1.0.x branch because #182 was also already backported - the delaySource that you removed has already caused us problems.
@@ -94,7 +94,7 @@ class JdbcReadJournal(config: Config, configPath: String)(implicit val system: E | |||
JournalSequenceActor.props(readJournalDao, readJournalConfig.journalSequenceRetrievalConfiguration), | |||
s"$configPath.pekko-persistence-jdbc-journal-sequence-actor") | |||
private val delaySource = | |||
Source.tick(0.seconds, readJournalConfig.refreshInterval, 0).take(1) | |||
Source.tick(readJournalConfig.refreshInterval, readJournalConfig.refreshInterval, 0).take(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did some testing in my local and solved the issue of delaySource not being able to truly limit the flow based on the following replacement.
Source
.repeat(0)
+ .buffer(1, OverflowStrategy.backpressure)
+ .throttle(1, readJournalConfig.refreshInterval)
+ .flatMapConcat(_ => currentPersistenceIds())
- .flatMapConcat(_ => delaySource.flatMapConcat(_ => currentPersistenceIds()))
However, it may bring another problem, which is that repeat + buffer + throttle
may cause the stream to never end (I guess it's because there is always a buffer element waiting to be processed).
In conclusion, I spent some time investigating and found that persistenceIds() in each executeflatMapConcat(_ => delaySource.flatMapConcat(_ => currentPersistenceIds()))
is actually a new delaySource due to the presence of take(1)
.
Removing take(1) or changing theinitialDelay
parameter of tick() to refreshInterval can achieve the desired flow control effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@He-Pin can you take a look? Thanks.
let us merge this and then move on the release. |
* fix: flaky H2ScalaAllPersistenceIdsTest * use Shaping ThrottleMode * rollback to original stream ways... * slow down the db query * fix delaySource flaky * eager fetch * not need config override
Resolves: #184
Link to the investigation: #184 (comment)