Skip to content

Commit

Permalink
Introduce APIs for list_recordings and update_metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
jleibs committed Nov 26, 2024
1 parent fb034b0 commit 82cf760
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 28 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5776,6 +5776,7 @@ dependencies = [
"prost",
"re_arrow2",
"re_dataframe",
"re_log",
"re_log_types",
"thiserror",
"tonic",
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_protos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ version.workspace = true
[dependencies]
re_log_types.workspace = true
re_dataframe.workspace = true
re_log.workspace = true

# External
arrow2 = { workspace = true, features = ["io_ipc"] }
Expand Down
231 changes: 213 additions & 18 deletions pixi.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ python = "=3.11"
[feature.examples-common.pypi-dependencies]
# External deps
jupyter = ">=1.0"
polars = ">=0.12.0"

segment-anything = { git = "https://github.com/facebookresearch/segment-anything.git" }
mesh-to-sdf = { git = "https://github.com/marian42/mesh_to_sdf.git" }
Expand Down
2 changes: 1 addition & 1 deletion rerun_py/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ name = "rerun_bindings" # name of the .so library that the Python module will im


[features]
default = ["extension-module"]
default = ["extension-module", "remote"]

## Extra features that aren't ready to be included in release builds yet.
extra = ["pypi", "remote"]
Expand Down
94 changes: 85 additions & 9 deletions rerun_py/src/remote.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
#![allow(unsafe_op_in_unsafe_fn)]
use arrow::{array::ArrayData, pyarrow::PyArrowType};
use arrow::{
array::{ArrayData, RecordBatch, RecordBatchIterator, RecordBatchReader},
datatypes::Schema,
pyarrow::PyArrowType,
};
// False positive due to #[pyfunction] macro
use pyo3::{exceptions::PyRuntimeError, prelude::*, types::PyDict, Bound, PyResult};
use re_chunk::TransportChunk;
use re_protos::v0::{
storage_node_client::StorageNodeClient, EncoderVersion, ListRecordingsRequest,
RecordingMetadata, RecordingType, RegisterRecordingRequest,
storage_node_client::StorageNodeClient, EncoderVersion, ListRecordingsRequest, RecordingId,
RecordingMetadata, RecordingType, RegisterRecordingRequest, UpdateRecordingMetadataRequest,
};

/// Register the `rerun.remote` module.
Expand Down Expand Up @@ -52,8 +56,9 @@ pub struct PyConnection {
#[pymethods]
impl PyConnection {
/// List all recordings registered with the node.
fn list_recordings(&mut self) -> PyResult<Vec<PyRecordingMetadata>> {
self.runtime.block_on(async {
fn list_recordings(&mut self) -> PyResult<PyArrowType<Box<dyn RecordBatchReader + Send>>> {
let reader = self.runtime.block_on(async {
// TODO(jleibs): Support column projection
let request = ListRecordingsRequest {
column_projection: None,
};
Expand All @@ -64,13 +69,33 @@ impl PyConnection {
.await
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

Ok(resp
let transport_chunks = resp
.into_inner()
.recordings
.into_iter()
.map(|recording| PyRecordingMetadata { info: recording })
.collect())
})
.map(|recording| recording.data())
.collect::<Result<Vec<_>, _>>()
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

let record_batches: Vec<Result<RecordBatch, arrow::error::ArrowError>> =
transport_chunks
.into_iter()
.map(|tc| tc.try_to_arrow_record_batch())
.collect();

// TODO(jleibs): surfacing this schema is awkward. This should be more explicit in
// the gRPC APIs somehow.
let schema = record_batches
.first()
.and_then(|batch| batch.as_ref().ok().map(|batch| batch.schema()))
.unwrap_or(std::sync::Arc::new(Schema::empty()));

let reader = RecordBatchIterator::new(record_batches, schema);

Ok::<_, PyErr>(reader)
})?;

Ok(PyArrowType(Box::new(reader)))
}

/// Register a recording along with some metadata
Expand Down Expand Up @@ -146,6 +171,57 @@ impl PyConnection {
Ok(recording_id)
})
}

/// Updates the metadata for a recording.
#[pyo3(signature = (
id,
metadata
))]
fn update_metadata(&mut self, id: &str, metadata: &Bound<'_, PyDict>) -> PyResult<()> {
self.runtime.block_on(async {
let (schema, data): (
Vec<arrow2::datatypes::Field>,
Vec<Box<dyn arrow2::array::Array>>,
) = metadata
.iter()
.map(|(key, value)| {
let key = key.to_string();
let value = value.extract::<MetadataLike>()?;
let value_array = value.to_arrow2()?;
let field =
arrow2::datatypes::Field::new(key, value_array.data_type().clone(), true);
Ok((field, value_array))
})
.collect::<PyResult<Vec<_>>>()?
.into_iter()
.unzip();

let schema = arrow2::datatypes::Schema::from(schema);

let data = arrow2::chunk::Chunk::new(data);

let metadata_tc = TransportChunk {
schema: schema.clone(),
data,
};

let metadata = RecordingMetadata::try_from(EncoderVersion::V0, &metadata_tc)
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

let request = UpdateRecordingMetadataRequest {
// TODO(jleibs): Description should really just be in the metadata
recording_id: Some(RecordingId { id: id.to_owned() }),
metadata: Some(metadata),
};

self.client
.update_recording_metadata(request)
.await
.map_err(|err| PyRuntimeError::new_err(err.to_string()))?;

Ok(())
})
}
}

/// A type alias for metadata.
Expand Down

0 comments on commit 82cf760

Please sign in to comment.