Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: simplify migration scripts and remove store legacy code #2894

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import
factory/builder,
common/utils/nat,
waku_relay,
waku_store/common,
],
./config_chat2

Expand Down Expand Up @@ -468,22 +469,30 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
# We have a viable storenode. Let's query it for historical messages.
echo "Connecting to storenode: " & $(storenode.get())

node.mountLegacyStoreClient()
node.peerManager.addServicePeer(storenode.get(), WakuLegacyStoreCodec)
node.mountStoreClient()
node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec)

proc storeHandler(response: HistoryResponse) {.gcsafe.} =
proc storeHandler(response: StoreQueryResponse) {.gcsafe.} =
for msg in response.messages:
let payload =
if msg.message.isSome():
msg.message.get().payload
else:
newSeq[byte](0)

let
pb = Chat2Message.init(msg.payload)
pb = Chat2Message.init(payload)
chatLine =
if pb.isOk:
pb[].toString()
else:
string.fromBytes(msg.payload)
string.fromBytes(payload)
echo &"{chatLine}"
info "Hit store handler"

let queryRes = await node.query(HistoryQuery(contentTopics: @[chat.contentTopic]))
let queryRes = await node.query(
StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get()
)
if queryRes.isOk():
storeHandler(queryRes.value)

Expand Down
85 changes: 6 additions & 79 deletions migrations/message_store_postgres/content_script_version_6.nim
Original file line number Diff line number Diff line change
@@ -1,85 +1,12 @@
const ContentScriptVersion_6* =
"""
-- Rename old table
ALTER TABLE IF EXISTS MESSAGES
RENAME TO OLD_MESSAGES;
-- we can drop the timestamp column because this data is also kept in the storedAt column
ALTER TABLE messages DROP COLUMN timestamp;

-- Remove old message index
ALTER TABLE IF EXISTS OLD_MESSAGES
DROP CONSTRAINT MESSAGEINDEX;

-- Create new empty table
CREATE TABLE IF NOT EXISTS NEW_MESSAGES (
MESSAGEHASH VARCHAR NOT NULL,
PUBSUBTOPIC VARCHAR NOT NULL,
CONTENTTOPIC VARCHAR NOT NULL,
PAYLOAD VARCHAR,
VERSION INTEGER NOT NULL,
TIMESTAMP BIGINT NOT NULL,
META VARCHAR,
CONSTRAINT MESSAGEINDEX PRIMARY KEY (TIMESTAMP, MESSAGEHASH)
)
PARTITION BY
RANGE (TIMESTAMP);

DO $$
DECLARE
partition_name TEXT;
partition_count numeric;
min_timestamp numeric;
max_timestamp numeric;
BEGIN
FOR partition_name in
(SELECT child.relname AS partition_name FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
WHERE parent.relname='old_messages'
ORDER BY partition_name ASC)
LOOP

-- Get the number of rows of this partition
EXECUTE format('SELECT COUNT(1) FROM %I', partition_name) INTO partition_count;

IF partition_count > 0 THEN

-- Get the smallest timestamp of this partition
EXECUTE format('SELECT MIN(timestamp) FROM %I', partition_name) INTO min_timestamp;

-- Get the largest timestamp of this partition
EXECUTE format('SELECT MAX(timestamp) FROM %I', partition_name) INTO max_timestamp;

-- Rename old partition
EXECUTE format('ALTER TABLE %I RENAME TO old_%I', partition_name, partition_name);

-- Create new partition with the same name and bounds
EXECUTE format('CREATE TABLE %I PARTITION OF new_messages FOR VALUES FROM (%L) TO (%L)', partition_name, min_timestamp, max_timestamp + 1);

-- Insert partition rows into new table
EXECUTE format('INSERT INTO %I (messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta, id, storedAt)
SELECT messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta, id, storedAt
FROM old_%I', partition_name, partition_name);

-- Drop old partition.
EXECUTE format('DROP TABLE old_%I', partition_name);

END IF;

END LOOP;
END $$;

-- Remove old table
DROP TABLE IF EXISTS OLD_MESSAGES;

-- Rename new table
ALTER TABLE IF EXISTS NEW_MESSAGES
RENAME TO MESSAGES;
-- from now on we are only interested in the message timestamp
ALTER TABLE messages RENAME COLUMN storedAt TO timestamp;

-- Update to new version
UPDATE VERSION
SET
VERSION = 6
WHERE
VERSION = 5;
UPDATE version SET version = 6 WHERE version = 5;

"""
117 changes: 0 additions & 117 deletions migrations/message_store_postgres/content_script_version_6_manual.nim

This file was deleted.

63 changes: 33 additions & 30 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -223,35 +223,29 @@ proc setupProtocols(
except CatchableError:
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())

if conf.store and conf.legacyStore:
let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new(
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections, onFatalErrorAction,
)
if archiveDriverRes.isErr():
return err("failed to setup legacy archive driver: " & archiveDriverRes.error)

let retPolicyRes =
legacy_policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)

let mountArcRes =
node.mountLegacyArchive(archiveDriverRes.get(), retPolicyRes.get())
if mountArcRes.isErr():
return err("failed to mount waku legacy archive protocol: " & mountArcRes.error)

# Store setup
let rateLimitSetting: RateLimitSetting =
(conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod))

try:
await mountLegacyStore(node, rateLimitSetting)
except CatchableError:
return
err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg())

if conf.store and not conf.legacyStore:
if conf.store:
if conf.legacyStore:
let archiveDriverRes = waitFor legacy_driver.ArchiveDriver.new(
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections, onFatalErrorAction,
)
if archiveDriverRes.isErr():
return err("failed to setup legacy archive driver: " & archiveDriverRes.error)

let retPolicyRes =
legacy_policy.RetentionPolicy.new(conf.storeMessageRetentionPolicy)
if retPolicyRes.isErr():
return err("failed to create retention policy: " & retPolicyRes.error)

let mountArcRes =
node.mountLegacyArchive(archiveDriverRes.get(), retPolicyRes.get())
if mountArcRes.isErr():
return err("failed to mount waku legacy archive protocol: " & mountArcRes.error)

## For now we always mount the future archive driver but if the legacy one is mounted,
## then the legacy will be in charge of performing the archiving.
## Regarding storage, the only diff between the current/future archive driver and the legacy
## one, is that the legacy stores an extra field: the id (message digest.)
let archiveDriverRes = waitFor driver.ArchiveDriver.new(
conf.storeMessageDbUrl, conf.storeMessageDbVacuum, conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections, onFatalErrorAction,
Expand All @@ -267,9 +261,18 @@ proc setupProtocols(
if mountArcRes.isErr():
return err("failed to mount waku archive protocol: " & mountArcRes.error)

# Store setup
let rateLimitSetting: RateLimitSetting =
(conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod))

if conf.legacyStore:
# Store legacy setup
try:
await mountLegacyStore(node, rateLimitSetting)
except CatchableError:
return
err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg())

# Store setup
try:
await mountStore(node, rateLimitSetting)
except CatchableError:
Expand Down
5 changes: 5 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,11 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
await node.wakuFilter.handleMessage(topic, msg)

proc archiveHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if not node.wakuLegacyArchive.isNil():
## we try to store with legacy archive
await node.wakuLegacyArchive.handleMessage(topic, msg)
return

if node.wakuArchive.isNil():
return

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## This module is aimed to handle the creation and truncation of partition tables
## in order to limit the space occupied in disk by the database.
##
## The created partitions are referenced by the 'storedAt' field.
## The created partitions are referenced by the 'timestamp' field.
##

import std/[deques, times]
Expand Down
13 changes: 9 additions & 4 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type PostgresDriver* = ref object of ArchiveDriver

const InsertRowStmtName = "InsertRow"
const InsertRowStmtDefinition =
"""INSERT INTO messages (messageHash, pubsubTopic, contentTopic, payload,
version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, CASE WHEN $7 = '' THEN NULL ELSE $7 END) ON CONFLICT DO NOTHING;"""
"""INSERT INTO messages (id, messageHash, pubsubTopic, contentTopic, payload,
version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, $7, CASE WHEN $8 = '' THEN NULL ELSE $8 END) ON CONFLICT DO NOTHING;"""

const SelectClause =
"""SELECT messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta FROM messages """
Expand Down Expand Up @@ -297,11 +297,16 @@ method put*(
trace "put PostgresDriver",
messageHash, contentTopic, payload, version, timestamp, meta

## this is not needed for store-v3. Nevertheless, we will keep that temporarily
## until we completely remove the store/archive-v2 logic
let fakeId = "0"

return await s.writeConnPool.runStmt(
InsertRowStmtName,
InsertRowStmtDefinition,
@[messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta],
@[fakeId, messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta],
@[
int32(fakeId.len),
int32(messageHash.len),
int32(pubsubTopic.len),
int32(contentTopic.len),
Expand All @@ -310,7 +315,7 @@ method put*(
int32(timestamp.len),
int32(meta.len),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
)

method getAllMessages*(
Expand Down
Loading
Loading