Skip to content

Commit

Permalink
RRD manifests bootstrap
Browse files Browse the repository at this point in the history
large
  • Loading branch information
teh-cmc committed Feb 19, 2025
1 parent 9265340 commit b06e556
Show file tree
Hide file tree
Showing 5 changed files with 663 additions and 9 deletions.
3 changes: 3 additions & 0 deletions crates/store/re_format_arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ pub fn format_record_batch_opts(
}

/// Nicely format this record batch, either with the given fixed width, or with the terminal width (`None`).
///
/// If `transposed` is `true`, the dataframe will be printed transposed on its diagonal axis.
/// This is very useful for wide (i.e. lots of columns), short (i.e. not many rows) datasets.
pub fn format_record_batch_with_width(
batch: &arrow::array::RecordBatch,
width: Option<usize>,
Expand Down
9 changes: 9 additions & 0 deletions crates/store/re_log_types/src/time_point/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ pub enum TimeType {
Sequence,
}

impl std::fmt::Display for TimeType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Time => f.write_str("time"),
Self::Sequence => f.write_str("sequence"),
}
}
}

impl TimeType {
#[inline]
fn hash(&self) -> u64 {
Expand Down
193 changes: 189 additions & 4 deletions crates/store/re_protos/proto/rerun/v0/remote_store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,20 @@ package rerun.remote_store.v0;
import "rerun/v0/common.proto";

service StorageNode {
// data API calls
// Data APIs

rpc Query(QueryRequest) returns (stream DataframePart) {}

rpc FetchRecording(FetchRecordingRequest) returns (stream rerun.common.v0.RerunChunk) {}

// Index APIs

rpc CreateIndex(CreateIndexRequest) returns (CreateIndexResponse) {}

rpc ReIndex(ReIndexRequest) returns (ReIndexResponse) {}

rpc GetChunkIds(GetChunkIdsRequest) returns (stream GetChunkIdsResponse) {}

rpc GetChunks(GetChunksRequest) returns (stream rerun.common.v0.RerunChunk) {}

// The response to `SearchIndex` a RecordBatch with 3 columns:
Expand All @@ -25,17 +31,32 @@ service StorageNode {
// - 'data' column with the data that is returned for the matched timepoints
rpc SearchIndex(SearchIndexRequest) returns (stream DataframePart) {}

// metadata API calls
// Chunk manifest APIs

rpc CreateManifests(CreateManifestsRequest) returns (CreateManifestsResponse) {}

rpc ListManifests(ListManifestsRequest) returns (stream DataframePart) {}

rpc QueryManifest(QueryManifestRequest) returns (stream DataframePart) {}

// TODO(zehiko, cmc): DeleteManifest

// Metadata APIs

rpc QueryCatalog(QueryCatalogRequest) returns (stream DataframePart) {}

rpc UpdateCatalog(UpdateCatalogRequest) returns (UpdateCatalogResponse) {}

rpc GetRecordingSchema(GetRecordingSchemaRequest) returns (GetRecordingSchemaResponse) {}

// Registration APIs

// TODO(zehiko) support registering more than one recording at a time
rpc RegisterRecording(RegisterRecordingRequest) returns (DataframePart) {}

rpc UnregisterRecording(UnregisterRecordingRequest) returns (UnregisterRecordingResponse) {}
rpc UnregisterAllRecordings(UnregisterAllRecordingsRequest)
returns (UnregisterAllRecordingsResponse) {}

rpc UnregisterAllRecordings(UnregisterAllRecordingsRequest) returns (UnregisterAllRecordingsResponse) {}
}

// ---------------- Common response message ------------------
Expand Down Expand Up @@ -192,6 +213,170 @@ message CatalogEntry {
string name = 1;
}


// ---------------- CreateManifests ------------------

// TODO(zehiko, cmc): At some point, this will need to be fully async (i.e. caller gets assigned
// a unique request ID and polls it for completion), but:
// A) Let's wait until we have a real need for this.
// B) That's true of everything in the platform, so this needs to be properly generalized.

message CreateManifestsRequest {
// Which catalog entry do we want to create manifests for?
CatalogEntry entry = 1;
}

message CreateManifestsResponse {}

// ---------------- ListManifests ------------------

message ListManifestsRequest {
// Which catalog entry do we want to list the manifests of?
CatalogEntry entry = 1;

// Generic parameters that will influence the behavior of the Lance scanner.
//
// TODO(zehiko, cmc): actually support those.
ScanParameters scan_parameters = 500;
}

message ListManifestsResponse {
rerun.common.v0.EncoderVersion encoder_version = 1;

// The record batch of the response, encoded according to `encoder_version`.
bytes payload = 2;
}

// ---------------- QueryManifest ------------------

// TODO(zehiko, cmc): Being able to specify only a collection ID rather than a resource ID could be
// super useful for cross-recording queries (resource_id becomes a column of the result).

// A manifest query will find all the relevant chunk IDs (and optionally a bunch of related metadata)
// for a given Rerun query (latest-at, range, etc).
//
// The result might contain duplicated chunk IDs, it is the responsibility of the caller to deduplicate
// them as needed.
message QueryManifestRequest {
// What resource are we querying the manifest for?
rerun.common.v0.RecordingId resource_id = 100;

// What columns of the manifest are we interested in?
ColumnProjection columns = 200;

// If true, `columns` will contain the entire schema.
bool columns_always_include_everything = 210;

// If true, `columns` always includes `chunk_id`,
bool columns_always_include_chunk_ids = 220;

// If true, `columns` always includes `byte_offset` and `byte_size`.
bool columns_always_include_byte_offsets = 230;

// If true, `columns` always includes all static component-level indexes.
bool columns_always_include_static_indexes = 240;

// If true, `columns` always includes all temporal chunk-level indexes.
bool columns_always_include_global_indexes = 250;

// If true, `columns` always includes all component-level indexes.
bool columns_always_include_component_indexes = 260;

// If specified, will perform a latest-at query with the given parameters.
//
// Incompatible with `range`.
QueryManifestLatestAtRelevantChunks latest_at = 300;

// If specified, will perform a range query with the given parameters.
//
// Incompatible with `latest_at`.
QueryManifestRangeRelevantChunks range = 400;

// Generic parameters that will influence the behavior of the Lance scanner.
ScanParameters scan_parameters = 500;
}

message QueryManifestLatestAtRelevantChunks {
// Which index column should we perform the query on? E.g. `log_time`.
rerun.common.v0.IndexColumnSelector index = 1;

// What index value are we looking for?
int64 at = 2;

// Which components are we interested in?
//
// If left unspecified, all existing components are considered of interest.
//
// This will perform a basic fuzzy match on the available columns' descriptors.
// The fuzzy logic is a simple case-sensitive `contains()` query.
// For example, given a `log_tick__SeriesLine:StrokeWidth#width` index, all of the following
// would match: `SeriesLine:StrokeWidth#width`, `StrokeWidth`, `Stroke`, `Width`, `width`,
// `SeriesLine`, etc.
repeated string fuzzy_descriptors = 3;
}

message QueryManifestRangeRelevantChunks {
// Which index column should we perform the query on? E.g. `log_time`.
rerun.common.v0.IndexColumnSelector index = 1;

// What index range are we looking for?
rerun.common.v0.TimeRange index_range = 2;

// Which components are we interested in?
//
// If left unspecified, all existing components are considered of interest.
//
// This will perform a basic fuzzy match on the available columns' descriptors.
// The fuzzy logic is a simple case-sensitive `contains()` query.
// For example, given a `log_tick__SeriesLine:StrokeWidth#width` index, all of the following
// would match: `SeriesLine:StrokeWidth#width`, `StrokeWidth`, `Stroke`, `Width`, `width`,
// `SeriesLine`, etc.
repeated string fuzzy_descriptors = 3;
}

// Generic parameters that will influence the behavior of the Lance scanner.
//
// TODO(zehiko, cmc): This should be available for every endpoint that queries data in
// one way or another.
message ScanParameters {
// An arbitrary filter expression that will be passed to the Lance scanner as-is.
//
// ```
// scanner.filter(filter)
// ```
string filter = 100;

// An arbitrary offset that will be passed to the Lance scanner as-is.
//
// ```
// scanner.limit(_, limit_offset)
// ```
int64 limit_offset = 200;

// An arbitrary limit that will be passed to the Lance scanner as-is.
//
// ```
// scanner.limit(limit_len, _)
// ```
int64 limit_len = 201;

// An arbitrary order clause that will be passed to the Lance scanner as-is.
//
// ```
// scanner.order_by(…)
// ```
ScanParametersOrderClause order_by = 300;

// If set, the output of `scanner.explain_plan` will be dumped to the server's log.
bool explain = 400;
}

message ScanParametersOrderClause {
bool ascending = 10;
bool nulls_first = 20;
string column_name = 30;
}

// ---------------- GetRecordingSchema ------------------

message GetRecordingSchemaRequest {
Expand Down
Loading

0 comments on commit b06e556

Please sign in to comment.