Skip to content

Commit

Permalink
feat: add start and stop time to bulk rpcs with large volumes of data (
Browse files Browse the repository at this point in the history
…#2255)

We want to be able to restrict the reconciliation job's queries to a
time range of e.g. past 1-2 weeks to reduce the amount of read load on
hubs. The most expensive part of the read operation is serializing the
protobufs to send back to the callee and that's what we want to save on.

Notes for reviewers:
- There are ways to optimize the time query by (1) using the indexes (2)
starting at the common prefix of the start and stop time rather than
iterating all the results for the fid. I think it's not worth pursuing
these optimizations until we have evidence that we need them. The simple
solution implemented here will eliminate the protobuf serialization and
significantly less risky than the more optimized solutions.
- `getUserDataByFid` and `getAllUserDataMessagesByFid` have the exact
same implementation. I added time ranges to the one we classified as a
bulk rpc but not the other one. We should consider eliminating one of
these rpcs.
- I didn't add a time range for the link compaction bulk query because
(1) we don't query for link compaction messages in reconciliation (2) it
seems like we don't store a lot of link compaction state-- each new link
compaction message for an fid replaces the older one.
- I'm not sure we need the time filter for user data, but I added it
anyway. Happy to remove if it seems like a bad complexity tradeoff.
- The rpc protocol change is backwards compatible. I copied the format
for `TimestampFidRequest` from `FidRequest` and added new optional
parameters.

## Merge Checklist

_Choose all relevant options below by adding an `x` now or at any time
before submitting for review_

- [x] PR title adheres to the [conventional
commits](https://www.conventionalcommits.org/en/v1.0.0/) standard
- [x] PR has a
[changeset](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#35-adding-changesets)
- [x] PR has been tagged with a change label(s) (i.e. documentation,
feature, bugfix, or chore)
- [ ] PR includes
[documentation](https://github.com/farcasterxyz/hub-monorepo/blob/main/CONTRIBUTING.md#32-writing-docs)
if necessary.

<!-- start pr-codex -->

---

## PR-Codex overview
This PR adds start/stop time filters for bulk queries and validation
methods for time inputs.

### Detailed summary
- Added start/stop time filters for bulk queries in various modules
- Implemented validation methods for farcaster time inputs

> The following files were skipped due to too many changes:
`apps/hubble/src/addon/src/store/store.rs`,
`apps/hubble/src/rpc/server.ts`,
`packages/hub-web/src/generated/request_response.ts`,
`packages/hub-nodejs/src/generated/request_response.ts`,
`packages/core/src/protobufs/generated/request_response.ts`,
`packages/hub-web/src/generated/rpc.ts`,
`apps/hubble/src/storage/engine/index.ts`,
`packages/hub-nodejs/src/generated/rpc.ts`,
`apps/hubble/src/rpc/test/bulkService.test.ts`

> ✨ Ask PR-Codex anything about this PR by commenting with `/codex {your
question}`

<!-- end pr-codex -->
  • Loading branch information
aditiharini authored Aug 20, 2024
1 parent fbd3ba5 commit cc0d0a3
Show file tree
Hide file tree
Showing 25 changed files with 1,068 additions and 174 deletions.
8 changes: 8 additions & 0 deletions .changeset/rude-garlics-doubt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@farcaster/hub-nodejs": patch
"@farcaster/hub-web": patch
"@farcaster/core": patch
"@farcaster/hubble": patch
---

feat: added start/stop time filters for bulk queries
4 changes: 0 additions & 4 deletions apps/hubble/src/addon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,6 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
cx.export_function("getLinksByTarget", LinkStore::js_get_links_by_target)?;
cx.export_function("getLinkAddsByFid", LinkStore::js_get_link_adds_by_fid)?;
cx.export_function("getLinkRemovesByFid", LinkStore::js_get_link_removes_by_fid)?;
cx.export_function(
"getAllLinkMessagesByFid",
LinkStore::js_get_all_link_messages_by_fid,
)?;
cx.export_function(
"getLinkCompactStateMessageByFid",
LinkStore::js_get_link_compact_state_message_by_fid,
Expand Down
31 changes: 0 additions & 31 deletions apps/hubble/src/addon/src/store/link_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,6 @@ impl LinkStore {
)
}

pub fn get_all_link_messages_by_fid(
store: &Store,
fid: u32,
page_options: &PageOptions,
) -> Result<MessagesPage, HubError> {
store.get_all_messages_by_fid(fid, page_options)
}

pub fn get_link_compact_state_message_by_fid(
store: &Store,
fid: u32,
Expand Down Expand Up @@ -672,29 +664,6 @@ impl LinkStore {
Ok(promise)
}

pub fn js_get_all_link_messages_by_fid(mut cx: FunctionContext) -> JsResult<JsPromise> {
let store = get_store(&mut cx)?;

let fid = cx.argument::<JsNumber>(0).unwrap().value(&mut cx) as u32;
let page_options = get_page_options(&mut cx, 1)?;

// fid must be specified
if fid == 0 {
return cx.throw_error("fid is required");
}

let channel = cx.channel();
let (deferred, promise) = cx.promise();

THREAD_POOL.lock().unwrap().execute(move || {
let messages = Self::get_all_link_messages_by_fid(&store, fid, &page_options);

deferred_settle_messages(deferred, &channel, messages);
});

Ok(promise)
}

pub fn js_get_link_compact_state_message_by_fid(
mut cx: FunctionContext,
) -> JsResult<JsPromise> {
Expand Down
16 changes: 16 additions & 0 deletions apps/hubble/src/addon/src/store/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,19 @@ pub fn delete_message_transaction(

Ok(())
}

pub fn is_message_in_time_range(
start_time: Option<u32>,
stop_time: Option<u32>,
message: &MessageProto,
) -> bool {
let start_time = start_time.unwrap_or(std::u32::MIN);
let stop_time = stop_time.unwrap_or(std::u32::MAX);
match &message.data {
None => {
// We expect all valid messages to have data
return false;
}
Some(data) => return data.timestamp >= start_time && data.timestamp <= stop_time,
};
}
35 changes: 27 additions & 8 deletions apps/hubble/src/addon/src/store/store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{
bytes_compare, delete_message_transaction, get_message, hub_error_to_js_throw,
make_message_primary_key, message, message_decode, message_encode, put_message_transaction,
is_message_in_time_range, make_message_primary_key, message, message_decode, message_encode,
put_message_transaction,
utils::{self, encode_messages_to_js_object, get_page_options, get_store, vec_to_u8_24},
MessagesPage, StoreEventHandler, TS_HASH_LENGTH,
};
Expand Down Expand Up @@ -1014,14 +1015,17 @@ impl Store {
pub fn get_all_messages_by_fid(
&self,
fid: u32,
start_time: Option<u32>,
stop_time: Option<u32>,
page_options: &PageOptions,
) -> Result<MessagesPage, HubError> {
let prefix = make_message_primary_key(fid, self.store_def.postfix(), None);
let messages =
message::get_messages_page_by_prefix(&self.db, &prefix, &page_options, |message| {
self.store_def.is_add_type(&message)
|| (self.store_def.remove_type_supported()
&& self.store_def.is_remove_type(&message))
is_message_in_time_range(start_time, stop_time, message)
&& (self.store_def.is_add_type(&message)
|| (self.store_def.remove_type_supported()
&& self.store_def.is_remove_type(&message)))
})?;

Ok(messages)
Expand Down Expand Up @@ -1263,15 +1267,30 @@ impl Store {

let fid = cx.argument::<JsNumber>(0).unwrap().value(&mut cx) as u32;
let page_options = get_page_options(&mut cx, 1)?;
let start_time = match cx.argument_opt(2) {
Some(arg) => match arg.downcast::<JsNumber, _>(&mut cx) {
Ok(v) => Some(v.value(&mut cx) as u32),
_ => None,
},
None => None,
};
let stop_time = match cx.argument_opt(3) {
Some(arg) => match arg.downcast::<JsNumber, _>(&mut cx) {
Ok(v) => Some(v.value(&mut cx) as u32),
_ => None,
},
None => None,
};

let channel = cx.channel();
let (deferred, promise) = cx.promise();

deferred.settle_with(&channel, move |mut tcx| {
let messages = match store.get_all_messages_by_fid(fid, &page_options) {
Ok(messages) => messages,
Err(e) => return tcx.throw_error(format!("{}/{}", e.code, e.message)),
};
let messages =
match store.get_all_messages_by_fid(fid, start_time, stop_time, &page_options) {
Ok(messages) => messages,
Err(e) => return tcx.throw_error(format!("{}/{}", e.code, e.message)),
};

encode_messages_to_js_object(&mut tcx, messages)
});
Expand Down
34 changes: 31 additions & 3 deletions apps/hubble/src/addon/src/store/user_data_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::{
bytes_compare, encode_messages_to_js_object, get_page_options, get_store,
hub_error_to_js_throw, make_user_key,
hub_error_to_js_throw, is_message_in_time_range, make_user_key,
name_registry_events::{
delete_username_proof_transaction, get_fname_proof_by_fid, get_username_proof,
put_username_proof_transaction,
Expand Down Expand Up @@ -226,17 +226,45 @@ impl UserDataStore {
store: &Store,
fid: u32,
page_options: &PageOptions,
start_time: Option<u32>,
stop_time: Option<u32>,
) -> Result<MessagesPage, HubError> {
store.get_adds_by_fid::<fn(&protos::Message) -> bool>(fid, page_options, None)
store.get_adds_by_fid(
fid,
page_options,
Some(|message: &Message| {
return is_message_in_time_range(start_time, stop_time, message);
}),
)
}

pub fn js_get_user_data_adds_by_fid(mut cx: FunctionContext) -> JsResult<JsPromise> {
let store = get_store(&mut cx)?;

let fid = cx.argument::<JsNumber>(0)?.value(&mut cx) as u32;
let page_options = get_page_options(&mut cx, 1)?;
let start_time = match cx.argument_opt(2) {
Some(arg) => match arg.downcast::<JsNumber, _>(&mut cx) {
Ok(v) => Some(v.value(&mut cx) as u32),
_ => None,
},
None => None,
};
let stop_time = match cx.argument_opt(3) {
Some(arg) => match arg.downcast::<JsNumber, _>(&mut cx) {
Ok(v) => Some(v.value(&mut cx) as u32),
_ => None,
},
None => None,
};

let messages = match Self::get_user_data_adds_by_fid(&store, fid, &page_options) {
let messages = match Self::get_user_data_adds_by_fid(
&store,
fid,
&page_options,
start_time,
stop_time,
) {
Ok(messages) => messages,
Err(e) => return hub_error_to_js_throw(&mut cx, e),
};
Expand Down
85 changes: 55 additions & 30 deletions apps/hubble/src/rpc/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1201,12 +1201,17 @@ export default class Server {
const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown");
log.debug({ method: "getAllCastMessagesByFid", req: call.request }, `RPC call from ${peer}`);

const { fid, pageSize, pageToken, reverse } = call.request;
const result = await this.engine?.getAllCastMessagesByFid(fid, {
pageSize,
pageToken,
reverse,
});
const { fid, pageSize, pageToken, reverse, startTimestamp, stopTimestamp } = call.request;
const result = await this.engine?.getAllCastMessagesByFid(
fid,
{
pageSize,
pageToken,
reverse,
},
startTimestamp,
stopTimestamp,
);
result?.match(
(page: MessagesPage<CastAddMessage | CastRemoveMessage>) => {
callback(null, messagesPageToResponse(page));
Expand All @@ -1220,12 +1225,17 @@ export default class Server {
const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown");
log.debug({ method: "getAllReactionMessagesByFid", req: call.request }, `RPC call from ${peer}`);

const { fid, pageSize, pageToken, reverse } = call.request;
const result = await this.engine?.getAllReactionMessagesByFid(fid, {
pageSize,
pageToken,
reverse,
});
const { fid, pageSize, pageToken, reverse, startTimestamp, stopTimestamp } = call.request;
const result = await this.engine?.getAllReactionMessagesByFid(
fid,
{
pageSize,
pageToken,
reverse,
},
startTimestamp,
stopTimestamp,
);
result?.match(
(page: MessagesPage<ReactionAddMessage | ReactionRemoveMessage>) => {
callback(null, messagesPageToResponse(page));
Expand All @@ -1239,12 +1249,17 @@ export default class Server {
const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown");
log.debug({ method: "getAllVerificationMessagesByFid", req: call.request }, `RPC call from ${peer}`);

const { fid, pageSize, pageToken, reverse } = call.request;
const result = await this.engine?.getAllVerificationMessagesByFid(fid, {
pageSize,
pageToken,
reverse,
});
const { fid, pageSize, pageToken, reverse, startTimestamp, stopTimestamp } = call.request;
const result = await this.engine?.getAllVerificationMessagesByFid(
fid,
{
pageSize,
pageToken,
reverse,
},
startTimestamp,
stopTimestamp,
);
result?.match(
(page: MessagesPage<VerificationAddAddressMessage | VerificationRemoveMessage>) => {
callback(null, messagesPageToResponse(page));
Expand All @@ -1258,12 +1273,17 @@ export default class Server {
const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown");
log.debug({ method: "getAllUserDataMessagesByFid", req: call.request }, `RPC call from ${peer}`);

const { fid, pageSize, pageToken, reverse } = call.request;
const result = await this.engine?.getUserDataByFid(fid, {
pageSize,
pageToken,
reverse,
});
const { fid, pageSize, pageToken, reverse, startTimestamp, stopTimestamp } = call.request;
const result = await this.engine?.getUserDataByFid(
fid,
{
pageSize,
pageToken,
reverse,
},
startTimestamp,
stopTimestamp,
);
result?.match(
(page: MessagesPage<UserDataAddMessage>) => {
callback(null, messagesPageToResponse(page));
Expand All @@ -1277,12 +1297,17 @@ export default class Server {
const peer = Result.fromThrowable(() => call.getPeer())().unwrapOr("unknown");
log.debug({ method: "getAllLinkMessagesByFid", req: call.request }, `RPC call from ${peer}`);

const { fid, pageSize, pageToken, reverse } = call.request;
const result = await this.engine?.getAllLinkMessagesByFid(fid, {
pageSize,
pageToken,
reverse,
});
const { fid, pageSize, pageToken, reverse, startTimestamp, stopTimestamp } = call.request;
const result = await this.engine?.getAllLinkMessagesByFid(
fid,
{
pageSize,
pageToken,
reverse,
},
startTimestamp,
stopTimestamp,
);
result?.match(
(page: MessagesPage<LinkAddMessage | LinkRemoveMessage>) => {
callback(null, messagesPageToResponse(page));
Expand Down
Loading

0 comments on commit cc0d0a3

Please sign in to comment.