Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add new DB column messageHash #2202

Merged
merged 9 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions migrations/message_store/00008_updatePrimaryKey_add_col.up.sql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kinda understand what this does but SQL not my forte.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
ALTER TABLE message RENAME TO message_backup;

CREATE TABLE IF NOT EXISTS message (
pubsubTopic BLOB NOT NULL,
contentTopic BLOB NOT NULL,
payload BLOB,
version INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
id BLOB,
messageHash BLOB, -- Newly added, this will be populated with a counter value
storedAt INTEGER NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (messageHash)
) WITHOUT ROWID;


INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt)
SELECT
mb.pubsubTopic,
mb.contentTopic,
mb.payload,
mb.version,
mb.timestamp,
mb.id,
randomblob(32), -- to populate 32-byte random blob
mb.storedAt
FROM message_backup AS mb;

DROP TABLE message_backup;
11 changes: 6 additions & 5 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import
../../../waku/waku_archive,
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../testlib/wakucore

proc now():int64 = getTime().toUnix()
Expand Down Expand Up @@ -80,7 +81,7 @@ suite "Postgres driver":

let computedDigest = computeDigest(msg)

let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, msg.timestamp)
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)
assert putRes.isOk(), putRes.error

let storedMsg = (await driver.getAllMessages()).tryGet()
Expand Down Expand Up @@ -113,12 +114,12 @@ suite "Postgres driver":

let msg1 = fakeWakuMessage(contentTopic=contentTopic1)

var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), msg1.timestamp)
var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), computeMessageHash(pubsubTopic1, msg1), msg1.timestamp)
assert putRes.isOk(), putRes.error

let msg2 = fakeWakuMessage(contentTopic=contentTopic2)

putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), msg2.timestamp)
putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), computeMessageHash(pubsubTopic2, msg2), msg2.timestamp)
assert putRes.isOk(), putRes.error

let countMessagesRes = await driver.getMessagesCount()
Expand Down Expand Up @@ -197,11 +198,11 @@ suite "Postgres driver":
let msg2 = fakeWakuMessage(ts = now)

var putRes = await driver.put(DefaultPubsubTopic,
msg1, computeDigest(msg1), msg1.timestamp)
msg1, computeDigest(msg1), computeMessageHash(DefaultPubsubTopic, msg1), msg1.timestamp)
assert putRes.isOk(), putRes.error

putRes = await driver.put(DefaultPubsubTopic,
msg2, computeDigest(msg2), msg2.timestamp)
msg2, computeDigest(msg2), computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp)
require not putRes.isOk()

(await driver.close()).expect("driver to close")
63 changes: 33 additions & 30 deletions tests/waku_archive/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import
../../../waku/waku_archive,
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../testlib/common,
../testlib/wakucore


logScope:
topics = "test archive postgres driver"


## This whole file is copied from the 'test_driver_sqlite_query.nim' file
## and it tests the same use cases but using the postgres driver.

Expand Down Expand Up @@ -65,7 +68,7 @@ suite "Postgres driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -107,7 +110,7 @@ suite "Postgres driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -150,7 +153,7 @@ suite "Postgres driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -195,7 +198,7 @@ suite "Postgres driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -233,7 +236,7 @@ suite "Postgres driver - query by content topic":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand All @@ -260,7 +263,7 @@ suite "Postgres driver - query by content topic":

for t in 0..<40:
let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t))
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -306,7 +309,7 @@ suite "Postgres driver - query by pubsub topic":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -351,7 +354,7 @@ suite "Postgres driver - query by pubsub topic":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -396,7 +399,7 @@ suite "Postgres driver - query by pubsub topic":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -443,7 +446,7 @@ suite "Postgres driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])

Expand Down Expand Up @@ -488,7 +491,7 @@ suite "Postgres driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])

Expand Down Expand Up @@ -531,7 +534,7 @@ suite "Postgres driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[4])

Expand Down Expand Up @@ -575,7 +578,7 @@ suite "Postgres driver - query by cursor":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])

Expand Down Expand Up @@ -626,7 +629,7 @@ suite "Postgres driver - query by cursor":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(expected[5][0], expected[5][1])

Expand Down Expand Up @@ -678,7 +681,7 @@ suite "Postgres driver - query by cursor":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(expected[6][0], expected[6][1])

Expand Down Expand Up @@ -726,7 +729,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -768,7 +771,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -816,7 +819,7 @@ suite "Postgres driver - query by time range":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -861,7 +864,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -904,7 +907,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -949,7 +952,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

## When
let res = await driver.getMessages(
Expand Down Expand Up @@ -994,7 +997,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[3])

Expand Down Expand Up @@ -1042,7 +1045,7 @@ suite "Postgres driver - query by time range":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[6])

Expand Down Expand Up @@ -1093,7 +1096,7 @@ suite "Postgres driver - query by time range":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1])

Expand Down Expand Up @@ -1147,7 +1150,7 @@ suite "Postgres driver - query by time range":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(expected[7][0], expected[7][1])

Expand Down Expand Up @@ -1201,7 +1204,7 @@ suite "Postgres driver - query by time range":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(expected[1][0], expected[1][1])

Expand Down Expand Up @@ -1256,7 +1259,7 @@ suite "Postgres driver - query by time range":

for row in messages:
let (topic, msg) = row
require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk()

let cursor = computeTestCursor(expected[1][0], expected[1][1])

Expand Down Expand Up @@ -1306,7 +1309,7 @@ suite "Postgres driver - retention policy":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

var res = await driver.getOldestMessageTimestamp()
assert res.isOk(), res.error
Expand Down Expand Up @@ -1341,7 +1344,7 @@ suite "Postgres driver - retention policy":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

var res = await driver.getMessagesCount()
assert res.isOk(), res.error
Expand Down Expand Up @@ -1378,7 +1381,7 @@ suite "Postgres driver - retention policy":
debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload)

for msg in messages:
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk()

var res = await driver.getMessagesCount()
assert res.isOk(), res.error
Expand Down
Loading
Loading