Skip to content

Commit 19dc705

Browse files
committed
feat(ubus): add support publish/subscriber.
Signed-off-by: Jianhui Zhao <zhaojh329@gmail.com>
1 parent 756b063 commit 19dc705

File tree

3 files changed

+142
-3
lines changed

3 files changed

+142
-3
lines changed

examples/ubus/subscribe.lua

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#!/usr/bin/env eco
2+
3+
local ubus = require 'eco.ubus'
4+
local time = require 'eco.time'
5+
6+
eco.run(function()
7+
local con, err = ubus.connect()
8+
if not con then
9+
error(err)
10+
end
11+
12+
local obj = con:add('eco', {})
13+
14+
while true do
15+
time.sleep(1)
16+
local ts = time.now()
17+
print('notify...', ts)
18+
con:notify(obj, 'time', { ts = ts })
19+
end
20+
end)
21+
22+
eco.run(function()
23+
local con, err = ubus.connect()
24+
if not con then
25+
error(err)
26+
end
27+
28+
con:subscribe('eco', function(method, msg)
29+
if method == 'time' then
30+
print('recv:', msg.ts)
31+
end
32+
end)
33+
34+
while true do
35+
time.sleep(1000)
36+
end
37+
end)

ubus.c

+96
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,100 @@ static int lua_ubus_complete_deferred_request(lua_State *L)
665665
return 0;
666666
}
667667

668+
static int ubus_subscriber_cb(struct ubus_context *ctx, struct ubus_object *obj,
669+
struct ubus_request_data *req,const char *method, struct blob_attr *msg)
670+
{
671+
struct eco_ubus_context *c = container_of(ctx, struct eco_ubus_context, ctx);
672+
struct ubus_subscriber *s = container_of(obj, struct ubus_subscriber, obj);
673+
lua_State *L = c->eco->L;
674+
675+
lua_pushnil(L);
676+
677+
lua_push_ubus_ctx(L, c);
678+
lua_getuservalue(L, -1);
679+
680+
lua_pushlightuserdata(L, s);
681+
lua_rawget(L, -2);
682+
683+
lua_getuservalue(L, -1);
684+
685+
lua_rawgeti(L, -1, 1);
686+
687+
lua_replace(L, -6);
688+
689+
lua_settop(L, -5);
690+
691+
lua_pushstring(L, method);
692+
693+
blob_to_lua_table(L, blob_data(msg), blob_len(msg), false);
694+
695+
lua_call(L, 2, 0);
696+
697+
return 0;
698+
}
699+
700+
static int lua_ubus_subscribe(lua_State *L)
701+
{
702+
struct eco_ubus_context *ctx = luaL_checkudata(L, 1, ECO_UBUS_CTX_MT);
703+
const char *path = luaL_checkstring(L, 2);
704+
struct ubus_subscriber *sub;
705+
uint32_t id;
706+
707+
sub = lua_newuserdata(L, sizeof(struct ubus_subscriber));
708+
lua_newtable(L);
709+
lua_pushvalue(L, 3);
710+
lua_rawseti(L, -2, 1);
711+
lua_setuservalue(L, -2);
712+
713+
memset(sub, 0, sizeof(struct ubus_event_handler));
714+
715+
sub->cb = ubus_subscriber_cb;
716+
717+
ubus_register_subscriber(&ctx->ctx, sub);
718+
719+
ubus_lookup_id(&ctx->ctx, path, &id);
720+
721+
ubus_subscribe(&ctx->ctx, sub, id);
722+
723+
lua_push_ubus_ctx(L, ctx);
724+
lua_getuservalue(L, -1);
725+
726+
lua_pushlightuserdata(L, sub);
727+
lua_pushvalue(L, -4);
728+
lua_rawset(L, -3);
729+
730+
lua_settop(L, -3);
731+
732+
return 0;
733+
}
734+
735+
static int lua_ubus_notify(lua_State *L)
736+
{
737+
struct eco_ubus_context *ctx = luaL_checkudata(L, 1, ECO_UBUS_CTX_MT);
738+
struct blob_buf buf = {};
739+
struct eco_ubus_object *obj;
740+
const char *method;
741+
742+
if(!lua_isuserdata(L, 2))
743+
return luaL_error(L, "Invald 2nd parameter, expected ubus obj ref");
744+
745+
obj = lua_touserdata(L, 2);
746+
747+
method = luaL_checkstring(L, 3);
748+
749+
luaL_checktype(L, 4, LUA_TTABLE);
750+
751+
blob_buf_init(&buf, 0);
752+
753+
lua_table_to_blob(L, 4, &buf, false);
754+
755+
ubus_notify(&ctx->ctx, &obj->object, method, buf.head, -1);
756+
757+
blob_buf_free(&buf);
758+
759+
return 0;
760+
}
761+
668762
static int lua_ubus_connect(lua_State *L)
669763
{
670764
const char *path = luaL_optstring(L, 1, NULL);
@@ -785,6 +879,8 @@ static const struct luaL_Reg ubus_methods[] = {
785879
{"add", lua_ubus_add},
786880
{"reply", lua_ubus_reply},
787881
{"complete_deferred_request", lua_ubus_complete_deferred_request},
882+
{"subscribe", lua_ubus_subscribe},
883+
{"notify", lua_ubus_notify},
788884
{"objects", lua_ubus_objects},
789885
{"signatures", lua_ubus_signatures},
790886
{"close", lua_ubus_close},

ubus.lua

+9-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ local global_timeout = 30.0
99

1010
local methods = {}
1111

12-
for _, method in ipairs({'close', 'call', 'reply', 'send', 'objects', 'signatures', 'settimeout', 'auto_reconnect'}) do
12+
for _, method in ipairs({'close', 'call', 'reply', 'send', 'notify', 'objects', 'signatures', 'settimeout', 'auto_reconnect'}) do
1313
methods[method] = function(self, ...)
1414
local con = self.con
1515
return con[method](con, ...)
@@ -33,8 +33,14 @@ function methods:add(object, ms)
3333
end
3434
end
3535

36-
local o, err = con:add(object, ms)
37-
if not o then
36+
return con:add(object, ms)
37+
end
38+
39+
function methods:subscribe(path, cb)
40+
local s, err = self.con:subscribe(path, function(...)
41+
eco.run(cb, ...)
42+
end)
43+
if not s then
3844
return false, err
3945
end
4046

0 commit comments

Comments
 (0)