Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: waku store sync 2.0 storage and tests #3215

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions tests/waku_store_sync/test_storage.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
{.used.}

import std/[options, random], testutils/unittests, chronos

import
../../waku/waku_core,
../../waku/waku_core/message/digest,
../../waku/waku_store_sync,
../../waku/waku_store_sync/storage/seq_storage,
./sync_utils

suite "Waku Sync Storage":
test "process fingerprint range":
var rng = initRand()
let count = 10_000
var elements = newSeqOfCap[ID](count)

for i in 0 ..< count:
let id = ID(time: Timestamp(i), fingerprint: randomHash(rng))

elements.add(id)

var storage1 = SeqStorage.new(elements)
var storage2 = SeqStorage.new(elements)

let lb = elements[0]
let ub = elements[count - 1]
let bounds = lb .. ub
let fingerprint1 = storage1.fingerprinting(bounds)

var outputPayload: SyncPayload

storage2.processFingerprintRange(bounds, fingerprint1, outputPayload)

let expected = SyncPayload(
ranges: @[(bounds, RangeType.skipRange)], fingerprints: @[], itemSets: @[]
)

check:
outputPayload == expected

test "process item set range":
var rng = initRand()
let count = 1000
var elements1 = newSeqOfCap[ID](count)
var elements2 = newSeqOfCap[ID](count)
var diffs: seq[Fingerprint]

for i in 0 ..< count:
let id = ID(time: Timestamp(i), fingerprint: randomHash(rng))

elements1.add(id)
if rng.rand(0 .. 9) == 0:
elements2.add(id)
else:
diffs.add(id.fingerprint)

var storage1 = SeqStorage.new(elements1)

let lb = elements1[0]
let ub = elements1[count - 1]
let bounds = lb .. ub

let itemSet2 = ItemSet(elements: elements2, reconciled: true)

var
toSend: seq[Fingerprint]
toRecv: seq[Fingerprint]
outputPayload: SyncPayload

storage1.processItemSetRange(bounds, itemSet2, toSend, toRecv, outputPayload)

check:
toSend == diffs

test "insert new element":
var rng = initRand()

let storage = SeqStorage.new(10)

let element1 = ID(time: Timestamp(1000), fingerprint: randomHash(rng))
let element2 = ID(time: Timestamp(2000), fingerprint: randomHash(rng))

let res1 = storage.insert(element1)
assert res1.isOk(), $res1.error
let count1 = storage.length()

let res2 = storage.insert(element2)
assert res2.isOk(), $res2.error
let count2 = storage.length()

check:
count1 == 1
count2 == 2

test "insert duplicate":
var rng = initRand()

let element = ID(time: Timestamp(1000), fingerprint: randomHash(rng))

let storage = SeqStorage.new(@[element])

let res = storage.insert(element)

check:
res.isErr() == true

test "prune elements":
var rng = initRand()
let count = 1000
var elements = newSeqOfCap[ID](count)

for i in 0 ..< count:
let id = ID(time: Timestamp(i), fingerprint: randomHash(rng))

elements.add(id)

let storage = SeqStorage.new(elements)

let beforeCount = storage.length()

let pruned = storage.prune(Timestamp(500))

let afterCount = storage.length()

check:
beforeCount == 1000
pruned == 500
afterCount == 500

## disabled tests are rough benchmark
#[ test "10M fingerprint":
var rng = initRand()

let count = 10_000_000

var elements = newSeqOfCap[ID](count)

for i in 0 .. count:
let id = ID(time: Timestamp(i), fingerprint: randomHash(rng))

elements.add(id)

let storage = SeqStorage.new(elements)

let before = getMonoTime()

discard storage.fingerprinting(some(0 .. count))

let after = getMonoTime()

echo "Fingerprint Time: " & $(after - before) ]#

#[ test "random inserts":
var rng = initRand()

let count = 10_000_000

var elements = newSeqOfCap[ID](count)

for i in 0 .. count:
let id = ID(time: Timestamp(i), fingerprint: randomHash(rng))

elements.add(id)

var storage = SeqStorage.new(elements)

var avg: times.Duration
for i in 0 ..< 1000:
let newId =
ID(time: Timestamp(rng.rand(0 .. count)), fingerprint: randomHash(rng))

let before = getMonoTime()

discard storage.insert(newId)

let after = getMonoTime()

avg += after - before

avg = avg div 1000

echo "Avg Time 1K Inserts: " & $avg ]#

#[ test "trim":
var rng = initRand()

let count = 10_000_000

var elements = newSeqOfCap[ID](count)

for i in 0 .. count:
let id = ID(time: Timestamp(i), fingerprint: randomHash(rng))

elements.add(id)

var storage = SeqStorage.new(elements)

let before = getMonoTime()

discard storage.trim(Timestamp(count div 4))

let after = getMonoTime()

echo "Trim Time: " & $(after - before) ]#
54 changes: 54 additions & 0 deletions waku/waku_store_sync/storage/range_processing.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import std/math, chronos

import ../../waku_core/time, ../common

proc calculateTimeRange*(
jitter: Duration = 20.seconds, syncRange: Duration = 1.hours
): Slice[Timestamp] =
## Calculates the start and end time of a sync session

var now = getNowInNanosecondTime()

# Because of message jitter inherent to Relay protocol
now -= jitter.nanos

let syncRange = syncRange.nanos

let syncStart = now - syncRange
let syncEnd = now

return Timestamp(syncStart) .. Timestamp(syncEnd)

proc equalPartitioning*(slice: Slice[ID], count: int): seq[Slice[ID]] =
## Partition into N time slices.
## Remainder is distributed equaly to the first slices.

let totalLength: int64 = slice.b.time - slice.a.time

if totalLength < count:
return @[]

var (parts, rem) = divmod(totalLength, count)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe simpler and less dependency with math module

Suggested change
var (parts, rem) = divmod(totalLength, count)
let parts = totalLength div count
var rem = totalLength mod count


var bounds = newSeqOfCap[Slice[ID]](count)

var lb = slice.a.time

for i in 0 ..< count:
var ub = lb + parts

if rem > 0:
ub += 1
rem -= 1

let lower = ID(time: lb, fingerprint: EmptyFingerprint)
let upper = ID(time: ub, fingerprint: EmptyFingerprint)
let bound = lower .. upper

bounds.add(bound)

lb = ub

return bounds

#TODO implement exponential partitioning
Loading
Loading