Skip to content

Commit

Permalink
box: support space and index names in IPROTO requests
Browse files Browse the repository at this point in the history
Add support for accepting IPROTO requests with space or index name instead
of identifier (name is preferred over identifier to disambiguate missing
identifiers from zero identifiers): mark space identifier request
key as present upon encountering space name, and delay resolution of
identifier until request gets to transaction thread.

Add support for sending DML requests from net.box connection objects with
disabled schema fetching by manually specifying space or index name or
identifier: when schema fetching is disabled, the space and index tables of
connections return wrapper tables that store necessary context (space or
index name or identifier, determined by type, connection object and space
for indexes) for performing requests. The space and index tables cache the
wrapper table they return.

Closes #8146

@TarantoolBot document
Title: Space and index name in IPROTO requests

Refer to design document for details:
https://www.notion.so/tarantool/Schemafull-IPROTO-cc315ad6bdd641dea66ad854992d8cbf?pvs=4#f4d4b3fa2b3646f1949319866428b6c0
  • Loading branch information
CuriousGeorgiy authored and locker committed May 22, 2023
1 parent bf086dc commit b9550f1
Show file tree
Hide file tree
Showing 17 changed files with 560 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## feature/box

* Added support for accepting IPROTO requests with a space or index name instead
of an identifier (gh-8146).
44 changes: 42 additions & 2 deletions src/box/iproto.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1666,8 +1666,8 @@ iproto_msg_decode(struct iproto_msg *msg, struct cmsg_hop **route)
assert(type < sizeof(iproto_thread->dml_route) /
sizeof(*iproto_thread->dml_route));
*route = iproto_thread->dml_route[type];
if (xrow_decode_dml(&msg->header, &msg->dml,
dml_request_key_map(type)))
if (xrow_decode_dml_iproto(&msg->header, &msg->dml,
dml_request_key_map(type)) != 0)
return -1;
/*
* In contrast to replication requests, for a client request
Expand Down Expand Up @@ -2143,6 +2143,42 @@ tx_process_rollback(struct cmsg *m)
tx_end_msg(msg, &header);
}

/*
* In case the request does not contain a space or identifier but contains a
* corresponding name, tries to resolve the name.
*/
static int
tx_resolve_space_and_index_name(struct request *dml)
{
struct space *space = NULL;
if (dml->space_name != NULL) {
space = space_by_name(dml->space_name, dml->space_name_len);
if (space == NULL) {
diag_set(ClientError, ER_NO_SUCH_SPACE,
tt_cstr(dml->space_name, dml->space_name_len));
return -1;
}
dml->space_id = space->def->id;
}
if ((dml->type == IPROTO_SELECT || dml->type == IPROTO_UPDATE ||
dml->type == IPROTO_DELETE) && dml->index_name != NULL) {
if (space == NULL)
space = space_cache_find(dml->space_id);
if (space == NULL)
return -1;
struct index *idx = space_index_by_name(space, dml->index_name,
dml->index_name_len);
if (idx == NULL) {
diag_set(ClientError, ER_NO_SUCH_INDEX_NAME,
tt_cstr(dml->index_name, dml->index_name_len),
space->def->name);
return -1;
}
dml->index_id = idx->dense_id;
}
return 0;
}

static void
tx_process1(struct cmsg *m)
{
Expand All @@ -2154,6 +2190,8 @@ tx_process1(struct cmsg *m)
struct obuf_svp svp;
struct obuf *out;
tx_inject_delay();
if (tx_resolve_space_and_index_name(&msg->dml) != 0)
goto error;
if (box_process1(&msg->dml, &tuple) != 0)
goto error;
out = msg->connection->tx.p_obuf;
Expand Down Expand Up @@ -2190,6 +2228,8 @@ tx_process_select(struct cmsg *m)
goto error;

tx_inject_delay();
if (tx_resolve_space_and_index_name(&msg->dml) != 0)
goto error;
packed_pos = req->after_position;
packed_pos_end = req->after_position_end;
if (packed_pos != NULL) {
Expand Down
2 changes: 2 additions & 0 deletions src/box/iproto_constants.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ const unsigned char iproto_key_type[iproto_key_MAX] =
/* 0x5b */ MP_STR, /* IPROTO_AUTH_TYPE */
/* 0x5c */ MP_STR, /* IPROTO_REPLICASET_NAME */
/* 0x5d */ MP_STR, /* IPROTO_INSTANCE_NAME */
/* 0x5e */ MP_STR, /* IPROTO_SPACE_NAME */
/* 0x5f */ MP_STR, /* IPROTO_INDEX_NAME */
/* }}} */
};

Expand Down
12 changes: 12 additions & 0 deletions src/box/iproto_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,18 @@ extern const size_t iproto_flag_constants_size;
_(IPROTO_AUTH_TYPE, 0x5b) \
_(IPROTO_REPLICASET_NAME, 0x5c) \
_(IPROTO_INSTANCE_NAME, 0x5d) \
/**
* Space name used instead of identifier (IPROTO_SPACE_ID) in DML
* requests. Preferred when identifier is present (i.e., the identifier
* is ignored).
*/ \
_(IPROTO_SPACE_NAME, 0x5e) \
/**
* Index name used instead of identifier (IPROTO_INDEX_ID) in
* IPROTO_SELECT, IPROTO_UPDATE, and IPROTO_DELETE requests. Preferred
* when identifier is present (i.e., the identifier is ignored).
*/ \
_(IPROTO_INDEX_NAME, 0x5f) \

ENUM(iproto_key, IPROTO_KEYS);
/**
Expand Down
2 changes: 2 additions & 0 deletions src/box/iproto_features.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,6 @@ iproto_features_init(void)
IPROTO_FEATURE_WATCHERS);
iproto_features_set(&IPROTO_CURRENT_FEATURES,
IPROTO_FEATURE_PAGINATION);
iproto_features_set(&IPROTO_CURRENT_FEATURES,
IPROTO_FEATURE_SPACE_AND_INDEX_NAMES);
}
10 changes: 9 additions & 1 deletion src/box/iproto_features.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ extern "C" {
* request fields and IPROTO_POSITION response field.
*/ \
_(IPROTO_FEATURE_PAGINATION, 4) \
/**
* Using space [index] names instead of identifiers support:
* IPROTO_SPACE_NAME and IPROTO_INDEX_NAME fields in IPROTO_SELECT,
* IPROTO_UPDATE and IPROTO_DELETE request body;
* IPROTO_SPACE_NAME field in IPROTO_INSERT, IPROTO_REPLACE,
* IPROTO_UPDATE and IPROTO_UPSERT request body.
*/ \
_(IPROTO_FEATURE_SPACE_AND_INDEX_NAMES, 5) \

ENUM(iproto_feature_id, IPROTO_FEATURES);

Expand All @@ -72,7 +80,7 @@ struct iproto_features {
* `box.iproto.protocol_version` needs to be updated correspondingly.
*/
enum {
IPROTO_CURRENT_VERSION = 4,
IPROTO_CURRENT_VERSION = 5,
};

/**
Expand Down
82 changes: 49 additions & 33 deletions src/box/lua/net_box.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ enum {
/**
* IPROTO protocol version supported by the netbox connector.
*/
NETBOX_IPROTO_VERSION = 4,
NETBOX_IPROTO_VERSION = 5,
};

/**
Expand Down Expand Up @@ -781,6 +781,44 @@ netbox_encode_eval(lua_State *L, int idx, struct mpstream *stream,
netbox_end_encode(stream, svp);
}

/*
* Depending on the type of the argument (see also net.box space metatable)
* encode either a space identifier or a space name.
*/
static void
netbox_encode_space_id_or_name(lua_State *L, int idx, struct mpstream *stream)
{
if (lua_type(L, idx) == LUA_TNUMBER) {
uint32_t space_id = lua_tonumber(L, idx);
mpstream_encode_uint(stream, IPROTO_SPACE_ID);
mpstream_encode_uint(stream, space_id);
} else {
size_t len;
const char *space_name = lua_tolstring(L, idx, &len);
mpstream_encode_uint(stream, IPROTO_SPACE_NAME);
mpstream_encode_strn(stream, space_name, len);
}
}

/*
* Depending on the type of the argument (see also net.box index metatable)
* encode either a index identifier or an index name.
*/
static void
netbox_encode_index_id_or_name(lua_State *L, int idx, struct mpstream *stream)
{
if (lua_type(L, idx) == LUA_TNUMBER) {
uint32_t space_id = lua_tonumber(L, idx);
mpstream_encode_uint(stream, IPROTO_INDEX_ID);
mpstream_encode_uint(stream, space_id);
} else {
size_t len;
const char *space_name = lua_tolstring(L, idx, &len);
mpstream_encode_uint(stream, IPROTO_INDEX_NAME);
mpstream_encode_strn(stream, space_name, len);
}
}

/* Encode select request. */
static void
netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
Expand All @@ -801,19 +839,13 @@ netbox_encode_select(lua_State *L, int idx, struct mpstream *stream,
if (fetch_pos)
map_size++;
mpstream_encode_map(stream, map_size);
uint32_t space_id = lua_tonumber(L, idx);
uint32_t index_id = lua_tonumber(L, idx + 1);
int iterator = lua_tointeger(L, idx + 2);
uint32_t offset = lua_tonumber(L, idx + 3);
uint32_t limit = lua_tonumber(L, idx + 4);

/* encode space_id */
mpstream_encode_uint(stream, IPROTO_SPACE_ID);
mpstream_encode_uint(stream, space_id);
netbox_encode_space_id_or_name(L, idx, stream);

/* encode index_id */
mpstream_encode_uint(stream, IPROTO_INDEX_ID);
mpstream_encode_uint(stream, index_id);
netbox_encode_index_id_or_name(L, idx + 1, stream);

/* encode iterator */
mpstream_encode_uint(stream, IPROTO_ITERATOR);
Expand Down Expand Up @@ -865,10 +897,7 @@ netbox_encode_insert_or_replace(lua_State *L, int idx, struct mpstream *stream,

mpstream_encode_map(stream, 2);

/* encode space_id */
uint32_t space_id = lua_tonumber(L, idx);
mpstream_encode_uint(stream, IPROTO_SPACE_ID);
mpstream_encode_uint(stream, space_id);
netbox_encode_space_id_or_name(L, idx, stream);

/* encode args */
mpstream_encode_uint(stream, IPROTO_TUPLE);
Expand Down Expand Up @@ -903,15 +932,9 @@ netbox_encode_delete(lua_State *L, int idx, struct mpstream *stream,

mpstream_encode_map(stream, 3);

/* encode space_id */
uint32_t space_id = lua_tonumber(L, idx);
mpstream_encode_uint(stream, IPROTO_SPACE_ID);
mpstream_encode_uint(stream, space_id);
netbox_encode_space_id_or_name(L, idx, stream);

/* encode space_id */
uint32_t index_id = lua_tonumber(L, idx + 1);
mpstream_encode_uint(stream, IPROTO_INDEX_ID);
mpstream_encode_uint(stream, index_id);
netbox_encode_index_id_or_name(L, idx + 1, stream);

/* encode key */
mpstream_encode_uint(stream, IPROTO_KEY);
Expand All @@ -930,15 +953,9 @@ netbox_encode_update(lua_State *L, int idx, struct mpstream *stream,

mpstream_encode_map(stream, 5);

/* encode space_id */
uint32_t space_id = lua_tonumber(L, idx);
mpstream_encode_uint(stream, IPROTO_SPACE_ID);
mpstream_encode_uint(stream, space_id);
netbox_encode_space_id_or_name(L, idx, stream);

/* encode index_id */
uint32_t index_id = lua_tonumber(L, idx + 1);
mpstream_encode_uint(stream, IPROTO_INDEX_ID);
mpstream_encode_uint(stream, index_id);
netbox_encode_index_id_or_name(L, idx + 1, stream);

/* encode index_id */
mpstream_encode_uint(stream, IPROTO_INDEX_BASE);
Expand All @@ -965,10 +982,7 @@ netbox_encode_upsert(lua_State *L, int idx, struct mpstream *stream,

mpstream_encode_map(stream, 4);

/* encode space_id */
uint32_t space_id = lua_tonumber(L, idx);
mpstream_encode_uint(stream, IPROTO_SPACE_ID);
mpstream_encode_uint(stream, space_id);
netbox_encode_space_id_or_name(L, idx, stream);

/* encode index_base */
mpstream_encode_uint(stream, IPROTO_INDEX_BASE);
Expand Down Expand Up @@ -2994,6 +3008,8 @@ luaopen_net_box(struct lua_State *L)
IPROTO_FEATURE_WATCHERS);
iproto_features_set(&NETBOX_IPROTO_FEATURES,
IPROTO_FEATURE_PAGINATION);
iproto_features_set(&NETBOX_IPROTO_FEATURES,
IPROTO_FEATURE_SPACE_AND_INDEX_NAMES);

lua_pushcfunction(L, luaT_netbox_request_iterator_next);
luaT_netbox_request_iterator_next_ref = luaL_ref(L, LUA_REGISTRYINDEX);
Expand Down
Loading

0 comments on commit b9550f1

Please sign in to comment.