Skip to content

Commit abbe0c2

Browse files
committed
client: return requested future with result argument of wait
Currently, the argument returns any decoded future - that is inconvenient and completely unusable. Let's return only the requested future, or nothing, if it's not ready. Along the way, reformat argument list of modified functions to make them conform clang-format. Closes #112
1 parent e646501 commit abbe0c2

File tree

3 files changed

+37
-14
lines changed

3 files changed

+37
-14
lines changed

src/Client/Connection.hpp

+3-5
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,7 @@ class Connection
230230

231231
template<class B, class N>
232232
friend
233-
enum DecodeStatus processResponse(Connection<B, N> &conn,
234-
Response<B> *result);
233+
enum DecodeStatus processResponse(Connection<B, N> &conn, int req_sync, Response<B> *result);
235234

236235
template<class B, class N>
237236
friend
@@ -530,8 +529,7 @@ inputBufGC(Connection<BUFFER, NetProvider> &conn)
530529

531530
template<class BUFFER, class NetProvider>
532531
DecodeStatus
533-
processResponse(Connection<BUFFER, NetProvider> &conn,
534-
Response<BUFFER> *result)
532+
processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result)
535533
{
536534
//Decode response. In case of success - fill in feature map
537535
//and adjust end-of-decoded data pointer. Call GC if needed.
@@ -563,7 +561,7 @@ processResponse(Connection<BUFFER, NetProvider> &conn,
563561
}
564562
LOG_DEBUG("Header: sync=", response.header.sync, ", code=",
565563
response.header.code, ", schema=", response.header.schema_id);
566-
if (result != nullptr) {
564+
if (result != nullptr && response.header.sync == req_sync) {
567565
*result = std::move(response);
568566
} else {
569567
conn.impl->futures.insert({response.header.sync,

src/Client/Connector.hpp

+13-9
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,17 @@ class Connector
8989
std::set<Connection<BUFFER, NetProvider>> m_ReadyToSend;
9090
void close(Connection<BUFFER, NetProvider> &conn);
9191
void close(ConnectionImpl<BUFFER, NetProvider> &conn);
92+
9293
private:
9394
/**
9495
* A helper to decode responses of a connection.
9596
* Can be called when the connection is not ready to decode - it's just no-op.
97+
* If `result` is not `nullptr`, it is used to return response for a request with
98+
* `req_sync` sync. If `result` is `nullptr` - `req_sync` is ignored.
9699
* Returns -1 in the case of any error, 0 on success.
97100
*/
98-
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
99-
Response<BUFFER> *result);
101+
int connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result);
102+
100103
private:
101104
NetProvider m_NetProvider;
102105
/**
@@ -170,7 +173,7 @@ Connector<BUFFER, NetProvider>::close(ConnectionImpl<BUFFER, NetProvider> &conn)
170173

171174
template<class BUFFER, class NetProvider>
172175
int
173-
Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn,
176+
Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, NetProvider> &conn, int req_sync,
174177
Response<BUFFER> *result)
175178
{
176179
if (!hasDataToDecode(conn))
@@ -181,7 +184,7 @@ Connector<BUFFER, NetProvider>::connectionDecodeResponses(Connection<BUFFER, Net
181184

182185
int rc = 0;
183186
while (hasDataToDecode(conn)) {
184-
DecodeStatus status = processResponse(conn, result);
187+
DecodeStatus status = processResponse(conn, req_sync, result);
185188
if (status == DECODE_ERR) {
186189
rc = -1;
187190
break;
@@ -211,9 +214,10 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
211214
Timer timer{timeout};
212215
timer.start();
213216
static constexpr int INVALID_SYNC = -1;
217+
int req_sync = static_cast<int>(future);
214218
if (result != NULL)
215219
result->header.sync = INVALID_SYNC;
216-
if (connectionDecodeResponses(conn, result) != 0)
220+
if (connectionDecodeResponses(conn, req_sync, result) != 0)
217221
return -1;
218222
if (result != NULL && result->header.sync != INVALID_SYNC) {
219223
LOG_DEBUG("Future ", future, " is ready and decoded");
@@ -225,7 +229,7 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
225229
strerror(errno), errno);
226230
return -1;
227231
}
228-
if (connectionDecodeResponses(conn, result) != 0)
232+
if (connectionDecodeResponses(conn, req_sync, result) != 0)
229233
return -1;
230234
if (result != NULL && result->header.sync != INVALID_SYNC) {
231235
LOG_DEBUG("Future ", future, " is ready and decoded");
@@ -264,7 +268,7 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
264268
strerror(errno), errno);
265269
return -1;
266270
}
267-
if (connectionDecodeResponses(conn, nullptr) != 0)
271+
if (connectionDecodeResponses(conn, 0, nullptr) != 0)
268272
return -1;
269273
bool finish = true;
270274
for (size_t i = last_not_ready; i < futures.size(); ++i) {
@@ -304,7 +308,7 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
304308
}
305309
Connection<BUFFER, NetProvider> conn = *m_ReadyToDecode.begin();
306310
assert(hasDataToDecode(conn));
307-
if (connectionDecodeResponses(conn, nullptr) != 0)
311+
if (connectionDecodeResponses(conn, 0, nullptr) != 0)
308312
return std::nullopt;
309313
return conn;
310314
}
@@ -323,7 +327,7 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
323327
strerror(errno), errno);
324328
return -1;
325329
}
326-
if (connectionDecodeResponses(conn, nullptr) != 0)
330+
if (connectionDecodeResponses(conn, 0, nullptr) != 0)
327331
return -1;
328332
if ((conn.getFutureCount() - ready_futures) >= future_count)
329333
return 0;

test/ClientTest.cpp

+21
Original file line numberDiff line numberDiff line change
@@ -1303,6 +1303,27 @@ test_wait(Connector<BUFFER, NetProvider> &client)
13031303
fail_unless(result.header.sync == static_cast<int>(f));
13041304
fail_unless(result.header.code == 0);
13051305

1306+
TEST_CASE("wait with argument result - several requests");
1307+
/* Obtain in direct order. */
1308+
f1 = conn.ping();
1309+
f2 = conn.ping();
1310+
fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0);
1311+
fail_unless(result.header.sync == static_cast<int>(f1));
1312+
fail_unless(result.header.code == 0);
1313+
fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0);
1314+
fail_unless(result.header.sync == static_cast<int>(f2));
1315+
fail_unless(result.header.code == 0);
1316+
1317+
/* Obtain in reversed order. */
1318+
f1 = conn.ping();
1319+
f2 = conn.ping();
1320+
fail_unless(client.wait(conn, f2, WAIT_TIMEOUT, &result) == 0);
1321+
fail_unless(result.header.sync == static_cast<int>(f2));
1322+
fail_unless(result.header.code == 0);
1323+
fail_unless(client.wait(conn, f1, WAIT_TIMEOUT, &result) == 0);
1324+
fail_unless(result.header.sync == static_cast<int>(f1));
1325+
fail_unless(result.header.code == 0);
1326+
13061327
client.close(conn);
13071328
}
13081329

0 commit comments

Comments
 (0)