-
Notifications
You must be signed in to change notification settings - Fork 442
/
Copy pathclient.cpp
601 lines (505 loc) · 21.1 KB
/
client.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
/**
* Copyright(c) 2018, Ouster, Inc.
* All rights reserved.
*/
#include "ouster/client.h"
#include <json/json.h>
#include <algorithm>
#include <cctype>
#include <cerrno>
#include <chrono>
#include <cmath>
#include <cstdio>
#include <cstring>
#include <fstream>
#include <memory>
#include <sstream>
#include <stdexcept>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "logging.h"
#include "netcompat.h"
#include "ouster/types.h"
#include "ouster/sensor_http.h"
using namespace std::chrono_literals;
namespace chrono = std::chrono;
using ouster::sensor::impl::Logger;
using ouster::sensor::util::SensorHttp;
namespace ouster {
namespace sensor {
struct client {
SOCKET lidar_fd;
SOCKET imu_fd;
std::string hostname;
Json::Value meta;
~client() {
impl::socket_close(lidar_fd);
impl::socket_close(imu_fd);
}
};
// defined in types.cpp
Json::Value to_json(const sensor_config& config);
namespace {
// default udp receive buffer size on windows is very low -- use 256K
const int RCVBUF_SIZE = 256 * 1024;
int32_t get_sock_port(SOCKET sock_fd) {
struct sockaddr_storage ss;
socklen_t addrlen = sizeof ss;
if (!impl::socket_valid(
getsockname(sock_fd, (struct sockaddr*)&ss, &addrlen))) {
logger().error("udp getsockname(): {}", impl::socket_get_error());
return SOCKET_ERROR;
}
if (ss.ss_family == AF_INET)
return ntohs(((struct sockaddr_in*)&ss)->sin_port);
else if (ss.ss_family == AF_INET6)
return ntohs(((struct sockaddr_in6*)&ss)->sin6_port);
else
return SOCKET_ERROR;
}
SOCKET udp_data_socket(int port) {
struct addrinfo hints, *info_start, *ai;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_PASSIVE;
auto port_s = std::to_string(port);
int ret = getaddrinfo(NULL, port_s.c_str(), &hints, &info_start);
if (ret != 0) {
logger().error("udp getaddrinfo(): {}", gai_strerror(ret));
return SOCKET_ERROR;
}
if (info_start == NULL) {
logger().error("udp getaddrinfo(): empty result");
return SOCKET_ERROR;
}
// try to bind a dual-stack ipv6 socket, but fall back to ipv4 only if that
// fails (when ipv6 is disabled via kernel parameters). Use two passes to
// deal with glibc addrinfo ordering:
// https://sourceware.org/bugzilla/show_bug.cgi?id=9981
for (auto preferred_af : {AF_INET6, AF_INET}) {
for (ai = info_start; ai != NULL; ai = ai->ai_next) {
if (ai->ai_family != preferred_af) continue;
// choose first addrinfo where bind() succeeds
SOCKET sock_fd =
socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
if (!impl::socket_valid(sock_fd)) {
logger().warn("udp socket(): {}", impl::socket_get_error());
continue;
}
int off = 0;
if (ai->ai_family == AF_INET6 &&
setsockopt(sock_fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&off,
sizeof(off))) {
logger().warn("udp setsockopt(): {}", impl::socket_get_error());
impl::socket_close(sock_fd);
continue;
}
if (impl::socket_set_reuse(sock_fd)) {
logger().warn("udp socket_set_reuse(): {}",
impl::socket_get_error());
}
if (::bind(sock_fd, ai->ai_addr, (socklen_t)ai->ai_addrlen)) {
logger().warn("udp bind(): {}", impl::socket_get_error());
impl::socket_close(sock_fd);
continue;
}
// bind() succeeded; set some options and return
if (impl::socket_set_non_blocking(sock_fd)) {
logger().warn("udp fcntl(): {}", impl::socket_get_error());
impl::socket_close(sock_fd);
continue;
}
if (setsockopt(sock_fd, SOL_SOCKET, SO_RCVBUF, (char*)&RCVBUF_SIZE,
sizeof(RCVBUF_SIZE))) {
logger().warn("udp setsockopt(): {}", impl::socket_get_error());
impl::socket_close(sock_fd);
continue;
}
freeaddrinfo(info_start);
return sock_fd;
}
}
// could not bind() a UDP server socket
freeaddrinfo(info_start);
logger().error("failed to bind udp socket");
return SOCKET_ERROR;
}
SOCKET mtp_data_socket(int port, const std::string& udp_dest_host = "",
const std::string& mtp_dest_host = "") {
struct addrinfo hints, *info_start, *ai;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_PASSIVE;
auto port_s = std::to_string(port);
int ret = getaddrinfo(NULL, port_s.c_str(), &hints, &info_start);
if (ret != 0) {
logger().error("mtp getaddrinfo(): {}", gai_strerror(ret));
return SOCKET_ERROR;
}
if (info_start == NULL) {
logger().error("mtp getaddrinfo(): empty result");
return SOCKET_ERROR;
}
for (auto preferred_af : {AF_INET}) { // TODO test with AF_INET6
for (ai = info_start; ai != NULL; ai = ai->ai_next) {
if (ai->ai_family != preferred_af) continue;
// choose first addrinfo where bind() succeeds
SOCKET sock_fd =
socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
if (!impl::socket_valid(sock_fd)) {
logger().warn("mtp socket(): {}", impl::socket_get_error());
continue;
}
if (impl::socket_set_reuse(sock_fd)) {
logger().warn("mtp socket_set_reuse(): {}",
impl::socket_get_error());
}
if (::bind(sock_fd, ai->ai_addr, (socklen_t)ai->ai_addrlen)) {
logger().warn("mtp bind(): {}", impl::socket_get_error());
impl::socket_close(sock_fd);
continue;
}
// bind() succeeded; join to multicast group on with preferred
// address connect only if addresses are not empty
if (!udp_dest_host.empty()) {
ip_mreq mreq;
mreq.imr_multiaddr.s_addr = inet_addr(udp_dest_host.c_str());
if (!mtp_dest_host.empty()) {
mreq.imr_interface.s_addr =
inet_addr(mtp_dest_host.c_str());
} else {
mreq.imr_interface.s_addr = htonl(INADDR_ANY);
}
if (setsockopt(sock_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
(char*)&mreq, sizeof(mreq))) {
logger().warn("mtp setsockopt(): {}",
impl::socket_get_error());
impl::socket_close(sock_fd);
continue;
}
}
// join to multicast group succeeded; set some options and return
if (impl::socket_set_non_blocking(sock_fd)) {
logger().warn("mtp fcntl(): {}", impl::socket_get_error());
impl::socket_close(sock_fd);
continue;
}
if (setsockopt(sock_fd, SOL_SOCKET, SO_RCVBUF, (char*)&RCVBUF_SIZE,
sizeof(RCVBUF_SIZE))) {
logger().warn("mtp setsockopt(): {}", impl::socket_get_error());
impl::socket_close(sock_fd);
continue;
}
freeaddrinfo(info_start);
return sock_fd;
}
}
// could not bind() a MTP server socket
freeaddrinfo(info_start);
logger().error("failed to bind mtp socket");
return SOCKET_ERROR;
}
Json::Value collect_metadata(const std::string& hostname, int timeout_sec) {
auto sensor_http = SensorHttp::create(hostname);
auto timeout_time =
chrono::steady_clock::now() + chrono::seconds{timeout_sec};
std::string status;
// TODO: can remove this loop when we drop support for FW 2.4
do {
if (chrono::steady_clock::now() >= timeout_time) return false;
std::this_thread::sleep_for(1s);
status = sensor_http->sensor_info()["status"].asString();
} while (status == "INITIALIZING");
// not all metadata available when sensor isn't RUNNING
if (status != "RUNNING") {
throw std::runtime_error(
"Cannot obtain full metadata with sensor status: " + status +
". Please ensure that sensor is not in a STANDBY, UNCONFIGURED, "
"WARMUP, or ERROR state");
}
auto metadata = sensor_http->metadata();
// merge extra info into metadata
metadata["client_version"] = client_version();
return metadata;
}
} // namespace
bool get_config(const std::string& hostname, sensor_config& config,
bool active) {
auto sensor_http = SensorHttp::create(hostname);
auto res = sensor_http->get_config_params(active);
config = parse_config(res);
return true;
}
bool set_config(const std::string& hostname, const sensor_config& config,
uint8_t config_flags) {
auto sensor_http = SensorHttp::create(hostname);
// reset staged config to avoid spurious errors
auto config_params = sensor_http->active_config_params();
Json::Value config_params_copy = config_params;
// set all desired config parameters
Json::Value config_json = to_json(config);
for (const auto& key : config_json.getMemberNames()) {
config_params[key] = config_json[key];
}
if (config_json.isMember("operating_mode") &&
config_params.isMember("auto_start_flag")) {
// we're setting operating mode and this sensor has a FW with
// auto_start_flag
config_params["auto_start_flag"] =
config_json["operating_mode"] == "NORMAL" ? 1 : 0;
}
// Signal multiplier changed from int to double for FW 3.0/2.5+, with
// corresponding change to config.signal_multiplier.
// Change values 1, 2, 3 back to ints to support older FWs
if (config_json.isMember("signal_multiplier")) {
check_signal_multiplier(config_params["signal_multiplier"].asDouble());
if (config_params["signal_multiplier"].asDouble() != 0.25 &&
config_params["signal_multiplier"].asDouble() != 0.5) {
config_params["signal_multiplier"] =
config_params["signal_multiplier"].asInt();
}
}
// set automatic udp dest, if flag specified
if (config_flags & CONFIG_UDP_DEST_AUTO) {
if (config.udp_dest)
throw std::invalid_argument(
"UDP_DEST_AUTO flag set but provided config has udp_dest");
sensor_http->set_udp_dest_auto();
auto staged = sensor_http->staged_config_params();
// now we set config_params according to the staged udp_dest from the
// sensor
if (staged.isMember("udp_ip")) { // means the FW version carries udp_ip
config_params["udp_ip"] = staged["udp_ip"];
config_params["udp_dest"] = staged["udp_ip"];
} else { // don't need to worry about udp_ip
config_params["udp_dest"] = staged["udp_dest"];
}
}
// if configuration didn't change then skip applying the params
// note: comparison will fail if config_params contains newer config params
// introduced after the verison of FW the sensor is on
if (config_flags & CONFIG_FORCE_REINIT ||
config_params_copy != config_params) {
Json::StreamWriterBuilder builder;
builder["indentation"] = "";
// send full string -- depends on older FWs not rejecting a blob even
// when it contains unknown keys
auto config_params_str = Json::writeString(builder, config_params);
sensor_http->set_config_param(".", config_params_str);
// reinitialize to make all staged parameters effective
sensor_http->reinitialize();
}
// save if indicated
if (config_flags & CONFIG_PERSIST) {
sensor_http->save_config_params();
}
return true;
}
std::string get_metadata(client& cli, int timeout_sec, bool legacy_format) {
try {
cli.meta = collect_metadata(cli.hostname, timeout_sec);
} catch (const std::exception& e) {
logger().warn(std::string("Unable to retrieve sensor metadata: ") + e.what());
throw;
}
Json::StreamWriterBuilder builder;
builder["enableYAMLCompatibility"] = true;
builder["precision"] = 6;
builder["indentation"] = " ";
auto metadata_string = Json::writeString(builder, cli.meta);
if (legacy_format) {
logger().warn(
"The SDK will soon output the non-legacy metadata format by "
"default. If you parse the metadata directly instead of using the "
"SDK (which will continue to read both legacy and non-legacy "
"formats), please be advised that on the next release you will "
"either have to update your parsing or specify legacy_format = "
"true to the get_metadata function.");
}
// We can't insert this logic into the light init_client since its advantage
// is that it doesn't make netowrk calls but we need it to run every time
// there is a valid connection to the sensor So we insert it here
// TODO: remove after release of FW 3.2/3.3 (sufficient warning)
sensor_config config;
get_config(cli.hostname, config);
auto fw_version = SensorHttp::firmware_version(cli.hostname);
// only warn for people on the latest FW, as people on older FWs may not
// care
if (fw_version.major >= 3 &&
config.udp_profile_lidar == UDPProfileLidar::PROFILE_LIDAR_LEGACY) {
logger().warn(
"Please note that the Legacy Lidar Profile will be deprecated "
"in the sensor FW soon. If you plan to upgrade your FW, we "
"recommend using the Single Return Profile instead. For users "
"sticking with older FWs, the Ouster SDK will continue to parse "
"the legacy lidar profile.");
}
return legacy_format ? convert_to_legacy(metadata_string) : metadata_string;
}
bool init_logger(const std::string& log_level, const std::string& log_file_path,
bool rotating, int max_size_in_bytes, int max_files) {
if (log_file_path.empty()) {
return Logger::instance().configure_stdout_sink(log_level);
} else {
return Logger::instance().configure_file_sink(
log_level, log_file_path, rotating, max_size_in_bytes, max_files);
}
}
std::shared_ptr<client> init_client(const std::string& hostname, int lidar_port,
int imu_port) {
logger().info("initializing sensor: {} with lidar port/imu port: {}/{}",
hostname, lidar_port, imu_port);
auto cli = std::make_shared<client>();
cli->hostname = hostname;
cli->lidar_fd = udp_data_socket(lidar_port);
cli->imu_fd = udp_data_socket(imu_port);
if (!impl::socket_valid(cli->lidar_fd) || !impl::socket_valid(cli->imu_fd))
return std::shared_ptr<client>();
return cli;
}
std::shared_ptr<client> init_client(const std::string& hostname,
const std::string& udp_dest_host,
lidar_mode ld_mode, timestamp_mode ts_mode,
int lidar_port, int imu_port,
int timeout_sec) {
auto cli = init_client(hostname, lidar_port, imu_port);
if (!cli) return std::shared_ptr<client>();
// update requested ports to actual bound ports
lidar_port = get_sock_port(cli->lidar_fd);
imu_port = get_sock_port(cli->imu_fd);
if (!impl::socket_valid(lidar_port) || !impl::socket_valid(imu_port))
return std::shared_ptr<client>();
try {
sensor::sensor_config config;
uint8_t config_flags = 0;
if (udp_dest_host.empty())
config_flags |= CONFIG_UDP_DEST_AUTO;
else
config.udp_dest = udp_dest_host;
if (ld_mode) config.ld_mode = ld_mode;
if (ts_mode) config.ts_mode = ts_mode;
if (lidar_port) config.udp_port_lidar = lidar_port;
if (imu_port) config.udp_port_imu = imu_port;
config.operating_mode = OPERATING_NORMAL;
set_config(hostname, config, config_flags);
// will block until no longer INITIALIZING
cli->meta = collect_metadata(hostname, timeout_sec);
// check for sensor error states
auto status = cli->meta["sensor_info"]["status"].asString();
if (status == "ERROR" || status == "UNCONFIGURED")
return std::shared_ptr<client>();
} catch (const std::runtime_error& e) {
// log error message
logger().error("init_client(): {}", e.what());
return std::shared_ptr<client>();
}
return cli;
}
std::shared_ptr<client> mtp_init_client(const std::string& hostname,
const sensor_config& config,
const std::string& mtp_dest_host,
bool main, int timeout_sec) {
logger().info(
"initializing sensor client: {} with ports: {}/{}, multicast group: {}",
hostname, config.udp_port_lidar.value(), config.udp_port_imu.value(),
config.udp_dest.value());
auto cli = std::make_shared<client>();
cli->hostname = hostname;
cli->lidar_fd = mtp_data_socket(config.udp_port_lidar.value(),
config.udp_dest.value(), mtp_dest_host);
cli->imu_fd = mtp_data_socket(
config.udp_port_imu
.value()); // no need to join multicast group second time
if (!impl::socket_valid(cli->lidar_fd) || !impl::socket_valid(cli->imu_fd))
return std::shared_ptr<client>();
if (main) {
auto lidar_port = get_sock_port(cli->lidar_fd);
auto imu_port = get_sock_port(cli->imu_fd);
sensor_config config_copy{config};
try {
uint8_t config_flags = 0;
if (lidar_port) config_copy.udp_port_lidar = lidar_port;
if (imu_port) config_copy.udp_port_imu = imu_port;
config_copy.operating_mode = OPERATING_NORMAL;
set_config(hostname, config_copy, config_flags);
// will block until no longer INITIALIZING
cli->meta = collect_metadata(hostname, timeout_sec);
// check for sensor error states
auto status = cli->meta["sensor_info"]["status"].asString();
if (status == "ERROR" || status == "UNCONFIGURED")
return std::shared_ptr<client>();
} catch (const std::runtime_error& e) {
// log error message
logger().error("init_client(): {}", e.what());
return std::shared_ptr<client>();
}
}
return cli;
}
client_state poll_client(const client& c, const int timeout_sec) {
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(c.lidar_fd, &rfds);
FD_SET(c.imu_fd, &rfds);
timeval tv;
tv.tv_sec = timeout_sec;
tv.tv_usec = 0;
SOCKET max_fd = std::max(c.lidar_fd, c.imu_fd);
SOCKET retval = select((int)max_fd + 1, &rfds, NULL, NULL, &tv);
client_state res = client_state(0);
if (!impl::socket_valid(retval) && impl::socket_exit()) {
res = EXIT;
} else if (!impl::socket_valid(retval)) {
logger().error("select: {}", impl::socket_get_error());
res = client_state(res | CLIENT_ERROR);
} else if (retval) {
if (FD_ISSET(c.lidar_fd, &rfds)) res = client_state(res | LIDAR_DATA);
if (FD_ISSET(c.imu_fd, &rfds)) res = client_state(res | IMU_DATA);
}
return res;
}
static bool recv_fixed(SOCKET fd, void* buf, int64_t len) {
// Have to read longer than len because you need to know if the packet is
// too large
int64_t bytes_read = recv(fd, (char*)buf, len + 1, 0);
if (bytes_read == len) {
return true;
} else if (bytes_read == -1) {
logger().error("recvfrom: {}", impl::socket_get_error());
} else {
logger().warn("Unexpected udp packet length: {}", bytes_read);
}
return false;
}
bool read_lidar_packet(const client& cli, uint8_t* buf,
const packet_format& pf) {
return recv_fixed(cli.lidar_fd, buf, pf.lidar_packet_size);
}
bool read_imu_packet(const client& cli, uint8_t* buf, const packet_format& pf) {
return recv_fixed(cli.imu_fd, buf, pf.imu_packet_size);
}
int get_lidar_port(client& cli) { return get_sock_port(cli.lidar_fd); }
int get_imu_port(client& cli) { return get_sock_port(cli.imu_fd); }
bool in_multicast(const std::string& addr) { return IN_MULTICAST(ntohl(inet_addr(addr.c_str()))); }
/**
* Return the socket file descriptor used to listen for lidar UDP data.
*
* @param[in] cli client returned by init_client associated with the connection.
*
* @return the socket file descriptor.
*/
extern SOCKET get_lidar_socket_fd(client& cli) { return cli.lidar_fd; }
/**
* Return the socket file descriptor used to listen for imu UDP data.
*
* @param[in] cli client returned by init_client associated with the connection.
*
* @return the socket file descriptor.
*/
extern SOCKET get_imu_socket_fd(client& cli) { return cli.imu_fd; }
} // namespace sensor
} // namespace ouster