-
Notifications
You must be signed in to change notification settings - Fork 1
Handlers
Handlers in cross.stream (xs
) allow you to process incoming frames as they are appended to the store. Each handler is defined by a Nushell closure that executes against incoming frames.
A handler is a Nushell closure that takes a frame as input and optionally returns a value or performs actions:
{ |frame|
if $frame.topic == "ping" {
"pong" # Will be appended to handler.out
}
}
The handler closure receives each new frame and can:
- Process the frame's content
- Return a value (which gets automatically appended to
<handler-name>.out
) - Explicitly append new frames using the
.append
command - Filter which frames to process using conditionals
To register a handler, append a frame with the topic <handler-name>.register
. The frame should contain:
- The handler closure as text (stored in the CAS)
- Optional metadata to configure the handler's behavior
Example registration:
{ |frame|
if $frame.topic == "ping" {
"pong"
}
} | .append --topic echo.register --meta {
start: "root", # Process all frames from beginning
return_options: {
postfix: ".response", # Append responses to echo.response instead of echo.out
ttl: "head:1" # Only keep most recent response
}
}
When registering a handler, you can include metadata to configure its behavior:
Field | Description |
---|---|
start |
Where to begin processing in the stream (see below) |
pulse |
Interval in milliseconds to send synthetic xs.pulse events |
return_options |
Controls how return values are published |
The start
field determines where the handler begins processing frames:
-
"tail"
: Only process new frames after registration (default) -
"root"
: Process all frames from the beginning -
{"cursor": "topic"}
: Start from the frame referenced by frame_id in the topic's metadata -
{"after": "topic"}
: Begin after the most recent frame of the specified topic
The return_options
field controls how return values are handled:
-
postfix
: String appended to handler's name for output topic (default: ".out") -
ttl
: Time-to-live for output frames-
"forever"
: Never expire -
"ephemeral"
: Remove after reading -
"time:<milliseconds>"
: Expire after duration -
"head:<n>"
: Keep only N most recent frames
-
Handlers can produce output in two ways:
-
Return Values: Any non-null return value is automatically appended to the handler's output topic (
<handler-name>.out
by default unless modified by return_options.postfix)
{ |frame|
if $frame.topic == "ping" {
"pong" # Automatically appended to handler.out
}
}
-
Explicit Appends: Use the
.append
command to create frames on any topic
{ |frame|
if $frame.topic == "ping" {
"pong" | .append response.topic --meta {"type": "response"}
"logged" | .append audit.topic
}
}
All output frames automatically include:
-
handler_id
: ID of the handler that created the frame -
frame_id
: ID of the frame that triggered the handler
A handler can be unregistered by:
- Appending
<handler-name>.unregister
- Registering a new handler with the same name
- Runtime errors in the handler closure
When unregistered, the handler appends a confirmation frame <handler-name>.unregistered
. If unregistered due to an error, the frame includes an error
field in its metadata.
If a handler encounters an error during execution:
- The handler is automatically unregistered
- A frame is appended to
<handler-name>.unregistered
with:- The error message in metadata
- Reference to the triggering frame
A handler becomes stateful by accepting a second state
parameter. The state persists across frames and can be updated by the handler.
Example stateful handler:
{ |frame, state|
mut state = $state # Create mutable state variable
if $frame.topic == "count.me" {
$state.count += 1
$state | .append counter.state # Save updated state
}
}
When registering a stateful handler, you can provide additional configuration:
{ |frame, state| ... } | .append counter.register --meta {
initial_state: { count: 0 }, # Optional initial state value
start: { cursor: "counter.state" }, # Resume from frame that triggered last state update
}
The state is:
- Initialized from
initial_state
metadata if provided - Persisted when handler appends to
<handler-name>.state
- Automatically restored when handler restarts
- Referenced in output frame metadata via
state_id