From ed45314d5ac354c7eecbe1d2b046d6d779137808 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Tue, 17 Dec 2024 08:12:56 -0500 Subject: [PATCH 1/9] common types & codec --- tests/waku_store_sync/sync_utils.nim | 60 ++++++ tests/waku_store_sync/test_codec.nim | 197 +++++++++++++++++++ waku/waku_store_sync/codec.nim | 283 +++++++++++++++++++++++++++ waku/waku_store_sync/common.nim | 80 ++++++++ 4 files changed, 620 insertions(+) create mode 100644 tests/waku_store_sync/sync_utils.nim create mode 100644 tests/waku_store_sync/test_codec.nim create mode 100644 waku/waku_store_sync/codec.nim create mode 100644 waku/waku_store_sync/common.nim diff --git a/tests/waku_store_sync/sync_utils.nim b/tests/waku_store_sync/sync_utils.nim new file mode 100644 index 0000000000..b949b89bf2 --- /dev/null +++ b/tests/waku_store_sync/sync_utils.nim @@ -0,0 +1,60 @@ +{.used.} + +import std/[options, random], chronos, chronicles + +import waku/[node/peer_manager, waku_core, waku_store_sync], ../testlib/wakucore + +randomize() + +proc randomHash*(rng: var Rand): WakuMessageHash = + var hash = EmptyWakuMessageHash + + for i in 0 ..< hash.len: + hash[i] = rng.rand(uint8) + + return hash + +proc newTestWakuRecon*( + switch: Switch, + idsRx: AsyncQueue[ID], + wantsTx: AsyncQueue[(PeerId, Fingerprint)], + needsTx: AsyncQueue[(PeerId, Fingerprint)], +): Future[SyncReconciliation] {.async.} = + let peerManager = PeerManager.new(switch) + + let res = await SyncReconciliation.new( + peerManager = peerManager, + wakuArchive = nil, + relayJitter = 0.seconds, + idsRx = idsRx, + wantsTx = wantsTx, + needsTx = needsTx, + ) + + let proto = res.get() + + proto.start() + switch.mount(proto) + + return proto + +proc newTestWakuTransfer*( + switch: Switch, + idsTx: AsyncQueue[ID], + wantsRx: AsyncQueue[(PeerId, Fingerprint)], + needsRx: AsyncQueue[(PeerId, Fingerprint)], +): SyncTransfer = + let peerManager = PeerManager.new(switch) + + let proto = SyncTransfer.new( + peerManager = peerManager, + wakuArchive = nil, + idsTx = idsTx, + wantsRx = wantsRx, + needsRx = needsRx, + ) + + proto.start() + switch.mount(proto) + + return proto diff --git a/tests/waku_store_sync/test_codec.nim b/tests/waku_store_sync/test_codec.nim new file mode 100644 index 0000000000..132c67c679 --- /dev/null +++ b/tests/waku_store_sync/test_codec.nim @@ -0,0 +1,197 @@ +{.used.} + +import std/[options, random], testutils/unittests, chronos + +import + ../../waku/waku_core, + ../../waku/waku_core/message/digest, + ../../waku/waku_core/time, + ../../waku/waku_store_sync, + ../../waku/waku_store_sync/common, + ../../waku/waku_store_sync/codec, + ./sync_utils + +proc randomItemSet(count: int, startTime: Timestamp, rng: var Rand): ItemSet = + var + elements = newSeqOfCap[ID](count) + lastTime = startTime + + for i in 0 ..< count: + let diff = rng.rand(9.uint8) + 1 + + let timestamp = lastTime + diff * 1_000_000_000 + lastTime = timestamp + + let hash = randomHash(rng) + + let id = ID(time: Timestamp(timestamp), fingerprint: hash) + + elements.add(id) + + return ItemSet(elements: elements, reconciled: true) + +proc randomSetRange( + count: int, startTime: Timestamp, rng: var Rand +): (Slice[ID], ItemSet) = + let itemSet = randomItemSet(count, startTime, rng) + + var + lb = itemSet.elements[0] + ub = itemSet.elements[^1] + + #for test check equality + lb.fingerprint = EmptyFingerprint + ub.fingerprint = EmptyFingerprint + + let bounds = lb .. ub + + return (bounds, itemSet) + +suite "Waku Store Sync Codec": + test "empty item set encoding roundtrip": + var origItemSet = ItemSet() + + origItemSet.reconciled = true + + var encodedSet = origItemSet.deltaEncode() + + var itemSet = ItemSet() + let _ = deltaDecode(itemSet, encodedSet, 0) + + check: + origItemSet == itemSet + + test "item set encoding roundtrip": + let + count = 10 + time = getNowInNanosecondTime() + + var rng = initRand() + + let origItemSet = randomItemSet(count, time, rng) + var encodedSet = origItemSet.deltaEncode() + + #faking a longer payload + let pad: seq[byte] = + @[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + encodedSet &= pad + + var itemSet = ItemSet() + let _ = deltaDecode(itemSet, encodedSet, count) + + check: + origItemSet == itemSet + + test "payload item set encoding roundtrip": + let count = 5 + + var + rng = initRand() + time = getNowInNanosecondTime() + + let (bounds1, itemSet1) = randomSetRange(count, time, rng) + let (bounds2, itemSet2) = randomSetRange(count, time + 10_000_000_000, rng) + let (bounds3, itemSet3) = randomSetRange(count, time + 20_000_000_000, rng) + let (bounds4, itemSet4) = randomSetRange(count, time + 30_000_000_000, rng) + + let range1 = (bounds1, RangeType.itemSetRange) + let range2 = (bounds2, RangeType.itemSetRange) + let range3 = (bounds3, RangeType.itemSetRange) + let range4 = (bounds4, RangeType.itemSetRange) + + let payload = SyncPayload( + ranges: @[range1, range2, range3, range4], + fingerprints: @[], + itemSets: @[itemSet1, itemSet2, itemSet3, itemSet4], + ) + + let encodedPayload = payload.deltaEncode() + + let decodedPayload = SyncPayload.deltaDecode(encodedPayload) + + check: + payload.ranges[0][0].b == decodedPayload.ranges[0][0].b + payload.ranges[1][0].b == decodedPayload.ranges[1][0].b + payload.ranges[2][0].b == decodedPayload.ranges[2][0].b + payload.ranges[3][0].b == decodedPayload.ranges[3][0].b + payload.itemSets == decodedPayload.itemSets + + test "payload fingerprint encoding roundtrip": + let count = 4 + + var + rng = initRand() + lastTime = getNowInNanosecondTime() + ranges = newSeqOfCap[(Slice[ID], RangeType)](4) + + for i in 0 ..< count: + let lb = ID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint) + #echo "lower bound: " & $lastTime + let nowTime = lastTime + 10_000_000_000 # 10s + #echo "upper bound: " & $nowTime + lastTime = nowTime + let ub = ID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) + let bounds = lb .. ub + let range = (bounds, RangeType.fingerprintRange) + + ranges.add(range) + + let payload = SyncPayload( + ranges: ranges, + fingerprints: + @[randomHash(rng), randomHash(rng), randomHash(rng), randomHash(rng)], + itemSets: @[], + ) + + let encodedPayload = payload.deltaEncode() + #echo "encoding done!" + let decodedPayload = SyncPayload.deltaDecode(encodedPayload) + + check: + payload.ranges[0][0].b == decodedPayload.ranges[0][0].b + payload.ranges[1][0].b == decodedPayload.ranges[1][0].b + payload.ranges[2][0].b == decodedPayload.ranges[2][0].b + payload.ranges[3][0].b == decodedPayload.ranges[3][0].b + payload.fingerprints == decodedPayload.fingerprints + + test "payload mixed encoding roundtrip": + let count = 2 + + var + rng = initRand() + lastTime = getNowInNanosecondTime() + ranges = newSeqOfCap[(Slice[ID], RangeType)](4) + itemSets = newSeqOfCap[ItemSet](4) + fingerprints = newSeqOfCap[Fingerprint](4) + + for i in 1 .. count: + let lb = ID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint) + let nowTime = lastTime + 10_000_000_000 # 10s + lastTime = nowTime + let ub = ID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) + let bounds = lb .. ub + let range = (bounds, RangeType.fingerprintRange) + + ranges.add(range) + fingerprints.add(randomHash(rng)) + + let (bound, itemSet) = randomSetRange(5, lastTime, rng) + lastTime += 50_000_000_000 # 50s + + ranges.add((bound, RangeType.itemSetRange)) + itemSets.add(itemSet) + + let payload = + SyncPayload(ranges: ranges, fingerprints: fingerprints, itemSets: itemSets) + + let encodedPayload = payload.deltaEncode() + #echo "encoding done!" + let decodedPayload = SyncPayload.deltaDecode(encodedPayload) + + check: + payload.ranges[0][0].b == decodedPayload.ranges[0][0].b + payload.ranges[1][0].b == decodedPayload.ranges[1][0].b + payload.ranges[2][0].b == decodedPayload.ranges[2][0].b + payload.ranges[3][0].b == decodedPayload.ranges[3][0].b + payload.fingerprints == decodedPayload.fingerprints + payload.itemSets == decodedPayload.itemSets diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim new file mode 100644 index 0000000000..d8594503c5 --- /dev/null +++ b/waku/waku_store_sync/codec.nim @@ -0,0 +1,283 @@ +{.push raises: [].} + +import std/sequtils, stew/leb128 + +import ../common/protobuf, ../waku_core/message, ../waku_core/time, ./common + +proc encode*(value: WakuMessagePayload): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, value.pubsub) + pb.write3(2, value.message.encode()) + + return pb + +proc deltaEncode*(itemSet: ItemSet): seq[byte] = + let capacity = 1 + (itemSet.elements.len * 41) + + var + output = newSeqOfCap[byte](capacity) + lastTime = Timestamp(0) + buf = Leb128Buf[uint64]() + + for id in itemSet.elements: + let timeDiff = uint64(id.time) - uint64(lastTime) + lastTime = id.time + + # encode timestamp + buf = timediff.toBytes(Leb128) + output &= @buf + + output &= id.fingerprint + + output &= byte(itemSet.reconciled) + + return output + +proc deltaEncode*(value: SyncPayload): seq[byte] = + if value.ranges.len == 0: + return @[0] + + var + output = newSeqOfCap[byte](1000) + buf = Leb128Buf[uint64]() + lastTimestamp: Timestamp + lastHash: Fingerprint + i = 0 + j = 0 + + # the first range is implicit but must be explicit when encoded + let (bound, _) = value.ranges[0] + + lastTimestamp = bound.a.time + lastHash = bound.a.fingerprint + + # encode first timestamp + buf = uint64(lastTimestamp).toBytes(Leb128) + output &= @buf + + #echo "First Timestamp: " & $lastTimestamp + + # implicit first fingerprint is always 0 and range type is always skip + + for (bound, rangeType) in value.ranges: + let timeDiff = uint64(bound.b.time) - uint64(lastTimestamp) + lastTimestamp = bound.b.time + + # encode timestamp + buf = timeDiff.toBytes(Leb128) + output &= @buf + + #echo "Timestamp: " & $timeDiff + + if timeDiff == 0: + var sameBytes = 0 + for (byte1, byte2) in zip(lastHash, bound.b.fingerprint): + sameBytes.inc() + + if byte1 != byte2: + break + + # encode number of same bytes + output &= byte(sameBytes) + + #echo "Same Bytes: " & $sameBytes + + # encode fingerprint bytes + output &= bound.b.fingerprint[0 ..< sameBytes] + + # encode rangeType + output &= byte(rangeType) + + case rangeType + of skipRange: + #echo "Skip Range" + continue + of fingerprintRange: + let fingerprint = value.fingerprints[i] + i.inc() + + #echo "Encode Fingerprint" + + output &= fingerprint + of itemSetRange: + let itemSet = value.itemSets[j] + j.inc() + + # encode how many elements are in the set + buf = uint64(itemSet.elements.len).toBytes(Leb128) + output &= @buf + + #echo "Set elements: " & $itemSet.elements.len + + let encodedSet = itemSet.deltaEncode() + + #echo "Encoded Set" + + output &= encodedSet + + continue + + return output + +proc deltaDecode*(itemSet: var ItemSet, buffer: seq[byte], setLength: int): int = + var + lastTime = Timestamp(0) + val = 0.uint64 + len = 0.int8 + idx = 0 + + while itemSet.elements.len < setLength: + var slice = buffer[idx ..< idx + 9] + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + + let time = lastTime + Timestamp(val) + lastTime = time + + #echo "Timestamp: " & $time + #echo "IDX: " & $idx + + slice = buffer[idx ..< idx + 32] + idx += 32 + var fingerprint = EmptyFingerprint + for i, bytes in slice: + fingerprint[i] = bytes + + #echo "Hash: " & $fingerprint + #echo "IDX: " & $idx + + let id = ID(time: time, fingerprint: fingerprint) + + itemSet.elements.add(id) + + itemSet.reconciled = bool(buffer[idx]) + idx += 1 + + #echo "Reconciled: " & $itemSet.reconciled + #echo "IDX: " & $idx + + return idx + +proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = + #echo "buffer length: " & $buffer.len + + if buffer.len == 1: + return SyncPayload() + + var + payload = SyncPayload() + lastTime = Timestamp(0) + val = 0.uint64 + len = 0.int8 + idx = 0 + slice = buffer[idx ..< idx + 9] + + # first timestamp + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + lastTime = Timestamp(val) + + #echo "First Range Timestamp: " & $lastTime + #echo "IDX: " & $idx + + # implicit first fingerprint is always 0 + # implicit first range mode is alway skip + + while idx < buffer.len - 1: + let lowerRangeBound = ID(time: lastTime, fingerprint: EmptyFingerprint) + + # decode timestamp diff + let min = min(idx + 9, buffer.len) + slice = buffer[idx ..< min] + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + let timeDiff = Timestamp(val) + + #echo "Range Timestamp diff: " & $timeDiff + #echo "IDX: " & $idx + + var fingerprint = EmptyFingerprint + if timeDiff == 0: + # decode number of same bytes + let sameBytes = int(buffer[idx]) + idx += 1 + + #echo "Same Bytes Count: " & $sameBytes + #echo "IDX: " & $idx + + # decode same bytes + slice = buffer[idx ..< idx + sameBytes] + idx += sameBytes + for i, bytes in slice: + fingerprint[i] = bytes + + #echo "Same Bytes: " & $fingerprint + #echo "IDX: " & $idx + + let thisTime = lastTime + timeDiff + lastTime = thisTime + + #echo "Range Timestamp: " & $thisTime + + let upperRangeBound = ID(time: thisTime, fingerprint: fingerprint) + + let bounds = lowerRangeBound .. upperRangeBound + + # decode range type + let rangeType = RangeType(buffer[idx]) + idx += 1 + + #echo "Range Type: " & $rangeType + #echo "IDX: " & $idx + + payload.ranges.add((bounds, rangeType)) + + if rangeType == fingerprintRange: + # decode fingerprint + slice = buffer[idx ..< idx + 32] + idx += 32 + var fingerprint = EmptyFingerprint + for i, bytes in slice: + fingerprint[i] = bytes + + #echo "Fingerprint: " & $fingerprint + #echo "IDX: " & $idx + + payload.fingerprints.add(fingerprint) + elif rangeType == itemSetRange: + # decode item set length + let min = min(idx + 9, buffer.len) + slice = buffer[idx ..< min] + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + let itemSetLength = int(val) + + #echo "Set length: " & $itemSetLength + #echo "IDX: " & $idx + + # decode item set + var itemSet = ItemSet() + slice = buffer[idx ..< buffer.len] + let count = deltaDecode(itemSet, slice, itemSetLength) + idx += count + + #echo "Set Decoded" + #echo "IDX: " & $idx + + payload.itemSets.add(itemSet) + + return payload + +proc decode*(T: type WakuMessagePayload, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + + var pubsub: string + discard ?pb.getField(1, pubsub) + + var proto: ProtoBuffer + discard ?pb.getField(2, proto) + + let message = ?WakuMessage.decode(proto.buffer) + + return ok(WakuMessagePayload(pubsub: pubsub, message: message)) diff --git a/waku/waku_store_sync/common.nim b/waku/waku_store_sync/common.nim new file mode 100644 index 0000000000..fd0bc8faab --- /dev/null +++ b/waku/waku_store_sync/common.nim @@ -0,0 +1,80 @@ +{.push raises: [].} + +import std/[options], chronos, stew/[byteutils] + +import ../waku_core + +const + DefaultSyncInterval*: Duration = 5.minutes + DefaultSyncRange*: Duration = 1.hours + RetryDelay*: Duration = 30.seconds + SyncReconciliationCodec* = "/vac/waku/reconciliation/1.0" + SyncTransferCodec* = "/vac/waku/transfer/1.0" + DefaultGossipSubJitter*: Duration = 20.seconds + +type + Fingerprint* = array[32, byte] + + ID* = object + time*: Timestamp + fingerprint*: Fingerprint + + ItemSet* = object + elements*: seq[ID] + reconciled*: bool + + RangeType* = enum + skipRange = 0 + fingerprintRange = 1 + itemSetRange = 2 + + SyncPayload* = object + ranges*: seq[(Slice[ID], RangeType)] + fingerprints*: seq[Fingerprint] + itemSets*: seq[ItemSet] + + WakuMessagePayload* = object + pubsub*: string + message*: WakuMessage + +const EmptyFingerprint*: Fingerprint = [ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, +] + +const FullFingerprint*: Fingerprint = [ + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, +] + +proc high*(T: type ID): T = + return ID(time: Timestamp(high(int64)), fingerprint: FullFingerprint) + +proc low*(T: type ID): T = + return ID(time: Timestamp(low(int64)), fingerprint: EmptyFingerprint) + +proc `$`*(value: ID): string = + return '(' & $value.time & ", " & $value.fingerprint & ')' + +proc cmp(x, y: Fingerprint): int = + if x < y: + return -1 + elif x == y: + return 0 + + return 1 + +proc cmp*(x, y: ID): int = + if x.time == y.time: + return cmp(x.fingerprint, y.fingerprint) + + if x.time < y.time: + return -1 + + return 1 + +proc `<`*(x, y: ID): bool = + cmp(x, y) == -1 + +proc `>`*(x, y: ID): bool = + cmp(x, y) == 1 From ee5d4af631950e1e1f694bd46bbfe6f0875b96a6 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Mon, 6 Jan 2025 08:55:31 -0500 Subject: [PATCH 2/9] remove echo :P --- waku/waku_store_sync/codec.nim | 50 ---------------------------------- 1 file changed, 50 deletions(-) diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim index d8594503c5..cf3ea9dde2 100644 --- a/waku/waku_store_sync/codec.nim +++ b/waku/waku_store_sync/codec.nim @@ -56,8 +56,6 @@ proc deltaEncode*(value: SyncPayload): seq[byte] = buf = uint64(lastTimestamp).toBytes(Leb128) output &= @buf - #echo "First Timestamp: " & $lastTimestamp - # implicit first fingerprint is always 0 and range type is always skip for (bound, rangeType) in value.ranges: @@ -68,8 +66,6 @@ proc deltaEncode*(value: SyncPayload): seq[byte] = buf = timeDiff.toBytes(Leb128) output &= @buf - #echo "Timestamp: " & $timeDiff - if timeDiff == 0: var sameBytes = 0 for (byte1, byte2) in zip(lastHash, bound.b.fingerprint): @@ -81,8 +77,6 @@ proc deltaEncode*(value: SyncPayload): seq[byte] = # encode number of same bytes output &= byte(sameBytes) - #echo "Same Bytes: " & $sameBytes - # encode fingerprint bytes output &= bound.b.fingerprint[0 ..< sameBytes] @@ -91,14 +85,11 @@ proc deltaEncode*(value: SyncPayload): seq[byte] = case rangeType of skipRange: - #echo "Skip Range" continue of fingerprintRange: let fingerprint = value.fingerprints[i] i.inc() - #echo "Encode Fingerprint" - output &= fingerprint of itemSetRange: let itemSet = value.itemSets[j] @@ -108,12 +99,8 @@ proc deltaEncode*(value: SyncPayload): seq[byte] = buf = uint64(itemSet.elements.len).toBytes(Leb128) output &= @buf - #echo "Set elements: " & $itemSet.elements.len - let encodedSet = itemSet.deltaEncode() - #echo "Encoded Set" - output &= encodedSet continue @@ -135,18 +122,12 @@ proc deltaDecode*(itemSet: var ItemSet, buffer: seq[byte], setLength: int): int let time = lastTime + Timestamp(val) lastTime = time - #echo "Timestamp: " & $time - #echo "IDX: " & $idx - slice = buffer[idx ..< idx + 32] idx += 32 var fingerprint = EmptyFingerprint for i, bytes in slice: fingerprint[i] = bytes - #echo "Hash: " & $fingerprint - #echo "IDX: " & $idx - let id = ID(time: time, fingerprint: fingerprint) itemSet.elements.add(id) @@ -154,14 +135,9 @@ proc deltaDecode*(itemSet: var ItemSet, buffer: seq[byte], setLength: int): int itemSet.reconciled = bool(buffer[idx]) idx += 1 - #echo "Reconciled: " & $itemSet.reconciled - #echo "IDX: " & $idx - return idx proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = - #echo "buffer length: " & $buffer.len - if buffer.len == 1: return SyncPayload() @@ -178,9 +154,6 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = idx += len lastTime = Timestamp(val) - #echo "First Range Timestamp: " & $lastTime - #echo "IDX: " & $idx - # implicit first fingerprint is always 0 # implicit first range mode is alway skip @@ -194,32 +167,21 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = idx += len let timeDiff = Timestamp(val) - #echo "Range Timestamp diff: " & $timeDiff - #echo "IDX: " & $idx - var fingerprint = EmptyFingerprint if timeDiff == 0: # decode number of same bytes let sameBytes = int(buffer[idx]) idx += 1 - #echo "Same Bytes Count: " & $sameBytes - #echo "IDX: " & $idx - # decode same bytes slice = buffer[idx ..< idx + sameBytes] idx += sameBytes for i, bytes in slice: fingerprint[i] = bytes - #echo "Same Bytes: " & $fingerprint - #echo "IDX: " & $idx - let thisTime = lastTime + timeDiff lastTime = thisTime - #echo "Range Timestamp: " & $thisTime - let upperRangeBound = ID(time: thisTime, fingerprint: fingerprint) let bounds = lowerRangeBound .. upperRangeBound @@ -228,9 +190,6 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = let rangeType = RangeType(buffer[idx]) idx += 1 - #echo "Range Type: " & $rangeType - #echo "IDX: " & $idx - payload.ranges.add((bounds, rangeType)) if rangeType == fingerprintRange: @@ -241,9 +200,6 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = for i, bytes in slice: fingerprint[i] = bytes - #echo "Fingerprint: " & $fingerprint - #echo "IDX: " & $idx - payload.fingerprints.add(fingerprint) elif rangeType == itemSetRange: # decode item set length @@ -253,18 +209,12 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = idx += len let itemSetLength = int(val) - #echo "Set length: " & $itemSetLength - #echo "IDX: " & $idx - # decode item set var itemSet = ItemSet() slice = buffer[idx ..< buffer.len] let count = deltaDecode(itemSet, slice, itemSetLength) idx += count - #echo "Set Decoded" - #echo "IDX: " & $idx - payload.itemSets.add(itemSet) return payload From 02b216789eb00c400a7d05344e5323bc4c893199 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Wed, 8 Jan 2025 08:27:30 -0500 Subject: [PATCH 3/9] renaming and stuff --- waku/waku_core/codecs.nim | 3 +- waku/waku_core/message/digest.nim | 8 +++++ waku/waku_store_sync/codec.nim | 57 +++++++++++++++++-------------- waku/waku_store_sync/common.nim | 10 +++--- 4 files changed, 45 insertions(+), 33 deletions(-) diff --git a/waku/waku_core/codecs.nim b/waku/waku_core/codecs.nim index 2b5b8f7b49..3436b42dfb 100644 --- a/waku/waku_core/codecs.nim +++ b/waku/waku_core/codecs.nim @@ -4,7 +4,8 @@ const WakuFilterSubscribeCodec* = "/vac/waku/filter-subscribe/2.0.0-beta1" WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1" WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1" - WakuSyncCodec* = "/vac/waku/sync/1.0.0" + WakuSyncReconciliationCodec* = "/vac/waku/reconciliation/1.0.0" + WakuSyncTransferCodec* = "/vac/waku/transfer/1.0.0" WakuMetadataCodec* = "/vac/waku/metadata/1.0.0" WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1" WakuLegacyStoreCodec* = "/vac/waku/store/2.0.0-beta4" diff --git a/waku/waku_core/message/digest.nim b/waku/waku_core/message/digest.nim index cb4f5b0141..0aa1ce610c 100644 --- a/waku/waku_core/message/digest.nim +++ b/waku/waku_core/message/digest.nim @@ -48,3 +48,11 @@ proc computeMessageHash*(pubsubTopic: PubsubTopic, msg: WakuMessage): WakuMessag ctx.update(toBytesBE(uint64(msg.timestamp))) return ctx.finish() # Computes the hash + +proc cmp*(x, y: WakuMessageHash): int = + if x < y: + return -1 + elif x == y: + return 0 + + return 1 diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim index cf3ea9dde2..a9212c15ff 100644 --- a/waku/waku_store_sync/codec.nim +++ b/waku/waku_store_sync/codec.nim @@ -4,7 +4,11 @@ import std/sequtils, stew/leb128 import ../common/protobuf, ../waku_core/message, ../waku_core/time, ./common -proc encode*(value: WakuMessagePayload): ProtoBuffer = +const + HashLen = 32 + VarIntLen = 9 + +proc encode*(value: WakuMessageAndTopic): ProtoBuffer = var pb = initProtoBuffer() pb.write3(1, value.pubsub) @@ -13,7 +17,8 @@ proc encode*(value: WakuMessagePayload): ProtoBuffer = return pb proc deltaEncode*(itemSet: ItemSet): seq[byte] = - let capacity = 1 + (itemSet.elements.len * 41) + # 1 byte for resolved bool and 32 bytes hash plus 9 bytes varint per elements + let capacity = 1 + (itemSet.elements.len * (VarIntLen + HashLen)) var output = newSeqOfCap[byte](capacity) @@ -28,7 +33,7 @@ proc deltaEncode*(itemSet: ItemSet): seq[byte] = buf = timediff.toBytes(Leb128) output &= @buf - output &= id.fingerprint + output &= id.hash output &= byte(itemSet.reconciled) @@ -50,13 +55,13 @@ proc deltaEncode*(value: SyncPayload): seq[byte] = let (bound, _) = value.ranges[0] lastTimestamp = bound.a.time - lastHash = bound.a.fingerprint + lastHash = bound.a.hash # encode first timestamp buf = uint64(lastTimestamp).toBytes(Leb128) output &= @buf - # implicit first fingerprint is always 0 and range type is always skip + # implicit first hash is always 0 and range type is always skip for (bound, rangeType) in value.ranges: let timeDiff = uint64(bound.b.time) - uint64(lastTimestamp) @@ -68,7 +73,7 @@ proc deltaEncode*(value: SyncPayload): seq[byte] = if timeDiff == 0: var sameBytes = 0 - for (byte1, byte2) in zip(lastHash, bound.b.fingerprint): + for (byte1, byte2) in zip(lastHash, bound.b.hash): sameBytes.inc() if byte1 != byte2: @@ -77,8 +82,8 @@ proc deltaEncode*(value: SyncPayload): seq[byte] = # encode number of same bytes output &= byte(sameBytes) - # encode fingerprint bytes - output &= bound.b.fingerprint[0 ..< sameBytes] + # encode hash bytes + output &= bound.b.hash[0 ..< sameBytes] # encode rangeType output &= byte(rangeType) @@ -115,20 +120,20 @@ proc deltaDecode*(itemSet: var ItemSet, buffer: seq[byte], setLength: int): int idx = 0 while itemSet.elements.len < setLength: - var slice = buffer[idx ..< idx + 9] + var slice = buffer[idx ..< idx + VarIntLen] (val, len) = uint64.fromBytes(slice, Leb128) idx += len let time = lastTime + Timestamp(val) lastTime = time - slice = buffer[idx ..< idx + 32] - idx += 32 - var fingerprint = EmptyFingerprint + slice = buffer[idx ..< idx + HashLen] + idx += HashLen + var hash = EmptyWakuMessageHash for i, bytes in slice: - fingerprint[i] = bytes + hash[i] = bytes - let id = ID(time: time, fingerprint: fingerprint) + let id = ID(time: time, hash: hash) itemSet.elements.add(id) @@ -147,27 +152,27 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = val = 0.uint64 len = 0.int8 idx = 0 - slice = buffer[idx ..< idx + 9] + slice = buffer[idx ..< idx + VarIntLen] # first timestamp (val, len) = uint64.fromBytes(slice, Leb128) idx += len lastTime = Timestamp(val) - # implicit first fingerprint is always 0 + # implicit first hash is always 0 # implicit first range mode is alway skip while idx < buffer.len - 1: - let lowerRangeBound = ID(time: lastTime, fingerprint: EmptyFingerprint) + let lowerRangeBound = ID(time: lastTime, hash: EmptyWakuMessageHash) # decode timestamp diff - let min = min(idx + 9, buffer.len) + let min = min(idx + VarIntLen, buffer.len) slice = buffer[idx ..< min] (val, len) = uint64.fromBytes(slice, Leb128) idx += len let timeDiff = Timestamp(val) - var fingerprint = EmptyFingerprint + var hash = EmptyWakuMessageHash if timeDiff == 0: # decode number of same bytes let sameBytes = int(buffer[idx]) @@ -177,12 +182,12 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = slice = buffer[idx ..< idx + sameBytes] idx += sameBytes for i, bytes in slice: - fingerprint[i] = bytes + hash[i] = bytes let thisTime = lastTime + timeDiff lastTime = thisTime - let upperRangeBound = ID(time: thisTime, fingerprint: fingerprint) + let upperRangeBound = ID(time: thisTime, hash: hash) let bounds = lowerRangeBound .. upperRangeBound @@ -194,8 +199,8 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = if rangeType == fingerprintRange: # decode fingerprint - slice = buffer[idx ..< idx + 32] - idx += 32 + slice = buffer[idx ..< idx + HashLen] + idx += HashLen var fingerprint = EmptyFingerprint for i, bytes in slice: fingerprint[i] = bytes @@ -203,7 +208,7 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = payload.fingerprints.add(fingerprint) elif rangeType == itemSetRange: # decode item set length - let min = min(idx + 9, buffer.len) + let min = min(idx + VarIntLen, buffer.len) slice = buffer[idx ..< min] (val, len) = uint64.fromBytes(slice, Leb128) idx += len @@ -219,7 +224,7 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = return payload -proc decode*(T: type WakuMessagePayload, buffer: seq[byte]): ProtobufResult[T] = +proc decode*(T: type WakuMessageAndTopic, buffer: seq[byte]): ProtobufResult[T] = let pb = initProtoBuffer(buffer) var pubsub: string @@ -230,4 +235,4 @@ proc decode*(T: type WakuMessagePayload, buffer: seq[byte]): ProtobufResult[T] = let message = ?WakuMessage.decode(proto.buffer) - return ok(WakuMessagePayload(pubsub: pubsub, message: message)) + return ok(WakuMessageAndTopic(pubsub: pubsub, message: message)) diff --git a/waku/waku_store_sync/common.nim b/waku/waku_store_sync/common.nim index fd0bc8faab..8d1a80ff9e 100644 --- a/waku/waku_store_sync/common.nim +++ b/waku/waku_store_sync/common.nim @@ -8,8 +8,6 @@ const DefaultSyncInterval*: Duration = 5.minutes DefaultSyncRange*: Duration = 1.hours RetryDelay*: Duration = 30.seconds - SyncReconciliationCodec* = "/vac/waku/reconciliation/1.0" - SyncTransferCodec* = "/vac/waku/transfer/1.0" DefaultGossipSubJitter*: Duration = 20.seconds type @@ -17,7 +15,7 @@ type ID* = object time*: Timestamp - fingerprint*: Fingerprint + hash*: WakuMessageHash ItemSet* = object elements*: seq[ID] @@ -33,7 +31,7 @@ type fingerprints*: seq[Fingerprint] itemSets*: seq[ItemSet] - WakuMessagePayload* = object + WakuMessageAndTopic* = object pubsub*: string message*: WakuMessage @@ -54,7 +52,7 @@ proc low*(T: type ID): T = return ID(time: Timestamp(low(int64)), fingerprint: EmptyFingerprint) proc `$`*(value: ID): string = - return '(' & $value.time & ", " & $value.fingerprint & ')' + return '(' & $value.time & ", " & $value.hash & ')' proc cmp(x, y: Fingerprint): int = if x < y: @@ -66,7 +64,7 @@ proc cmp(x, y: Fingerprint): int = proc cmp*(x, y: ID): int = if x.time == y.time: - return cmp(x.fingerprint, y.fingerprint) + return cmp(x.hash, y.hash) if x.time < y.time: return -1 From 380598f355f3dcc663f110819e472593d8488f36 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Wed, 8 Jan 2025 08:39:02 -0500 Subject: [PATCH 4/9] fix caps --- waku/waku_enr/capabilities.nim | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/waku/waku_enr/capabilities.nim b/waku/waku_enr/capabilities.nim index 2f09796c8b..37fca67e72 100644 --- a/waku/waku_enr/capabilities.nim +++ b/waku/waku_enr/capabilities.nim @@ -26,11 +26,10 @@ const capabilityToCodec = { Capabilities.Store: WakuStoreCodec, Capabilities.Filter: WakuFilterSubscribeCodec, Capabilities.Lightpush: WakuLightPushCodec, - Capabilities.Sync: WakuSyncCodec, }.toTable func init*( - T: type CapabilitiesBitfield, lightpush, filter, store, relay, sync: bool = false + T: type CapabilitiesBitfield, lightpush, filter, store, relay: bool = false ): T = ## Creates an waku2 ENR flag bit field according to RFC 31 (https://rfc.vac.dev/spec/31/) var bitfield: uint8 @@ -42,8 +41,6 @@ func init*( bitfield.setBit(2) if lightpush: bitfield.setBit(3) - if sync: - bitfield.setBit(4) CapabilitiesBitfield(bitfield) func init*(T: type CapabilitiesBitfield, caps: varargs[Capabilities]): T = From 0bff8bbbfd4a62a2f05e5587099526b25f1a5fdd Mon Sep 17 00:00:00 2001 From: SionoiS Date: Wed, 8 Jan 2025 08:43:28 -0500 Subject: [PATCH 5/9] revert cap fix --- waku/waku_core/codecs.nim | 5 +++-- waku/waku_enr/capabilities.nim | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/waku/waku_core/codecs.nim b/waku/waku_core/codecs.nim index 3436b42dfb..35a050b721 100644 --- a/waku/waku_core/codecs.nim +++ b/waku/waku_core/codecs.nim @@ -4,8 +4,9 @@ const WakuFilterSubscribeCodec* = "/vac/waku/filter-subscribe/2.0.0-beta1" WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1" WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1" - WakuSyncReconciliationCodec* = "/vac/waku/reconciliation/1.0.0" - WakuSyncTransferCodec* = "/vac/waku/transfer/1.0.0" + WakuSyncCodec* = "/vac/waku/sync/1.0.0" + WakuReconciliationCodec* = "/vac/waku/reconciliation/1.0.0" + WakuTransferCodec* = "/vac/waku/transfer/1.0.0" WakuMetadataCodec* = "/vac/waku/metadata/1.0.0" WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1" WakuLegacyStoreCodec* = "/vac/waku/store/2.0.0-beta4" diff --git a/waku/waku_enr/capabilities.nim b/waku/waku_enr/capabilities.nim index 37fca67e72..2f09796c8b 100644 --- a/waku/waku_enr/capabilities.nim +++ b/waku/waku_enr/capabilities.nim @@ -26,10 +26,11 @@ const capabilityToCodec = { Capabilities.Store: WakuStoreCodec, Capabilities.Filter: WakuFilterSubscribeCodec, Capabilities.Lightpush: WakuLightPushCodec, + Capabilities.Sync: WakuSyncCodec, }.toTable func init*( - T: type CapabilitiesBitfield, lightpush, filter, store, relay: bool = false + T: type CapabilitiesBitfield, lightpush, filter, store, relay, sync: bool = false ): T = ## Creates an waku2 ENR flag bit field according to RFC 31 (https://rfc.vac.dev/spec/31/) var bitfield: uint8 @@ -41,6 +42,8 @@ func init*( bitfield.setBit(2) if lightpush: bitfield.setBit(3) + if sync: + bitfield.setBit(4) CapabilitiesBitfield(bitfield) func init*(T: type CapabilitiesBitfield, caps: varargs[Capabilities]): T = From 7337a9fb795c67c912307a850701bef11c883f73 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 16 Jan 2025 09:48:23 -0500 Subject: [PATCH 6/9] remaning & fixes --- tests/waku_store_sync/sync_utils.nim | 6 ++-- tests/waku_store_sync/test_codec.nim | 18 ++++++------ waku/waku_store_sync/codec.nim | 16 +++++------ waku/waku_store_sync/common.nim | 41 +++++++++++++++------------- 4 files changed, 41 insertions(+), 40 deletions(-) diff --git a/tests/waku_store_sync/sync_utils.nim b/tests/waku_store_sync/sync_utils.nim index b949b89bf2..e32d0fefa4 100644 --- a/tests/waku_store_sync/sync_utils.nim +++ b/tests/waku_store_sync/sync_utils.nim @@ -1,5 +1,3 @@ -{.used.} - import std/[options, random], chronos, chronicles import waku/[node/peer_manager, waku_core, waku_store_sync], ../testlib/wakucore @@ -16,7 +14,7 @@ proc randomHash*(rng: var Rand): WakuMessageHash = proc newTestWakuRecon*( switch: Switch, - idsRx: AsyncQueue[ID], + idsRx: AsyncQueue[SyncID], wantsTx: AsyncQueue[(PeerId, Fingerprint)], needsTx: AsyncQueue[(PeerId, Fingerprint)], ): Future[SyncReconciliation] {.async.} = @@ -40,7 +38,7 @@ proc newTestWakuRecon*( proc newTestWakuTransfer*( switch: Switch, - idsTx: AsyncQueue[ID], + idsTx: AsyncQueue[SyncID], wantsRx: AsyncQueue[(PeerId, Fingerprint)], needsRx: AsyncQueue[(PeerId, Fingerprint)], ): SyncTransfer = diff --git a/tests/waku_store_sync/test_codec.nim b/tests/waku_store_sync/test_codec.nim index 132c67c679..3d5f02e9f7 100644 --- a/tests/waku_store_sync/test_codec.nim +++ b/tests/waku_store_sync/test_codec.nim @@ -13,7 +13,7 @@ import proc randomItemSet(count: int, startTime: Timestamp, rng: var Rand): ItemSet = var - elements = newSeqOfCap[ID](count) + elements = newSeqOfCap[SyncID](count) lastTime = startTime for i in 0 ..< count: @@ -24,7 +24,7 @@ proc randomItemSet(count: int, startTime: Timestamp, rng: var Rand): ItemSet = let hash = randomHash(rng) - let id = ID(time: Timestamp(timestamp), fingerprint: hash) + let id = SyncID(time: Timestamp(timestamp), fingerprint: hash) elements.add(id) @@ -32,7 +32,7 @@ proc randomItemSet(count: int, startTime: Timestamp, rng: var Rand): ItemSet = proc randomSetRange( count: int, startTime: Timestamp, rng: var Rand -): (Slice[ID], ItemSet) = +): (Slice[SyncID], ItemSet) = let itemSet = randomItemSet(count, startTime, rng) var @@ -122,15 +122,15 @@ suite "Waku Store Sync Codec": var rng = initRand() lastTime = getNowInNanosecondTime() - ranges = newSeqOfCap[(Slice[ID], RangeType)](4) + ranges = newSeqOfCap[(Slice[SyncID], RangeType)](4) for i in 0 ..< count: - let lb = ID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint) + let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint) #echo "lower bound: " & $lastTime let nowTime = lastTime + 10_000_000_000 # 10s #echo "upper bound: " & $nowTime lastTime = nowTime - let ub = ID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) + let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) let bounds = lb .. ub let range = (bounds, RangeType.fingerprintRange) @@ -160,15 +160,15 @@ suite "Waku Store Sync Codec": var rng = initRand() lastTime = getNowInNanosecondTime() - ranges = newSeqOfCap[(Slice[ID], RangeType)](4) + ranges = newSeqOfCap[(Slice[SyncID], RangeType)](4) itemSets = newSeqOfCap[ItemSet](4) fingerprints = newSeqOfCap[Fingerprint](4) for i in 1 .. count: - let lb = ID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint) + let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint) let nowTime = lastTime + 10_000_000_000 # 10s lastTime = nowTime - let ub = ID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) + let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) let bounds = lb .. ub let range = (bounds, RangeType.fingerprintRange) diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim index a9212c15ff..6996a120c3 100644 --- a/waku/waku_store_sync/codec.nim +++ b/waku/waku_store_sync/codec.nim @@ -89,14 +89,14 @@ proc deltaEncode*(value: SyncPayload): seq[byte] = output &= byte(rangeType) case rangeType - of skipRange: + of RangeType.Skip: continue - of fingerprintRange: + of RangeType.Fingerprint: let fingerprint = value.fingerprints[i] i.inc() output &= fingerprint - of itemSetRange: + of RangeType.ItemSet: let itemSet = value.itemSets[j] j.inc() @@ -133,7 +133,7 @@ proc deltaDecode*(itemSet: var ItemSet, buffer: seq[byte], setLength: int): int for i, bytes in slice: hash[i] = bytes - let id = ID(time: time, hash: hash) + let id = SyncID(time: time, hash: hash) itemSet.elements.add(id) @@ -163,7 +163,7 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = # implicit first range mode is alway skip while idx < buffer.len - 1: - let lowerRangeBound = ID(time: lastTime, hash: EmptyWakuMessageHash) + let lowerRangeBound = SyncID(time: lastTime, hash: EmptyWakuMessageHash) # decode timestamp diff let min = min(idx + VarIntLen, buffer.len) @@ -187,7 +187,7 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = let thisTime = lastTime + timeDiff lastTime = thisTime - let upperRangeBound = ID(time: thisTime, hash: hash) + let upperRangeBound = SyncID(time: thisTime, hash: hash) let bounds = lowerRangeBound .. upperRangeBound @@ -197,7 +197,7 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = payload.ranges.add((bounds, rangeType)) - if rangeType == fingerprintRange: + if rangeType == RangeType.Fingerprint: # decode fingerprint slice = buffer[idx ..< idx + HashLen] idx += HashLen @@ -206,7 +206,7 @@ proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = fingerprint[i] = bytes payload.fingerprints.add(fingerprint) - elif rangeType == itemSetRange: + elif rangeType == RangeType.ItemSet: # decode item set length let min = min(idx + VarIntLen, buffer.len) slice = buffer[idx ..< min] diff --git a/waku/waku_store_sync/common.nim b/waku/waku_store_sync/common.nim index 8d1a80ff9e..7774e09543 100644 --- a/waku/waku_store_sync/common.nim +++ b/waku/waku_store_sync/common.nim @@ -7,32 +7,31 @@ import ../waku_core const DefaultSyncInterval*: Duration = 5.minutes DefaultSyncRange*: Duration = 1.hours - RetryDelay*: Duration = 30.seconds DefaultGossipSubJitter*: Duration = 20.seconds type Fingerprint* = array[32, byte] - ID* = object + SyncID* = object time*: Timestamp hash*: WakuMessageHash ItemSet* = object - elements*: seq[ID] + elements*: seq[SyncID] reconciled*: bool - RangeType* = enum - skipRange = 0 - fingerprintRange = 1 - itemSetRange = 2 + RangeType* {.pure.} = enum + Skip = 0 + Fingerprint = 1 + ItemSet = 2 SyncPayload* = object - ranges*: seq[(Slice[ID], RangeType)] - fingerprints*: seq[Fingerprint] - itemSets*: seq[ItemSet] + ranges*: seq[(Slice[SyncID], RangeType)] + fingerprints*: seq[Fingerprint] # Range type fingerprint stored here in order + itemSets*: seq[ItemSet] # Range type itemset stored here in order WakuMessageAndTopic* = object - pubsub*: string + pubsub*: PubSubTopic message*: WakuMessage const EmptyFingerprint*: Fingerprint = [ @@ -45,13 +44,17 @@ const FullFingerprint*: Fingerprint = [ 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, ] -proc high*(T: type ID): T = - return ID(time: Timestamp(high(int64)), fingerprint: FullFingerprint) +proc high*(T: type SyncID): T = + ## Same as high(int) but for IDs -proc low*(T: type ID): T = - return ID(time: Timestamp(low(int64)), fingerprint: EmptyFingerprint) + return SyncID(time: Timestamp(high(int64)), fingerprint: FullFingerprint) -proc `$`*(value: ID): string = +proc low*(T: type SyncID): T = + ## Same as low(int) but for IDs + + return SyncID(time: Timestamp(low(int64)), fingerprint: EmptyFingerprint) + +proc `$`*(value: SyncID): string = return '(' & $value.time & ", " & $value.hash & ')' proc cmp(x, y: Fingerprint): int = @@ -62,7 +65,7 @@ proc cmp(x, y: Fingerprint): int = return 1 -proc cmp*(x, y: ID): int = +proc cmp*(x, y: SyncID): int = if x.time == y.time: return cmp(x.hash, y.hash) @@ -71,8 +74,8 @@ proc cmp*(x, y: ID): int = return 1 -proc `<`*(x, y: ID): bool = +proc `<`*(x, y: SyncID): bool = cmp(x, y) == -1 -proc `>`*(x, y: ID): bool = +proc `>`*(x, y: SyncID): bool = cmp(x, y) == 1 From f2f0e648236eb5bad4745208368defe93585329f Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 16 Jan 2025 10:11:58 -0500 Subject: [PATCH 7/9] more renaming --- tests/waku_store_sync/test_codec.nim | 26 +++++++++++++------------- waku/waku_store_sync/codec.nim | 8 ++++---- waku/waku_store_sync/common.nim | 2 +- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/tests/waku_store_sync/test_codec.nim b/tests/waku_store_sync/test_codec.nim index 3d5f02e9f7..db25cc1684 100644 --- a/tests/waku_store_sync/test_codec.nim +++ b/tests/waku_store_sync/test_codec.nim @@ -94,12 +94,12 @@ suite "Waku Store Sync Codec": let (bounds3, itemSet3) = randomSetRange(count, time + 20_000_000_000, rng) let (bounds4, itemSet4) = randomSetRange(count, time + 30_000_000_000, rng) - let range1 = (bounds1, RangeType.itemSetRange) - let range2 = (bounds2, RangeType.itemSetRange) - let range3 = (bounds3, RangeType.itemSetRange) - let range4 = (bounds4, RangeType.itemSetRange) + let range1 = (bounds1, RangeType.ItemSet) + let range2 = (bounds2, RangeType.ItemSet) + let range3 = (bounds3, RangeType.ItemSet) + let range4 = (bounds4, RangeType.ItemSet) - let payload = SyncPayload( + let payload = RangesData( ranges: @[range1, range2, range3, range4], fingerprints: @[], itemSets: @[itemSet1, itemSet2, itemSet3, itemSet4], @@ -107,7 +107,7 @@ suite "Waku Store Sync Codec": let encodedPayload = payload.deltaEncode() - let decodedPayload = SyncPayload.deltaDecode(encodedPayload) + let decodedPayload = RangesData.deltaDecode(encodedPayload) check: payload.ranges[0][0].b == decodedPayload.ranges[0][0].b @@ -132,11 +132,11 @@ suite "Waku Store Sync Codec": lastTime = nowTime let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) let bounds = lb .. ub - let range = (bounds, RangeType.fingerprintRange) + let range = (bounds, RangeType.Fingerprint) ranges.add(range) - let payload = SyncPayload( + let payload = RangesData( ranges: ranges, fingerprints: @[randomHash(rng), randomHash(rng), randomHash(rng), randomHash(rng)], @@ -145,7 +145,7 @@ suite "Waku Store Sync Codec": let encodedPayload = payload.deltaEncode() #echo "encoding done!" - let decodedPayload = SyncPayload.deltaDecode(encodedPayload) + let decodedPayload = RangesData.deltaDecode(encodedPayload) check: payload.ranges[0][0].b == decodedPayload.ranges[0][0].b @@ -170,7 +170,7 @@ suite "Waku Store Sync Codec": lastTime = nowTime let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) let bounds = lb .. ub - let range = (bounds, RangeType.fingerprintRange) + let range = (bounds, RangeType.Fingerprint) ranges.add(range) fingerprints.add(randomHash(rng)) @@ -178,15 +178,15 @@ suite "Waku Store Sync Codec": let (bound, itemSet) = randomSetRange(5, lastTime, rng) lastTime += 50_000_000_000 # 50s - ranges.add((bound, RangeType.itemSetRange)) + ranges.add((bound, RangeType.ItemSet)) itemSets.add(itemSet) let payload = - SyncPayload(ranges: ranges, fingerprints: fingerprints, itemSets: itemSets) + RangesData(ranges: ranges, fingerprints: fingerprints, itemSets: itemSets) let encodedPayload = payload.deltaEncode() #echo "encoding done!" - let decodedPayload = SyncPayload.deltaDecode(encodedPayload) + let decodedPayload = RangesData.deltaDecode(encodedPayload) check: payload.ranges[0][0].b == decodedPayload.ranges[0][0].b diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim index 6996a120c3..c972fdba3f 100644 --- a/waku/waku_store_sync/codec.nim +++ b/waku/waku_store_sync/codec.nim @@ -39,7 +39,7 @@ proc deltaEncode*(itemSet: ItemSet): seq[byte] = return output -proc deltaEncode*(value: SyncPayload): seq[byte] = +proc deltaEncode*(value: RangesData): seq[byte] = if value.ranges.len == 0: return @[0] @@ -142,12 +142,12 @@ proc deltaDecode*(itemSet: var ItemSet, buffer: seq[byte], setLength: int): int return idx -proc deltaDecode*(T: type SyncPayload, buffer: seq[byte]): T = +proc deltaDecode*(T: type RangesData, buffer: seq[byte]): T = if buffer.len == 1: - return SyncPayload() + return RangesData() var - payload = SyncPayload() + payload = RangesData() lastTime = Timestamp(0) val = 0.uint64 len = 0.int8 diff --git a/waku/waku_store_sync/common.nim b/waku/waku_store_sync/common.nim index 7774e09543..2795450786 100644 --- a/waku/waku_store_sync/common.nim +++ b/waku/waku_store_sync/common.nim @@ -25,7 +25,7 @@ type Fingerprint = 1 ItemSet = 2 - SyncPayload* = object + RangesData* = object ranges*: seq[(Slice[SyncID], RangeType)] fingerprints*: seq[Fingerprint] # Range type fingerprint stored here in order itemSets*: seq[ItemSet] # Range type itemset stored here in order From c9ac7d9c7ce8fd4dc664521fdbbe535f1c3e994c Mon Sep 17 00:00:00 2001 From: SionoiS Date: Mon, 20 Jan 2025 10:42:58 -0500 Subject: [PATCH 8/9] refactor --- waku/waku_store_sync/codec.nim | 156 +++++++++++++++++++-------------- 1 file changed, 89 insertions(+), 67 deletions(-) diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim index c972fdba3f..108ce6e5ca 100644 --- a/waku/waku_store_sync/codec.nim +++ b/waku/waku_store_sync/codec.nim @@ -7,6 +7,7 @@ import ../common/protobuf, ../waku_core/message, ../waku_core/time, ./common const HashLen = 32 VarIntLen = 9 + AvgCapacity = 1000 proc encode*(value: WakuMessageAndTopic): ProtoBuffer = var pb = initProtoBuffer() @@ -44,7 +45,7 @@ proc deltaEncode*(value: RangesData): seq[byte] = return @[0] var - output = newSeqOfCap[byte](1000) + output = newSeqOfCap[byte](AvgCapacity) buf = Leb128Buf[uint64]() lastTimestamp: Timestamp lastHash: Fingerprint @@ -92,10 +93,8 @@ proc deltaEncode*(value: RangesData): seq[byte] = of RangeType.Skip: continue of RangeType.Fingerprint: - let fingerprint = value.fingerprints[i] + output &= value.fingerprints[i] i.inc() - - output &= fingerprint of RangeType.ItemSet: let itemSet = value.itemSets[j] j.inc() @@ -112,33 +111,94 @@ proc deltaEncode*(value: RangesData): seq[byte] = return output +proc getItemSet(idx: var int, buffer: seq[byte], itemSetLength: int): ItemSet = + var itemSet = ItemSet() + let slice = buffer[idx ..< buffer.len] + let count = deltaDecode(itemSet, slice, itemSetLength) + idx += count + +proc getItemSetLength(idx: var int, buffer: seq[byte]): int = + # decode item set length + let min = min(idx + VarIntLen, buffer.len) + let slice = buffer[idx ..< min] + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + + return int(val) + +proc getFingerprint(idx: var int, buffer: seq[byte]): Fingerprint = + # decode fingerprint + let slice = buffer[idx ..< idx + HashLen] + idx += HashLen + var fingerprint = EmptyFingerprint + for i, bytes in slice: + fingerprint[i] = bytes + + return fingerprint + +proc getRangeType(idx: var int, buffer: seq[byte]): RangeType = + let rangeType = RangeType(buffer[idx]) + idx += 1 + + return rangeType + +proc updateHash(idx: var int, buffer: seq[byte], hash: var WakuMessageHash) = + let sameBytes = int(buffer[idx]) + idx += 1 + + let slice = buffer[idx ..< idx + sameBytes] + idx += sameBytes + + for i, bytes in slice: + hash[i] = bytes + +proc getTimeDiff(idx: var int, buffer: seq[byte]): Timestamp = + let min = min(idx + VarIntLen, buffer.len) + let slice = buffer[idx ..< min] + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + + return Timestamp(val) + +proc getTimestamp(idx: var int, buffer: seq[byte]): Timestamp = + let slice = buffer[idx ..< idx + VarIntLen] + (val, len) = uint64.fromBytes(slice, Leb128) + idx += len + + return Timestamp(val) + +proc getHash(idx: var int, buffer: seq[byte]): WakuMessageHash = + let slice = buffer[idx ..< idx + HashLen] + idx += HashLen + var hash = EmptyWakuMessageHash + for i, bytes in slice: + hash[i] = bytes + + return hash + +proc getReconciled(idx: var int, buffer: seq[byte]): bool = + let recon = bool(buffer[idx]) + idx += 1 + + return recon + proc deltaDecode*(itemSet: var ItemSet, buffer: seq[byte], setLength: int): int = var lastTime = Timestamp(0) - val = 0.uint64 - len = 0.int8 idx = 0 while itemSet.elements.len < setLength: - var slice = buffer[idx ..< idx + VarIntLen] - (val, len) = uint64.fromBytes(slice, Leb128) - idx += len - - let time = lastTime + Timestamp(val) + let timeDiff = getTimestamp(idx, buffer) + let time = lastTime + timeDiff lastTime = time - slice = buffer[idx ..< idx + HashLen] - idx += HashLen - var hash = EmptyWakuMessageHash - for i, bytes in slice: - hash[i] = bytes + let hash = getHash(idx, buffer) let id = SyncID(time: time, hash: hash) itemSet.elements.add(id) - itemSet.reconciled = bool(buffer[idx]) - idx += 1 + itemSet.reconciled = getReconciled(idx, buffer) return idx @@ -149,15 +209,9 @@ proc deltaDecode*(T: type RangesData, buffer: seq[byte]): T = var payload = RangesData() lastTime = Timestamp(0) - val = 0.uint64 - len = 0.int8 idx = 0 - slice = buffer[idx ..< idx + VarIntLen] - # first timestamp - (val, len) = uint64.fromBytes(slice, Leb128) - idx += len - lastTime = Timestamp(val) + lastTime = getTimestamp(idx, buffer) # implicit first hash is always 0 # implicit first range mode is alway skip @@ -165,61 +219,27 @@ proc deltaDecode*(T: type RangesData, buffer: seq[byte]): T = while idx < buffer.len - 1: let lowerRangeBound = SyncID(time: lastTime, hash: EmptyWakuMessageHash) - # decode timestamp diff - let min = min(idx + VarIntLen, buffer.len) - slice = buffer[idx ..< min] - (val, len) = uint64.fromBytes(slice, Leb128) - idx += len - let timeDiff = Timestamp(val) + let timeDiff = getTimeDiff(idx, buffer) var hash = EmptyWakuMessageHash if timeDiff == 0: - # decode number of same bytes - let sameBytes = int(buffer[idx]) - idx += 1 - - # decode same bytes - slice = buffer[idx ..< idx + sameBytes] - idx += sameBytes - for i, bytes in slice: - hash[i] = bytes + updateHash(idx, buffer, hash) let thisTime = lastTime + timeDiff lastTime = thisTime let upperRangeBound = SyncID(time: thisTime, hash: hash) - let bounds = lowerRangeBound .. upperRangeBound - # decode range type - let rangeType = RangeType(buffer[idx]) - idx += 1 - + let rangeType = getRangeType(idx, buffer) payload.ranges.add((bounds, rangeType)) if rangeType == RangeType.Fingerprint: - # decode fingerprint - slice = buffer[idx ..< idx + HashLen] - idx += HashLen - var fingerprint = EmptyFingerprint - for i, bytes in slice: - fingerprint[i] = bytes - + let fingerprint = getFingerprint(idx, buffer) payload.fingerprints.add(fingerprint) elif rangeType == RangeType.ItemSet: - # decode item set length - let min = min(idx + VarIntLen, buffer.len) - slice = buffer[idx ..< min] - (val, len) = uint64.fromBytes(slice, Leb128) - idx += len - let itemSetLength = int(val) - - # decode item set - var itemSet = ItemSet() - slice = buffer[idx ..< buffer.len] - let count = deltaDecode(itemSet, slice, itemSetLength) - idx += count - + let itemSetLength = getItemSetLength(idx, buffer) + let itemSet = getItemSet(idx, buffer, itemSetLength) payload.itemSets.add(itemSet) return payload @@ -228,10 +248,12 @@ proc decode*(T: type WakuMessageAndTopic, buffer: seq[byte]): ProtobufResult[T] let pb = initProtoBuffer(buffer) var pubsub: string - discard ?pb.getField(1, pubsub) + if not ?pb.getField(1, pubsub): + return err(ProtobufError.missingRequiredField("pubsub")) var proto: ProtoBuffer - discard ?pb.getField(2, proto) + if not ?pb.getField(2, proto): + return err(ProtobufError.missingRequiredField("msg")) let message = ?WakuMessage.decode(proto.buffer) From 81fb58c1b41d557b6cbb8d60eb8dee8327b69493 Mon Sep 17 00:00:00 2001 From: SionoiS Date: Tue, 21 Jan 2025 10:48:25 -0500 Subject: [PATCH 9/9] decode error handling --- tests/waku_store_sync/test_codec.nim | 23 +++++++---- waku/waku_store_sync/codec.nim | 61 +++++++++++++++++++--------- 2 files changed, 57 insertions(+), 27 deletions(-) diff --git a/tests/waku_store_sync/test_codec.nim b/tests/waku_store_sync/test_codec.nim index db25cc1684..9e7a85924e 100644 --- a/tests/waku_store_sync/test_codec.nim +++ b/tests/waku_store_sync/test_codec.nim @@ -107,7 +107,10 @@ suite "Waku Store Sync Codec": let encodedPayload = payload.deltaEncode() - let decodedPayload = RangesData.deltaDecode(encodedPayload) + let res = RangesData.deltaDecode(encodedPayload) + assert res.isOk(), $res.error + + let decodedPayload = res.get() check: payload.ranges[0][0].b == decodedPayload.ranges[0][0].b @@ -126,9 +129,9 @@ suite "Waku Store Sync Codec": for i in 0 ..< count: let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint) - #echo "lower bound: " & $lastTime + let nowTime = lastTime + 10_000_000_000 # 10s - #echo "upper bound: " & $nowTime + lastTime = nowTime let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint) let bounds = lb .. ub @@ -144,8 +147,11 @@ suite "Waku Store Sync Codec": ) let encodedPayload = payload.deltaEncode() - #echo "encoding done!" - let decodedPayload = RangesData.deltaDecode(encodedPayload) + + let res = RangesData.deltaDecode(encodedPayload) + assert res.isOk(), $res.error + + let decodedPayload = res.get() check: payload.ranges[0][0].b == decodedPayload.ranges[0][0].b @@ -185,8 +191,11 @@ suite "Waku Store Sync Codec": RangesData(ranges: ranges, fingerprints: fingerprints, itemSets: itemSets) let encodedPayload = payload.deltaEncode() - #echo "encoding done!" - let decodedPayload = RangesData.deltaDecode(encodedPayload) + + let res = RangesData.deltaDecode(encodedPayload) + assert res.isOk(), $res.error + + let decodedPayload = res.get() check: payload.ranges[0][0].b == decodedPayload.ranges[0][0].b diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim index 108ce6e5ca..c02702bd1e 100644 --- a/waku/waku_store_sync/codec.nim +++ b/waku/waku_store_sync/codec.nim @@ -118,7 +118,6 @@ proc getItemSet(idx: var int, buffer: seq[byte], itemSetLength: int): ItemSet = idx += count proc getItemSetLength(idx: var int, buffer: seq[byte]): int = - # decode item set length let min = min(idx + VarIntLen, buffer.len) let slice = buffer[idx ..< min] (val, len) = uint64.fromBytes(slice, Leb128) @@ -126,26 +125,37 @@ proc getItemSetLength(idx: var int, buffer: seq[byte]): int = return int(val) -proc getFingerprint(idx: var int, buffer: seq[byte]): Fingerprint = - # decode fingerprint +proc getFingerprint(idx: var int, buffer: seq[byte]): Result[Fingerprint, string] = + if idx + HashLen > buffer.len: + return err("Cannot decode fingerprint") + let slice = buffer[idx ..< idx + HashLen] idx += HashLen var fingerprint = EmptyFingerprint for i, bytes in slice: fingerprint[i] = bytes - return fingerprint + return ok(fingerprint) + +proc getRangeType(idx: var int, buffer: seq[byte]): Result[RangeType, string] = + if idx >= buffer.len: + return err("Cannot decode range type") -proc getRangeType(idx: var int, buffer: seq[byte]): RangeType = let rangeType = RangeType(buffer[idx]) idx += 1 - return rangeType + return ok(rangeType) proc updateHash(idx: var int, buffer: seq[byte], hash: var WakuMessageHash) = + if idx >= buffer.len: + return + let sameBytes = int(buffer[idx]) idx += 1 + if idx + sameBytes > buffer.len: + return + let slice = buffer[idx ..< idx + sameBytes] idx += sameBytes @@ -160,49 +170,60 @@ proc getTimeDiff(idx: var int, buffer: seq[byte]): Timestamp = return Timestamp(val) -proc getTimestamp(idx: var int, buffer: seq[byte]): Timestamp = +proc getTimestamp(idx: var int, buffer: seq[byte]): Result[Timestamp, string] = + if idx + VarIntLen > buffer.len: + return err("Cannot decode timestamp") + let slice = buffer[idx ..< idx + VarIntLen] (val, len) = uint64.fromBytes(slice, Leb128) idx += len - return Timestamp(val) + return ok(Timestamp(val)) + +proc getHash(idx: var int, buffer: seq[byte]): Result[WakuMessageHash, string] = + if idx + HashLen > buffer.len: + return err("Cannot decode hash") -proc getHash(idx: var int, buffer: seq[byte]): WakuMessageHash = let slice = buffer[idx ..< idx + HashLen] idx += HashLen var hash = EmptyWakuMessageHash for i, bytes in slice: hash[i] = bytes - return hash + return ok(hash) + +proc getReconciled(idx: var int, buffer: seq[byte]): Result[bool, string] = + if idx >= buffer.len: + return err("Cannot decode reconciled") -proc getReconciled(idx: var int, buffer: seq[byte]): bool = let recon = bool(buffer[idx]) idx += 1 - return recon + return ok(recon) -proc deltaDecode*(itemSet: var ItemSet, buffer: seq[byte], setLength: int): int = +proc deltaDecode*( + itemSet: var ItemSet, buffer: seq[byte], setLength: int +): Result[int, string] = var lastTime = Timestamp(0) idx = 0 while itemSet.elements.len < setLength: - let timeDiff = getTimestamp(idx, buffer) + let timeDiff = ?getTimestamp(idx, buffer) let time = lastTime + timeDiff lastTime = time - let hash = getHash(idx, buffer) + let hash = ?getHash(idx, buffer) let id = SyncID(time: time, hash: hash) itemSet.elements.add(id) - itemSet.reconciled = getReconciled(idx, buffer) + itemSet.reconciled = ?getReconciled(idx, buffer) return idx -proc deltaDecode*(T: type RangesData, buffer: seq[byte]): T = +proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] = if buffer.len == 1: return RangesData() @@ -211,7 +232,7 @@ proc deltaDecode*(T: type RangesData, buffer: seq[byte]): T = lastTime = Timestamp(0) idx = 0 - lastTime = getTimestamp(idx, buffer) + lastTime = ?getTimestamp(idx, buffer) # implicit first hash is always 0 # implicit first range mode is alway skip @@ -231,11 +252,11 @@ proc deltaDecode*(T: type RangesData, buffer: seq[byte]): T = let upperRangeBound = SyncID(time: thisTime, hash: hash) let bounds = lowerRangeBound .. upperRangeBound - let rangeType = getRangeType(idx, buffer) + let rangeType = ?getRangeType(idx, buffer) payload.ranges.add((bounds, rangeType)) if rangeType == RangeType.Fingerprint: - let fingerprint = getFingerprint(idx, buffer) + let fingerprint = ?getFingerprint(idx, buffer) payload.fingerprints.add(fingerprint) elif rangeType == RangeType.ItemSet: let itemSetLength = getItemSetLength(idx, buffer)