Skip to content

Commit e783fac

Browse files
committed
proto: support queue
This patch adds the `push` field to `ExecuteRequest` and introduces `aeon_queue.proto` module. Needed for tarantool/aeon#488
1 parent 2d0e279 commit e783fac

File tree

2 files changed

+169
-0
lines changed

2 files changed

+169
-0
lines changed

aeon_crud.proto

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,63 @@ service CRUDService {
4343
// Transactionally executes a set of read and write operations.
4444

4545
message ExecuteRequest {
46+
// Message queue push section in a transaction request.
47+
message Push {
48+
// Definition of messages pushed to a shard-local queue.
49+
message Message {
50+
// Topic of the message.
51+
string topic = 1;
52+
// Data of the message.
53+
Value data = 2;
54+
// Time to live of the message.
55+
double ttl = 3;
56+
}
57+
// The source code of the Lua function that will be used to generate
58+
// messages. It's optional: if omitted, messages from the `messages`
59+
// field will be sent instead.
60+
//
61+
// The function is passed three arguments: the resulting read set,
62+
// the resulting write set, and an optional additional argument specified
63+
// as `func_arg`. If the function raises an error, the error will be
64+
// returned in the `push_err` field. The transaction will not be affected.
65+
//
66+
// The function must return an array of messages. Message is an array
67+
// of `{topic, data, ttl}`, where `topic` is required and is a string
68+
// value, `data` is required and is of any supported value type,
69+
// and `ttl` is optional and represents a number of seconds.
70+
//
71+
// A read/write operation is passed in an array: {space, key, tuple}.
72+
// (without string key names).
73+
//
74+
// Below is an example of a Lua function that returns messages
75+
// with `read_topic` as topic and the first fields of the read tuples
76+
// as data if the optional argument is `true`, and `write_topic`
77+
// as topic and the first fields of the written tuples as data otherwise.
78+
//
79+
// function(rs, ws, arg)
80+
// local messages = {}
81+
// if arg == true then
82+
// for _, r in ipairs(rs) do
83+
// if r.tuple ~= nil then
84+
// table.insert(messages, {'read_topic', r.tuple[1]})
85+
// end
86+
// end
87+
// else
88+
// for _, w in ipairs(ws) do
89+
// if w.tuple ~= nil then
90+
// table.insert(messages, {'write_topic', w.tuple[1]})
91+
// end
92+
// end
93+
// end
94+
// return messages
95+
// end
96+
//
97+
string func = 1;
98+
// Additional argument to the push function. Optional.
99+
Value func_arg = 2;
100+
// Messages to send if the push function is not provided.
101+
repeated Message messages = 3;
102+
}
46103
// Array of read operations.
47104
repeated Operation read_set = 1;
48105
// Array of write operations.
@@ -89,6 +146,8 @@ message ExecuteRequest {
89146
// Map : space name -> tuple format.
90147
// Contains formats of all provided tuples. Optional.
91148
map<string, TupleFormat> tuple_formats = 6;
149+
// Description of messages to push when executing a transaction.
150+
Push push = 7;
92151
}
93152

94153
message ExecuteResponse {
@@ -103,6 +162,9 @@ message ExecuteResponse {
103162
// Map : space name -> tuple format.
104163
// Contains formats of all returned tuples.
105164
map<string, TupleFormat> tuple_formats = 5;
165+
// PushErr is the error returned by the push function, or nil
166+
// if no error occurred.
167+
Error push_err = 6;
106168
}
107169

108170
// Transactionally inserts tuples into a space.

aeon_queue.proto

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
syntax = "proto3";
2+
3+
import "aeon_error.proto";
4+
import "aeon_value.proto";
5+
6+
package aeon;
7+
8+
// Queue API to Aeon - a distributed database based on Tarantool.
9+
service QueueService {
10+
// Takes messages from a shard-local queue. The messages should be
11+
// released after processing.
12+
rpc TakeMessages(TakeMessagesRequest) returns (TakeMessagesResponse) {}
13+
14+
// Releases messages. The consumer cannot guarantee to take new messages
15+
// until the old ones are released. Moreover, if messages are taken
16+
// exclusively, they will not be available to other consumers until
17+
// they are released.
18+
rpc ReleaseMessages(ReleaseMessagesRequest)
19+
returns (ReleaseMessagesResponse) {}
20+
21+
// Returns the oldest message for all storages, or the oldest message
22+
// for each storage.
23+
rpc GetOldestMessages(GetOldestMessagesRequest)
24+
returns (GetOldestMessagesResponse) {}
25+
}
26+
27+
// Description of returned messages.
28+
message Message {
29+
// Shard name.
30+
string shard = 1;
31+
// The serial number of the message on the shard.
32+
uint64 lsn = 2;
33+
// Data of the message.
34+
Value data = 3;
35+
}
36+
37+
// Consumer description.
38+
message ConsumerRef {
39+
// Consumer shard.
40+
string shard = 1;
41+
// Consumer topic.
42+
string topic = 2;
43+
// Consumer name.
44+
string consumer = 3;
45+
}
46+
47+
message TakeMessagesRequest {
48+
// Topic name.
49+
string topic = 1;
50+
// A unique consumer name. The consumer takes messages for processing.
51+
// Messages should then be released. This consumer name will be used
52+
// to return a reference to the consumer, which should be used
53+
// in `ReleaseMessages` to release taken messages.
54+
string consumer = 2;
55+
// Max number of returned messages.
56+
uint64 limit = 3;
57+
// Time for the consumer to process the messages. After this time,
58+
// messages will be released with the status `undone`.
59+
double ttl = 4;
60+
// Exclusive mode flag. If set to true, messages cannot be received
61+
// by other consumers until this consumer releases them.
62+
bool exclusive = 5;
63+
// Time to wait for messages.
64+
double timeout = 6;
65+
}
66+
67+
message TakeMessagesResponse {
68+
// Error information. Set only on failure.
69+
Error error = 1;
70+
// Returned messages.
71+
repeated Message messages = 2;
72+
// Consumer reference used to release messages.
73+
ConsumerRef ref = 3;
74+
// True if these messages have already been taken by the same consumer,
75+
// false otherwise.
76+
bool taken_earlier = 4;
77+
}
78+
79+
message ReleaseMessagesRequest {
80+
// Consumer reference. Should be the same as returned by `TakeMessages`.
81+
ConsumerRef ref = 1;
82+
// If true, released messages can no longer be taken by consumers.
83+
// Otherwise, the released messages may be taken again.
84+
bool done = 2;
85+
}
86+
87+
message ReleaseMessagesResponse {
88+
// Error information. Set only on failure.
89+
Error error = 1;
90+
// True if messages were already released, false otherwise.
91+
bool released_earlier = 2;
92+
}
93+
94+
message GetOldestMessagesRequest {
95+
// Topic name.
96+
string topic = 1;
97+
// True if the oldest messages for each shard should be returned,
98+
// false if the oldest messages for all shards should be returned.
99+
bool for_each_shard = 2;
100+
}
101+
102+
message GetOldestMessagesResponse {
103+
// Error information. Set only on failure.
104+
Error error = 1;
105+
// Returned messages.
106+
repeated Message messages = 2;
107+
}

0 commit comments

Comments
 (0)