Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer: index tx_affected_addresses #19355

Merged
merged 3 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS tx_affected_addresses;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE tx_affected_addresses (
tx_sequence_number BIGINT NOT NULL,
affected BYTEA NOT NULL,
sender BYTEA NOT NULL,
PRIMARY KEY(affected, tx_sequence_number)
);

CREATE INDEX tx_affected_addresses_tx_sequence_number_index ON tx_affected_addresses (tx_sequence_number);
CREATE INDEX tx_affected_addresses_sender ON tx_affected_addresses (sender, affected, tx_sequence_number);
28 changes: 26 additions & 2 deletions crates/sui-indexer/src/models/tx_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use crate::{
schema::{
tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects,
tx_kinds, tx_recipients, tx_senders,
tx_affected_addresses, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects,
tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
},
types::TxIndex,
};
Expand All @@ -22,6 +22,14 @@ pub struct TxDigest {
pub transaction_digest: Vec<u8>,
}

#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)]
#[diesel(table_name = tx_affected_addresses)]
pub struct StoredTxAffected {
pub tx_sequence_number: i64,
pub affected: Vec<u8>,
pub sender: Vec<u8>,
}

#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)]
#[diesel(table_name = tx_senders)]
pub struct StoredTxSenders {
Expand Down Expand Up @@ -99,6 +107,7 @@ impl TxIndex {
pub fn split(
self: TxIndex,
) -> (
Vec<StoredTxAffected>,
Vec<StoredTxSenders>,
Vec<StoredTxRecipients>,
Vec<StoredTxInputObject>,
Expand All @@ -110,6 +119,20 @@ impl TxIndex {
Vec<StoredTxKind>,
) {
let tx_sequence_number = self.tx_sequence_number as i64;
let tx_affected_addresses = self
.recipients
.iter()
.map(|r| StoredTxAffected {
tx_sequence_number,
affected: r.to_vec(),
sender: self.sender.to_vec(),
})
.chain(std::iter::once(StoredTxAffected {
tx_sequence_number,
affected: self.sender.to_vec(),
sender: self.sender.to_vec(),
}))
.collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the rationale for having the sender duplicated for each row?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's to support filtering by sender and affected address. Transaction filters in GraphQL all follow the same rough shape:

  • They get a table that includes the filter value mapped to transaction sequence number, and their primary key is the combination of the filter value and transaction sequence number -- this allows us to phrase a filter on just this field as a single index scan.
  • They get a sender column, and a secondary index from the sender and the filter value to transaction sequence number to support filtering by sender and the filter value. The alternative is to join against tx_senders and postgres is not able to perform this join efficiently, reliably. This is why we we require that if the user sends a query that involves any other combination of filters (not sender + something) they need to provide a scan limit.
  • They get a secondary index on transaction sequence number to support pruning.

By following this pattern, we are able to give people a simple rule to follow: You can apply any single filter without worrying, and you can optionally include a sender filter without worrying, but anything else requires using scan limits. Otherwise it is very difficult for users to figure out what they can do, based on the schema we advertise, very much like things are today in JSON-RPC, where the schema implies we support any number of queries that we don't, for a variety of reasons.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome thanks for the detailed information. Do we happen to have this knowledge/best practice written down anywhere in the repo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question -- the principle around using scan limits is documented in the GraphQL schema:

/// The transaction blocks that exist in the network.
///
/// `scanLimit` restricts the number of candidate transactions scanned when gathering a page of
/// results. It is required for queries that apply more than two complex filters (on function,
/// kind, sender, recipient, input object, changed object, or ids), and can be at most
/// `serviceConfig.maxScanLimit`.
///
/// When the scan limit is reached the page will be returned even if it has fewer than `first`
/// results when paginating forward (`last` when paginating backwards). If there are more
/// transactions to scan, `pageInfo.hasNextPage` (or `pageInfo.hasPreviousPage`) will be set to
/// `true`, and `PageInfo.endCursor` (or `PageInfo.startCursor`) will be set to the last
/// transaction that was scanned as opposed to the last (or first) transaction in the page.
///
/// Requesting the next (or previous) page after this cursor will resume the search, scanning
/// the next `scanLimit` many transactions in the direction of pagination, and so on until all
/// transactions in the scanning range have been visited.
///
/// By default, the scanning range includes all transactions known to GraphQL, but it can be
/// restricted by the `after` and `before` cursors, and the `beforeCheckpoint`,
/// `afterCheckpoint` and `atCheckpoint` filters.

But the insight around the DB schema and transaction patterns is only captured in the graphql-benchmark repo:

https://github.com/MystenLabs/graphql-benchmark/blob/a7bdfad4c0d419cffa8acde24b12b373fcf7e1fa/migration_scripts/bulk_load/src/hybrid_tx.clj#L8-L49

I can add a version of this comment here:

https://github.com/MystenLabs/sui/blob/77b18b45c195103e25d62fb6cd35ab812b9e7f12/crates/sui-graphql-rpc/src/types/transaction_block/tx_lookups.rs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up splitting this out into its own PR because there were some other docs I wanted to add! #19363

let tx_sender = StoredTxSenders {
tx_sequence_number,
sender: self.sender.to_vec(),
Expand Down Expand Up @@ -197,6 +220,7 @@ impl TxIndex {
};

(
tx_affected_addresses,
vec![tx_sender],
tx_recipients,
tx_input_objects,
Expand Down
9 changes: 9 additions & 0 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ diesel::table! {
}
}

diesel::table! {
tx_affected_addresses (affected, tx_sequence_number) {
tx_sequence_number -> Int8,
affected -> Bytea,
sender -> Bytea,
}
}

diesel::table! {
tx_calls_fun (package, module, func, tx_sequence_number) {
tx_sequence_number -> Int8,
Expand Down Expand Up @@ -375,6 +383,7 @@ diesel::allow_tables_to_appear_in_same_query!(
protocol_configs,
pruner_cp_watermark,
transactions,
tx_affected_addresses,
tx_calls_fun,
tx_calls_mod,
tx_calls_pkg,
Expand Down
123 changes: 75 additions & 48 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ use crate::schema::{
event_senders, event_struct_instantiation, event_struct_module, event_struct_name,
event_struct_package, events, feature_flags, full_objects_history, objects, objects_history,
objects_snapshot, objects_version, packages, protocol_configs, pruner_cp_watermark,
transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests,
tx_input_objects, tx_kinds, tx_recipients, tx_senders,
transactions, tx_affected_addresses, tx_calls_fun, tx_calls_mod, tx_calls_pkg,
tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders,
};
use crate::store::transaction_with_retry;
use crate::types::EventIndex;
Expand Down Expand Up @@ -1025,56 +1025,76 @@ impl PgIndexerStore {
.checkpoint_db_commit_latency_tx_indices_chunks
.start_timer();
let len = indices.len();
let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) =
indices.into_iter().map(|i| i.split()).fold(
let (
affected_addresses,
senders,
recipients,
input_objects,
changed_objects,
pkgs,
mods,
funs,
digests,
kinds,
) = indices.into_iter().map(|i| i.split()).fold(
(
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
),
|(
mut tx_affected_addresses,
mut tx_senders,
mut tx_recipients,
mut tx_input_objects,
mut tx_changed_objects,
mut tx_pkgs,
mut tx_mods,
mut tx_funs,
mut tx_digests,
mut tx_kinds,
),
index| {
tx_affected_addresses.extend(index.0);
tx_senders.extend(index.1);
tx_recipients.extend(index.2);
tx_input_objects.extend(index.3);
tx_changed_objects.extend(index.4);
tx_pkgs.extend(index.5);
tx_mods.extend(index.6);
tx_funs.extend(index.7);
tx_digests.extend(index.8);
tx_kinds.extend(index.9);
(
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
),
|(
mut tx_senders,
mut tx_recipients,
mut tx_input_objects,
mut tx_changed_objects,
mut tx_pkgs,
mut tx_mods,
mut tx_funs,
mut tx_digests,
mut tx_kinds,
),
index| {
tx_senders.extend(index.0);
tx_recipients.extend(index.1);
tx_input_objects.extend(index.2);
tx_changed_objects.extend(index.3);
tx_pkgs.extend(index.4);
tx_mods.extend(index.5);
tx_funs.extend(index.6);
tx_digests.extend(index.7);
tx_kinds.extend(index.8);
(
tx_senders,
tx_recipients,
tx_input_objects,
tx_changed_objects,
tx_pkgs,
tx_mods,
tx_funs,
tx_digests,
tx_kinds,
)
},
);
tx_affected_addresses,
tx_senders,
tx_recipients,
tx_input_objects,
tx_changed_objects,
tx_pkgs,
tx_mods,
tx_funs,
tx_digests,
tx_kinds,
)
},
);

transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::insert_into(tx_affected_addresses::table)
.values(&affected_addresses)
.on_conflict_do_nothing()
.execute(conn)
.await?;

diesel::insert_into(tx_senders::table)
.values(&senders)
.on_conflict_do_nothing()
Expand Down Expand Up @@ -1383,6 +1403,13 @@ impl PgIndexerStore {
let (min_tx, max_tx) = (min_tx as i64, max_tx as i64);
transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| {
async {
diesel::delete(
tx_affected_addresses::table
.filter(tx_affected_addresses::tx_sequence_number.between(min_tx, max_tx)),
)
.execute(conn)
.await?;

diesel::delete(
tx_senders::table
.filter(tx_senders::tx_sequence_number.between(min_tx, max_tx)),
Expand Down
Loading