Skip to content

Commit

Permalink
API extentions.
Browse files Browse the repository at this point in the history
1. Cursor API to scan the key space
2. Stream API
   * Read range from streams
   * Trim stream
3. Server Events API to register to the following events:
   * Flush started/ended
   * Loading started/ended for AOF/RDB/Replication
   * Role changed (become primary/replica)
4. Is primary API was added to Context object
  • Loading branch information
MeirShpilraien committed May 26, 2022
1 parent 517eabf commit 325f4ca
Show file tree
Hide file tree
Showing 13 changed files with 664 additions and 5 deletions.
15 changes: 15 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ name = "info"
crate-type = ["cdylib"]
required-features = []

[[example]]
name = "stream"
crate-type = ["cdylib"]
required-features = ["experimental-api"]

[[example]]
name = "server_events"
crate-type = ["cdylib"]
required-features = ["experimental-api"]

[[example]]
name = "scan"
crate-type = ["cdylib"]
required-features = ["experimental-api"]

[dependencies]
bitflags = "1.2"
libc = "0.2"
Expand Down
26 changes: 26 additions & 0 deletions examples/scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#[macro_use]
extern crate redis_module;

use redis_module::{Context, RedisResult, RedisString, RedisValue, context::keys_cursor::KeysCursor};

fn scan_keys(ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
let mut keys = Vec::new();
let cursor = KeysCursor::new();
while cursor.scan(ctx, &|_ctx, key_name, _key|{
keys.push(RedisValue::BulkString(key_name.try_as_str().unwrap().to_string()));
}){

}
Ok(RedisValue::Array(keys))
}

//////////////////////////////////////////////////////

redis_module! {
name: "scan",
version: 1,
data_types: [],
commands: [
["SCAN_KEYS", scan_keys, "fast deny-oom", 0, 0, 0],
],
}
56 changes: 56 additions & 0 deletions examples/server_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#[macro_use]
extern crate redis_module;

use redis_module::{
Context, RedisResult, RedisString, RedisValue,
context::server_events::ServerEventData
};

static mut NUM_FLUSHES: usize = 0;
static mut NUM_ROLED_CHANGED: usize = 0;
static mut NUM_LOADINGS: usize = 0;

fn num_flushed(_ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
Ok(RedisValue::Integer(unsafe{NUM_FLUSHES} as i64))
}

fn num_roled_changed(_ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
Ok(RedisValue::Integer(unsafe{NUM_ROLED_CHANGED} as i64))
}

fn num_loading(_ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
Ok(RedisValue::Integer(unsafe{NUM_LOADINGS} as i64))
}

fn on_role_changed(_ctx: &Context, _event_data: ServerEventData) {
let num_roled_changed = unsafe{&mut NUM_ROLED_CHANGED};
*num_roled_changed = *num_roled_changed + 1;
}

fn on_loading_event(_ctx: &Context, _event_data: ServerEventData) {
let num_loading = unsafe{&mut NUM_LOADINGS};
*num_loading = *num_loading + 1;
}

fn on_flush_event(_ctx: &Context, _event_data: ServerEventData) {
let num_flushed = unsafe{&mut NUM_FLUSHES};
*num_flushed = *num_flushed + 1;
}

//////////////////////////////////////////////////////

redis_module! {
name: "server_evemts",
version: 1,
data_types: [],
commands: [
["NUM_FLUSHED", num_flushed, "fast deny-oom", 0, 0, 0],
["NUM_ROLED_CHANGED", num_roled_changed, "fast deny-oom", 0, 0, 0],
["NUM_LOADING", num_loading, "fast deny-oom", 0, 0, 0],
],
server_events: [
[@RuleChanged: on_role_changed],
[@Loading: on_loading_event],
[@Flush: on_flush_event],
]
}
42 changes: 42 additions & 0 deletions examples/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#[macro_use]
extern crate redis_module;

use redis_module::raw::{KeyType, RedisModuleStreamID};
use redis_module::{Context, NextArg, RedisError, RedisResult, RedisString, RedisValue};

fn stream_read_from(ctx: &Context, args: Vec<RedisString>) -> RedisResult {
let mut args = args.into_iter().skip(1);

let stream_key = args.next_arg()?;

let stream = ctx.open_key(&stream_key);

let key_type = stream.key_type();

if key_type != KeyType::Stream {
return Err(RedisError::WrongType);
}

let mut iter = stream.get_stream_iterator()?;
let element = iter.next();
let id_to_keep = iter.next().as_ref().map_or(RedisModuleStreamID{ms:u64::MAX, seq:u64::MAX}, |e| e.id);

let stream = ctx.open_key_writable(&stream_key);
stream.trim_stream_by_id(id_to_keep, false)?;

Ok(match element {
Some(e) => RedisValue::BulkString(format!("{}-{}", e.id.ms, e.id.seq)),
None => RedisValue::Null,
})
}

//////////////////////////////////////////////////////

redis_module! {
name: "stream",
version: 1,
data_types: [],
commands: [
["STREAM_POP", stream_read_from, "fast deny-oom", 1, 1, 1],
],
}
45 changes: 45 additions & 0 deletions src/context/keys_cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::raw;
use crate::context::Context;
use crate::RedisString;
use crate::key::RedisKey;
use std::os::raw::c_void;

pub struct KeysCursor {
inner_cursor: *mut raw::RedisModuleScanCursor,
}

extern "C" fn scan_callback<C: FnMut(&Context, RedisString, Option<&RedisKey>)>(ctx: *mut raw::RedisModuleCtx, keyname: *mut raw::RedisModuleString, key: *mut raw::RedisModuleKey, privdata: *mut ::std::os::raw::c_void) {
let context = Context::new(ctx);
let key_name = RedisString::new(ctx, keyname);
let mut redis_key = if !key.is_null() {
Some(RedisKey{ctx: ctx, key_inner: key})
} else {
None
};
let callback = unsafe{&mut *(privdata as *mut C)};
callback(&context, key_name, redis_key.as_ref());


if redis_key.is_some() {
// we are not the owner of the key so we must not keep it.
redis_key.as_mut().unwrap().key_inner = std::ptr::null_mut();
}
}

impl KeysCursor {
pub fn new() -> KeysCursor {
let inner_cursor = unsafe{ raw::RedisModule_ScanCursorCreate.unwrap()() };
KeysCursor { inner_cursor }
}

pub fn scan<C: FnMut(&Context, RedisString, Option<&RedisKey>)>(&self, ctx: &Context, callback: &C) -> bool {
let res = unsafe{ raw::RedisModule_Scan.unwrap()(ctx.ctx, self.inner_cursor, Some(scan_callback::<C>), callback as *const C as *mut c_void) };
if res != 0 {true} else {false}
}
}

impl Drop for KeysCursor {
fn drop(&mut self) {
unsafe{raw::RedisModule_ScanCursorDestroy.unwrap()(self.inner_cursor)};
}
}
34 changes: 34 additions & 0 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ pub mod blocked;

pub mod info;

pub mod keys_cursor;

pub mod server_events;

/// `Context` is a structure that's designed to give us a high-level interface to
/// the Redis module API by abstracting away the raw C FFI calls.
pub struct Context {
Expand Down Expand Up @@ -161,6 +165,26 @@ impl Context {
unsafe { raw::RedisModule_ReplyWithSimpleString.unwrap()(self.ctx, msg.as_ptr()).into() }
}

#[allow(clippy::must_use_candidate)]
pub fn reply_bulk_string(&self, s: &str) -> raw::Status {
unsafe { raw::RedisModule_ReplyWithStringBuffer.unwrap()(self.ctx, s.as_ptr() as *mut c_char, s.len()).into() }
}

#[allow(clippy::must_use_candidate)]
pub fn reply_array(&self, size: usize) -> raw::Status {
unsafe { raw::RedisModule_ReplyWithArray.unwrap()(self.ctx, size as c_long).into() }
}

#[allow(clippy::must_use_candidate)]
pub fn reply_long(&self, l: i64) -> raw::Status {
unsafe { raw::RedisModule_ReplyWithLongLong.unwrap()(self.ctx, l as c_long).into() }
}

#[allow(clippy::must_use_candidate)]
pub fn reply_double(&self, d: f64) -> raw::Status {
unsafe { raw::RedisModule_ReplyWithDouble.unwrap()(self.ctx, d).into() }
}

#[allow(clippy::must_use_candidate)]
pub fn reply_error_string(&self, s: &str) -> raw::Status {
let msg = Self::str_as_legal_resp_string(s);
Expand Down Expand Up @@ -354,9 +378,19 @@ impl Context {
}
}
}

pub fn set_module_options(&self, options: ModuleOptions) {
unsafe { raw::RedisModule_SetModuleOptions.unwrap()(self.ctx, options.bits()) };
}

pub fn is_primary(&self) -> bool {
let flags = unsafe { raw::RedisModule_GetContextFlags.unwrap()(self.ctx) };
if flags as u32 & raw::REDISMODULE_CTX_FLAGS_MASTER != 0 {
return true
} else {
return false
}
}
}

pub struct InfoContext {
Expand Down
Loading

0 comments on commit 325f4ca

Please sign in to comment.