Skip to content

Handlers

Andy Gayton edited this page Dec 20, 2024 · 10 revisions

cross.stream Handlers

Cross.stream (xs) handlers use Nushell closures to process and act on incoming frames as they are appended to the store.

Handler Basics

A handler is a Nushell closure that takes a frame as input and optionally returns a value or performs actions:

{ |frame|
    if $frame.topic == "ping" {
        "pong"  # Will be appended to handler.out
    }
}

The handler closure receives each new frame and can:

  • Process the frame's content
  • Return a value (which gets automatically appended to <handler-name>.out)
  • Explicitly append new frames using the .append command
  • Filter which frames to process using conditionals

Registering Handlers

To register a handler, append a frame with the topic <handler-name>.register. The frame should contain:

  • The handler closure as text (stored in the CAS)
  • Optional metadata to configure the handler's behavior

Example registration:

{ |frame|
    if $frame.topic == "ping" {
        "pong"
    }
} | .append --topic echo.register --meta {
    start: "root",  # Process all frames from beginning
    return_options: {
        suffix: ".response",  # Append responses to echo.response instead of echo.out
        ttl: "head:1"  # Only keep most recent response
    }
}

Handler Configuration Options

When registering a handler, you can include metadata to configure its behavior:

Field Description
start Where to begin processing in the stream (see below)
pulse Interval in milliseconds to send synthetic xs.pulse events
return_options Controls how return values are published
modules Map of module name to frame ID for loading Nushell modules
with_env Map of environment variable name to frame ID for setting Nushell env vars

Start Options

The start field determines where the handler begins processing frames:

  • "tail": Only process new frames after registration (default)
  • "root": Process all frames from the beginning
  • {"cursor": "topic"}: Start from the frame referenced by frame_id in the topic's metadata
  • {"after": "topic"}: Begin after the most recent frame of the specified topic

Return Options

The return_options field controls how return values are handled:

  • suffix: String appended to handler's name for output topic (default: ".out")
  • ttl: Time-to-live for output frames
    • "forever": Never expire
    • "ephemeral": Remove after reading
    • "time:<milliseconds>": Expire after duration
    • "head:<n>": Keep only N most recent frames

Using Modules

The modules option allows handlers to use Nushell modules stored as frames. Each module must be stored as a frame containing valid Nushell module code. Example:

# First create a module
r#'
    export def double [x] { $x * 2 }
'# | .append my-math.nu

# Then register a handler that uses it
r#'
    {|frame|
        my-math double 8
    }
'# | .append processor.register --meta {modules: {"my-math": (.head "my-math.nu").id}}

Module exports are available within the handler's closure using the specified module name as prefix.

Setting Environment Variables

The with_env option allows handlers to access frame content through Nushell environment variables. Each frame's content is loaded into the specified environment variable. Example:

# First create a frame with content
"secret-key-123" | .append secrets.api-key

# Then register a handler that uses it
r#"
{|frame|
    $env.API_KEY  # Will contain "secret-key-123"
}
"# | .append processor.register --meta {
    with_env: {
        "API_KEY": (.head "secrets.api-key").id
    }
}

Handler Output

Handlers can produce output in two ways:

  1. Return Values: Any non-null return value is automatically appended to the handler's output topic (<handler-name>.out by default unless modified by return_options.suffix)
{ |frame|
    if $frame.topic == "ping" {
        "pong"  # Automatically appended to handler.out
    }
}
  1. Explicit Appends: Use the .append command to create frames on any topic
{ |frame|
    if $frame.topic == "ping" {
        "pong" | .append response.topic --meta {"type": "response"}
        "logged" | .append audit.topic
    }
}

All output frames automatically include:

  • handler_id: ID of the handler that created the frame
  • frame_id: ID of the frame that triggered the handler

Handler Lifecycle

stateDiagram-v2
    [*] --> Registering: .register event
    Registering --> Unregistered: nushell parse error
    Registering --> Registered : parse OK
    Unregistered --> [*]

    state Registered {
        direction LR
        [*] --> events.recv()
        events.recv() --> should_run: event received

        should_run --> events.recv(): skip
        should_run --> process_event: yep
        should_run --> [*]: .unregister event

        process_event --> [*]: error encountered
        process_event --> events.recv(): OK
    }

    Registered --> Unregistered
Loading

Unregistering

A handler can be unregistered by:

  • Appending <handler-name>.unregister
  • Registering a new handler with the same name
  • Runtime errors in the handler closure

When unregistered, the handler appends a confirmation frame <handler-name>.unregistered. If unregistered due to an error, the frame includes an error field in its metadata.

Error Handling

If a handler encounters an error during execution:

  1. The handler is automatically unregistered
  2. A frame is appended to <handler-name>.unregistered with:
    • The error message in metadata
    • Reference to the triggering frame

Stateful Handlers

A handler becomes stateful by accepting a second state parameter. The state persists across frames and can be updated by the handler.

Example stateful handler:

{ |frame, state|
    mut state = $state  # Create mutable state variable
    
    if $frame.topic == "count.me" {
        $state.count += 1
        $state | .append counter.state  # Save updated state
    }
}

Configuring Stateful Handlers

When registering a stateful handler, you can provide additional configuration:

{ |frame, state| ... } | .append counter.register --meta {
    initial_state: { count: 0 },  # Optional initial state value
    start: { cursor: "counter.state" },  # Resume from frame that triggered last state update
}

The state is:

  • Initialized from initial_state metadata if provided
  • Persisted when handler appends to <handler-name>.state
  • Automatically restored when handler restarts
  • Referenced in output frame metadata via state_id