diff --git a/migrations/message_store/00008_updatePrimaryKey_add_col.up.sql b/migrations/message_store/00008_updatePrimaryKey_add_col.up.sql new file mode 100644 index 0000000000..f933d08496 --- /dev/null +++ b/migrations/message_store/00008_updatePrimaryKey_add_col.up.sql @@ -0,0 +1,28 @@ +ALTER TABLE message RENAME TO message_backup; + +CREATE TABLE IF NOT EXISTS message ( + pubsubTopic BLOB NOT NULL, + contentTopic BLOB NOT NULL, + payload BLOB, + version INTEGER NOT NULL, + timestamp INTEGER NOT NULL, + id BLOB, + messageHash BLOB, -- Newly added, this will be populated with a counter value + storedAt INTEGER NOT NULL, + CONSTRAINT messageIndex PRIMARY KEY (messageHash) +) WITHOUT ROWID; + + +INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt) +SELECT + mb.pubsubTopic, + mb.contentTopic, + mb.payload, + mb.version, + mb.timestamp, + mb.id, + randomblob(32), -- to populate 32-byte random blob + mb.storedAt +FROM message_backup AS mb; + +DROP TABLE message_backup; \ No newline at end of file diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 4709d4588f..90dbf7eeb2 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -8,6 +8,7 @@ import ../../../waku/waku_archive, ../../../waku/waku_archive/driver/postgres_driver, ../../../waku/waku_core, + ../../../waku/waku_core/message/digest, ../testlib/wakucore proc now():int64 = getTime().toUnix() @@ -80,7 +81,7 @@ suite "Postgres driver": let computedDigest = computeDigest(msg) - let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, msg.timestamp) + let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) assert putRes.isOk(), putRes.error let storedMsg = (await driver.getAllMessages()).tryGet() @@ -113,12 +114,12 @@ suite "Postgres driver": let msg1 = fakeWakuMessage(contentTopic=contentTopic1) - var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), msg1.timestamp) + var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), computeMessageHash(pubsubTopic1, msg1), msg1.timestamp) assert putRes.isOk(), putRes.error let msg2 = fakeWakuMessage(contentTopic=contentTopic2) - putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), msg2.timestamp) + putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), computeMessageHash(pubsubTopic2, msg2), msg2.timestamp) assert putRes.isOk(), putRes.error let countMessagesRes = await driver.getMessagesCount() @@ -197,11 +198,11 @@ suite "Postgres driver": let msg2 = fakeWakuMessage(ts = now) var putRes = await driver.put(DefaultPubsubTopic, - msg1, computeDigest(msg1), msg1.timestamp) + msg1, computeDigest(msg1), computeMessageHash(DefaultPubsubTopic, msg1), msg1.timestamp) assert putRes.isOk(), putRes.error putRes = await driver.put(DefaultPubsubTopic, - msg2, computeDigest(msg2), msg2.timestamp) + msg2, computeDigest(msg2), computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp) require not putRes.isOk() (await driver.close()).expect("driver to close") diff --git a/tests/waku_archive/test_driver_postgres_query.nim b/tests/waku_archive/test_driver_postgres_query.nim index cbc1d16d91..f93c7be483 100644 --- a/tests/waku_archive/test_driver_postgres_query.nim +++ b/tests/waku_archive/test_driver_postgres_query.nim @@ -9,12 +9,15 @@ import ../../../waku/waku_archive, ../../../waku/waku_archive/driver/postgres_driver, ../../../waku/waku_core, + ../../../waku/waku_core/message/digest, ../testlib/common, ../testlib/wakucore + logScope: topics = "test archive postgres driver" + ## This whole file is copied from the 'test_driver_sqlite_query.nim' file ## and it tests the same use cases but using the postgres driver. @@ -65,7 +68,7 @@ suite "Postgres driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -107,7 +110,7 @@ suite "Postgres driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -150,7 +153,7 @@ suite "Postgres driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -195,7 +198,7 @@ suite "Postgres driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -233,7 +236,7 @@ suite "Postgres driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -260,7 +263,7 @@ suite "Postgres driver - query by content topic": for t in 0..<40: let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t)) - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -306,7 +309,7 @@ suite "Postgres driver - query by pubsub topic": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -351,7 +354,7 @@ suite "Postgres driver - query by pubsub topic": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -396,7 +399,7 @@ suite "Postgres driver - query by pubsub topic": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -443,7 +446,7 @@ suite "Postgres driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) @@ -488,7 +491,7 @@ suite "Postgres driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) @@ -531,7 +534,7 @@ suite "Postgres driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) @@ -575,7 +578,7 @@ suite "Postgres driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[6]) @@ -626,7 +629,7 @@ suite "Postgres driver - query by cursor": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[5][0], expected[5][1]) @@ -678,7 +681,7 @@ suite "Postgres driver - query by cursor": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[6][0], expected[6][1]) @@ -726,7 +729,7 @@ suite "Postgres driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -768,7 +771,7 @@ suite "Postgres driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -816,7 +819,7 @@ suite "Postgres driver - query by time range": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -861,7 +864,7 @@ suite "Postgres driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -904,7 +907,7 @@ suite "Postgres driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -949,7 +952,7 @@ suite "Postgres driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -994,7 +997,7 @@ suite "Postgres driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[3]) @@ -1042,7 +1045,7 @@ suite "Postgres driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[6]) @@ -1093,7 +1096,7 @@ suite "Postgres driver - query by time range": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1]) @@ -1147,7 +1150,7 @@ suite "Postgres driver - query by time range": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[7][0], expected[7][1]) @@ -1201,7 +1204,7 @@ suite "Postgres driver - query by time range": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[1][0], expected[1][1]) @@ -1256,7 +1259,7 @@ suite "Postgres driver - query by time range": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[1][0], expected[1][1]) @@ -1306,7 +1309,7 @@ suite "Postgres driver - retention policy": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() var res = await driver.getOldestMessageTimestamp() assert res.isOk(), res.error @@ -1341,7 +1344,7 @@ suite "Postgres driver - retention policy": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() var res = await driver.getMessagesCount() assert res.isOk(), res.error @@ -1378,7 +1381,7 @@ suite "Postgres driver - retention policy": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() var res = await driver.getMessagesCount() assert res.isOk(), res.error diff --git a/tests/waku_archive/test_driver_queue_query.nim b/tests/waku_archive/test_driver_queue_query.nim index 5f25d6d9ae..31e25422ef 100644 --- a/tests/waku_archive/test_driver_queue_query.nim +++ b/tests/waku_archive/test_driver_queue_query.nim @@ -9,6 +9,7 @@ import ../../../waku/waku_archive, ../../../waku/waku_archive/driver/queue_driver, ../../../waku/waku_core, + ../../../waku/waku_core/message/digest, ../testlib/common, ../testlib/wakucore @@ -58,7 +59,7 @@ suite "Queue driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() ## When @@ -102,7 +103,7 @@ suite "Queue driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() ## When @@ -147,7 +148,7 @@ suite "Queue driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() ## When @@ -194,7 +195,7 @@ suite "Queue driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() ## When @@ -234,7 +235,7 @@ suite "Queue driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() ## When @@ -263,7 +264,7 @@ suite "Queue driver - query by content topic": for t in 0..<40: let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t)) - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() ## When @@ -312,7 +313,7 @@ suite "SQLite driver - query by pubsub topic": for row in messages: let (topic, msg) = row - let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp) require retFut.isOk() ## When @@ -359,7 +360,7 @@ suite "SQLite driver - query by pubsub topic": for row in messages: let (topic, msg) = row - let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp) require retFut.isOk() ## When @@ -406,7 +407,7 @@ suite "SQLite driver - query by pubsub topic": for row in messages: let (topic, msg) = row - let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp) require retFut.isOk() ## When @@ -456,7 +457,7 @@ suite "Queue driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) @@ -503,7 +504,7 @@ suite "Queue driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) @@ -548,7 +549,7 @@ suite "Queue driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) @@ -594,7 +595,7 @@ suite "Queue driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[6]) @@ -647,7 +648,7 @@ suite "Queue driver - query by cursor": for row in messages: let (topic, msg) = row - let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp) require retFut.isOk() let cursor = computeTestCursor(expected[5][0], expected[5][1]) @@ -701,7 +702,7 @@ suite "Queue driver - query by cursor": for row in messages: let (topic, msg) = row - let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp) require retFut.isOk() let cursor = computeTestCursor(expected[6][0], expected[6][1]) @@ -752,7 +753,7 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() ## When @@ -796,7 +797,7 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg),computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() ## When @@ -846,7 +847,7 @@ suite "Queue driver - query by time range": for row in messages: let (topic, msg) = row - let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp) require retFut.isOk() ## When @@ -893,7 +894,7 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() ## When @@ -938,7 +939,7 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() ## When @@ -985,7 +986,7 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() ## When @@ -1032,7 +1033,7 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[3]) @@ -1082,7 +1083,7 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[6]) @@ -1135,7 +1136,7 @@ suite "Queue driver - query by time range": for row in messages: let (topic, msg) = row - let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp) require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1]) @@ -1191,7 +1192,7 @@ suite "Queue driver - query by time range": for row in messages: let (topic, msg) = row - let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp) require retFut.isOk() let cursor = computeTestCursor(expected[7][0], expected[7][1]) @@ -1247,7 +1248,7 @@ suite "Queue driver - query by time range": for row in messages: let (topic, msg) = row - let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp) require retFut.isOk() let cursor = computeTestCursor(expected[1][0], expected[1][1]) @@ -1304,7 +1305,7 @@ suite "Queue driver - query by time range": for row in messages: let (topic, msg) = row - let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp) require retFut.isOk() let cursor = computeTestCursor(expected[1][0], expected[1][1]) diff --git a/tests/waku_archive/test_driver_sqlite.nim b/tests/waku_archive/test_driver_sqlite.nim index 7c1bd2e1b7..342f995653 100644 --- a/tests/waku_archive/test_driver_sqlite.nim +++ b/tests/waku_archive/test_driver_sqlite.nim @@ -50,7 +50,7 @@ suite "SQLite driver": let msg = fakeWakuMessage(contentTopic=contentTopic) ## When - let putRes = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let putRes = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp) ## Then check: diff --git a/tests/waku_archive/test_driver_sqlite_query.nim b/tests/waku_archive/test_driver_sqlite_query.nim index 9cf21cfca7..d63ec7c75c 100644 --- a/tests/waku_archive/test_driver_sqlite_query.nim +++ b/tests/waku_archive/test_driver_sqlite_query.nim @@ -10,6 +10,7 @@ import ../../../waku/waku_archive, ../../../waku/waku_archive/driver/sqlite_driver, ../../../waku/waku_core, + ../../../waku/waku_core/message/digest, ../testlib/common, ../testlib/wakucore @@ -62,7 +63,7 @@ suite "SQLite driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -105,7 +106,7 @@ suite "SQLite driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -149,7 +150,7 @@ suite "SQLite driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -195,7 +196,7 @@ suite "SQLite driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -234,7 +235,7 @@ suite "SQLite driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -262,7 +263,7 @@ suite "SQLite driver - query by content topic": for t in 0..<40: let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t)) - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -310,7 +311,7 @@ suite "SQLite driver - query by pubsub topic": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -356,7 +357,7 @@ suite "SQLite driver - query by pubsub topic": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -402,7 +403,7 @@ suite "SQLite driver - query by pubsub topic": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -451,7 +452,7 @@ suite "SQLite driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) @@ -497,7 +498,7 @@ suite "SQLite driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) @@ -541,7 +542,7 @@ suite "SQLite driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) @@ -586,7 +587,7 @@ suite "SQLite driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[6]) @@ -638,7 +639,7 @@ suite "SQLite driver - query by cursor": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[5][0], expected[5][1]) @@ -691,7 +692,7 @@ suite "SQLite driver - query by cursor": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[6][0], expected[6][1]) @@ -741,7 +742,7 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -784,7 +785,7 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -833,7 +834,7 @@ suite "SQLite driver - query by time range": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -879,7 +880,7 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -923,7 +924,7 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -969,7 +970,7 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## When let res = await driver.getMessages( @@ -1015,7 +1016,7 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[3]) @@ -1064,7 +1065,7 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[6]) @@ -1116,7 +1117,7 @@ suite "SQLite driver - query by time range": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1]) @@ -1171,7 +1172,7 @@ suite "SQLite driver - query by time range": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[7][0], expected[7][1]) @@ -1226,7 +1227,7 @@ suite "SQLite driver - query by time range": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[1][0], expected[1][1]) @@ -1282,7 +1283,7 @@ suite "SQLite driver - query by time range": for row in messages: let (topic, msg) = row - require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (await driver.put(topic, msg, computeDigest(msg), computeMessageHash(topic, msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[1][0], expected[1][1]) diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 2aa3866323..fbbcca4218 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -8,6 +8,7 @@ import import ../../../waku/common/databases/db_sqlite, ../../../waku/waku_core, + ../../../waku/waku_core/message/digest, ../../../waku/waku_archive, ../../../waku/waku_archive/driver/sqlite_driver, ../../../waku/waku_archive/retention_policy, @@ -41,7 +42,7 @@ suite "Waku Archive - Retention policy": ## When for i in 1..capacity+excess: let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) - putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)) + putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)) discard waitFor allFinished(putFutures) @@ -79,11 +80,12 @@ suite "Waku Archive - Retention policy": require (waitFor driver.performVacuum()).isOk() ## When + ## # create a number of messages so that the size of the DB overshoots for i in 1..excess: let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) - putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)) + putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)) # waitFor is used to synchronously wait for the futures to complete. discard waitFor allFinished(putFutures) @@ -137,7 +139,7 @@ suite "Waku Archive - Retention policy": ## When for msg in messages: - require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() require (waitFor retentionPolicy.execute(driver)).isOk() ## Then diff --git a/tests/waku_archive/test_waku_archive.nim b/tests/waku_archive/test_waku_archive.nim index dcac1484b4..eb9c4e3410 100644 --- a/tests/waku_archive/test_waku_archive.nim +++ b/tests/waku_archive/test_waku_archive.nim @@ -9,6 +9,7 @@ import import ../../../waku/common/databases/db_sqlite, ../../../waku/waku_core, + ../../../waku/waku_core/message/digest, ../../../waku/waku_archive/driver/sqlite_driver, ../../../waku/waku_archive, ../testlib/common, @@ -152,7 +153,7 @@ procSuite "Waku Archive - find messages": archive = newTestWakuArchive(driver) for msg in msgListA: - require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() archive @@ -446,7 +447,7 @@ procSuite "Waku Archive - find messages": ] for msg in msgList: - require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)).isOk() ## Given let req = ArchiveQuery(contentTopics: @[DefaultContentTopic]) diff --git a/tests/waku_core/test_message_digest.nim b/tests/waku_core/test_message_digest.nim index 673d4adbf1..64f6696984 100644 --- a/tests/waku_core/test_message_digest.nim +++ b/tests/waku_core/test_message_digest.nim @@ -32,7 +32,7 @@ suite "Waku Message - Deterministic hashing": ) ## When - let messageHash = digest(pubsubTopic, message) + let messageHash = computeMessageHash(pubsubTopic, message) ## Then check: @@ -64,7 +64,7 @@ suite "Waku Message - Deterministic hashing": ) ## When - let messageHash = digest(pubsubTopic, message) + let messageHash = computeMessageHash(pubsubTopic, message) ## Then check: @@ -96,7 +96,7 @@ suite "Waku Message - Deterministic hashing": ) ## When - let messageHash = digest(pubsubTopic, message) + let messageHash = computeMessageHash(pubsubTopic, message) ## Then check: @@ -128,7 +128,7 @@ suite "Waku Message - Deterministic hashing": ) ## When - let messageHash = digest(pubsubTopic, message) + let messageHash = computeMessageHash(pubsubTopic, message) ## Then check: diff --git a/tests/waku_store/test_resume.nim b/tests/waku_store/test_resume.nim index ea918ce3c3..d858158a45 100644 --- a/tests/waku_store/test_resume.nim +++ b/tests/waku_store/test_resume.nim @@ -8,9 +8,11 @@ import libp2p/crypto/crypto import ../../waku/common/databases/db_sqlite, - ../../waku/node/message_store/sqlite_store, + ../../waku/waku_archive/driver, + ../../waku/waku_archive/driver/sqlite_driver/sqlite_driver, ../../waku/node/peer_manager, ../../waku/waku_core, + ../../waku/waku_core/message/digest, ../../waku/waku_store, ./testlib/common, ./testlib/switch @@ -19,12 +21,12 @@ import proc newTestDatabase(): SqliteDatabase = SqliteDatabase.new("memory:").tryGet() -proc newTestArchiveDriver(): ArchiveDriver = +proc newTestArchiveDriver(): ArchiveDriverResult = let database = SqliteDatabase.new(":memory:").tryGet() SqliteDriver.init(database).tryGet() -proc newTestWakuStore(switch: Switch, store=newTestMessageStore()): Future[WakuStore] {.async.} = +proc newTestWakuStore(switch: Switch, store: MessageStore = nil): Future[WakuStore] {.async.} = let peerManager = PeerManager.new(switch) proto = WakuStore.init(peerManager, rng, store) @@ -58,7 +60,7 @@ procSuite "Waku Store - resume store": ] for msg in msgList: - require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp).isOk() store @@ -76,7 +78,7 @@ procSuite "Waku Store - resume store": ] for msg in msgList2: - require store.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require store.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp).isOk() store diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index 2eff88e227..ae489fc6a4 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -15,6 +15,7 @@ import import ../../../waku/common/databases/db_sqlite, ../../../waku/waku_core, + ../../../waku/waku_core/message/digest, ../../../waku/node/peer_manager, ../../../waku/waku_archive, ../../../waku/waku_archive/driver/sqlite_driver, @@ -58,7 +59,8 @@ procSuite "WakuNode - Store": for msg in msgListA: let msg_digest = waku_archive.computeDigest(msg) - require (waitFor driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp)).isOk() + let msg_hash = computeMessageHash(DefaultPubsubTopic, msg) + require (waitFor driver.put(DefaultPubsubTopic, msg, msg_digest, msg_hash, msg.timestamp)).isOk() driver diff --git a/tests/wakunode_jsonrpc/test_jsonrpc_store.nim b/tests/wakunode_jsonrpc/test_jsonrpc_store.nim index de04f83f0a..fe22f8ac18 100644 --- a/tests/wakunode_jsonrpc/test_jsonrpc_store.nim +++ b/tests/wakunode_jsonrpc/test_jsonrpc_store.nim @@ -9,6 +9,7 @@ import json_rpc/[rpcserver, rpcclient] import ../../../waku/waku_core, + ../../../waku/waku_core/message/digest, ../../../waku/node/peer_manager, ../../../waku/waku_node, ../../../waku/waku_api/jsonrpc/store/handlers as store_api, @@ -25,10 +26,11 @@ import proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] = let digest = waku_archive.computeDigest(message) + msgHash = computeMessageHash(pubsubTopic, message) receivedTime = if message.timestamp > 0: message.timestamp else: getNanosecondTime(getTime().toUnixFloat()) - store.put(pubsubTopic, message, digest, receivedTime) + store.put(pubsubTopic, message, digest, msgHash, receivedTime) procSuite "Waku v2 JSON-RPC API - Store": diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index 64ad5c31e7..ef8fc745d2 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -10,6 +10,7 @@ import libp2p/crypto/crypto import ../../../waku/waku_core/message, + ../../../waku/waku_core/message/digest, ../../../waku/waku_core/topics, ../../../waku/waku_core/time, ../../../waku/waku_node, @@ -33,10 +34,11 @@ logScope: proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] = let digest = waku_archive.computeDigest(message) + msgHash = computeMessageHash(pubsubTopic, message) receivedTime = if message.timestamp > 0: message.timestamp else: getNanosecondTime(getTime().toUnixFloat()) - store.put(pubsubTopic, message, digest, receivedTime) + store.put(pubsubTopic, message, digest, msgHash, receivedTime) # Creates a new WakuNode proc testWakuNode(): WakuNode = diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index e09d1a27a0..82f72f8e5a 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -210,7 +210,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = trace "waku.relay received", peerId=node.peerId, pubsubTopic=topic, - hash=topic.digest(msg).to0xHex(), + hash=topic.computeMessageHash(msg).to0xHex(), receivedTime=getNowInNanosecondTime(), payloadSizeBytes=msg.payload.len @@ -339,7 +339,7 @@ proc publish*( trace "waku.relay published", peerId=node.peerId, pubsubTopic=pubsubTopic, - hash=pubsubTopic.digest(message).to0xHex(), + hash=pubsubTopic.computeMessageHash(message).to0xHex(), publishTime=getNowInNanosecondTime() proc startRelay*(node: WakuNode) {.async.} = diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index b6e8aad3fa..fef44efb02 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -18,6 +18,7 @@ import ./retention_policy/retention_policy_capacity, ./retention_policy/retention_policy_time, ../waku_core, + ../waku_core/message/digest, ./common, ./archive_metrics @@ -101,12 +102,13 @@ proc handleMessage*(w: WakuArchive, block: let msgDigest = computeDigest(msg) + msgHash = computeMessageHash(pubsubTopic, msg) msgReceivedTime = if msg.timestamp > 0: msg.timestamp else: getNanosecondTime(getTime().toUnixFloat()) - trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest + trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest, messageHash=msgHash - let putRes = await w.driver.put(pubsubTopic, msg, msgDigest, msgReceivedTime) + let putRes = await w.driver.put(pubsubTopic, msg, msgDigest, msgHash, msgReceivedTime) if putRes.isErr(): error "failed to insert message", err=putRes.error waku_archive_errors.inc(labelValues = [insertFailure]) diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index 64c6b3318a..38cffb372f 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -26,6 +26,7 @@ method put*(driver: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, + messageHash: WakuMessageHash, receivedTime: Timestamp): Future[ArchiveDriverResult[void]] {.base, async.} = discard diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 9ecc1bd3b9..e41f85b77c 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -35,15 +35,16 @@ proc createTableQuery(): string = " version INTEGER NOT NULL," & " timestamp BIGINT NOT NULL," & " id VARCHAR NOT NULL," & + " messageHash VARCHAR NOT NULL," & " storedAt BIGINT NOT NULL," & - " CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" & + " CONSTRAINT messageIndex PRIMARY KEY (messageHash)" & ");" const InsertRowStmtName = "InsertRow" const InsertRowStmtDefinition = # TODO: get the sql queries from a file - """INSERT INTO messages (id, storedAt, contentTopic, payload, pubsubTopic, - version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);""" + """INSERT INTO messages (id, messageHash, storedAt, contentTopic, payload, pubsubTopic, + version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8);""" const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc" const SelectNoCursorAscStmtDef = @@ -186,10 +187,12 @@ method put*(s: PostgresDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, + messageHash: WakuMessageHash, receivedTime: Timestamp): Future[ArchiveDriverResult[void]] {.async.} = let digest = toHex(digest.data) + let messageHash = toHex(messageHash) let rxTime = $receivedTime let contentTopic = message.contentTopic let payload = toHex(message.payload) @@ -199,6 +202,7 @@ method put*(s: PostgresDriver, return await s.writeConnPool.runStmt(InsertRowStmtName, InsertRowStmtDefinition, @[digest, + messageHash, rxTime, contentTopic, payload, @@ -206,6 +210,7 @@ method put*(s: PostgresDriver, version, timestamp], @[int32(digest.len), + int32(messageHash.len), int32(rxTime.len), int32(contentTopic.len), int32(payload.len), @@ -213,7 +218,7 @@ method put*(s: PostgresDriver, int32(version.len), int32(timestamp.len)], @[int32(0), int32(0), int32(0), int32(0), - int32(0), int32(0), int32(0)]) + int32(0), int32(0), int32(0), int32(0)]) method getAllMessages*(s: PostgresDriver): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index cce7d895bf..948521ccbb 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -228,6 +228,7 @@ method put*(driver: QueueDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, + messageHash: WakuMessageHash, receivedTime: Timestamp): Future[ArchiveDriverResult[void]] {.async.} = let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest) diff --git a/waku/waku_archive/driver/sqlite_driver/migrations.nim b/waku/waku_archive/driver/sqlite_driver/migrations.nim index 0aa925fda1..4f7fea7bde 100644 --- a/waku/waku_archive/driver/sqlite_driver/migrations.nim +++ b/waku/waku_archive/driver/sqlite_driver/migrations.nim @@ -14,7 +14,7 @@ logScope: topics = "waku archive migration" -const SchemaVersion* = 7 # increase this when there is an update in the database schema +const SchemaVersion* = 8 # increase this when there is an update in the database schema template projectRoot: string = currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".." const MessageStoreMigrationPath: string = projectRoot / "migrations" / "message_store" diff --git a/waku/waku_archive/driver/sqlite_driver/queries.nim b/waku/waku_archive/driver/sqlite_driver/queries.nim index 27ab4de618..e03246c5eb 100644 --- a/waku/waku_archive/driver/sqlite_driver/queries.nim +++ b/waku/waku_archive/driver/sqlite_driver/queries.nim @@ -71,8 +71,9 @@ proc createTableQuery(table: string): SqlQueryStr = " version INTEGER NOT NULL," & " timestamp INTEGER NOT NULL," & " id BLOB," & + " messageHash BLOB," & " storedAt INTEGER NOT NULL," & - " CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" & + " CONSTRAINT messageIndex PRIMARY KEY (messageHash)" & ") WITHOUT ROWID;" proc createTable*(db: SqliteDatabase): DatabaseResult[void] = @@ -102,17 +103,16 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = ## Insert message -type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) +type InsertMessageParams* = (seq[byte], seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) proc insertMessageQuery(table: string): SqlQueryStr = - "INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & - " VALUES (?, ?, ?, ?, ?, ?, ?);" + "INSERT INTO " & table & "(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & + " VALUES (?, ?, ?, ?, ?, ?, ?, ?);" proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] = let query = insertMessageQuery(DbTable) db.prepareStmt(query, InsertMessageParams, void).expect("this is a valid statement") - ## Count table messages proc countMessagesQuery(table: string): SqlQueryStr = diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 53da379b1a..a6039d4ece 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -13,6 +13,7 @@ import import ../../../common/databases/db_sqlite, ../../../waku_core, + ../../../waku_core/message/digest, ../../common, ../../driver, ./cursor, @@ -61,11 +62,13 @@ method put*(s: SqliteDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, + messageHash: WakuMessageHash, receivedTime: Timestamp): Future[ArchiveDriverResult[void]] {.async.} = ## Inserts a message into the store let res = s.insertStmt.exec(( @(digest.data), # id + @(messageHash), # messageHash receivedTime, # storedAt toBytes(message.contentTopic), # contentTopic message.payload, # payload diff --git a/waku/waku_core/message/digest.nim b/waku/waku_core/message/digest.nim index 9db0542e18..953be4f880 100644 --- a/waku/waku_core/message/digest.nim +++ b/waku/waku_core/message/digest.nim @@ -17,17 +17,16 @@ import ## 14/WAKU2-MESSAGE: Deterministic message hashing ## https://rfc.vac.dev/spec/14/#deterministic-message-hashing -type WakuMessageDigest* = array[32, byte] +type WakuMessageHash* = array[32, byte] -converter toBytesArray*(digest: MDigest[256]): WakuMessageDigest = +converter toBytesArray*(digest: MDigest[256]): WakuMessageHash = digest.data converter toBytes*(digest: MDigest[256]): seq[byte] = toSeq(digest.data) - -proc digest*(pubsubTopic: PubsubTopic, msg: WakuMessage): WakuMessageDigest = +proc computeMessageHash*(pubsubTopic: PubsubTopic, msg: WakuMessage): WakuMessageHash = var ctx: sha256 ctx.init() defer: ctx.clear() diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 5a505960b2..3af3943365 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -163,7 +163,7 @@ proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = await conn.get().writeLp(buffer) proc pushToPeers(wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush) {.async.} = - debug "pushing message to subscribed peers", pubsubTopic=messagePush.pubsubTopic, contentTopic=messagePush.wakuMessage.contentTopic, peers=peers, hash=messagePush.pubsubTopic.digest(messagePush.wakuMessage).to0xHex() + debug "pushing message to subscribed peers", pubsubTopic=messagePush.pubsubTopic, contentTopic=messagePush.wakuMessage.contentTopic, peers=peers, hash=messagePush.pubsubTopic.computeMessageHash(messagePush.wakuMessage).to0xHex() let bufferToPublish = messagePush.encode().buffer @@ -210,10 +210,10 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessa wakuMessage: message) if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(MessagePushTimeout): - debug "timed out pushing message to peers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.digest(message).to0xHex() + debug "timed out pushing message to peers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.computeMessageHash(message).to0xHex() waku_filter_errors.inc(labelValues = [pushTimeoutFailure]) else: - debug "pushed message succesfully to all subscribers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.digest(message).to0xHex() + debug "pushed message succesfully to all subscribers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic, hash=pubsubTopic.computeMessageHash(message).to0xHex() let diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 74f51fe7a2..735c01cf57 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -21,6 +21,7 @@ import when defined(waku_exp_store_resume): import std/[sequtils, times] import ../waku_archive + import ../waku_core/message/digest logScope: @@ -154,10 +155,11 @@ when defined(waku_exp_store_resume): proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] = let digest = waku_archive.computeDigest(message) + messageHash = computeMessageHash(pubsubTopic, message) receivedTime = if message.timestamp > 0: message.timestamp else: getNanosecondTime(getTime().toUnixFloat()) - store.put(pubsubTopic, message, digest, receivedTime) + store.put(pubsubTopic, message, digest, messageHash, receivedTime) proc resume*(w: WakuStoreClient, peerList = none(seq[RemotePeerInfo]),