Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: invalid cursor returning messages #2724

Merged
merged 2 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions tests/waku_archive/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -675,11 +675,20 @@ suite "Postgres driver - queries":
)
).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage())
let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage())
let cursor = ArchiveCursor(hash: fakeCursor)

## When
let res = await driver.getMessages(
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
includeData = true,
contentTopicSeq = @[DefaultContentTopic],
pubsubTopic = none(PubsubTopic),
cursor = some(cursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = @[],
maxPageSize = 5,
ascendingOrder = true,
)

## Then
Expand Down
13 changes: 11 additions & 2 deletions tests/waku_archive/test_driver_queue_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -637,11 +637,20 @@ suite "Queue driver - query by cursor":
)
require retFut.isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage())
let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage())
let cursor = ArchiveCursor(hash: fakeCursor)

## When
let res = waitFor driver.getMessages(
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
includeData = true,
contentTopic = @[DefaultContentTopic],
pubsubTopic = none(PubsubTopic),
cursor = some(cursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = @[],
maxPageSize = 5,
ascendingOrder = true,
)

## Then
Expand Down
17 changes: 13 additions & 4 deletions tests/waku_archive/test_driver_sqlite_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -701,17 +701,26 @@ suite "SQLite driver - query by cursor":
)
).isOk()

let cursor = computeArchiveCursor(DefaultPubsubTopic, fakeWakuMessage())
let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage())
let cursor = ArchiveCursor(hash: fakeCursor)

## When
let res = await driver.getMessages(
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
includeData = true,
contentTopic = @[DefaultContentTopic],
pubsubTopic = none(PubsubTopic),
cursor = some(cursor),
startTime = none(Timestamp),
endTime = none(Timestamp),
hashes = @[],
maxPageSize = 5,
ascendingOrder = true,
)

## Then
check:
res.isOk()
res.value.len == 0
res.isErr()
res.error == "cursor not found"

## Cleanup
(await driver.close()).expect("driver to close")
Expand Down
87 changes: 84 additions & 3 deletions tests/wakunode_rest/test_rest_store.nim
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{.used.}

import
std/[options, times, sugar],
std/[options, sugar],
stew/shims/net as stewNet,
chronicles,
chronos/timer,
testutils/unittests,
eth/keys,
presto,
Expand All @@ -24,8 +25,10 @@ import
../../../waku/waku_api/rest/store/types,
../../../waku/waku_archive,
../../../waku/waku_archive/driver/queue_driver,
../../../waku/waku_archive/driver/sqlite_driver,
../../../waku/common/databases/db_sqlite,
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_store as waku_store,
../../../waku/common/base64,
../testlib/wakucore,
../testlib/wakunode

Expand All @@ -42,7 +45,7 @@ proc put(
if message.timestamp > 0:
message.timestamp
else:
getNanosecondTime(getTime().toUnixFloat())
getNowInNanosecondTime()

store.put(pubsubTopic, message, digest, msgHash, receivedTime)

Expand Down Expand Up @@ -84,6 +87,84 @@ procSuite "Waku Rest API - Store v3":
check:
expected.get() == msgHashRes.get().get().toRestStringWakuMessageHash()

asyncTest "invalid cursor":
let node = testWakuNode()
await node.start()
await node.mountRelay()

let restPort = Port(58011)
let restAddress = parseIpAddress("0.0.0.0")
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()

installStoreApiHandlers(restServer.router, node)
restServer.start()

# WakuStore setup
let db: SqliteDatabase =
SqliteDatabase.new(string.none().get(":memory:")).expect("valid DB")
let driver: ArchiveDriver = SqliteDriver.new(db).expect("valid driver")
let mountArchiveRes = node.mountArchive(driver)
assert mountArchiveRes.isOk(), mountArchiveRes.error

await node.mountStore()
node.mountStoreClient()

let key = generateEcdsaKey()
var peerSwitch = newStandardSwitch(some(key))
await peerSwitch.start()

peerSwitch.mount(node.wakuStore)

await sleepAsync(1.seconds())

# Now prime it with some history before tests
let msgList =
@[
fakeWakuMessage(@[byte 0], contentTopic = ContentTopic("ct1"), ts = 0),
fakeWakuMessage(@[byte 1], ts = 1),
fakeWakuMessage(@[byte 1, byte 2], ts = 2),
fakeWakuMessage(@[byte 1], ts = 3),
fakeWakuMessage(@[byte 1], ts = 4),
fakeWakuMessage(@[byte 1], ts = 5),
fakeWakuMessage(@[byte 1], ts = 6),
fakeWakuMessage(@[byte 9], contentTopic = ContentTopic("c2"), ts = 9),
]
for msg in msgList:
require (await driver.put(DefaultPubsubTopic, msg)).isOk()

let client = newRestHttpClient(initTAddress(restAddress, restPort))

let remotePeerInfo = peerSwitch.peerInfo.toRemotePeerInfo()
let fullAddr = $remotePeerInfo.addrs[0] & "/p2p/" & $remotePeerInfo.peerId

await sleepAsync(1.seconds())

let fakeCursor = computeMessageHash(DefaultPubsubTopic, fakeWakuMessage())
let encodedCursor = fakeCursor.toRestStringWakuMessageHash()

# Apply filter by start and end timestamps
var response = await client.getStoreMessagesV3(
encodeUrl(fullAddr),
"true", # include data
"", # pubsub topic
"ct1,c2", # empty content topics.
"", # start time
"", # end time
"", # hashes
encodedCursor, # base64-encoded hash
"true", # ascending
"5", # empty implies default page size
)

check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.messages.len == 0

await restServer.stop()
await restServer.closeWait()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Did you need that to avoid a possible crash?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. It's just a copy pasta of another test.

await node.stop()

asyncTest "Filter by start and end time":
let node = testWakuNode()
await node.start()
Expand Down
3 changes: 3 additions & 0 deletions waku/waku_api/rest/store/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ proc parseHash*(input: Option[string]): Result[Option[WakuMessageHash], string]
let decodedBytes = base64.decode(Base64String(base64Encoded)).valueOr:
return err("waku message hash parsing error: " & error)

if decodedBytes.len != 32:
return err("waku message hash parsing error: invalid hash length: " & $decodedBytes.len)

let hash: WakuMessageHash = fromBytes(decodedBytes)

return ok(some(hash))
Expand Down
25 changes: 15 additions & 10 deletions waku/waku_archive/driver/sqlite_driver/queries.nim
Original file line number Diff line number Diff line change
Expand Up @@ -674,26 +674,31 @@ proc selectMessagesByStoreQueryWithLimit*(
if cursor.isSome() and cursor.get() != EmptyWakuMessageHash:
let hash: WakuMessageHash = cursor.get()

var wakuMessage: WakuMessage
var wakuMessage: Option[WakuMessage]

proc queryRowCallback(s: ptr sqlite3_stmt) =
wakuMessage = queryRowWakuMessageCallback(
s,
contentTopicCol = 0,
payloadCol = 1,
versionCol = 2,
senderTimestampCol = 3,
metaCol = 4,
wakuMessage = some(
queryRowWakuMessageCallback(
s,
contentTopicCol = 0,
payloadCol = 1,
versionCol = 2,
senderTimestampCol = 3,
metaCol = 4,
)
)

let query = selectMessageByHashQuery()
let dbStmt = ?db.prepareStmt(query)
?dbStmt.execSelectMessageByHash(hash, queryRowCallback)
dbStmt.dispose()

let time: Timestamp = wakuMessage.timestamp
if wakuMessage.isSome():
let time = wakuMessage.get().timestamp

some((time, hash))
some((time, hash))
else:
return err("cursor not found")
else:
none((Timestamp, WakuMessageHash))

Expand Down
Loading