Skip to content

Commit

Permalink
Schema changes for tags and using Akka Serialization for payloads (#467)
Browse files Browse the repository at this point in the history
  • Loading branch information
chbatey authored Jan 8, 2021
1 parent 89613f8 commit 0d2f1ee
Show file tree
Hide file tree
Showing 87 changed files with 1,956 additions and 659 deletions.
32 changes: 31 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ script:
jobs:
include:
- stage: check
env: CMD="verifyCodeStyle; mimaReportBinaryIssues"
env: CMD="verifyCodeStyle" # mima removed until 5.0 M0 created
name: "Code style and binary compatibility check. Run locally with: sbt 'verifyCodeStyle; mimaReportBinaryIssues'"

- env: CMD="+Test/compile"
name: "Compile all code with Scala 2.12 and 2.13"
- env: CMD="docs/paradox"
Expand Down Expand Up @@ -74,6 +75,35 @@ jobs:
- PRE_CMD="./scripts/launch-sqlserver.sh"
- CMD="++2.13.1 It/testOnly akka.persistence.jdbc.integration.SqlServer*"

- name: "MySQL integration tests with Scala 2.13 and AdoptOpenJDK 11 (old dao)"
env:
- JDK="adopt@~1.11-0"
- _JAVA_OPTIONS="-Djdbc-journal.dao=akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao -Djdbc-snapshot-store.dao=akka.persistence.jdbc.snapshot.dao.legacy.ByteArraySnapshotDao -Djdbc-read-journal.dao=akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao"
- PRE_CMD="./scripts/launch-mysql.sh"
- CMD="++2.13.1 It/testOnly akka.persistence.jdbc.integration.MySQL*"

- name: "Oracle integration tests with Scala 2.13 and AdoptOpenJDK 11 (old dao)"
env:
- JDK="adopt@~1.11-0"
- _JAVA_OPTIONS="-Djdbc-journal.dao=akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao -Djdbc-snapshot-store.dao=akka.persistence.jdbc.snapshot.dao.legacy.ByteArraySnapshotDao -Djdbc-read-journal.dao=akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao"
- PRE_CMD="./scripts/launch-oracle.sh"
- CMD="++2.13.1 It/testOnly akka.persistence.jdbc.integration.Oracle*"

- name: "PostgreSQL integration tests with Scala 2.13 and AdoptOpenJDK 11 (old dao)"
env:
- JDK="adopt@~1.11-0"
- _JAVA_OPTIONS="-Djdbc-journal.dao=akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao -Djdbc-snapshot-store.dao=akka.persistence.jdbc.snapshot.dao.legacy.ByteArraySnapshotDao -Djdbc-read-journal.dao=akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao"
- PRE_CMD="./scripts/launch-postgres.sh"
- CMD="++2.13.1 It/testOnly akka.persistence.jdbc.integration.Postgres*"

- name: "SQL Server integration tests with Scala 2.13 and AdoptOpenJDK 11 (old dao)"
env:
- JDK="adopt@~1.11-0"
- _JAVA_OPTIONS="-Djdbc-journal.dao=akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao -Djdbc-snapshot-store.dao=akka.persistence.jdbc.snapshot.dao.legacy.ByteArraySnapshotDao -Djdbc-read-journal.dao=akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao"
- PRE_CMD="./scripts/launch-sqlserver.sh"
- CMD="++2.13.1 It/testOnly akka.persistence.jdbc.integration.SqlServer*"


- stage: whitesource
script: git branch -f "$TRAVIS_BRANCH" && git checkout "$TRAVIS_BRANCH" && sbt whitesourceCheckPolicies whitesourceUpdate
name: "Check licenses via WhiteSource"
Expand Down
80 changes: 63 additions & 17 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,12 @@ jdbc-journal {
class = "akka.persistence.jdbc.journal.JdbcAsyncWriteJournal"

tables {
journal {

# Only used in pre 5.0.0 Dao
legacy_journal {
tableName = "journal"
schemaName = ""

columnNames {
ordering = "ordering"
deleted = "deleted"
Expand All @@ -131,14 +134,48 @@ jdbc-journal {
message = "message"
}
}

event_journal {
tableName = "event_journal"
schemaName = ""

columnNames {
ordering = "ordering"
deleted = "deleted"
persistenceId = "persistence_id"
sequenceNumber = "sequence_number"
writer = "writer"
writeTimestamp = "write_timestamp"
adapterManifest = "adapter_manifest"
eventPayload = "event_payload"
eventSerId = "event_ser_id"
eventSerManifest = "event_ser_manifest"
metaPayload = "meta_payload"
metaSerId = "meta_ser_id"
metaSerManifest = "meta_ser_manifest"
}
}

event_tag {
tableName = "event_tag"
schemaName = ""

columnNames {
eventId = "event_id"
tag = "tag"
}
}
}

# The tag separator to use when tagging events with more than one tag.
# should not be configured directly, but through property akka-persistence-jdbc.tagSeparator
# in order to keep consistent behavior over write/read sides
# Only used for the legacy schema
tagSeparator = ${akka-persistence-jdbc.tagSeparator}

dao = "akka.persistence.jdbc.journal.dao.ByteArrayJournalDao"
# If you have data from pre 5.0.0 use the legacy akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao
# Dao. Migration to the new dao will be added in the future.
dao = "akka.persistence.jdbc.journal.dao.DefaultJournalDao"

# The size of the buffer used when queueing up events for batch writing. This number must be bigger then the number
# of events that may be written concurrently. In other words this number must be bigger than the number of persistent
Expand Down Expand Up @@ -247,7 +284,7 @@ jdbc-snapshot-store {
class = "akka.persistence.jdbc.snapshot.JdbcSnapshotStore"

tables {
snapshot {
legacy_snapshot {
tableName = "snapshot"
schemaName = ""
columnNames {
Expand All @@ -257,6 +294,24 @@ jdbc-snapshot-store {
snapshot = "snapshot"
}
}

snapshot {
tableName = "snapshot"
schemaName = ""
columnNames {
persistenceId = "persistence_id"
sequenceNumber = "sequence_number"
created = "created"

snapshotPayload = "snapshot_payload"
snapshotSerId = "snapshot_ser_id"
snapshotSerManifest = "snapshot_ser_manifest"

metaPayload = "meta_payload"
metaSerId = "meta_ser_id"
metaSerManifest = "meta_ser_manifest"
}
}
}

# This setting can be used to configure usage of a shared database.
Expand All @@ -269,7 +324,7 @@ jdbc-snapshot-store {
# to the same value for these other journals.
use-shared-db = null

dao = "akka.persistence.jdbc.snapshot.dao.ByteArraySnapshotDao"
dao = "akka.persistence.jdbc.snapshot.dao.DefaultSnapshotDao"

slick {

Expand Down Expand Up @@ -375,7 +430,7 @@ jdbc-read-journal {
# to the same value for these other journals.
use-shared-db = null

dao = "akka.persistence.jdbc.query.dao.ByteArrayReadJournalDao"
dao = "akka.persistence.jdbc.query.dao.DefaultReadJournalDao"

# if true, queries will include logically deleted events
# should not be configured directly, but through property akka-persistence-jdbc.logicalDelete.enable
Expand All @@ -401,18 +456,9 @@ jdbc-read-journal {
}

tables {
journal {
tableName = "journal"
schemaName = ""
columnNames {
ordering = "ordering"
persistenceId = "persistence_id"
sequenceNumber = "sequence_number"
created = "created"
tags = "tags"
message = "message"
}
}
legacy_journal = ${jdbc-journal.tables.legacy_journal}
event_journal = ${jdbc-journal.tables.event_journal}
event_tag = ${jdbc-journal.tables.event_tag}
}

# The tag separator to use when tagging events with more than one tag.
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/resources/schema/h2/h2-create-schema-legacy.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE TABLE IF NOT EXISTS PUBLIC."journal" (
"ordering" BIGINT AUTO_INCREMENT,
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
"deleted" BOOLEAN DEFAULT FALSE NOT NULL,
"tags" VARCHAR(255) DEFAULT NULL,
"message" BYTEA NOT NULL,
PRIMARY KEY("persistence_id", "sequence_number")
);
CREATE UNIQUE INDEX IF NOT EXISTS "journal_ordering_idx" ON PUBLIC."journal"("ordering");

CREATE TABLE IF NOT EXISTS PUBLIC."snapshot" (
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
"created" BIGINT NOT NULL,
"snapshot" BYTEA NOT NULL,
PRIMARY KEY("persistence_id", "sequence_number")
);
55 changes: 39 additions & 16 deletions core/src/main/resources/schema/h2/h2-create-schema.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,41 @@
CREATE TABLE IF NOT EXISTS PUBLIC."journal" (
"ordering" BIGINT AUTO_INCREMENT,
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
"deleted" BOOLEAN DEFAULT FALSE NOT NULL,
"tags" VARCHAR(255) DEFAULT NULL,
"message" BYTEA NOT NULL,
PRIMARY KEY("persistence_id", "sequence_number")
);
CREATE UNIQUE INDEX IF NOT EXISTS "journal_ordering_idx" ON PUBLIC."journal"("ordering");
CREATE TABLE IF NOT EXISTS "event_journal" (
"ordering" BIGINT NOT NULL AUTO_INCREMENT,
"deleted" BOOLEAN DEFAULT false NOT NULL,
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
"writer" VARCHAR NOT NULL,
"write_timestamp" BIGINT NOT NULL,
"adapter_manifest" VARCHAR NOT NULL,
"event_payload" BLOB NOT NULL,
"event_ser_id" INTEGER NOT NULL,
"event_ser_manifest" VARCHAR NOT NULL,
"meta_payload" BLOB,
"meta_ser_id" INTEGER,
"meta_ser_manifest" VARCHAR,
PRIMARY KEY("persistence_id","sequence_number")
);

CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal" ("ordering");

CREATE TABLE IF NOT EXISTS PUBLIC."snapshot" (
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
"created" BIGINT NOT NULL,
"snapshot" BYTEA NOT NULL,
PRIMARY KEY("persistence_id", "sequence_number")
CREATE TABLE IF NOT EXISTS "event_tag" (
"event_id" BIGINT NOT NULL,
"tag" VARCHAR NOT NULL,
PRIMARY KEY("event_id", "tag"),
CONSTRAINT fk_event_journal
FOREIGN KEY("event_id")
REFERENCES "event_journal"("ordering")
ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS "snapshot" (
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" BIGINT NOT NULL,
"created" BIGINT NOT NULL,"snapshot_ser_id" INTEGER NOT NULL,
"snapshot_ser_manifest" VARCHAR NOT NULL,
"snapshot_payload" BLOB NOT NULL,
"meta_ser_id" INTEGER,
"meta_ser_manifest" VARCHAR,
"meta_payload" BLOB,
PRIMARY KEY("persistence_id","sequence_number")
);

2 changes: 2 additions & 0 deletions core/src/main/resources/schema/h2/h2-drop-schema-legacy.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS PUBLIC."journal";
DROP TABLE IF EXISTS PUBLIC."snapshot";
3 changes: 2 additions & 1 deletion core/src/main/resources/schema/h2/h2-drop-schema.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
DROP TABLE IF EXISTS PUBLIC."journal";
DROP TABLE IF EXISTS PUBLIC."event_tag";
DROP TABLE IF EXISTS PUBLIC."event_journal";
DROP TABLE IF EXISTS PUBLIC."snapshot";
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE TABLE IF NOT EXISTS journal (
ordering SERIAL,
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
deleted BOOLEAN DEFAULT FALSE NOT NULL,
tags VARCHAR(255) DEFAULT NULL,
message BLOB NOT NULL,
PRIMARY KEY(persistence_id, sequence_number)
);
CREATE UNIQUE INDEX journal_ordering_idx ON journal(ordering);

CREATE TABLE IF NOT EXISTS snapshot (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
created BIGINT NOT NULL,
snapshot BLOB NOT NULL,
PRIMARY KEY (persistence_id, sequence_number)
);
50 changes: 35 additions & 15 deletions core/src/main/resources/schema/mysql/mysql-create-schema.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
CREATE TABLE IF NOT EXISTS journal (
ordering SERIAL,
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
deleted BOOLEAN DEFAULT FALSE NOT NULL,
tags VARCHAR(255) DEFAULT NULL,
message BLOB NOT NULL,
PRIMARY KEY(persistence_id, sequence_number)
CREATE TABLE IF NOT EXISTS event_journal(
ordering SERIAL,
deleted BOOLEAN DEFAULT false NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
writer TEXT NOT NULL,
write_timestamp BIGINT NOT NULL,
adapter_manifest TEXT NOT NULL,
event_payload BLOB NOT NULL,
event_ser_id INTEGER NOT NULL,
event_ser_manifest TEXT NOT NULL,
meta_payload BLOB,
meta_ser_id INTEGER,meta_ser_manifest TEXT,
PRIMARY KEY(persistence_id,sequence_number)
);
CREATE UNIQUE INDEX journal_ordering_idx ON journal(ordering);

CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering);

CREATE TABLE IF NOT EXISTS event_tag (
event_id BIGINT UNSIGNED NOT NULL,
tag VARCHAR(255) NOT NULL,
PRIMARY KEY(event_id, tag),
FOREIGN KEY (event_id)
REFERENCES event_journal(ordering)
ON DELETE CASCADE
);

CREATE TABLE IF NOT EXISTS snapshot (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
created BIGINT NOT NULL,
snapshot BLOB NOT NULL,
PRIMARY KEY (persistence_id, sequence_number)
);
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
created BIGINT NOT NULL,
snapshot_ser_id INTEGER NOT NULL,
snapshot_ser_manifest TEXT NOT NULL,
snapshot_payload BLOB NOT NULL,
meta_ser_id INTEGER,
meta_ser_manifest TEXT,
meta_payload BLOB,
PRIMARY KEY (persistence_id, sequence_number));
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE IF EXISTS journal;
DROP TABLE IF EXISTS snapshot;
3 changes: 2 additions & 1 deletion core/src/main/resources/schema/mysql/mysql-drop-schema.sql
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
DROP TABLE IF EXISTS journal;
DROP TABLE IF EXISTS event_tag;
DROP TABLE IF EXISTS event_journal;
DROP TABLE IF EXISTS snapshot;
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
CREATE SEQUENCE "ordering_seq" START WITH 1 INCREMENT BY 1 NOMAXVALUE
/

CREATE TABLE "journal" (
"ordering" NUMERIC,
"deleted" char check ("deleted" in (0,1)) NOT NULL,
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" NUMERIC NOT NULL,
"tags" VARCHAR(255) DEFAULT NULL,
"message" BLOB NOT NULL,
PRIMARY KEY("persistence_id", "sequence_number")
)
/

CREATE UNIQUE INDEX "journal_ordering_idx" ON "journal"("ordering")
/

CREATE OR REPLACE TRIGGER "ordering_seq_trigger"
BEFORE INSERT ON "journal"
FOR EACH ROW
BEGIN
SELECT "ordering_seq".NEXTVAL INTO :NEW."ordering" FROM DUAL;
END;
/

CREATE OR REPLACE PROCEDURE "reset_sequence"
IS
l_value NUMBER;
BEGIN
EXECUTE IMMEDIATE 'SELECT "ordering_seq".nextval FROM dual' INTO l_value;
EXECUTE IMMEDIATE 'ALTER SEQUENCE "ordering_seq" INCREMENT BY -' || l_value || ' MINVALUE 0';
EXECUTE IMMEDIATE 'SELECT "ordering_seq".nextval FROM dual' INTO l_value;
EXECUTE IMMEDIATE 'ALTER SEQUENCE "ordering_seq" INCREMENT BY 1 MINVALUE 0';
END;
/

CREATE TABLE "snapshot" (
"persistence_id" VARCHAR(255) NOT NULL,
"sequence_number" NUMERIC NOT NULL,
"created" NUMERIC NOT NULL,
"snapshot" BLOB NOT NULL,
PRIMARY KEY ("persistence_id", "sequence_number")
)
/
Loading

0 comments on commit 0d2f1ee

Please sign in to comment.