diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 6e7d7cdd..80f310f3 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -185,7 +185,7 @@ jobs: docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root --password=root --database=mysql < ddl-scripts/create_tables_mysql.sql - name: test - run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ matrix.SCALA_VERSION }} core/${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }} + run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ matrix.SCALA_VERSION }} ${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }} test-docs: name: Docs diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala index dd8856f5..5cbfbffb 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala @@ -53,13 +53,7 @@ final class R2dbcSettings(config: Config) { val durableStateAssertSingleWriter: Boolean = config.getBoolean("state.assert-single-writer") - val dialect: Dialect = toRootLowerCase(config.getString("dialect")) match { - case "yugabyte" => Dialect.Yugabyte - case "postgres" => Dialect.Postgres - case "mysql" => Dialect.MySQL - case other => - throw new IllegalArgumentException(s"Unknown dialect [$other]. Supported dialects are [yugabyte, postgres].") - } + val dialect: Dialect = Dialect.fromString(config.getString("dialect")) val querySettings = new QuerySettings(config.getConfig("query")) @@ -96,6 +90,18 @@ object Dialect { /** @since 1.1.0 */ case object MySQL extends Dialect + + /** @since 1.1.0 */ + def fromString(value: String): Dialect = { + toRootLowerCase(value) match { + case "yugabyte" => Dialect.Yugabyte + case "postgres" => Dialect.Postgres + case "mysql" => Dialect.MySQL + case other => + throw new IllegalArgumentException( + s"Unknown dialect [$other]. Supported dialects are [yugabyte, postgres, mysql].") + } + } } /** diff --git a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala index affe56f1..6fc06491 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/Sql.scala @@ -54,7 +54,7 @@ object Sql { * INTERNAL API */ @InternalApi - private[r2dbc] implicit class DialectInterpolation(val sc: StringContext) extends AnyVal { + private[pekko] implicit class DialectInterpolation(val sc: StringContext) extends AnyVal { def sql(args: Any*)(implicit dialect: Dialect): String = dialect.replaceParameters(trimLineBreaks(sc.s(args: _*))) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b555938f..6eebdc70 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -89,6 +89,7 @@ object Dependencies { r2dbcSpi, r2dbcPool, r2dbcPostgres % "provided,test", + r2dbcMysql % "provided,test", pekkoProjectionCore, TestDeps.pekkoProjectionEventSourced, TestDeps.pekkoProjectionDurableState, diff --git a/projection/src/main/mima-filters/1.0.x.backwards.excludes/r2dbcprojectionsettings.excludes b/projection/src/main/mima-filters/1.0.x.backwards.excludes/r2dbcprojectionsettings.excludes new file mode 100644 index 00000000..8b28128b --- /dev/null +++ b/projection/src/main/mima-filters/1.0.x.backwards.excludes/r2dbcprojectionsettings.excludes @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Converting case class to class +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.canEqual") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productArity") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productPrefix") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productElement") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productElementName") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productElementNames") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.productIterator") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$1") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$2") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$3") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$4") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$5") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$6") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$7") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$8") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$9") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.copy$default$10") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._1") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._2") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._3") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._4") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._5") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._6") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._7") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._8") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._9") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings._10") +ProblemFilters.exclude[MissingTypesProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings$") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.unapply") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings.fromProduct") diff --git a/projection/src/main/resources/reference.conf b/projection/src/main/resources/reference.conf index cf8a83b7..67ffe1d1 100644 --- a/projection/src/main/resources/reference.conf +++ b/projection/src/main/resources/reference.conf @@ -5,7 +5,7 @@ //#projection-config pekko.projection.r2dbc { - # postgres or yugabyte + # postgres, yugabyte or mysql dialect = ${pekko.persistence.r2dbc.dialect} offset-store { diff --git a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala index a2423be3..70bc11b8 100644 --- a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala +++ b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSettings.scala @@ -17,11 +17,12 @@ import java.time.{ Duration => JDuration } import java.util.Locale import scala.concurrent.duration._ - +import scala.util.hashing.MurmurHash3 +import com.typesafe.config.Config import org.apache.pekko -import pekko.util.JavaDurationConverters._ import pekko.actor.typed.ActorSystem -import com.typesafe.config.Config +import pekko.persistence.r2dbc.Dialect +import pekko.util.JavaDurationConverters._ object R2dbcProjectionSettings { @@ -34,7 +35,8 @@ object R2dbcProjectionSettings { case _ => config.getDuration("log-db-calls-exceeding").asScala } - R2dbcProjectionSettings( + new R2dbcProjectionSettings( + dialect = Dialect.fromString(config.getString("dialect")), schema = Option(config.getString("offset-store.schema")).filterNot(_.trim.isEmpty), offsetTable = config.getString("offset-store.offset-table"), timestampOffsetTable = config.getString("offset-store.timestamp-offset-table"), @@ -44,25 +46,149 @@ object R2dbcProjectionSettings { keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"), evictInterval = config.getDuration("offset-store.evict-interval"), deleteInterval = config.getDuration("offset-store.delete-interval"), - logDbCallsExceeding) + logDbCallsExceeding + ) } def apply(system: ActorSystem[_]): R2dbcProjectionSettings = apply(system.settings.config.getConfig(DefaultConfigPath)) + + def apply( + schema: Option[String], + offsetTable: String, + timestampOffsetTable: String, + managementTable: String, + useConnectionFactory: String, + timeWindow: JDuration, + keepNumberOfEntries: Int, + evictInterval: JDuration, + deleteInterval: JDuration, + logDbCallsExceeding: FiniteDuration + ): R2dbcProjectionSettings = new R2dbcProjectionSettings( + Dialect.Postgres, + schema, + offsetTable, + timestampOffsetTable, + managementTable, + useConnectionFactory, + timeWindow, + keepNumberOfEntries, + evictInterval, + deleteInterval, + logDbCallsExceeding + ) } -// FIXME remove case class, and add `with` methods -final case class R2dbcProjectionSettings( - schema: Option[String], - offsetTable: String, - timestampOffsetTable: String, - managementTable: String, - useConnectionFactory: String, - timeWindow: JDuration, - keepNumberOfEntries: Int, - evictInterval: JDuration, - deleteInterval: JDuration, - logDbCallsExceeding: FiniteDuration) { +final class R2dbcProjectionSettings private ( + val dialect: Dialect, + val schema: Option[String], + val offsetTable: String, + val timestampOffsetTable: String, + val managementTable: String, + val useConnectionFactory: String, + val timeWindow: JDuration, + val keepNumberOfEntries: Int, + val evictInterval: JDuration, + val deleteInterval: JDuration, + val logDbCallsExceeding: FiniteDuration +) extends Serializable { + + override def toString: String = + s"R2dbcProjectionSettings($dialect, $schema, $offsetTable, $timestampOffsetTable, $managementTable, " + + s"$useConnectionFactory, $timeWindow, $keepNumberOfEntries, $evictInterval, $deleteInterval, $logDbCallsExceeding)" + + override def equals(other: Any): Boolean = + other match { + case that: R2dbcProjectionSettings => + dialect == that.dialect && schema == that.schema && + offsetTable == that.offsetTable && timestampOffsetTable == that.timestampOffsetTable && + managementTable == that.managementTable && useConnectionFactory == that.useConnectionFactory && + timeWindow == that.timeWindow && keepNumberOfEntries == that.keepNumberOfEntries && + evictInterval == that.evictInterval && deleteInterval == that.deleteInterval && + logDbCallsExceeding == that.logDbCallsExceeding + case _ => false + } + + override def hashCode(): Int = { + val values = Seq( + dialect, + schema, + offsetTable, + timestampOffsetTable, + managementTable, + useConnectionFactory, + timeWindow, + keepNumberOfEntries, + evictInterval, + deleteInterval, + logDbCallsExceeding + ) + val h = values.foldLeft(MurmurHash3.productSeed) { case (h, value) => + MurmurHash3.mix(h, value.##) + } + MurmurHash3.finalizeHash(h, values.size) + } + + private[this] def copy( + dialect: Dialect = dialect, + schema: Option[String] = schema, + offsetTable: String = offsetTable, + timestampOffsetTable: String = timestampOffsetTable, + managementTable: String = managementTable, + useConnectionFactory: String = useConnectionFactory, + timeWindow: JDuration = timeWindow, + keepNumberOfEntries: Int = keepNumberOfEntries, + evictInterval: JDuration = evictInterval, + deleteInterval: JDuration = deleteInterval, + logDbCallsExceeding: FiniteDuration = logDbCallsExceeding + ): R2dbcProjectionSettings = + new R2dbcProjectionSettings( + dialect, + schema, + offsetTable, + timestampOffsetTable, + managementTable, + useConnectionFactory, + timeWindow, + keepNumberOfEntries, + evictInterval, + deleteInterval, + logDbCallsExceeding + ) + + def withDialect(dialect: Dialect): R2dbcProjectionSettings = + copy(dialect = dialect) + + def withSchema(schema: Option[String]): R2dbcProjectionSettings = + copy(schema = schema) + + def withOffsetTable(offsetTable: String): R2dbcProjectionSettings = + copy(offsetTable = offsetTable) + + def withTimestampOffsetTable(timestampOffsetTable: String): R2dbcProjectionSettings = + copy(timestampOffsetTable = timestampOffsetTable) + + def withManagementTable(managementTable: String): R2dbcProjectionSettings = + copy(managementTable = managementTable) + + def withUseConnectionFactory(useConnectionFactory: String): R2dbcProjectionSettings = + copy(useConnectionFactory = useConnectionFactory) + + def withTimeWindow(timeWindow: JDuration): R2dbcProjectionSettings = + copy(timeWindow = timeWindow) + + def withKeepNumberOfEntries(keepNumberOfEntries: Int): R2dbcProjectionSettings = + copy(keepNumberOfEntries = keepNumberOfEntries) + + def withEvictInterval(evictInterval: JDuration): R2dbcProjectionSettings = + copy(evictInterval = evictInterval) + + def withDeleteInterval(deleteInterval: JDuration): R2dbcProjectionSettings = + copy(deleteInterval = deleteInterval) + + def withLogDbCallsExceeding(logDbCallsExceeding: FiniteDuration): R2dbcProjectionSettings = + copy(logDbCallsExceeding = logDbCallsExceeding) + val offsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + offsetTable val timestampOffsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + timestampOffsetTable val managementTableWithSchema: String = schema.map(_ + ".").getOrElse("") + managementTable diff --git a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala index 3a0a5526..3c612ee5 100644 --- a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -24,7 +24,6 @@ import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future - import org.apache.pekko import pekko.Done import pekko.actor.typed.ActorSystem @@ -38,8 +37,9 @@ import pekko.persistence.query.TimestampOffset import pekko.persistence.query.UpdatedDurableState import pekko.persistence.query.typed.EventEnvelope import pekko.persistence.query.typed.scaladsl.EventTimestampQuery +import pekko.persistence.r2dbc.Dialect import pekko.persistence.r2dbc.internal.R2dbcExecutor -import pekko.persistence.r2dbc.internal.Sql.Interpolation +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.persistence.typed.PersistenceId import pekko.projection.BySlicesSourceProvider import pekko.projection.MergeableOffset @@ -49,6 +49,7 @@ import pekko.projection.internal.OffsetSerialization import pekko.projection.internal.OffsetSerialization.MultipleOffsets import pekko.projection.internal.OffsetSerialization.SingleOffset import pekko.projection.r2dbc.R2dbcProjectionSettings +import pekko.projection.r2dbc.internal.mysql.MySQLR2dbcOffsetStore import io.r2dbc.spi.Connection import io.r2dbc.spi.Statement import org.slf4j.LoggerFactory @@ -154,6 +155,22 @@ object R2dbcOffsetStore { val FutureDone: Future[Done] = Future.successful(Done) val FutureTrue: Future[Boolean] = Future.successful(true) val FutureFalse: Future[Boolean] = Future.successful(false) + + def fromConfig( + projectionId: ProjectionId, + sourceProvider: Option[BySlicesSourceProvider], + system: ActorSystem[_], + settings: R2dbcProjectionSettings, + r2dbcExecutor: R2dbcExecutor, + clock: Clock = Clock.systemUTC() + ): R2dbcOffsetStore = { + settings.dialect match { + case Dialect.Postgres | Dialect.Yugabyte => + new R2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor, clock) + case Dialect.MySQL => + new MySQLR2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor, clock) + } + } } /** @@ -170,6 +187,9 @@ private[projection] class R2dbcOffsetStore( import R2dbcOffsetStore._ + implicit protected val dialect: Dialect = settings.dialect + protected lazy val timestampSql: String = "transaction_timestamp()" + // FIXME include projectionId in all log messages private val logger = LoggerFactory.getLogger(this.getClass) @@ -181,8 +201,8 @@ private[projection] class R2dbcOffsetStore( import offsetSerialization.toStorageRepresentation private val timestampOffsetTable = settings.timestampOffsetTableWithSchema - private val offsetTable = settings.offsetTableWithSchema - private val managementTable = settings.managementTableWithSchema + protected val offsetTable = settings.offsetTableWithSchema + protected val managementTable = settings.managementTableWithSchema private[projection] implicit val executionContext: ExecutionContext = system.executionContext @@ -195,7 +215,7 @@ private[projection] class R2dbcOffsetStore( private val insertTimestampOffsetSql: String = sql""" INSERT INTO $timestampOffsetTable (projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed) - VALUES (?,?,?,?,?,?, transaction_timestamp())""" + VALUES (?,?,?,?,?,?, $timestampSql)""" // delete less than a timestamp private val deleteOldTimestampOffsetSql: String = @@ -211,7 +231,7 @@ private[projection] class R2dbcOffsetStore( private val selectOffsetSql: String = sql"SELECT projection_key, current_offset, manifest, mergeable FROM $offsetTable WHERE projection_name = ?" - private val upsertOffsetSql: String = sql""" + protected val upsertOffsetSql: String = sql""" INSERT INTO $offsetTable (projection_name, projection_key, current_offset, manifest, mergeable, last_updated) VALUES (?,?,?,?,?,?) @@ -518,8 +538,10 @@ private[projection] class R2dbcOffsetStore( } else { // TODO Try Batch without bind parameters for better performance. Risk of sql injection for these parameters is low. val boundStatement = - records.foldLeft(statement) { (stmt, rec) => - stmt.add() + records.zipWithIndex.foldLeft(statement) { case (stmt, (rec, idx)) => + if (idx != 0) { + stmt.add() + } bindRecord(stmt, rec) } R2dbcExecutor.updateBatchInTx(boundStatement) @@ -979,11 +1001,7 @@ private[projection] class R2dbcOffsetStore( .bind(2, paused) .bind(3, Instant.now(clock).toEpochMilli) } - .flatMap { - case i if i == 1 => Future.successful(Done) - case _ => - Future.failed(new RuntimeException(s"Failed to update management table for $projectionId")) - } + .map(_ => Done)(ExecutionContexts.parasitic) } private def createRecordWithOffset[Envelope](envelope: Envelope): Option[RecordWithOffset] = { diff --git a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala index 5e41ebd5..610b442d 100644 --- a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala +++ b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/R2dbcProjectionImpl.scala @@ -87,7 +87,7 @@ private[projection] object R2dbcProjectionImpl { connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]) = { val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, settings.logDbCallsExceeding)(system.executionContext, system) - new R2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor) + R2dbcOffsetStore.fromConfig(projectionId, sourceProvider, system, settings, r2dbcExecutor) } private val loadEnvelopeCounter = new AtomicLong diff --git a/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala new file mode 100644 index 00000000..fe988c05 --- /dev/null +++ b/projection/src/main/scala/org/apache/pekko/projection/r2dbc/internal/mysql/MySQLR2dbcOffsetStore.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pekko.projection.r2dbc.internal.mysql + +import java.time.Clock + +import org.apache.pekko +import pekko.actor.typed.ActorSystem +import pekko.annotation.InternalApi +import pekko.persistence.r2dbc.internal.R2dbcExecutor +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation +import pekko.projection.BySlicesSourceProvider +import pekko.projection.ProjectionId +import pekko.projection.r2dbc.R2dbcProjectionSettings +import pekko.projection.r2dbc.internal.R2dbcOffsetStore + +/** + * INTERNAL API + */ +@InternalApi +private[projection] class MySQLR2dbcOffsetStore( + projectionId: ProjectionId, + sourceProvider: Option[BySlicesSourceProvider], + system: ActorSystem[_], + settings: R2dbcProjectionSettings, + r2dbcExecutor: R2dbcExecutor, + clock: Clock = Clock.systemUTC()) + extends R2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor, clock) { + + override lazy val timestampSql: String = "NOW(6)" + + override val upsertOffsetSql: String = sql""" + INSERT INTO $offsetTable + (projection_name, projection_key, current_offset, manifest, mergeable, last_updated) + VALUES (?,?,?,?,?,?) AS excluded + ON DUPLICATE KEY UPDATE + current_offset = excluded.current_offset, + manifest = excluded.manifest, + mergeable = excluded.mergeable, + last_updated = excluded.last_updated""" + + override val updateManagementStateSql: String = sql""" + INSERT INTO $managementTable + (projection_name, projection_key, paused, last_updated) + VALUES (?,?,?,?) AS excluded + ON DUPLICATE KEY UPDATE + paused = excluded.paused, + last_updated = excluded.last_updated""" +} diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedChaosSpec.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedChaosSpec.scala index a05f5108..2e57692d 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedChaosSpec.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedChaosSpec.scala @@ -201,7 +201,7 @@ class EventSourcedChaosSpec (1 to expectedEventCounts).foreach { _ => // not using receiveMessages(expectedEvents) for better logging in case of failure try { - processed :+= processedProbe.receiveMessage(15.seconds) + processed :+= processedProbe.receiveMessage(30.seconds) } catch { case e: AssertionError => val missing = expectedEvents.diff(processed.map(_.envelope.event)) diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala index dd9f430a..f633fc4e 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala @@ -19,7 +19,6 @@ import java.util.UUID import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ - import org.apache.pekko import pekko.Done import pekko.actor.testkit.typed.scaladsl.LogCapturing @@ -29,8 +28,9 @@ import pekko.actor.typed.ActorSystem import pekko.actor.typed.Behavior import pekko.actor.typed.scaladsl.Behaviors import pekko.persistence.query.typed.EventEnvelope +import pekko.persistence.r2dbc.Dialect import pekko.persistence.r2dbc.R2dbcSettings -import pekko.persistence.r2dbc.internal.Sql.Interpolation +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import pekko.persistence.typed.PersistenceId import pekko.persistence.typed.scaladsl.Effect @@ -152,6 +152,7 @@ class EventSourcedEndToEndSpec // to be able to store events with specific timestamps private def writeEvent(persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = { log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr: java.lang.Long, event, timestamp) + implicit val dialect: Dialect = projectionSettings.dialect val insertEventSql = sql""" INSERT INTO ${journalSettings.journalTableWithSchema} (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload) @@ -269,7 +270,7 @@ class EventSourcedEndToEndSpec (1 to numberOfEvents).foreach { _ => // not using receiveMessages(expectedEvents) for better logging in case of failure try { - processed :+= processedProbe.receiveMessage(15.seconds) + processed :+= processedProbe.receiveMessage(30.seconds) } catch { case e: AssertionError => val missing = expectedEvents.diff(processed.map(_.envelope.event)) diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala index e3e2e0c3..9c884771 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala @@ -149,7 +149,7 @@ class EventSourcedPubSubSpec (1 to numberOfEvents).foreach { _ => // not using receiveMessages(expectedEvents) for better logging in case of failure try { - processed :+= processedProbe.receiveMessage(25.seconds) + processed :+= processedProbe.receiveMessage(30.seconds) } catch { case e: AssertionError => val missing = expectedEvents.diff(processed.map(_.envelope.event)) diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala index b9af9940..b957eb7c 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcOffsetStoreSpec.scala @@ -17,14 +17,14 @@ import java.time.Instant import java.util.UUID import scala.concurrent.ExecutionContext - import org.apache.pekko import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.actor.typed.ActorSystem import pekko.persistence.query.Sequence import pekko.persistence.query.TimeBasedUUID -import pekko.persistence.r2dbc.internal.Sql.Interpolation +import pekko.persistence.r2dbc.Dialect +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.projection.MergeableOffset import pekko.projection.ProjectionId import pekko.projection.internal.ManagementState @@ -46,12 +46,13 @@ class R2dbcOffsetStoreSpec private val settings = R2dbcProjectionSettings(testKit.system) private def createOffsetStore(projectionId: ProjectionId) = - new R2dbcOffsetStore(projectionId, None, system, settings, r2dbcExecutor, clock) + R2dbcOffsetStore.fromConfig(projectionId, None, system, settings, r2dbcExecutor, clock) private val table = settings.offsetTableWithSchema private implicit val ec: ExecutionContext = system.executionContext + implicit val dialect: Dialect = settings.dialect def selectLastSql: String = sql"SELECT * FROM $table WHERE projection_name = ? AND projection_key = ?" diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala index 0560568f..90c606d1 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcProjectionSpec.scala @@ -23,7 +23,6 @@ import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ - import org.apache.pekko import pekko.Done import pekko.NotUsed @@ -32,8 +31,9 @@ import pekko.actor.testkit.typed.scaladsl.LogCapturing import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import pekko.actor.typed.ActorRef import pekko.actor.typed.ActorSystem -import pekko.persistence.r2dbc.internal.Sql.Interpolation +import pekko.persistence.r2dbc.Dialect import pekko.persistence.r2dbc.internal.R2dbcExecutor +import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation import pekko.projection.HandlerRecoveryStrategy import pekko.projection.OffsetVerification import pekko.projection.OffsetVerification.VerificationFailure @@ -97,18 +97,21 @@ object R2dbcProjectionSpec { val table = "projection_spec_model" val createTableSql: String = - s"""|CREATE table IF NOT EXISTS "$table" ( + s"""|CREATE table IF NOT EXISTS $table ( | id VARCHAR(255) NOT NULL, | concatenated VARCHAR(255) NOT NULL, | PRIMARY KEY(id) |);""".stripMargin } - final case class TestRepository(session: R2dbcSession)(implicit ec: ExecutionContext, system: ActorSystem[_]) { + final case class TestRepository(session: R2dbcSession, settings: R2dbcProjectionSettings)( + implicit ec: ExecutionContext, system: ActorSystem[_]) { import TestRepository.table private val logger = LoggerFactory.getLogger(this.getClass) + implicit private val dialect: Dialect = settings.dialect + def concatToText(id: String, payload: String): Future[Done] = { val savedStrOpt = findById(id) @@ -134,14 +137,23 @@ object R2dbcProjectionSpec { private def upsert(concatStr: ConcatStr): Future[Done] = { logger.debug("TestRepository.upsert: [{}]", concatStr) - val stmtSql = - sql""" - INSERT INTO "$table" (id, concatenated) VALUES (?, ?) + val stmtSql = dialect match { + case Dialect.Postgres | Dialect.Yugabyte => + sql""" + INSERT INTO $table (id, concatenated) VALUES (?, ?) ON CONFLICT (id) DO UPDATE SET id = excluded.id, concatenated = excluded.concatenated """ + case Dialect.MySQL => + sql""" + INSERT INTO $table (id, concatenated) VALUES (?, ?) AS excluded + ON DUPLICATE KEY UPDATE + id = excluded.id, + concatenated = excluded.concatenated + """ + } val stmt = session .createStatement(stmtSql) .bind(0, concatStr.id) @@ -182,7 +194,7 @@ class R2dbcProjectionSpec private val logger = LoggerFactory.getLogger(getClass) private val settings = R2dbcProjectionSettings(testKit.system) private def createOffsetStore(projectionId: ProjectionId): R2dbcOffsetStore = - new R2dbcOffsetStore(projectionId, None, system, settings, r2dbcExecutor) + R2dbcOffsetStore.fromConfig(projectionId, None, system, settings, r2dbcExecutor) private val projectionTestKit = ProjectionTestKit(system) override protected def beforeAll(): Unit = { @@ -245,7 +257,7 @@ class R2dbcProjectionSpec private def withRepo[R](fun: TestRepository => Future[R]): Future[R] = { r2dbcExecutor.withConnection("test") { conn => val session = new R2dbcSession(conn) - fun(TestRepository(session)) + fun(TestRepository(session, settings)) } } @@ -260,7 +272,7 @@ class R2dbcProjectionSpec throw TestException(concatHandlerFail4Msg + s" after $attempts attempts") } else { logger.debug("handling {}", envelope) - TestRepository(session).concatToText(envelope.id, envelope.message) + TestRepository(session, settings).concatToText(envelope.id, envelope.message) } } } @@ -441,7 +453,7 @@ class R2dbcProjectionSpec val bogusEventHandler = new R2dbcHandler[Envelope] { override def process(session: R2dbcSession, envelope: Envelope): Future[Done] = { - val repo = TestRepository(session) + val repo = TestRepository(session, settings) if (envelope.offset == 4L) repo.updateWithNullValue(envelope.id) else repo.concatToText(envelope.id, envelope.message) } @@ -492,7 +504,7 @@ class R2dbcProjectionSpec verificationProbe.receiveMessage().offset shouldEqual envelope.offset processProbe.ref ! ProbeMessage("process", envelope.offset) - TestRepository(session).concatToText(envelope.id, envelope.message) + TestRepository(session, settings).concatToText(envelope.id, envelope.message) } } @@ -588,7 +600,7 @@ class R2dbcProjectionSpec if (envelopes.isEmpty) Future.successful(Done) else { - val repo = TestRepository(session) + val repo = TestRepository(session, settings) val id = envelopes.head.id repo.findById(id).flatMap { existing => val newConcatStr = envelopes.foldLeft(existing.getOrElse(ConcatStr(id, ""))) { (acc, env) => @@ -925,7 +937,7 @@ class R2dbcProjectionSpec handler = () => R2dbcHandler[Envelope] { (session, envelope) => verifiedProbe.expectMessage(envelope.offset) - TestRepository(session).concatToText(envelope.id, envelope.message) + TestRepository(session, settings).concatToText(envelope.id, envelope.message) }) projectionTestKit.run(projection) { @@ -1292,7 +1304,7 @@ class R2dbcProjectionSpec sourceProvider = sourceProvider(entityId), handler = () => R2dbcHandler[Envelope] { (session, envelope) => - TestRepository(session).concatToText(envelope.id, envelope.message) + TestRepository(session, settings).concatToText(envelope.id, envelope.message) }) offsetShouldBeEmpty() @@ -1324,7 +1336,7 @@ class R2dbcProjectionSpec sourceProvider = sourceProvider(entityId), handler = () => R2dbcHandler[Envelope] { (session, envelope) => - TestRepository(session).concatToText(envelope.id, envelope.message) + TestRepository(session, settings).concatToText(envelope.id, envelope.message) }) offsetShouldBeEmpty() @@ -1357,7 +1369,7 @@ class R2dbcProjectionSpec sourceProvider = sourceProvider(entityId), handler = () => R2dbcHandler[Envelope] { (session, envelope) => - TestRepository(session).concatToText(envelope.id, envelope.message) + TestRepository(session, settings).concatToText(envelope.id, envelope.message) }) offsetShouldBeEmpty() diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala index 36852425..f24b88f8 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetProjectionSpec.scala @@ -14,6 +14,7 @@ package org.apache.pekko.projection.r2dbc import java.time.Instant +import java.time.temporal.ChronoUnit import java.time.{ Duration => JDuration } import java.util.UUID import java.util.concurrent.atomic.AtomicBoolean @@ -25,7 +26,6 @@ import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ - import org.apache.pekko import pekko.Done import pekko.NotUsed @@ -154,7 +154,7 @@ class R2dbcTimestampOffsetProjectionSpec private def createOffsetStore( projectionId: ProjectionId, sourceProvider: TestTimestampSourceProvider): R2dbcOffsetStore = - new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) private val projectionTestKit = ProjectionTestKit(system) override protected def beforeAll(): Unit = { @@ -228,7 +228,7 @@ class R2dbcTimestampOffsetProjectionSpec private def withRepo[R](fun: TestRepository => Future[R]): Future[R] = { r2dbcExecutor.withConnection("test") { conn => val session = new R2dbcSession(conn) - fun(TestRepository(session)) + fun(TestRepository(session, settings)) } } @@ -245,7 +245,7 @@ class R2dbcTimestampOffsetProjectionSpec throw TestException(concatHandlerFail4Msg + s" after $attempts attempts") } else { logger.debug(s"handling {}", envelope) - TestRepository(session).concatToText(envelope.persistenceId, envelope.event) + TestRepository(session, settings).concatToText(envelope.persistenceId, envelope.event) } } @@ -287,8 +287,13 @@ class R2dbcTimestampOffsetProjectionSpec } } + def now(): Instant = { + // supported databases do not store more than 6 fractional digits + Instant.now().truncatedTo(ChronoUnit.MICROS) + } + def createEnvelopesWithDuplicates(pid1: Pid, pid2: Pid): Vector[EventEnvelope[String]] = { - val startTime = Instant.now() + val startTime = now() Vector( createEnvelope(pid1, 1, startTime, "e1-1"), @@ -342,7 +347,7 @@ class R2dbcTimestampOffsetProjectionSpec if (envelopes.isEmpty) Future.successful(Done) else { - val repo = TestRepository(session) + val repo = TestRepository(session, settings) val results = envelopes.groupBy(_.persistenceId).map { case (pid, envs) => repo.findById(pid).flatMap { existing => val newConcatStr = envs.foldLeft(existing.getOrElse(ConcatStr(pid, ""))) { (acc, env) => @@ -552,7 +557,7 @@ class R2dbcTimestampOffsetProjectionSpec val pid2 = UUID.randomUUID().toString val projectionId = genRandomProjectionId() - val startTime = Instant.now() + val startTime = now() val envelopes1 = createEnvelopesUnknownSequenceNumbers(startTime, pid1, pid2) val sourceProvider1 = createSourceProvider(envelopes1) implicit val offsetStore: R2dbcOffsetStore = createOffsetStore(projectionId, sourceProvider1) @@ -664,7 +669,7 @@ class R2dbcTimestampOffsetProjectionSpec val pid2 = UUID.randomUUID().toString val projectionId = genRandomProjectionId() - val startTime = Instant.now() + val startTime = now() val envelopes1 = createEnvelopesUnknownSequenceNumbers(startTime, pid1, pid2) val sourceProvider1 = createBacktrackingSourceProvider(envelopes1) implicit val offsetStore: R2dbcOffsetStore = createOffsetStore(projectionId, sourceProvider1) @@ -813,10 +818,10 @@ class R2dbcTimestampOffsetProjectionSpec val pid2 = UUID.randomUUID().toString val projectionId = genRandomProjectionId() - val startTime = Instant.now() + val startTime = now() val sourceProvider = new TestSourceProviderWithInput() implicit val offsetStore: R2dbcOffsetStore = - new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) val result1 = new StringBuffer() val result2 = new StringBuffer() @@ -950,10 +955,10 @@ class R2dbcTimestampOffsetProjectionSpec val pid2 = UUID.randomUUID().toString val projectionId = genRandomProjectionId() - val startTime = Instant.now() + val startTime = now() val sourceProvider = new TestSourceProviderWithInput() implicit val offsetStore: R2dbcOffsetStore = - new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) val projectionRef = spawn( ProjectionBehavior( @@ -1154,10 +1159,10 @@ class R2dbcTimestampOffsetProjectionSpec val pid1 = UUID.randomUUID().toString val pid2 = UUID.randomUUID().toString val projectionId = genRandomProjectionId() - val startTime = Instant.now() + val startTime = now() val sourceProvider = new TestSourceProviderWithInput() implicit val offsetStore: R2dbcOffsetStore = - new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) val result1 = new StringBuffer() val result2 = new StringBuffer() @@ -1296,10 +1301,10 @@ class R2dbcTimestampOffsetProjectionSpec val pid2 = UUID.randomUUID().toString val projectionId = genRandomProjectionId() - val startTime = Instant.now() + val startTime = now() val sourceProvider = new TestSourceProviderWithInput() implicit val offsetStore: R2dbcOffsetStore = - new R2dbcOffsetStore(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) + R2dbcOffsetStore.fromConfig(projectionId, Some(sourceProvider), system, settings, r2dbcExecutor) val flowHandler = FlowWithContext[EventEnvelope[String], ProjectionContext] @@ -1393,7 +1398,7 @@ class R2dbcTimestampOffsetProjectionSpec sourceProvider, handler = () => R2dbcHandler[EventEnvelope[String]] { (session, envelope) => - TestRepository(session).concatToText(envelope.persistenceId, envelope.event) + TestRepository(session, settings).concatToText(envelope.persistenceId, envelope.event) }) offsetShouldBeEmpty() @@ -1427,7 +1432,7 @@ class R2dbcTimestampOffsetProjectionSpec sourceProvider, handler = () => R2dbcHandler[EventEnvelope[String]] { (session, envelope) => - TestRepository(session).concatToText(envelope.persistenceId, envelope.event) + TestRepository(session, settings).concatToText(envelope.persistenceId, envelope.event) }) offsetShouldBeEmpty() diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index f1e91cf0..0708d74f 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -81,7 +81,7 @@ class R2dbcTimestampOffsetStoreSpec projectionId: ProjectionId, customSettings: R2dbcProjectionSettings = settings, eventTimestampQueryClock: TestClock = clock) = - new R2dbcOffsetStore( + R2dbcOffsetStore.fromConfig( projectionId, Some(new TestTimestampSourceProvider(0, persistenceExt.numberOfSlices - 1, eventTimestampQueryClock)), system, @@ -281,7 +281,7 @@ class R2dbcTimestampOffsetStoreSpec slice4 shouldBe 656 val offsetStore0 = - new R2dbcOffsetStore( + R2dbcOffsetStore.fromConfig( projectionId0, Some(new TestTimestampSourceProvider(0, persistenceExt.numberOfSlices - 1, clock)), system, @@ -302,7 +302,7 @@ class R2dbcTimestampOffsetStoreSpec offsetStore0.saveOffset(offset4).futureValue val offsetStore1 = - new R2dbcOffsetStore( + R2dbcOffsetStore.fromConfig( projectionId1, Some(new TestTimestampSourceProvider(0, 511, clock)), system, @@ -312,7 +312,7 @@ class R2dbcTimestampOffsetStoreSpec offsetStore1.getState().byPid.keySet shouldBe Set(p1, p2) val offsetStore2 = - new R2dbcOffsetStore( + R2dbcOffsetStore.fromConfig( projectionId2, Some(new TestTimestampSourceProvider(512, 1023, clock)), system, @@ -561,7 +561,7 @@ class R2dbcTimestampOffsetStoreSpec "evict old records" in { val projectionId = genRandomProjectionId() - val evictSettings = settings.copy(timeWindow = JDuration.ofSeconds(100), evictInterval = JDuration.ofSeconds(10)) + val evictSettings = settings.withTimeWindow(JDuration.ofSeconds(100)).withEvictInterval(JDuration.ofSeconds(10)) import evictSettings._ val offsetStore = createOffsetStore(projectionId, evictSettings) @@ -604,7 +604,7 @@ class R2dbcTimestampOffsetStoreSpec "delete old records" in { val projectionId = genRandomProjectionId() - val deleteSettings = settings.copy(timeWindow = JDuration.ofSeconds(100)) + val deleteSettings = settings.withTimeWindow(JDuration.ofSeconds(100)) import deleteSettings._ val offsetStore = createOffsetStore(projectionId, deleteSettings) @@ -635,8 +635,7 @@ class R2dbcTimestampOffsetStoreSpec "periodically delete old records" in { val projectionId = genRandomProjectionId() - val deleteSettings = - settings.copy(timeWindow = JDuration.ofSeconds(100), deleteInterval = JDuration.ofMillis(500)) + val deleteSettings = settings.withTimeWindow(JDuration.ofSeconds(100)).withDeleteInterval(JDuration.ofMillis(500)) import deleteSettings._ val offsetStore = createOffsetStore(projectionId, deleteSettings) diff --git a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala index 13b87715..fc48aff4 100644 --- a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala +++ b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala @@ -44,6 +44,21 @@ object TestConfig { database = "yugabyte" } """) + case "mysql" => + ConfigFactory.parseString(""" + pekko.persistence.r2dbc{ + connection-factory { + driver = "mysql" + host = "localhost" + port = 3306 + user = "root" + password = "root" + database = "mysql" + } + db-timestamp-monotonic-increasing = on + use-app-timestamp = on + } + """) } // using load here so that connection-factory can be overridden