-
Notifications
You must be signed in to change notification settings - Fork 142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migration tool #501
Migration tool #501
Conversation
@octonato , @patriknw for some weird reasons I am getting some type mismatch here https://travis-ci.com/github/akka/akka-persistence-jdbc/jobs/481103765#L368 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Tochemey, this is looking good.
I like the event adapter idea.
I'm wondering if we should take a similar path as the Cassandra plugin. The Cassandra has a Cleanup tool with different methods that users can combine to achieve what they want.
For instance, do I need to preserve the ordering of events? I may want to do it if I'm using Akka Projections and don't want to replay all my projections.
In that case, preserving the ordering is a must.
If I don't care about the ordering, I may even migrate per persistence id and in parallel.
private def adaptEvents(repr: PersistentRepr): Seq[PersistentRepr] = { | ||
val adapter: EventAdapter = eventAdapters.get(repr.payload.getClass) | ||
adapter.fromJournal(repr.payload, repr.manifest).events.map(repr.withPayload) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is neat.
This will allow users to eventually migrate their events to some new format.
There is one thing that we should watch-out though. If an adapter produces more events then it takes (1 event split in many), that will change the ordering in the new table.
That's not an issue in itself unless users are tracking the offset using Lagom or Akka Projections.
That's all good, but we need to carefully inform users that their adapters should emit 1:1 in that case or they should reset their offset table and consume from scratch.
There is one more thing on the adapters approach, it that we can re-tag the events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting.
private def allEvents(): Source[PersistentRepr, NotUsed] = { | ||
legacyReadJournalDao | ||
.allPersistenceIdsSource(Long.MaxValue) | ||
.flatMapConcat((persistenceId: String) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mapConcat will change the ordering in the new schema. Instead, we should read the events directly without fetching by persistence id.
def migrate(): Source[Option[Future[Unit]], NotUsed] = { | ||
legacyReadJournalDao.allPersistenceIdsSource(Long.MaxValue).mapAsync(1) { persistenceId: String => | ||
legacySnapshotDao | ||
.latestSnapshot(persistenceId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may want to offer the option to migrate the last N
snapshots. Is common for users to keep more than snapshot and in that case they probably want to migrate them as well.
@Tochemey, I have a second thought about the Flyway part. Maybe we should just drop it. There is too much on our side to have something that would work for all databases. Selecting the type, etc. At the end, what is the real added value? Maybe the best is to let users just pick the schema as we publish in the docs (or use the new SchemaUtils) to create it. Then run the tool using new legacy and new DAOs. And that's it. The migration doesn't need to take care of creating the schema for the users. |
However the new SchemaUtils need to be made a bit flexible to accommodate the user defined postgres schema. At the moment it is tied to the |
@octonato |
@patriknw some lovely feedback will not be bad.😁 |
# Conflicts: # migrator/src/main/scala/akka/persistence/jdbc/migrator/LegacySnapshotDataMigrator.scala
# Conflicts: # migrator/src/main/scala/akka/persistence/jdbc/migrator/LegacySnapshotDataMigrator.scala
@octonato and @patriknw I am able to migrate all data from snapshot and journal into the new tables. However for the tags migration this is the code I use. private def events(): Source[PersistentRepr, NotUsed] = {
Source
.fromPublisher(
journaldb.stream(queries.JournalTable.sortBy(_.sequenceNumber).result)
)
.via(serializer.deserializeFlow)
.mapAsync(1)((reprAndOrdNr: Try[(PersistentRepr, Set[String], Long)]) => Future.fromTry(reprAndOrdNr))
.map { case (repr, tags, _) =>
repr.withPayload(Tagged(repr.payload, tags))
}
} I don't think we will need an event adapter since the payload is the raw one that is persisted into the old journal table column as well as the tags. I strongly believe we just need to move the data. |
@Tochemey, I think you are right on this. The good part of this approach is that it doesn't leave the door open for adding more events. An event adapter is capable of taking one event and emit more than one. That would add more events on the new table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Tochemey.
I think we are going in a good direction. Cool that you manage to read the tags and remove the event adapter.
We will need to think of integration tests.
First start the system with old dao, persist some events and snapshots. Stop it, migrate the data and start again using the new DAOs.
build.sbt
Outdated
@@ -4,7 +4,7 @@ lazy val `akka-persistence-jdbc` = project | |||
.in(file(".")) | |||
.enablePlugins(ScalaUnidocPlugin) | |||
.disablePlugins(MimaPlugin, SitePlugin) | |||
.aggregate(core, migration, docs) | |||
.aggregate(core, migration, docs, migrator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove the migration
project and keep only the migrator
.
.settings( | ||
name := "akka-persistence-jdbc-migrator", | ||
libraryDependencies ++= Dependencies.Migration ++ Dependencies.Libraries, | ||
publish / skip := true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a TODO here so we don't forget to remove it when ready.
We want to publish the tool, but only when ready for use. It doesn't need to be when production ready. We may decide to publish it on a snapshot to test it out on real world projects.
.mapAsync(1)((repr: PersistentRepr) => { | ||
defaultJournalDao.asyncWriteMessages(Seq(AtomicWrite(Seq(repr)))) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.mapAsync(1)((repr: PersistentRepr) => { | |
defaultJournalDao.asyncWriteMessages(Seq(AtomicWrite(Seq(repr)))) | |
}) | |
.mapAsync(1) { repr => | |
defaultJournalDao.asyncWriteMessages(Seq(AtomicWrite(Seq(repr)))) | |
} |
project/Dependencies.scala
Outdated
@@ -33,7 +33,7 @@ object Dependencies { | |||
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test) ++ JdbcDrivers.map(_ % Test) | |||
|
|||
val Migration: Seq[ModuleID] = Seq( | |||
"org.flywaydb" % "flyway-core" % "7.5.1", | |||
"org.flywaydb" % "flyway-core" % "7.5.2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets' remove this. We don't need it.
defaultJournalDao.asyncWriteMessages(Seq(AtomicWrite(Seq(repr)))) | ||
}) | ||
.limit(Long.MaxValue) | ||
.runWith(Sink.seq) // FIXME for performance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sing.ignore
is good enough here and we won't accumulate records in memory.
.mapAsync(1)((reprAndOrdNr: Try[(PersistentRepr, Set[String], Long)]) => Future.fromTry(reprAndOrdNr)) | ||
.map { case (repr, tags, _) => | ||
repr.withPayload(Tagged(repr.payload, tags)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to go from Try
to Future
and back to unwrapped tuple 3.
I think that whole block can be changed to:
.map {
case Success((repr, tags, _)) if tags.nonEmpty =>
repr.withPayload(Tagged(repr, tags)) // only wrap in `Tagged` if needed
case Success((repr, _, _)) => repr // noops map
case Failure(exception) => throw exception // blow-up on failure
}
Eventually we need to add more logging inside deserializeFlow
. If something goes wrong, that will be when deserializing the payload.
def migrateAll(): Future[Seq[Future[Unit]]] = { | ||
for { | ||
rows <- snapshotdb.run(queries.SnapshotTable.sortBy(_.sequenceNumber.desc).result) | ||
} yield rows.map(toSnapshotData).map { case (metadata, value) => | ||
defaultSnapshotDao.save(metadata, value) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Snapshots tend to have payloads much larger then events. I think this query will fetch all snapshots into memory. Better if we stream like we did for the events.
|
||
def migrateLatest(): Future[Option[Future[Unit]]] = { | ||
for { | ||
rows <- snapshotdb.run(queries.SnapshotTable.sortBy(_.sequenceNumber.desc).take(1).result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will only take the latest snapshot across all persistence ids.
Insteads, we should use get the distinct list of persistence id (in a stream), run through it and fetch the latest per persistenceId.
/** | ||
* migrate all the legacy snapshot schema data into the new snapshot schema | ||
*/ | ||
def migrateAll(): Future[Seq[Future[Unit]]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Future[Seq[Future[Unit]]]
this will confuse users. When is the migration done? When the outer future completes I still have a bunch on Futures in flight.
Better if we return a Future[Done]
here and that we do it using a stream of snapshots. We consume one by one and we finish with Sink.ignore
and return a Future[Done]
.
def migrate(offset: Int, limit: Int): Future[Seq[Future[Unit]]] = { | ||
for { | ||
rows <- snapshotdb.run(queries.SnapshotTable.sortBy(_.sequenceNumber.desc).drop(offset).take(limit).result) | ||
} yield rows.map(toSnapshotData).map { case (metadata, value) => | ||
defaultSnapshotDao.save(metadata, value) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this one is less useful. I see the value in migrating only the latest snapshot like in the following method. But migrating a subset is maybe something that we can skip.
It's also one scenario less to test for us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Tochemey, I left a few more comments and a proposal for a test plan.
* This trait will be implemented by the various rdbms supported by the akka persistence jdbc since | ||
* the sequence number is handled differently by the various databases | ||
*/ | ||
sealed trait JournalOrdering { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! I see it now.
So, the basic issue is that we are forcing new ordering because we are importing the ordering from another table and therefore, when we finished, the new table sequence/auto-increment needs to be reconfigured to start from the last + 1. Correct?
I remember that at some point you mentioned that you were getting the ordering interleaved, but I think the main reason was because when streaming the results you were calling sortBy(_.sequenceNumber)
.
Wouldn't we just get the right ordering if we read on one side, respecting the ordering, and insert on the other? At the end of the game, the ordering on the new table should be exactly the same. Or am I missing something obvious?
The JournalOrdering
you introduced looks neat if we don't have another solution. But I would first try to just ensure that the ordering is preserved by inserting on the same order. As such we will have less code.
|
||
import scala.concurrent.{ ExecutionContext, Future } | ||
|
||
package object migrator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to add a package object. We can just add a DatabaseType.scala
and JournalOrdering.scala
file.
It makes it easier to find them when just browsing the code in GitHub, text editor or file browser. When we add traits and classes to a package.scala, we need to know they live there in order to find them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ordering can never be respected because it is generated by the database. So the only way to ensure the ordering from the old journal to the new is to force insert them during the insertion into the new journal because the ordering field is a database auto-increment field. So after the migration of the data reset the ordering per database on the new journal as done here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but why wouldn't the ordering be respected if the records are inserted in the same order? True that the ordering is generated by the db, but it is deterministic, no?
If we have a blank slate and we insert the first event, the ordering will be 1, second event ordering 2, etc.
What am I missing?
import java.util.UUID | ||
import scala.concurrent.{ ExecutionContext, Future } | ||
|
||
abstract class BaseJournalMigratorSpec(config: String) extends SingleActorSystemPerTestSpec(config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be more convenient (and closer to user experience) if the tests start a EventSourcedBehavior
using the legacy tables and fill it by sending commands instead of going low-level and manipulate the queries directly.
When doing so, we can also configure the behavior to save the snapshots every N events (eg: N=5) and to keep 3 snapshots for example.
That's why I was saying that we may need to start two actor systems.
I suggest the following test plan:
- start actor system with the legacy config and populate the journal for a few entities, with many events and snapshots. Stop actor system
- run migrator for events and snapshots
- start another actor system using default config. Reload each entity and verify that their state is as expected.
The Account entity is a good one because we can easily test the balance.
Another tests could be:
- start actor system with the legacy config and populate the journal for a few entities, with many events and save snapshot on each event. Stop actor system
- run migrator only for snapshots
- start another actor system using default config. Reload each entity and verify that they state is as expected.
In that second test, since we save snapshots on each event we don't need to migrate the events and when verifying the state, we prove that we indeed migrated all snapshots.
Finally, we need to test that the tags are correctly migrated.
For that test I think it suffice to create one entity that persist events with tags, migrate the events and consume the tags using eventsByTags
from the new table.
In that test, we can simply verify that we consume the same amount of events as we generated through the entity.
WDYT?
Hello, is this still being worked on? Or is it blocked by some underlying issue? We have some production code we would like to upgrade to a newer akka-persistence version, and this PR would be extremely useful for us. We'd be happy to contribute with testing, or help in any other way if necessary. Thanks in advance. |
We'd be happy to help, there is a way we can contribute? |
private val bufferSize: Int = journalConfig.daoConfig.bufferSize | ||
|
||
// get the journal ordering based upon the schema type used | ||
private val journalOrdering: JournalOrdering = profile match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-exhaustive pattern matching
* | ||
* @param pr the given PersistentRepr | ||
*/ | ||
private def unpackPersistentRepr(pr: PersistentRepr): (PersistentRepr, Set[String]) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never used declaration
tags: Set[String], | ||
ordering: Long): (JournalAkkaSerializationRow, Set[String]) = { | ||
|
||
val serializedPayload: AkkaSerialization.AkkaSerialized = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val serializedPayload: AkkaSerialization.AkkaSerialized =
AkkaSerialization.serialize(serialization, repr.payload).get
/** | ||
* the actual event journal table | ||
*/ | ||
val tableName: String = s"${journalConfig.eventJournalTableConfiguration.tableName}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is enough only:
val tableName: String = journalConfig.eventJournalTableConfiguration.tableName
private val legacyJournalDao: ByteArrayReadJournalDao = | ||
new ByteArrayReadJournalDao(journaldb, profile, readJournalConfig, SerializationExtension(system)) | ||
|
||
private def toSnapshotData(row: SnapshotRow): (SnapshotMetadata, Any) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private def toSnapshotData(row: SnapshotRow): (SnapshotMetadata, Any) =
serializer.deserialize(row).get
@flonedo / @gabriele83, I think the best way to contribute on this PR is to see with @Tochemey as he's the original author. There are a few paths we can follow:
I think option 3 is the easiest and will allow others to contribute, but first we need to know from @Tochemey if he will continue work on this. |
@gabriele83 The migration code has been used in production already and it was successful(we were only using postgres and we did get all relevant tests). However because of the general nature of the lib there might be some edge cases. Honestly I believe it should work for any of the because we migrate the raw data from the old schema unto the new. The only thing that remains that I can think of at the moment are the tests(unit and integration) which I have not got the time to write. Also @octonato will be the best person to give us the edge cases that can be an issue. I am glad to assist whenever necessary. |
@gabriele83 I have left a comment on you PR. |
fix non-exhaustive pattern matching
@gabriele83 I have added you as collab on my branch. When free kindly fix the conflicts issue and let us merge it to this PR. |
@octonato Hello, a while back you suggested to create two actor systems for test purpose (one using legacy journal and one using the new one). How can I set up my tests to do so? Do you have any examples? |
@flonedo / @gabriele83, we don't have tests using 2 actor systems. That would be the first time we do it. We do have a few tests that starts and stops an actor system on each tests. This abstract spec offers some helper methods that you can use, for instance: def withActorSystem(f: ActorSystem => Unit): Unit = {
implicit val system: ActorSystem = ActorSystem("test", config)
f(system)
system.terminate().futureValue
} Nothing fancy. Just a helper that start actor system, runs the An alternative for the migration test could be the following: def withActorSystem(config: Config)(f: ActorSystem => Unit): Unit = {
implicit val system: ActorSystem = ActorSystem("test", config)
f(system)
system.terminate().futureValue
} Then the tests: it should "migrate the event journal" in {
// start 1st actor system with legacy schema
withActorSystem(configWithLegacySchema) { sys =>
// populate db
}
// start a new actor system using new schema
withActorSystem(configWithNewSchema) { sys =>
// run migration tool
// instantiate some persistent actors
// and verify journal is replayed as expected
}
} That's what I would try out first. And then, following the structure we have in our tests. |
I will merge this PR into branch Note, the code here isn't ready. Tests are not passing and we probably need to run formatting task and other things. I'm doing so because I understood that @Tochemey won't have time to continue working on it for now and others (@flonedo / @gabriele83) would like to pick from here. So, @flonedo / @gabriele83, few free to open PRs pointing to the new |
Ok 👍 |
@octonato, @ennru and @patriknw kindly take a look and advice.
Refs #317