Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Projection implementation for MySQL support #177

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ jobs:
docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root --password=root --database=mysql < ddl-scripts/create_tables_mysql.sql

- name: test
run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ matrix.SCALA_VERSION }} core/${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}
run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ matrix.SCALA_VERSION }} ${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}

test-docs:
name: Docs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,7 @@ final class R2dbcSettings(config: Config) {

val durableStateAssertSingleWriter: Boolean = config.getBoolean("state.assert-single-writer")

val dialect: Dialect = toRootLowerCase(config.getString("dialect")) match {
case "yugabyte" => Dialect.Yugabyte
case "postgres" => Dialect.Postgres
case "mysql" => Dialect.MySQL
case other =>
throw new IllegalArgumentException(s"Unknown dialect [$other]. Supported dialects are [yugabyte, postgres].")
}
val dialect: Dialect = Dialect.fromString(config.getString("dialect"))

val querySettings = new QuerySettings(config.getConfig("query"))

Expand Down Expand Up @@ -96,6 +90,18 @@ object Dialect {

/** @since 1.1.0 */
case object MySQL extends Dialect

/** @since 1.1.0 */
def fromString(value: String): Dialect = {
toRootLowerCase(value) match {
case "yugabyte" => Dialect.Yugabyte
case "postgres" => Dialect.Postgres
case "mysql" => Dialect.MySQL
case other =>
throw new IllegalArgumentException(
s"Unknown dialect [$other]. Supported dialects are [yugabyte, postgres, mysql].")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object Sql {
* INTERNAL API
*/
@InternalApi
private[r2dbc] implicit class DialectInterpolation(val sc: StringContext) extends AnyVal {
private[pekko] implicit class DialectInterpolation(val sc: StringContext) extends AnyVal {
def sql(args: Any*)(implicit dialect: Dialect): String =
dialect.replaceParameters(trimLineBreaks(sc.s(args: _*)))
}
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ object Dependencies {
pekkoPersistenceQuery,
r2dbcSpi,
r2dbcPool,
r2dbcMysql % "provided,test",
pekkoProjectionCore,
TestDeps.pekkoProjectionEventSourced,
TestDeps.pekkoProjectionDurableState,
Expand Down
2 changes: 1 addition & 1 deletion projection/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

//#projection-config
pekko.projection.r2dbc {
# postgres or yugabyte
# postgres, yugabyte or mysql
dialect = ${pekko.persistence.r2dbc.dialect}

offset-store {
Expand Down
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are breaking backwards compatibility here, can we convert from case class to class too? Similar to how R2dbcSettings is implemented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course this class is different from R2dbcSettings in a way that users can manually construct it - it is not constructed by config only, so will have to keep the means to set/modify its attributes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjfanning Do you have a preferred solution for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the TODO on the class - so I don't mind a rewrite for 1.1.0.

It would still be nice to minimise source incompatible changes - i.e. code with against 1.0.0 would ideally still be compilable with 1.1.0.

Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ import java.time.{ Duration => JDuration }
import java.util.Locale

import scala.concurrent.duration._

import com.typesafe.config.Config
import org.apache.pekko
import pekko.util.JavaDurationConverters._
import pekko.actor.typed.ActorSystem
import com.typesafe.config.Config
import pekko.persistence.r2dbc.Dialect
import pekko.util.JavaDurationConverters._

object R2dbcProjectionSettings {

Expand All @@ -44,11 +44,37 @@ object R2dbcProjectionSettings {
keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"),
evictInterval = config.getDuration("offset-store.evict-interval"),
deleteInterval = config.getDuration("offset-store.delete-interval"),
logDbCallsExceeding)
logDbCallsExceeding,
dialect = Dialect.fromString(config.getString("dialect")))
}

def apply(system: ActorSystem[_]): R2dbcProjectionSettings =
apply(system.settings.config.getConfig(DefaultConfigPath))

def apply(
schema: Option[String],
offsetTable: String,
timestampOffsetTable: String,
managementTable: String,
useConnectionFactory: String,
timeWindow: JDuration,
keepNumberOfEntries: Int,
evictInterval: JDuration,
deleteInterval: JDuration,
logDbCallsExceeding: FiniteDuration
): R2dbcProjectionSettings = R2dbcProjectionSettings(
schema,
offsetTable,
timestampOffsetTable,
managementTable,
useConnectionFactory,
timeWindow,
keepNumberOfEntries,
evictInterval,
deleteInterval,
logDbCallsExceeding,
Dialect.Postgres
)
}

// FIXME remove case class, and add `with` methods
Expand All @@ -62,7 +88,8 @@ final case class R2dbcProjectionSettings(
keepNumberOfEntries: Int,
evictInterval: JDuration,
deleteInterval: JDuration,
logDbCallsExceeding: FiniteDuration) {
logDbCallsExceeding: FiniteDuration,
dialect: Dialect) {
val offsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + offsetTable
val timestampOffsetTableWithSchema: String = schema.map(_ + ".").getOrElse("") + timestampOffsetTable
val managementTableWithSchema: String = schema.map(_ + ".").getOrElse("") + managementTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorSystem
Expand All @@ -38,8 +37,9 @@ import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.UpdatedDurableState
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.Interpolation
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.typed.PersistenceId
import pekko.projection.BySlicesSourceProvider
import pekko.projection.MergeableOffset
Expand All @@ -49,6 +49,7 @@ import pekko.projection.internal.OffsetSerialization
import pekko.projection.internal.OffsetSerialization.MultipleOffsets
import pekko.projection.internal.OffsetSerialization.SingleOffset
import pekko.projection.r2dbc.R2dbcProjectionSettings
import pekko.projection.r2dbc.internal.mysql.MySQLR2dbcOffsetStore
import io.r2dbc.spi.Connection
import io.r2dbc.spi.Statement
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -154,6 +155,22 @@ object R2dbcOffsetStore {
val FutureDone: Future[Done] = Future.successful(Done)
val FutureTrue: Future[Boolean] = Future.successful(true)
val FutureFalse: Future[Boolean] = Future.successful(false)

def fromConfig(
projectionId: ProjectionId,
sourceProvider: Option[BySlicesSourceProvider],
system: ActorSystem[_],
settings: R2dbcProjectionSettings,
r2dbcExecutor: R2dbcExecutor,
clock: Clock = Clock.systemUTC()
): R2dbcOffsetStore = {
settings.dialect match {
case Dialect.Postgres | Dialect.Yugabyte =>
new R2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor, clock)
case Dialect.MySQL =>
new MySQLR2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor, clock)
}
}
}

/**
Expand All @@ -170,6 +187,9 @@ private[projection] class R2dbcOffsetStore(

import R2dbcOffsetStore._

implicit protected val dialect: Dialect = settings.dialect
protected lazy val timestampSql: String = "transaction_timestamp()"

// FIXME include projectionId in all log messages
private val logger = LoggerFactory.getLogger(this.getClass)

Expand All @@ -181,8 +201,8 @@ private[projection] class R2dbcOffsetStore(
import offsetSerialization.toStorageRepresentation

private val timestampOffsetTable = settings.timestampOffsetTableWithSchema
private val offsetTable = settings.offsetTableWithSchema
private val managementTable = settings.managementTableWithSchema
protected val offsetTable = settings.offsetTableWithSchema
protected val managementTable = settings.managementTableWithSchema

private[projection] implicit val executionContext: ExecutionContext = system.executionContext

Expand All @@ -195,7 +215,7 @@ private[projection] class R2dbcOffsetStore(
private val insertTimestampOffsetSql: String = sql"""
INSERT INTO $timestampOffsetTable
(projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed)
VALUES (?,?,?,?,?,?, transaction_timestamp())"""
VALUES (?,?,?,?,?,?, $timestampSql)"""

// delete less than a timestamp
private val deleteOldTimestampOffsetSql: String =
Expand All @@ -211,7 +231,7 @@ private[projection] class R2dbcOffsetStore(
private val selectOffsetSql: String =
sql"SELECT projection_key, current_offset, manifest, mergeable FROM $offsetTable WHERE projection_name = ?"

private val upsertOffsetSql: String = sql"""
protected val upsertOffsetSql: String = sql"""
INSERT INTO $offsetTable
(projection_name, projection_key, current_offset, manifest, mergeable, last_updated)
VALUES (?,?,?,?,?,?)
Expand Down Expand Up @@ -518,8 +538,10 @@ private[projection] class R2dbcOffsetStore(
} else {
// TODO Try Batch without bind parameters for better performance. Risk of sql injection for these parameters is low.
val boundStatement =
records.foldLeft(statement) { (stmt, rec) =>
stmt.add()
records.zipWithIndex.foldLeft(statement) { case (stmt, (rec, idx)) =>
if (idx != 0) {
stmt.add()
}
bindRecord(stmt, rec)
}
R2dbcExecutor.updateBatchInTx(boundStatement)
Expand Down Expand Up @@ -979,11 +1001,7 @@ private[projection] class R2dbcOffsetStore(
.bind(2, paused)
.bind(3, Instant.now(clock).toEpochMilli)
}
.flatMap {
case i if i == 1 => Future.successful(Done)
case _ =>
Future.failed(new RuntimeException(s"Failed to update management table for $projectionId"))
}
.map(_ => Done)(ExecutionContexts.parasitic)
}

private def createRecordWithOffset[Envelope](envelope: Envelope): Option[RecordWithOffset] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private[projection] object R2dbcProjectionImpl {
connectionFactory: ConnectionFactory)(implicit system: ActorSystem[_]) = {
val r2dbcExecutor =
new R2dbcExecutor(connectionFactory, log, settings.logDbCallsExceeding)(system.executionContext, system)
new R2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor)
R2dbcOffsetStore.fromConfig(projectionId, sourceProvider, system, settings, r2dbcExecutor)
}

private val loadEnvelopeCounter = new AtomicLong
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pekko.projection.r2dbc.internal.mysql

import java.time.Clock

import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.projection.BySlicesSourceProvider
import pekko.projection.ProjectionId
import pekko.projection.r2dbc.R2dbcProjectionSettings
import pekko.projection.r2dbc.internal.R2dbcOffsetStore

/**
* INTERNAL API
*/
@InternalApi
private[projection] class MySQLR2dbcOffsetStore(
projectionId: ProjectionId,
sourceProvider: Option[BySlicesSourceProvider],
system: ActorSystem[_],
settings: R2dbcProjectionSettings,
r2dbcExecutor: R2dbcExecutor,
clock: Clock = Clock.systemUTC())
extends R2dbcOffsetStore(projectionId, sourceProvider, system, settings, r2dbcExecutor, clock) {

override lazy val timestampSql: String = "NOW(6)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a dialect method, can we abstract a class 'Dialect'? like Hibernate SQL Dialects does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have an ADT for Dialect, I guess more FP-ish way would be to add a private extension class with these methods, although these kinds of attributes are already present in core.


override val upsertOffsetSql: String = sql"""
INSERT INTO $offsetTable
(projection_name, projection_key, current_offset, manifest, mergeable, last_updated)
VALUES (?,?,?,?,?,?) AS excluded
ON DUPLICATE KEY UPDATE
current_offset = excluded.current_offset,
manifest = excluded.manifest,
mergeable = excluded.mergeable,
last_updated = excluded.last_updated"""

override val updateManagementStateSql: String = sql"""
INSERT INTO $managementTable
(projection_name, projection_key, paused, last_updated)
VALUES (?,?,?,?) AS excluded
ON DUPLICATE KEY UPDATE
paused = excluded.paused,
last_updated = excluded.last_updated"""
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class EventSourcedChaosSpec
(1 to expectedEventCounts).foreach { _ =>
// not using receiveMessages(expectedEvents) for better logging in case of failure
try {
processed :+= processedProbe.receiveMessage(15.seconds)
processed :+= processedProbe.receiveMessage(30.seconds)
} catch {
case e: AssertionError =>
val missing = expectedEvents.diff(processed.map(_.envelope.event))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import java.util.UUID
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._

import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
Expand All @@ -29,8 +28,9 @@ import pekko.actor.typed.ActorSystem
import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.Behaviors
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.r2dbc.Dialect
import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.r2dbc.internal.Sql.Interpolation
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
Expand Down Expand Up @@ -152,6 +152,7 @@ class EventSourcedEndToEndSpec
// to be able to store events with specific timestamps
private def writeEvent(persistenceId: String, seqNr: Long, timestamp: Instant, event: String): Unit = {
log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr: java.lang.Long, event, timestamp)
implicit val dialect: Dialect = projectionSettings.dialect
val insertEventSql = sql"""
INSERT INTO ${journalSettings.journalTableWithSchema}
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, adapter_manifest, event_ser_id, event_ser_manifest, event_payload)
Expand Down Expand Up @@ -269,7 +270,7 @@ class EventSourcedEndToEndSpec
(1 to numberOfEvents).foreach { _ =>
// not using receiveMessages(expectedEvents) for better logging in case of failure
try {
processed :+= processedProbe.receiveMessage(15.seconds)
processed :+= processedProbe.receiveMessage(30.seconds)
} catch {
case e: AssertionError =>
val missing = expectedEvents.diff(processed.map(_.envelope.event))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class EventSourcedPubSubSpec
(1 to numberOfEvents).foreach { _ =>
// not using receiveMessages(expectedEvents) for better logging in case of failure
try {
processed :+= processedProbe.receiveMessage(25.seconds)
processed :+= processedProbe.receiveMessage(30.seconds)
} catch {
case e: AssertionError =>
val missing = expectedEvents.diff(processed.map(_.envelope.event))
Expand Down
Loading
Loading