Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration tool #501

Merged
merged 42 commits into from
Oct 11, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
17490ac
WIP
Tochemey Feb 5, 2021
32c7b75
WIP
Tochemey Feb 5, 2021
6fc16ae
Refactoring :beers:
Tochemey Feb 7, 2021
e824d21
Renaming and fix Java collectors :beers:
Tochemey Feb 7, 2021
c49166c
Set explicit type for some weird reasons for scala 2.12 compilation
Tochemey Feb 7, 2021
1e3e8e9
Address some reviews comments
Tochemey Feb 13, 2021
3cd2e3d
Remove schema utils
Tochemey Feb 13, 2021
483df26
Fix convertion issue
Tochemey Feb 13, 2021
09f32d1
Rename module name to reflect his job
Tochemey Feb 13, 2021
e102ae0
Fixes
Tochemey Feb 13, 2021
8aa8542
Fix some issue
Tochemey Feb 14, 2021
712a2fc
Merge branch 'master' of github.com:Tochemey/akka-persistence-jdbc
Tochemey Feb 15, 2021
85cfb9b
Merge branch 'master' of github.com:Tochemey/akka-persistence-jdbc
Tochemey Feb 15, 2021
a39b5e8
Wow
Tochemey Feb 23, 2021
a228672
Remove event adapter
Tochemey Feb 23, 2021
0a60fef
Cleanup
Tochemey Feb 23, 2021
9a84ae4
Remove the old migration project
Tochemey Mar 8, 2021
1ac0e5e
Remove unecessary dependencies
Tochemey Mar 8, 2021
8f3fac8
Address review comments
Tochemey Mar 8, 2021
06d8fa0
Small fix :bulb:
Tochemey Mar 8, 2021
98a403f
some refactoring based upon QA
Tochemey Mar 13, 2021
b8c17b9
Add Oracle Ordering
Tochemey Mar 14, 2021
acf1760
Add Oracle Ordering
Tochemey Mar 14, 2021
7b3f3ad
Fix basic check
Tochemey Mar 14, 2021
3804bdb
Fix header checker
Tochemey Mar 14, 2021
c11aeb1
some enhancement
Tochemey Mar 14, 2021
597aa75
add some fixme
Tochemey Mar 14, 2021
e3fb374
Perform some cleanup
Tochemey Mar 14, 2021
763c424
Address Review comments
Tochemey Mar 15, 2021
687f571
Address Review comments
Tochemey Mar 16, 2021
86f676d
Quick fix
Tochemey Mar 16, 2021
75559ab
Start preparation for tests
Tochemey Mar 17, 2021
a3aa6a2
Buggy implementation
Tochemey Mar 17, 2021
e805b48
Basic check
Tochemey Mar 17, 2021
3de5100
:ambulance:
Tochemey Mar 17, 2021
651e768
refactoring
Tochemey Mar 23, 2021
14d0f3d
add license header
Tochemey Mar 23, 2021
6095df9
Merge branch 'master' into master
Tochemey Apr 27, 2021
c169ea7
checkin
Tochemey Apr 27, 2021
d5fad3a
fix non-exhaustive pattern matching
gabriele83 Oct 1, 2021
f7e483d
Merge pull request #1 from gabriele83/master
Tochemey Oct 5, 2021
3531def
Merge branch 'master' into master
octonato Oct 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ lazy val `akka-persistence-jdbc` = project
.in(file("."))
.enablePlugins(ScalaUnidocPlugin)
.disablePlugins(MimaPlugin, SitePlugin)
.aggregate(core, migration, docs)
.aggregate(core, migration, docs, tools)
.settings(publish / skip := true)

lazy val core = project
Expand All @@ -30,6 +30,16 @@ lazy val migration = project
libraryDependencies ++= Dependencies.Migration,
publish / skip := true)

lazy val tools = project
.in(file("tools"))
.enablePlugins(Publish)
.disablePlugins(SitePlugin, MimaPlugin)
.settings(
name := "akka-persistence-jdbc-schema-utils",
libraryDependencies ++= Dependencies.Migration ++ Dependencies.Libraries,
publish / skip := true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO here so we don't forget to remove it when ready.

We want to publish the tool, but only when ready for use. It doesn't need to be when production ready. We may decide to publish it on a snapshot to test it out on real world projects.

.dependsOn(core)
Tochemey marked this conversation as resolved.
Show resolved Hide resolved

lazy val docs = project
.enablePlugins(ProjectAutoPlugin, AkkaParadoxPlugin, ParadoxSitePlugin, PreprocessPlugin, PublishRsyncPlugin)
.disablePlugins(MimaPlugin)
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object Dependencies {
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test) ++ JdbcDrivers.map(_ % Test)

val Migration: Seq[ModuleID] = Seq(
"org.flywaydb" % "flyway-core" % "7.5.1",
"org.flywaydb" % "flyway-core" % "7.5.2",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets' remove this. We don't need it.

"com.typesafe" % "config" % "1.4.1",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"org.testcontainers" % "postgresql" % "1.15.1" % Test,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
-- creation of the new akka persistence jdbc schemas

-- event_journal table
CREATE TABLE IF NOT EXISTS ${apj:event_journal}
(
ordering BIGSERIAL,
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
deleted BOOLEAN DEFAULT FALSE NOT NULL,

writer VARCHAR(255) NOT NULL,
write_timestamp BIGINT,
adapter_manifest VARCHAR(255),

event_ser_id INTEGER NOT NULL,
event_ser_manifest VARCHAR(255) NOT NULL,
event_payload BYTEA NOT NULL,

meta_ser_id INTEGER,
meta_ser_manifest VARCHAR(255),
meta_payload BYTEA,

PRIMARY KEY (persistence_id, sequence_number)
);

CREATE UNIQUE INDEX ${apj:event_journal}_ordering_idx ON ${apj:event_journal} (ordering);

-- event_tag table
CREATE TABLE IF NOT EXISTS ${apj:event_tag}
(
event_id BIGINT,
tag VARCHAR(256),
PRIMARY KEY (event_id, tag),
CONSTRAINT fk_${apj:event_journal}
FOREIGN KEY (event_id)
REFERENCES ${apj:event_journal} (ordering)
ON DELETE CASCADE
);

-- state_snapshot table
CREATE TABLE IF NOT EXISTS ${apj:state_snapshot}
(
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
created BIGINT NOT NULL,

snapshot_ser_id INTEGER NOT NULL,
snapshot_ser_manifest VARCHAR(255) NOT NULL,
snapshot_payload BYTEA NOT NULL,

meta_ser_id INTEGER,
meta_ser_manifest VARCHAR(255),
meta_payload BYTEA,

PRIMARY KEY (persistence_id, sequence_number)
);
65 changes: 65 additions & 0 deletions tools/src/main/scala/akka/persistence/jdbc/tools/SchemasUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc.tools

import com.typesafe.config.Config
import org.flywaydb.core.Flyway
import org.flywaydb.core.api.Location
import org.flywaydb.core.api.configuration.FluentConfiguration
import org.flywaydb.core.api.output.MigrateResult

import scala.collection.JavaConverters.{ collectionAsScalaIterableConverter, mapAsJavaMapConverter }

/**
* executes the database migration
*
* @param config the application config
*/
final case class SchemasUtil(config: Config) {
private val userKey: String = "jdbc-journal.slick.db.user"
private val passwordKey: String = "jdbc-journal.slick.db.password"
private val urlKey: String = "jdbc-journal.slick.db.url"

// Flyway placeholders values keys
private val eventJournalPlaceholderValueKey: String = "jdbc-journal.tables.event_journal.tableName"
private val eventTagPlaceholderValueKey: String = "jdbc-journal.tables.event_tag.tableName"
private val stateSnapshotPlaceholderValueKey: String = "jdbc-snapshot-store.tables.snapshot.tableName"

// Flyway placeholders
private val eventJournalPlaceholder = "apj:event_journal"
private val eventTagPlaceholder: String = "apj:event_tag"
private val stateSnapshotPlaceholder: String = "apj:state_snapshot"

private val url: String = config.getString(urlKey)
private val user: String = config.getString(userKey)
private val password: String = config.getString(passwordKey)

/**
* create the persistence schemas.
*
* @return the list of migration versions run
*/
def createIfNotExists(): Seq[String] = {
val flywayConfig: FluentConfiguration = Flyway.configure
.dataSource(url, user, password)
.table("apj_schema_history")
.locations(new Location("classpath:db/migration/postgres"))
.ignoreMissingMigrations(true) // in case someone has some missing migrations
.placeholders(Map(
eventJournalPlaceholder -> config.getString(eventJournalPlaceholderValueKey),
eventTagPlaceholder -> config.getString(eventTagPlaceholderValueKey),
stateSnapshotPlaceholder -> config.getString(stateSnapshotPlaceholderValueKey)).asJava)

val flyway: Flyway = flywayConfig.load
flyway.baseline()

// running the migration
val result: MigrateResult = flyway.migrate()

// let us return the sequence of migration versions applied sorted in a descending order
result.migrations.asScala.toSeq.map(_.version).sortWith(_ > _)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc.tools.migration

import akka.actor.ActorSystem
import akka.persistence.jdbc.config.{ JournalConfig, ReadJournalConfig, SnapshotConfig }
import akka.persistence.jdbc.db.SlickExtension
import akka.persistence.jdbc.journal.dao.DefaultJournalDao
import akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao
import akka.persistence.jdbc.snapshot.dao.legacy.ByteArraySnapshotDao
import akka.persistence.jdbc.snapshot.dao.DefaultSnapshotDao
import akka.serialization.SerializationExtension
import com.typesafe.config.Config
import slick.basic.DatabaseConfig
import slick.jdbc.{ JdbcBackend, JdbcProfile }
import slick.jdbc

/**
* this class will be implemented by both the journal and the snapshot migration class
*
* @param config the application config
* @param system the actor system
*/
abstract class DataMigrator(config: Config)(implicit system: ActorSystem) {

import system.dispatcher

// get an instance of the database and the Jdbc profile
protected val profile: JdbcProfile = DatabaseConfig.forConfig[JdbcProfile]("slick").profile

// get the various configuration
protected val journalConfig = new JournalConfig(config.getConfig("jdbc-journal"))
protected val readJournalConfig = new ReadJournalConfig(config.getConfig("jdbc-read-journal"))

protected val journaldb: JdbcBackend.Database =
SlickExtension(system).database(config.getConfig("jdbc-read-journal")).database
protected val snapshotdb: jdbc.JdbcBackend.Database =
SlickExtension(system).database(config.getConfig("jdbc-snapshot-store")).database

// get an instance of the legacy read journal dao
protected val legacyReadJournalDao: ByteArrayReadJournalDao =
new ByteArrayReadJournalDao(journaldb, profile, readJournalConfig, SerializationExtension(system))

// get an instance of the default journal dao
protected val defaultJournalDao: DefaultJournalDao =
new DefaultJournalDao(journaldb, profile, journalConfig, SerializationExtension(system))

protected val snapshotConfig = new SnapshotConfig(config.getConfig("jdbc-snapshot-store"))

// get the instance of the legacy snapshot dao
val legacySnapshotDao: ByteArraySnapshotDao =
new ByteArraySnapshotDao(snapshotdb, profile, snapshotConfig, SerializationExtension(system))

// get the instance if the default snapshot dao
val defaultSnapshotDao: DefaultSnapshotDao =
new DefaultSnapshotDao(snapshotdb, profile, snapshotConfig, SerializationExtension(system))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc.tools.migration

import akka.NotUsed
import akka.actor.ActorSystem
import akka.persistence.{ AtomicWrite, Persistence, PersistentRepr }
import akka.persistence.journal.EventAdapter
import akka.stream.scaladsl.Source
import com.typesafe.config.Config

import scala.collection.immutable
import scala.concurrent.Future
import scala.util.Try

/**
* This will help migrate the legacy journal data onto the new journal schema with the
* appropriate serialization
*
* @param config the application config
* @param system the actor system
*/
final case class LegacyJournalDataMigrator(config: Config)(implicit system: ActorSystem) extends DataMigrator(config) {

private val eventAdapters = Persistence(system).adaptersFor("", config)

private def adaptEvents(repr: PersistentRepr): Seq[PersistentRepr] = {
val adapter: EventAdapter = eventAdapters.get(repr.payload.getClass)
adapter.fromJournal(repr.payload, repr.manifest).events.map(repr.withPayload)
}
Copy link
Member

@octonato octonato Feb 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is neat.

This will allow users to eventually migrate their events to some new format.

There is one thing that we should watch-out though. If an adapter produces more events then it takes (1 event split in many), that will change the ordering in the new table.

That's not an issue in itself unless users are tracking the offset using Lagom or Akka Projections.

That's all good, but we need to carefully inform users that their adapters should emit 1:1 in that case or they should reset their offset table and consume from scratch.

There is one more thing on the adapters approach, it that we can re-tag the events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting.


/**
* reads all the current events in the legacy journal
*
* @return the source of all the events
*/
private def allEvents(): Source[PersistentRepr, NotUsed] = {
legacyReadJournalDao
.allPersistenceIdsSource(Long.MaxValue)
.flatMapConcat((persistenceId: String) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mapConcat will change the ordering in the new schema. Instead, we should read the events directly without fetching by persistence id.

legacyReadJournalDao
.messagesWithBatch(persistenceId, 0L, Long.MaxValue, readJournalConfig.maxBufferSize, None)
.mapAsync(1)(reprAndOrdNr => Future.fromTry(reprAndOrdNr))
.mapConcat { case (repr: PersistentRepr, _) =>
adaptEvents(repr)
}
})
}

/**
* write all legacy events into the new journal tables applying the proper serialization
*/
def migrate(): Source[Seq[Try[Unit]], NotUsed] = {
allEvents().mapAsync(1) { pr: PersistentRepr =>
defaultJournalDao.asyncWriteMessages(immutable.Seq(AtomicWrite(pr)))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2021 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc.tools.migration

import akka.actor.ActorSystem
import akka.NotUsed
import akka.persistence.SnapshotMetadata
import akka.stream.scaladsl.Source
import com.typesafe.config.Config

import scala.concurrent.Future

/**
* This will help migrate the legacy snapshot data onto the new snapshot schema with the
* appropriate serialization
*
* @param config the application config
* @param system the actor system
*/
case class LegacySnapshotDataMigrator(config: Config)(implicit system: ActorSystem) extends DataMigrator(config) {

import system.dispatcher

/**
* write the latest state snapshot into the new snapshot table applying the proper serialization
*/
def migrate(): Source[Option[Future[Unit]], NotUsed] = {
legacyReadJournalDao.allPersistenceIdsSource(Long.MaxValue).mapAsync(1) { persistenceId: String =>
legacySnapshotDao
.latestSnapshot(persistenceId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to offer the option to migrate the last N snapshots. Is common for users to keep more than snapshot and in that case they probably want to migrate them as well.

.map((o: Option[(SnapshotMetadata, Any)]) => {
o.map((result: (SnapshotMetadata, Any)) => {
val (meta, data) = result
defaultSnapshotDao.save(meta, data)
})
})
}
}
}