|
| 1 | +# Dynamo runtime: Transport Agnostic Dynamo Pipelines |
| 2 | + |
| 3 | +## Overview |
| 4 | + |
| 5 | +High level goal is to decouple the NATs transport from the dynamo runtime. |
| 6 | + |
| 7 | +- introduce abstractions for current NATs usages (e.g. KV router, event plane, request plane & object store, etc) which can be used to plug different implementations. |
| 8 | + |
| 9 | +- deprecate NATs object store and reduce dependencies on NATs. |
| 10 | + |
| 11 | +## Requirements |
| 12 | +- deliver messages across dynamo instances with at least once delivery guarantee. |
| 13 | +- switch between transports at runtime. |
| 14 | +- support long term architecure goals for Dynamo GA |
| 15 | + |
| 16 | +### Transport Agnostic API |
| 17 | + |
| 18 | +Dynamo communication primitives needs to support: |
| 19 | +- peer-to-peer (req/reply: request plane) and scoped broadcasts (event plane) |
| 20 | +- communication regimes: single process, single node (multi process) and multi-node |
| 21 | +- transport options: NATs, Raw TCP, ZMQ, HTTP SSE, GRPC, UCX active messaging |
| 22 | + |
| 23 | +### Deprecate NATs Object store usage |
| 24 | + - Router snapshots are stored in NATs object store. |
| 25 | + - Model files are stored in NATs object store. |
| 26 | + |
| 27 | +### Long term architectural goals: |
| 28 | +Support: |
| 29 | + |
| 30 | +- separation of Frontend (3-in-1 across in-process, same node or remote node) |
| 31 | + |
| 32 | +- HTTP based endpoint for one off usage of a component (KV router, etc) |
| 33 | + |
| 34 | +- batching/muxing messages for Req/Responses: |
| 35 | + - we can see a perf benefit by batching multiple requests together over a network round-trip. |
| 36 | + |
| 37 | +- Simplify `dynamo namespace` usage and process heirarchy (namespace, component, etc) |
| 38 | + - `dynamo namespace` is causing unnecessaary cognitive complexity for end-users. |
| 39 | + - Support mapping to more meaningful Grove concepts like PodClique, PodCliqueSet, etc. |
| 40 | + |
| 41 | +## Usage Patterns |
| 42 | + |
| 43 | +More details in [NATs use cases](#nats-use-cases) |
| 44 | + |
| 45 | +| Plane | Purpose / Protocol | Delivery Guarantee / Notes | Current NATs Usage Example | |
| 46 | +|---------------|------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------| |
| 47 | +| Request Plane | Protocol for request context, control messages, and request cancellation | At least once (with idempotent handlers); supports request/reply, cancellation, and timeouts | Service stats collection, direct requests | |
| 48 | +| Event Plane | Delivery of KV events and other system events | At least once (preferred); events are idempotent, redelivered messages are ignored | KV router event publishing/consumption | |
| 49 | +| Metrics | Collection and reporting of runtime/service metrics | At least once (metrics can be aggregated, duplicate-safe) | Service stats, health checks | |
| 50 | +| Object Store | Ephemeral storage for model files and artifacts | At least once (object upload/download, may use JetStream Object Store) | Model file storage, file upload/download | |
| 51 | + |
| 52 | +**Notes:** |
| 53 | +- All planes should support pluggable transports (NATs, ZMQ, HTTP SSE, GRPC, UCX, etc). |
| 54 | +- At least once delivery is preferred for reliability; consumers must be idempotent. |
| 55 | +- Request Plane requires protocol for cancellation and context propagation. |
| 56 | +- Object Store usage via NATs is being deprecated in favor of more direct or persistent solutions. |
| 57 | + |
| 58 | + |
| 59 | +## Proposal |
| 60 | + |
| 61 | + |
| 62 | + |
| 63 | +### Transport Agnostic API |
| 64 | + |
| 65 | +Use unique identifiers to identify the target entity. |
| 66 | + |
| 67 | +`DynamoEntityID`: a unique address for each dynamo process. |
| 68 | + - An Opaque `u128` identifier that uniquely identifies a single instance or collection of instances (component, namespace, etc) |
| 69 | + - each dynamo process has a unique DynamoEntityID and can be used to communicate with other dynamo processes. |
| 70 | + - can be used to identify the target and establish connection by `DynamoNetworkManager`. |
| 71 | + - discovery service api can be used to find participating dynamo entities. |
| 72 | + - dynamo application will use communication primitives: (publish, subscribe, request) and this ID to route messages to the target in transport agnostic manner. |
| 73 | + |
| 74 | +`DynamoNetworkManager`: manages communication between dynamo processes. |
| 75 | + - manages listening ports and client connections to remote peers |
| 76 | + - responsible for serializing and deserializing messages for different transports |
| 77 | + - responsible for handling timeout error, retry and cancellation for request/reply pattern |
| 78 | + - responsible for sending and receiving messages |
| 79 | + - Handles remote peer connections: in-process, local host or remote host |
| 80 | + |
| 81 | +High level`DynamoNetworkManager` API: |
| 82 | + |
| 83 | +``` |
| 84 | +
|
| 85 | +publish(id: DynamoEntityID, topic: str, message: InputType) -> Result<(), Error> |
| 86 | +
|
| 87 | +subscribe(id: DynamoEntityID, topic: str, handler: fn(message: InputType) -> Result<(), Error>) -> Result<(), Error> |
| 88 | +
|
| 89 | +request(id: DynamoEntityID, topic: str, message: InputType) -> Future<Result<OutputType, Error>> |
| 90 | +``` |
| 91 | + |
| 92 | + |
| 93 | + |
| 94 | + |
| 95 | +#### DynamoEntityID |
| 96 | + |
| 97 | +Inspired by inode and ProcessID from Operating System and Grove concepts. |
| 98 | + |
| 99 | +Uniquely identify a single instance or collection of instances (component, namespace, etc) |
| 100 | + |
| 101 | + |
| 102 | +```rust |
| 103 | +pub struct DynamoEntityId { |
| 104 | + pub id: u128, |
| 105 | + pub entity_type: EntityType, |
| 106 | + pub name: Option<Arc<String>>, // Optional human-readable name for debugging and logging |
| 107 | + pub children: Vec<DynamoEntityId>, // Optional children for collection of instances |
| 108 | +} |
| 109 | + |
| 110 | +impl DynamoEntityId { |
| 111 | + /// Create from name with automatic hashing |
| 112 | + pub fn from_name(name: &str, entity_type: EntityType) -> Self { |
| 113 | + Self { |
| 114 | + id: hash_name(name), |
| 115 | + name: Some(Arc::new(name.to_string())), |
| 116 | + entity_type: entity_type, |
| 117 | + children: Vec::new(), |
| 118 | + } |
| 119 | + } |
| 120 | +} |
| 121 | + |
| 122 | +enum EntityType { |
| 123 | + Instance, |
| 124 | + Collection, |
| 125 | +} |
| 126 | +``` |
| 127 | + |
| 128 | +`EntityType`: single instance or collection of instances (currently component, namespace, etc) |
| 129 | + |
| 130 | +`name`: String (optional) |
| 131 | +- Human-readable identifiers for debugging and logging |
| 132 | +- Configuration files (YAML/JSON) |
| 133 | +- Command-line interfaces |
| 134 | +- Logging and observability |
| 135 | + |
| 136 | + |
| 137 | +TODO: |
| 138 | +- map these ids cleanly to grove concepts like PodClique/PodCliqueSet/PodCliqueSet |
| 139 | +- show a diagram of the entity hierarchy |
| 140 | + |
| 141 | + |
| 142 | +### Object Store Interface |
| 143 | + |
| 144 | +// Todo: add clean interface for object store |
| 145 | + |
| 146 | +### Implementation |
| 147 | +- Phase 1 |
| 148 | + * degraded feature set |
| 149 | + * not use KV router if they want. Best effort |
| 150 | + * nats |
| 151 | + * No HA guarantees for router |
| 152 | + * Operate without high availability w/ single router |
| 153 | +- Phase 2 |
| 154 | + * explore transports |
| 155 | + * durability |
| 156 | + * exactly once delivery |
| 157 | + |
| 158 | +## Guiding principles |
| 159 | + |
| 160 | +### Generic Messaging Protocol |
| 161 | +Decouple messaging protocol from the underlying transport like Raw TCP, ZMQ or (HTTP, GRPC, and UCX active message). |
| 162 | + |
| 163 | +Phased approach: start with Nats, ZMQ and HTTP SSE. |
| 164 | +Later, incrementally expand to support more advanced transports, ensuring that the protocol remains adaptable to requirements. |
| 165 | + |
| 166 | +### Handshake and Closure Protocols: |
| 167 | +Robust handshake and closure protocols, using sentinels and message headers to signal the end of stream or cancellation. |
| 168 | +A common semantic for closing requests and handling errors, will be generalized across different transports. |
| 169 | + |
| 170 | +### Multipart Message Structure |
| 171 | +Use a multipart message structure, inspired by ZMQ's native multipart support, to encapsulate headers, body, and control signals (such as closure control signals or error notifications). |
| 172 | + |
| 173 | +### Better Python-Rust Interoperability and Data class generation |
| 174 | + |
| 175 | +Improve Python-Rust interoperability, focusing on auto-generating Python data classes from Rust structs using Pydantic. |
| 176 | +This way message schemas are aligned and we can reduce manual coding and serialization errors. |
| 177 | + |
| 178 | + |
| 179 | +## Additional notes |
| 180 | + |
| 181 | +## NATs use cases |
| 182 | + |
| 183 | +### 1. NatsQueue python binding |
| 184 | +- **Location**: `lib/bindings/python/rust/llm/nats.rs` (`NatsQueue`) |
| 185 | +- **Functionality**: |
| 186 | +- Deprecated: We don't use `NatsQueue` python binding anymore. We use `NatsQueue` rust binding instead. |
| 187 | +- We can remove the python binding and the associated tests to simplify the codebase. |
| 188 | + |
| 189 | +### 2. JetStream-backed Queue/Event Bus |
| 190 | +- **Location**: `lib/runtime/src/transports/nats.rs` (`NatsQueue`) |
| 191 | +- **Functionality**: |
| 192 | + - Stream creation per subject pattern `{stream_name}.*` |
| 193 | + - Publisher-only, worker-group, and broadcast consumer modes |
| 194 | + - Durable consumers with pull-based consumption |
| 195 | + - Administrative operations (purge, consumer management) |
| 196 | + |
| 197 | +### 3. Event Publishing for KV Router |
| 198 | +- **Location**: `lib/llm/src/kv_router/publisher.rs` |
| 199 | +- **Functionality**: |
| 200 | + - Publishes KV cache events from ZMQ or direct sources |
| 201 | + - Uses `EventPublisher` trait to send events |
| 202 | + |
| 203 | +### 4. Event Consumption for KV Router |
| 204 | +- **Location**: `lib/llm/src/kv_router/subscriber.rs` |
| 205 | +- **Functionality**: |
| 206 | + - Consumes `RouterEvent` messages via durable consumers |
| 207 | + - Handles state snapshots and stream purging |
| 208 | + |
| 209 | +### 5. Object Store (JetStream Object Store) |
| 210 | +- **Location**: `lib/runtime/src/transports/nats.rs` |
| 211 | +- **Functionality**: |
| 212 | + - File upload/download operations |
| 213 | + - Typed data serialization with bincode |
| 214 | + - Bucket management and cleanup |
| 215 | + |
| 216 | +### 6. Key-Value Store (JetStream KV) |
| 217 | +- **Location**: `lib/runtime/src/storage/key_value_store/nats.rs` |
| 218 | +- **Functionality**: |
| 219 | + - Implements `KeyValueStore` trait |
| 220 | + - CRUD operations with conflict resolution |
| 221 | + - Watch streams for real-time updates |
| 222 | + |
| 223 | +### 7. Request/Reply Pattern |
| 224 | +- **Location**: `lib/runtime/src/transports/nats.rs` |
| 225 | +- **Functionality**: |
| 226 | + - Service stats collection via broadcast requests |
| 227 | + - Each service responds once to stats queries |
| 228 | + |
| 229 | +### 8 KVBM Nats usage (todo) |
| 230 | + |
| 231 | + |
| 232 | +## Message Delivery Guarantees |
| 233 | + |
| 234 | +### At least once delivery (preferred) |
| 235 | +- No message loss is possible. |
| 236 | +- Message is delivered at least once to the consumers |
| 237 | +- consumers should be idempotent and be able to handle duplicate messages. |
| 238 | + |
| 239 | +### Exactly once delivery |
| 240 | +- needs stateful tracking of messages and ack/nack coordination to ensure exactly once delivery. |
| 241 | + |
| 242 | +### At most once delivery |
| 243 | +- Message loss is possible. |
0 commit comments