Skip to content

Commit

Permalink
Merge 3b1aa4d into a22ee60
Browse files Browse the repository at this point in the history
  • Loading branch information
ABresting authored Nov 22, 2023
2 parents a22ee60 + 3b1aa4d commit 104a717
Show file tree
Hide file tree
Showing 24 changed files with 187 additions and 129 deletions.
28 changes: 28 additions & 0 deletions migrations/message_store/00008_updatePrimaryKey_add_col.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
ALTER TABLE message RENAME TO message_backup;

CREATE TABLE IF NOT EXISTS message (
pubsubTopic BLOB NOT NULL,
contentTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
id BLOB,
messageHash BLOB, -- Newly added, this will be populated with a counter value
storedAt INTEGER NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (messageHash)
) WITHOUT ROWID;


INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt)
SELECT
mb.pubsubTopic,
mb.contentTopic,
mb.payload,
mb.version,
mb.timestamp,
mb.id,
randomblob(32), -- to populate 32-byte random blob
mb.storedAt
FROM message_backup AS mb;

DROP TABLE message_backup;
11 changes: 6 additions & 5 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import
../../../waku/waku_archive,
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../testlib/wakucore

proc now():int64 = getTime().toUnix()
Expand Down Expand Up @@ -80,7 +81,7 @@ suite "Postgres driver":

let computedDigest = computeDigest(msg)

let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, msg.timestamp)
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
assert putRes.isOk(), putRes.error

let storedMsg = (await driver.getAllMessages()).tryGet()
Expand Down Expand Up @@ -113,12 +114,12 @@ suite "Postgres driver":

let msg1 = fakeWakuMessage(contentTopic=contentTopic1)

var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), msg1.timestamp)
var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), computeMessageHash(pubsubTopic1, msg1), msg1.timestamp)
assert putRes.isOk(), putRes.error

let msg2 = fakeWakuMessage(contentTopic=contentTopic2)

putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), msg2.timestamp)
putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), computeMessageHash(pubsubTopic2, msg2), msg2.timestamp)
assert putRes.isOk(), putRes.error

let countMessagesRes = await driver.getMessagesCount()
Expand Down Expand Up @@ -197,11 +198,11 @@ suite "Postgres driver":
let msg2 = fakeWakuMessage(ts = now)

var putRes = await driver.put(DefaultPubsubTopic,
msg1, computeDigest(msg1), msg1.timestamp)
msg1, computeDigest(msg1), computeMessageHash(DefaultPubsubTopic, msg1), msg1.timestamp)
assert putRes.isOk(), putRes.error

putRes = await driver.put(DefaultPubsubTopic,
msg2, computeDigest(msg2), msg2.timestamp)
msg2, computeDigest(msg2), computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp)
require not putRes.isOk()

(await driver.close()).expect("driver to close")
63 changes: 33 additions & 30 deletions tests/waku_archive/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import
../../../waku/waku_archive,
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../testlib/common,
../testlib/wakucore


logScope:
topics = "test archive postgres driver"


## This whole file is copied from the 'test_driver_sqlite_query.nim' file
## and it tests the same use cases but using the postgres driver.

Expand Down Expand Up @@ -65,7 +68,7 @@ suite "Postgres driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -107,7 +110,7 @@ suite "Postgres driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -150,7 +153,7 @@ suite "Postgres driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -195,7 +198,7 @@ suite "Postgres driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -233,7 +236,7 @@ suite "Postgres driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand All @@ -260,7 +263,7 @@ suite "Postgres driver - query by content topic":

for t in 0..<40:
let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t))
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -306,7 +309,7 @@ suite "Postgres driver - query by pubsub topic":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -351,7 +354,7 @@ suite "Postgres driver - query by pubsub topic":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -396,7 +399,7 @@ suite "Postgres driver - query by pubsub topic":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -443,7 +446,7 @@ suite "Postgres driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])

Expand Down Expand Up @@ -488,7 +491,7 @@ suite "Postgres driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])

Expand Down Expand Up @@ -531,7 +534,7 @@ suite "Postgres driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])

Expand Down Expand Up @@ -575,7 +578,7 @@ suite "Postgres driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])

Expand Down Expand Up @@ -626,7 +629,7 @@ suite "Postgres driver - query by cursor":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(expected[5][0], expected[5][1])

Expand Down Expand Up @@ -678,7 +681,7 @@ suite "Postgres driver - query by cursor":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(expected[6][0], expected[6][1])

Expand Down Expand Up @@ -726,7 +729,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -768,7 +771,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -816,7 +819,7 @@ suite "Postgres driver - query by time range":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -861,7 +864,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -904,7 +907,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -949,7 +952,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -994,7 +997,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[3])

Expand Down Expand Up @@ -1042,7 +1045,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])

Expand Down Expand Up @@ -1093,7 +1096,7 @@ suite "Postgres driver - query by time range":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1])

Expand Down Expand Up @@ -1147,7 +1150,7 @@ suite "Postgres driver - query by time range":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(expected[7][0], expected[7][1])

Expand Down Expand Up @@ -1201,7 +1204,7 @@ suite "Postgres driver - query by time range":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(expected[1][0], expected[1][1])

Expand Down Expand Up @@ -1256,7 +1259,7 @@ suite "Postgres driver - query by time range":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(expected[1][0], expected[1][1])

Expand Down Expand Up @@ -1306,7 +1309,7 @@ suite "Postgres driver - retention policy":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

var res = await driver.getOldestMessageTimestamp()
assert res.isOk(), res.error
Expand Down Expand Up @@ -1341,7 +1344,7 @@ suite "Postgres driver - retention policy":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

var res = await driver.getMessagesCount()
assert res.isOk(), res.error
Expand Down Expand Up @@ -1378,7 +1381,7 @@ suite "Postgres driver - retention policy":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

var res = await driver.getMessagesCount()
assert res.isOk(), res.error
Expand Down
Loading

0 comments on commit 104a717

Please sign in to comment.