Skip to content

Commit

Permalink
box/lua: introduce box.watch and box.broadcast
Browse files Browse the repository at this point in the history
Part of #6257

@TarantoolBot document
Title: Document box.watch and box.broadcast

`box.watch(key, func)` registers a watcher for the given key and returns
a watcher handle, which can be used to unregister the watcher (by
calling the `unregister` method). A key is an arbitrary string. It's
possible to register more than one watcher for the same key. Note,
garbage collection of a watcher handle doesnt result in unregistering
the watcher so it's okay to discard the result of `box.watch` if the
watcher is never going to be unregistered.

`box.broadcast(key, value)` updates the value of the given key and
signals all watchers registered for it.

A watcher callback is first invoked unconditionally after the watcher
registration. Subsequent invocations are triggered by `box.broadcast()`
called on the local host. A watcher callback is passed the name of the
key the watcher was subscribed to and the current key value. A watcher
callback is always executed in a new fiber so it's okay to yield inside
it. A watcher callback never runs in parallel with itself: if the key
to which a watcher is subscribed is updated while the watcher callback
is running, the callback will be invoked again with the new value as
soon as it returns.

`box.watch` and `box.broadcast` may be used before `box.cfg`.

Example usage:

```lua
-- Broadcast value 123 for key 'foo'.
box.broadcast('foo', 123)
-- Subscribe to updates of key 'foo'.
w = box.watch('foo', function(key, value)
    assert(key == 'foo')
    -- do something with value
end)
-- Unregister the watcher when it's no longer needed.
w:unregister()
```
  • Loading branch information
locker committed Nov 2, 2021
1 parent e8b9dff commit 11f2d99
Show file tree
Hide file tree
Showing 9 changed files with 971 additions and 1 deletion.
4 changes: 4 additions & 0 deletions changelogs/unreleased/gh-6257-box-watcher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## feature/core

* Introduced `box.broadcast` and `box.watch` functions to signal/watch
user-defined state changes (gh-6257).
1 change: 1 addition & 0 deletions src/box/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ add_library(box STATIC
lua/execute.c
lua/key_def.c
lua/merger.c
lua/watcher.c
${bin_sources})

if(CMAKE_BUILD_TYPE STREQUAL "Debug")
Expand Down
2 changes: 2 additions & 0 deletions src/box/lua/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
#include "box/lua/execute.h"
#include "box/lua/key_def.h"
#include "box/lua/merger.h"
#include "box/lua/watcher.h"

#include "mpstream/mpstream.h"

Expand Down Expand Up @@ -478,6 +479,7 @@ box_lua_init(struct lua_State *L)
box_lua_session_init(L);
box_lua_xlog_init(L);
box_lua_sql_init(L);
box_lua_watcher_init(L);
luaopen_net_box(L);
lua_pop(L, 1);
tarantool_lua_console_init(L);
Expand Down
4 changes: 3 additions & 1 deletion src/box/lua/load_cfg.lua
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,9 @@ local box_cfg_guard_whitelist = {
session = true;
tuple = true;
runtime = true;
ctl = true,
ctl = true;
watch = true;
broadcast = true;
NULL = true;
};

Expand Down
216 changes: 216 additions & 0 deletions src/box/lua/watcher.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*
* SPDX-License-Identifier: BSD-2-Clause
*
* Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file.
*/
#include "box/lua/watcher.h"

#include <assert.h>
#include <lua.h>
#include <lauxlib.h>
#include <stddef.h>

#include "box/watcher.h"
#include "diag.h"
#include "fiber.h"
#include "core/cord_buf.h"
#include "lua/msgpack.h"
#include "lua/utils.h"
#include "mpstream/mpstream.h"
#include "small/ibuf.h"
#include "trivia/util.h"

struct lbox_watcher {
struct watcher base;
/** Lua function reference. */
int func_ref;
};

/**
* Watcher handle pushed as userdata to Lua so that a watcher can be
* unregistered from Lua. Garbage collection of a handle doesn't lead to
* watcher destruction.
*/
struct lbox_watcher_handle {
struct lbox_watcher *watcher;
};

static const char lbox_watcher_typename[] = "box.watcher";

/**
* We keep a reference to each C function that is often called with lua_pcall
* so as not to create a new Lua object each time we call it.
*/
static int lbox_watcher_run_lua_ref = LUA_NOREF;

/** Passed to pcall by lbox_watcher_run_f. */
static int
lbox_watcher_run_lua(struct lua_State *L)
{
struct lbox_watcher *watcher = lua_touserdata(L, 1);
size_t key_len;
const char *key = watcher_key(&watcher->base, &key_len);
const char *data_end;
const char *data = watcher_data(&watcher->base, &data_end);
lua_rawgeti(L, LUA_REGISTRYINDEX, watcher->func_ref);
lua_pushlstring(L, key, key_len);
if (data != NULL) {
luamp_decode(L, luaL_msgpack_default, &data);
assert(data == data_end);
(void)data_end;
}
lua_call(L, data != NULL ? 2 : 1, 0);
return 0;
}

/**
* The callback runs a user-defined Lua function. Since the callback is invoked
* in a newly created fiber, which doesn't have a Lua stack, we need to create
* a temporary Lua stack for the call.
*
* A user-defined watcher function may throw. Even pushing arguments to the
* stack may throw. So we wrap the callback in pcall to properly handle a Lua
* exception.
*/
static void
lbox_watcher_run_f(struct watcher *watcher)
{
/*
* Create a new coro and reference it. Remove it
* from tarantool_L stack, which is a) scarce
* b) can be used by other triggers while this
* trigger yields, so when it's time to clean
* up the coro, we wouldn't know which stack position
* it is on.
*/
void *L = luaT_newthread(tarantool_L);
if (L == NULL) {
diag_log();
return;
}
int coro_ref = luaL_ref(tarantool_L, LUA_REGISTRYINDEX);
lua_rawgeti(L, LUA_REGISTRYINDEX, lbox_watcher_run_lua_ref);
lua_pushlightuserdata(L, watcher);
if (luaT_call(L, 1, 0) != 0)
diag_log();
luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref);
}

/**
* Releases the Lua function reference and frees the watcher.
*/
static void
lbox_watcher_destroy_f(struct watcher *base)
{
struct lbox_watcher *watcher = (struct lbox_watcher *)base;
luaL_unref(tarantool_L, LUA_REGISTRYINDEX, watcher->func_ref);
free(watcher);
}

static inline struct lbox_watcher_handle *
lbox_check_watcher(struct lua_State *L, int idx)
{
return luaL_checkudata(L, idx, lbox_watcher_typename);
}

static int
lbox_watcher_tostring(struct lua_State *L)
{
lua_pushstring(L, lbox_watcher_typename);
return 1;
}

/**
* Lua wrapper around box_watcher_unregister().
*/
static int
lbox_watcher_unregister(struct lua_State *L)
{
struct lbox_watcher_handle *handle = lbox_check_watcher(L, 1);
if (handle->watcher == NULL)
return luaL_error(L, "Watcher is already unregistered");
watcher_unregister(&handle->watcher->base);
handle->watcher = NULL;
return 0;
}

/**
* Lua wrapper around box_watcher_register().
*/
static int
lbox_watch(struct lua_State *L)
{
/* Check arguments. */
if (lua_gettop(L) != 2)
return luaL_error(L, "Usage: box.watch(key, function)");
size_t key_len;
const char *key = luaL_checklstring(L, 1, &key_len);
luaL_checktype(L, 2, LUA_TFUNCTION);

/* Create a watcher handle. */
struct lbox_watcher_handle *handle = lua_newuserdata(
L, sizeof(*handle));
luaL_getmetatable(L, lbox_watcher_typename);
lua_setmetatable(L, -2);
lua_replace(L, 1);

/* Allocate and register a watcher. */
struct lbox_watcher *watcher = xmalloc(sizeof(*watcher));
watcher->func_ref = luaL_ref(L, LUA_REGISTRYINDEX);
box_register_watcher(key, key_len, lbox_watcher_run_f,
lbox_watcher_destroy_f, WATCHER_RUN_ASYNC,
&watcher->base);
handle->watcher = watcher;
return 1;
}

/**
* Lua wrapper around box_broadcast().
*/
static int
lbox_broadcast(struct lua_State *L)
{
int top = lua_gettop(L);
if (top != 1 && top != 2)
return luaL_error(L, "Usage: box.broadcast(key[, value])");
size_t key_len;
const char *key = luaL_checklstring(L, 1, &key_len);
struct ibuf *ibuf = cord_ibuf_take();
const char *data = NULL;
const char *data_end = NULL;
if (!lua_isnoneornil(L, 2)) {
struct mpstream stream;
mpstream_init(&stream, ibuf, ibuf_reserve_cb, ibuf_alloc_cb,
luamp_error, L);
luamp_encode(L, luaL_msgpack_default, &stream, 2);
mpstream_flush(&stream);
data = ibuf->rpos;
data_end = data + ibuf_used(ibuf);
}
box_broadcast(key, key_len, data, data_end);
cord_ibuf_put(ibuf);
return 0;
}

void
box_lua_watcher_init(struct lua_State *L)
{
lua_pushcfunction(L, lbox_watcher_run_lua);
lbox_watcher_run_lua_ref = luaL_ref(L, LUA_REGISTRYINDEX);

static const struct luaL_Reg lbox_watcher_meta[] = {
{"__tostring", lbox_watcher_tostring},
{"unregister", lbox_watcher_unregister},
{NULL, NULL},
};
luaL_register_type(L, lbox_watcher_typename, lbox_watcher_meta);

lua_getfield(L, LUA_GLOBALSINDEX, "box");
lua_pushstring(L, "watch");
lua_pushcfunction(L, lbox_watch);
lua_settable(L, -3);
lua_pushstring(L, "broadcast");
lua_pushcfunction(L, lbox_broadcast);
lua_settable(L, -3);
lua_pop(L, 1);
}
19 changes: 19 additions & 0 deletions src/box/lua/watcher.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* SPDX-License-Identifier: BSD-2-Clause
*
* Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file.
*/
#pragma once

#if defined(__cplusplus)
extern "C" {
#endif /* defined(__cplusplus) */

struct lua_State;

void
box_lua_watcher_init(struct lua_State *L);

#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
Loading

0 comments on commit 11f2d99

Please sign in to comment.