Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: simplify migration scripts and remove store legacy code #2894

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ import
waku_lightpush/rpc,
waku_enr,
discovery/waku_dnsdisc,
waku_store_legacy,
waku_node,
node/waku_metrics,
node/peer_manager,
factory/builder,
common/utils/nat,
waku_relay,
waku_store/common,
],
./config_chat2

Expand Down Expand Up @@ -468,22 +468,30 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
# We have a viable storenode. Let's query it for historical messages.
echo "Connecting to storenode: " & $(storenode.get())

node.mountLegacyStoreClient()
node.peerManager.addServicePeer(storenode.get(), WakuLegacyStoreCodec)
node.mountStoreClient()
node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec)

proc storeHandler(response: HistoryResponse) {.gcsafe.} =
proc storeHandler(response: StoreQueryResponse) {.gcsafe.} =
for msg in response.messages:
let payload =
if msg.message.isSome():
msg.message.get().payload
else:
newSeq[byte](0)

let
pb = Chat2Message.init(msg.payload)
pb = Chat2Message.init(payload)
chatLine =
if pb.isOk:
pb[].toString()
else:
string.fromBytes(msg.payload)
string.fromBytes(payload)
echo &"{chatLine}"
info "Hit store handler"

let queryRes = await node.query(HistoryQuery(contentTopics: @[chat.contentTopic]))
let queryRes = await node.query(
StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get()
)
if queryRes.isOk():
storeHandler(queryRes.value)

Expand Down
86 changes: 8 additions & 78 deletions migrations/message_store_postgres/content_script_version_6.nim
Original file line number Diff line number Diff line change
@@ -1,85 +1,15 @@
const ContentScriptVersion_6* =
"""
-- Rename old table
ALTER TABLE IF EXISTS MESSAGES
RENAME TO OLD_MESSAGES;
-- we can drop the timestamp column because this data is also kept in the storedAt column
ALTER TABLE messages DROP COLUMN timestamp;

-- Remove old message index
ALTER TABLE IF EXISTS OLD_MESSAGES
DROP CONSTRAINT MESSAGEINDEX;
-- drop unused column
ALTER TABLE messages DROP COLUMN id;

-- Create new empty table
CREATE TABLE IF NOT EXISTS NEW_MESSAGES (
MESSAGEHASH VARCHAR NOT NULL,
PUBSUBTOPIC VARCHAR NOT NULL,
CONTENTTOPIC VARCHAR NOT NULL,
PAYLOAD VARCHAR,
VERSION INTEGER NOT NULL,
TIMESTAMP BIGINT NOT NULL,
META VARCHAR,
CONSTRAINT MESSAGEINDEX PRIMARY KEY (TIMESTAMP, MESSAGEHASH)
)
PARTITION BY
RANGE (TIMESTAMP);

DO $$
DECLARE
partition_name TEXT;
partition_count numeric;
min_timestamp numeric;
max_timestamp numeric;
BEGIN
FOR partition_name in
(SELECT child.relname AS partition_name FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
WHERE parent.relname='old_messages'
ORDER BY partition_name ASC)
LOOP

-- Get the number of rows of this partition
EXECUTE format('SELECT COUNT(1) FROM %I', partition_name) INTO partition_count;

IF partition_count > 0 THEN

-- Get the smallest timestamp of this partition
EXECUTE format('SELECT MIN(timestamp) FROM %I', partition_name) INTO min_timestamp;

-- Get the largest timestamp of this partition
EXECUTE format('SELECT MAX(timestamp) FROM %I', partition_name) INTO max_timestamp;

-- Rename old partition
EXECUTE format('ALTER TABLE %I RENAME TO old_%I', partition_name, partition_name);

-- Create new partition with the same name and bounds
EXECUTE format('CREATE TABLE %I PARTITION OF new_messages FOR VALUES FROM (%L) TO (%L)', partition_name, min_timestamp, max_timestamp + 1);

-- Insert partition rows into new table
EXECUTE format('INSERT INTO %I (messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta, id, storedAt)
SELECT messageHash, pubsubTopic, contentTopic, payload, version, timestamp, meta, id, storedAt
FROM old_%I', partition_name, partition_name);

-- Drop old partition.
EXECUTE format('DROP TABLE old_%I', partition_name);

END IF;

END LOOP;
END $$;

-- Remove old table
DROP TABLE IF EXISTS OLD_MESSAGES;

-- Rename new table
ALTER TABLE IF EXISTS NEW_MESSAGES
RENAME TO MESSAGES;
-- from now on we are only interested in the message timestamp
ALTER TABLE messages RENAME COLUMN storedAt TO timestamp;

-- Update to new version
UPDATE VERSION
SET
VERSION = 6
WHERE
VERSION = 5;
UPDATE version SET version = 6 WHERE version = 5;

"""
117 changes: 0 additions & 117 deletions migrations/message_store_postgres/content_script_version_6_manual.nim

This file was deleted.

23 changes: 1 addition & 22 deletions tests/all_tests_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,7 @@ import
./waku_archive/test_driver_sqlite,
./waku_archive/test_retention_policy,
./waku_archive/test_waku_archive,
./waku_archive/test_partition_manager,
./waku_archive_legacy/test_driver_queue_index,
./waku_archive_legacy/test_driver_queue_pagination,
./waku_archive_legacy/test_driver_queue_query,
./waku_archive_legacy/test_driver_queue,
./waku_archive_legacy/test_driver_sqlite_query,
./waku_archive_legacy/test_driver_sqlite,
./waku_archive_legacy/test_retention_policy,
./waku_archive_legacy/test_waku_archive
./waku_archive/test_partition_manager

const os* {.strdefine.} = ""
when os == "Linux" and
Expand All @@ -36,8 +28,6 @@ when os == "Linux" and
import
./waku_archive/test_driver_postgres_query,
./waku_archive/test_driver_postgres,
#./waku_archive_legacy/test_driver_postgres_query,
#./waku_archive_legacy/test_driver_postgres,
./factory/test_node_factory,
./wakunode_rest/test_rest_store

Expand All @@ -48,17 +38,6 @@ import
./waku_store/test_waku_store,
./waku_store/test_wakunode_store

# Waku legacy store test suite
import
./waku_store_legacy/test_client,
./waku_store_legacy/test_rpc_codec,
./waku_store_legacy/test_waku_store,
./waku_store_legacy/test_wakunode_store

when defined(waku_exp_store_resume):
# TODO: Review store resume test cases (#1282)
import ./waku_store_legacy/test_resume

import
./node/test_all,
./waku_filter_v2/test_all,
Expand Down
1 change: 0 additions & 1 deletion tests/node/test_all.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ import
./test_wakunode_lightpush,
./test_wakunode_peer_exchange,
./test_wakunode_store,
./test_wakunode_legacy_store,
./test_wakunode_peer_manager
Loading
Loading