Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: messageHash attribute added in SQLite + testcase #2142

Merged
merged 2 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ suite "Postgres driver":
require:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, actualMsg, digest, storeTimestamp) = item
let (pubsubTopic, actualMsg, digest, messageHash, storeTimestamp) = item
actualMsg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic and
toHex(computedDigest.data) == toHex(digest) and
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_archive/test_driver_sqlite.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ suite "SQLite driver":
check:
storedMsg.len == 1
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, msg, digest, storeTimestamp) = item
let (pubsubTopic, msg, digest, messageHash, storeTimestamp) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic

Expand Down
44 changes: 44 additions & 0 deletions tests/waku_archive/test_driver_sqlite_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,50 @@ suite "SQLite driver - query by pubsub topic":

## Cleanup
(await driver.close()).expect("driver to close")

asyncTest "pubSubTopic messageHash match":
## Given
const pubsubTopic1 = "test-pubsub-topic1"
const pubsubTopic2 = "test-pubsub-topic2"
# take 2 variables to hold the message hashes
var msgHash1: seq[byte]
var msgHash2: seq[byte]

let driver = newTestSqliteDriver()
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()

let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(1))
putFutures.add(driver.put(pubsubTopic1, msg1, computeDigest(msg1, pubsubTopic1), msg1.timestamp))

let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic, ts=Timestamp(2))
putFutures.add(driver.put(pubsubTopic2, msg2, computeDigest(msg2, pubsubTopic2), msg2.timestamp))

discard waitFor allFinished(putFutures)

# get the messages from the database
let storedMsg = (waitFor driver.getAllMessages()).tryGet()

check:
# there needs to be two messages
storedMsg.len > 0
storedMsg.len == 2

# get the individual messages and message hash values
@[storedMsg[0]].all do (item1: auto) -> bool:
let (gotPubsubTopic1, gotMsg1, digest1, messageHash1, timestamp1) = item1
msgHash1 = messageHash1
true

@[storedMsg[1]].all do (item2: auto) -> bool:
let (gotPubsubTopic2, gotMsg2, digest2, messageHash2, timestamp2) = item2
msgHash2 = messageHash2
true

# compare of the messge hashes, given the context, they should be different
msgHash1 != msgHash2

## Cleanup
(await driver.close()).expect("driver to close")


suite "SQLite driver - query by cursor":
Expand Down
2 changes: 1 addition & 1 deletion tests/waku_archive/test_retention_policy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ suite "Waku Archive - Retention policy":
check:
storedMsg.len == capacity
storedMsg.all do (item: auto) -> bool:
let (pubsubTopic, msg, digest, storeTimestamp) = item
let (pubsubTopic, msg, digest, messageHash, storeTimestamp) = item
msg.contentTopic == contentTopic and
pubsubTopic == DefaultPubsubTopic

Expand Down
2 changes: 1 addition & 1 deletion waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {
## Build last message cursor
## The cursor is built from the last message INCLUDED in the response
## (i.e. the second last message in the rows list)
let (pubsubTopic, message, digest, storeTimestamp) = rows[^2]
let (pubsubTopic, message, digest, messageHash, storeTimestamp) = rows[^2]

# TODO: Improve coherence of MessageDigest type
let messageDigest = block:
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type
ArchiveDriver* = ref object of RootObj
OnErrHandler* = proc(errMsg: string) {.gcsafe, closure.}

type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)
type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], seq[byte], Timestamp)

# ArchiveDriver interface

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ proc toArchiveRow(r: Row): ArchiveDriverResult[ArchiveRow] =
return ok((pubSubTopic,
wakuMessage,
@(digest.toOpenArrayByte(0, digest.high)),
@(digest.toOpenArrayByte(0, digest.high)),
storedAt))

method getAllMessages*(s: PostgresDriver):
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_archive/driver/queue_driver/queue_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ proc getPage(driver: QueueDriver,

numberOfItems += 1

outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), key.receiverTime))
outSeq.add((key.pubsubTopic, data.msg, @(key.digest.data), @(key.digest.data), key.receiverTime))

currentEntry = if forward: w.next()
else: w.prev()
Expand Down
25 changes: 15 additions & 10 deletions waku/waku_archive/driver/sqlite_driver/queries.nim
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ proc createTableQuery(table: string): SqlQueryStr =
" version INTEGER NOT NULL," &
" timestamp INTEGER NOT NULL," &
" id BLOB," &
" messageHash BLOB NOT NULL,"&
" storedAt INTEGER NOT NULL," &
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" &
" CONSTRAINT messageIndex PRIMARY KEY (storedAt, messageHash)" &
") WITHOUT ROWID;"

proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
Expand Down Expand Up @@ -102,11 +103,11 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =


## Insert message
type InsertMessageParams* = (seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
type InsertMessageParams* = (seq[byte], seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)

proc insertMessageQuery(table: string): SqlQueryStr =
"INSERT INTO " & table & "(id, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?);"
"INSERT INTO " & table & "(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?, ?);"

proc prepareInsertMessageStmt*(db: SqliteDatabase): SqliteStmt[InsertMessageParams, void] =
let query = insertMessageQuery(DbTable)
Expand Down Expand Up @@ -197,24 +198,26 @@ proc deleteOldestMessagesNotWithinLimit*(db: SqliteDatabase, limit: int):
## Select all messages

proc selectAllMessagesQuery(table: string): SqlQueryStr =
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id" &
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" &
" FROM " & table &
" ORDER BY storedAt ASC"

proc selectAllMessages*(db: SqliteDatabase): DatabaseResult[seq[(PubsubTopic,
WakuMessage,
seq[byte],
seq[byte],
Timestamp)]] =
## Retrieve all messages from the store.
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)]
var rows: seq[(PubsubTopic, WakuMessage, seq[byte], seq[byte], Timestamp)]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
wakuMessage = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
digest = queryRowDigestCallback(s, digestCol=6)
messageHash = queryRowDigestCallback(s, digestCol=7)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)

rows.add((pubsubTopic, wakuMessage, digest, storedAt))
rows.add((pubsubTopic, wakuMessage, digest, messageHash, storedAt))

let query = selectAllMessagesQuery(DbTable)
let res = db.query(query, queryRowCallback)
Expand Down Expand Up @@ -280,7 +283,7 @@ proc selectMessagesWithLimitQuery(table: string, where: Option[string], limit: u

var query: string

query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id"
query = "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash"
query &= " FROM " & table

if where.isSome():
Expand Down Expand Up @@ -361,18 +364,20 @@ proc selectMessagesByHistoryQueryWithLimit*(db: SqliteDatabase,
DatabaseResult[seq[(PubsubTopic,
WakuMessage,
seq[byte],
seq[byte],
Timestamp)]] =


var messages: seq[(PubsubTopic, WakuMessage, seq[byte], Timestamp)] = @[]
var messages: seq[(PubsubTopic, WakuMessage, seq[byte], seq[byte], Timestamp)] = @[]
proc queryRowCallback(s: ptr sqlite3_stmt) =
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol=3)
message = queryRowWakuMessageCallback(s, contentTopicCol=1, payloadCol=2, versionCol=4, senderTimestampCol=5)
digest = queryRowDigestCallback(s, digestCol=6)
messageHash = queryRowDigestCallback(s, digestCol=7)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol=0)

messages.add((pubsubTopic, message, digest, storedAt))
messages.add((pubsubTopic, message, digest, messageHash, storedAt))

let query = block:
let where = whereClause(cursor, pubsubTopic, contentTopic, startTime, endTime, ascending)
Expand Down
1 change: 1 addition & 0 deletions waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ method put*(s: SqliteDriver,
## Inserts a message into the store
let res = s.insertStmt.exec((
@(digest.data), # id
@(digest.data), # messageHash
Comment on lines 68 to +69
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe there is something that I'm missing but messageHash looks the same as id.

I think we need to create a new computeDeterministicMessageHash proc in waku_archive.nim where we create the following attribute: https://rfc.vac.dev/spec/14/#deterministic-message-hashing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, as per the initial approach, we aim to remove the id attribute and replace it with messageHash, as not only nomenclature is better but the way digest/hash is computed it makes sense.

the computation logic behind messageHash follows the RFC. The digest logic is implemented in the previous PR for the same base issue #2112

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are absolutely right that we have it implemented. I got confused because we have another computeDigest definition, which is different. We need to revisit it and check if we can only use one.

proc computeDigest*(msg: WakuMessage): MessageDigest =

receivedTime, # storedAt
toBytes(message.contentTopic), # contentTopic
message.payload, # payload
Expand Down
Loading