diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index 149c43aa05..f64925dcbc 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -87,7 +87,7 @@ suite "Postgres driver": require: storedMsg.len == 1 storedMsg.all do (item: auto) -> bool: - let (pubsubTopic, actualMsg, digest, storeTimestamp) = item + let (pubsubTopic, actualMsg, digest, messageHash, storeTimestamp) = item actualMsg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and toHex(computedDigest.data) == toHex(digest) and diff --git a/tests/waku_archive/test_driver_sqlite.nim b/tests/waku_archive/test_driver_sqlite.nim index 0fae560e03..24699826e2 100644 --- a/tests/waku_archive/test_driver_sqlite.nim +++ b/tests/waku_archive/test_driver_sqlite.nim @@ -60,7 +60,7 @@ suite "SQLite driver": check: storedMsg.len == 1 storedMsg.all do (item: auto) -> bool: - let (pubsubTopic, msg, digest, storeTimestamp) = item + let (pubsubTopic, msg, digest, messageHash, storeTimestamp) = item msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic diff --git a/tests/waku_archive/test_driver_sqlite_query.nim b/tests/waku_archive/test_driver_sqlite_query.nim index 2ed20f0fac..7b3c224a05 100644 --- a/tests/waku_archive/test_driver_sqlite_query.nim +++ b/tests/waku_archive/test_driver_sqlite_query.nim @@ -423,6 +423,50 @@ suite "SQLite driver - query by pubsub topic": ## Cleanup (await driver.close()).expect("driver to close") + + asyncTest "pubSubTopic messageHash match": + ## Given + const pubsubTopic1 = "test-pubsub-topic1" + const pubsubTopic2 = "test-pubsub-topic2" + # take 2 variables to hold the message hashes + var msgHash1: seq[byte] + var msgHash2: seq[byte] + + let driver = newTestSqliteDriver() + var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() + + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(1)) + putFutures.add(driver.put(pubsubTopic1, msg1, computeDigest(msg1, pubsubTopic1), msg1.timestamp)) + + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(2)) + putFutures.add(driver.put(pubsubTopic2, msg2, computeDigest(msg2, pubsubTopic2), msg2.timestamp)) + + discard waitFor allFinished(putFutures) + + # get the messages from the database + let storedMsg = (waitFor driver.getAllMessages()).tryGet() + + check: + # there needs to be two messages + storedMsg.len > 0 + storedMsg.len == 2 + + # get the individual messages and message hash values + @[storedMsg[0]].all do (item1: auto) -> bool: + let (gotPubsubTopic1, gotMsg1, digest1, messageHash1, timestamp1) = item1 + msgHash1 = messageHash1 + true + + @[storedMsg[1]].all do (item2: auto) -> bool: + let (gotPubsubTopic2, gotMsg2, digest2, messageHash2, timestamp2) = item2 + msgHash2 = messageHash2 + true + + # compare of the messge hashes, given the context, they should be different + msgHash1 != msgHash2 + + ## Cleanup + (await driver.close()).expect("driver to close") suite "SQLite driver - query by cursor": diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 1e6bab4614..f844b24fc2 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -147,7 +147,7 @@ suite "Waku Archive - Retention policy": check: storedMsg.len == capacity storedMsg.all do (item: auto) -> bool: - let (pubsubTopic, msg, digest, storeTimestamp) = item + let (pubsubTopic, msg, digest, messageHash, storeTimestamp) = item msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 8d236caf7e..b8f3e2777f 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -163,7 +163,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] { ## Build last message cursor ## The cursor is built from the last message INCLUDED in the response ## (i.e. the second last message in the rows list) - let (pubsubTopic, message, digest, storeTimestamp) = rows[^2] + let (pubsubTopic, message, digest, messageHash, storeTimestamp) = rows[^2] # TODO: Improve coherence of MessageDigest type let messageDigest = block: diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index 64c6b3318a..ed3e848859 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -18,7 +18,7 @@ type ArchiveDriver* = ref object of RootObj OnErrHandler* = proc(errMsg: string) {.gcsafe, closure.} -type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp) +type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], seq[byte], Timestamp) # ArchiveDriver interface diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 775b348778..90d56a5c0e 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -138,6 +138,7 @@ proc toArchiveRow(r: Row): ArchiveDriverResult[ArchiveRow] = return ok((pubSubTopic, wakuMessage, @(digest.toOpenArrayByte(0, digest.high)), + @(digest.toOpenArrayByte(0, digest.high)), storedAt)) method getAllMessages*(s: PostgresDriver): diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index cce7d895bf..31cc259620 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -138,7 +138,7 @@ proc getPage(driver: QueueDriver, numberOfItems += 1 - outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime)) + outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), @(key.digest.data), key.receiverTime)) currentEntry = if forward: w.next() else: w.prev() diff --git a/waku/waku_archive/driver/sqlite_driver/queries.nim b/waku/waku_archive/driver/sqlite_driver/queries.nim index 27ab4de618..de0f513c10 100644 --- a/waku/waku_archive/driver/sqlite_driver/queries.nim +++ b/waku/waku_archive/driver/sqlite_driver/queries.nim @@ -71,8 +71,9 @@ proc createTableQuery(table: string): SqlQueryStr = " version INTEGER NOT NULL," & " timestamp INTEGER NOT NULL," & " id BLOB," & + " messageHash BLOB NOT NULL,"& " storedAt INTEGER NOT NULL," & - " CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" & + " CONSTRAINT messageIndex PRIMARY KEY (storedAt, messageHash)" & ") WITHOUT ROWID;" proc createTable*(db: SqliteDatabase): DatabaseResult[void] = @@ -102,11 +103,11 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = ## Insert message -type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) +type InsertMessageParams* = (seq[byte], seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) proc insertMessageQuery(table: string): SqlQueryStr = - "INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & - " VALUES (?, ?, ?, ?, ?, ?, ?);" + "INSERT INTO " & table & "(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & + " VALUES (?, ?, ?, ?, ?, ?, ?, ?);" proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] = let query = insertMessageQuery(DbTable) @@ -197,24 +198,26 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int): ## Select all messages proc selectAllMessagesQuery(table: string): SqlQueryStr = - "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" & + "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" & " FROM " & table & " ORDER BY storedAt ASC" proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], + seq[byte], Timestamp)]] = ## Retrieve all messages from the store. - var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] + var rows: seq[(PubsubTopic, WakuMessage, seq[byte], seq[byte], Timestamp)] proc queryRowCallback(s: ptr sqlite3_stmt) = let pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) digest = queryRowDigestCallback(s, digestCol=6) + messageHash = queryRowDigestCallback(s, digestCol=7) storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) - rows.add((pubsubTopic, wakuMessage, digest, storedAt)) + rows.add((pubsubTopic, wakuMessage, digest, messageHash, storedAt)) let query = selectAllMessagesQuery(DbTable) let res = db.query(query, queryRowCallback) @@ -280,7 +283,7 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u var query: string - query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" + query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" query &= " FROM " & table if where.isSome(): @@ -361,18 +364,20 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase, DatabaseResult[seq[(PubsubTopic, WakuMessage, seq[byte], + seq[byte], Timestamp)]] = - var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] = @[] + var messages: seq[(PubsubTopic, WakuMessage, seq[byte], seq[byte], Timestamp)] = @[] proc queryRowCallback(s: ptr sqlite3_stmt) = let pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3) message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5) digest = queryRowDigestCallback(s, digestCol=6) + messageHash = queryRowDigestCallback(s, digestCol=7) storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0) - messages.add((pubsubTopic, message, digest, storedAt)) + messages.add((pubsubTopic, message, digest, messageHash, storedAt)) let query = block: let where = whereClause(cursor, pubsubTopic, contentTopic, startTime, endTime, ascending) diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 53da379b1a..6d1830ed9f 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -66,6 +66,7 @@ method put*(s: SqliteDriver, ## Inserts a message into the store let res = s.insertStmt.exec(( @(digest.data), # id + @(digest.data), # messageHash receivedTime, # storedAt toBytes(message.contentTopic), # contentTopic message.payload, # payload