Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ntcore,wpinet] Enhance debug logging across NT and WS layers #6267

Merged
merged 7 commits into from
Jan 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion glass/src/app/native/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ static void NtInitialize() {
auto inst = nt::GetDefaultInstance();
auto poller = nt::CreateListenerPoller(inst);
nt::AddPolledListener(poller, inst, NT_EVENT_CONNECTION | NT_EVENT_IMMEDIATE);
nt::AddPolledLogger(poller, 0, 100);
nt::AddPolledLogger(poller, NT_LOG_INFO, 100);
gui::AddEarlyExecute([inst, poller] {
auto win = gui::GetSystemWindow();
if (!win) {
Expand Down
5 changes: 3 additions & 2 deletions ntcore/src/main/native/cpp/NetworkClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ void NetworkClient::HandleLocal() {
}

void NetworkClient::TcpConnected(uv::Tcp& tcp) {
tcp.SetLogger(&m_logger);
tcp.SetNoDelay(true);
// Start the WS client
if (m_logger.min_level() >= wpi::WPI_LOG_DEBUG4) {
Expand Down Expand Up @@ -401,8 +402,8 @@ void NetworkClient::WsConnected(wpi::WebSocket& ws, uv::Tcp& tcp,
INFO("CONNECTED NT4 to {} port {}", connInfo.remote_ip, connInfo.remote_port);
m_connHandle = m_connList.AddConnection(connInfo);

m_wire =
std::make_shared<net::WebSocketConnection>(ws, connInfo.protocol_version);
m_wire = std::make_shared<net::WebSocketConnection>(
ws, connInfo.protocol_version, m_logger);
m_clientImpl = std::make_unique<net::ClientImpl>(
m_loop.Now().count(), m_inst, *m_wire, m_logger, m_timeSyncUpdated,
[this](uint32_t repeatMs) {
Expand Down
3 changes: 2 additions & 1 deletion ntcore/src/main/native/cpp/NetworkServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ void NetworkServer::ServerConnection4::ProcessWsUpgrade() {
m_info.protocol_version =
protocol == "v4.1.networktables.first.wpi.edu" ? 0x0401 : 0x0400;
m_wire = std::make_shared<net::WebSocketConnection>(
*m_websocket, m_info.protocol_version);
*m_websocket, m_info.protocol_version, m_logger);

if (protocol == "rtt.networktables.first.wpi.edu") {
INFO("CONNECTED RTT client (from {})", m_connInfo);
Expand Down Expand Up @@ -499,6 +499,7 @@ void NetworkServer::Init() {
if (!tcp) {
return;
}
tcp->SetLogger(&m_logger);
tcp->error.connect([logger = &m_logger](uv::Error err) {
WPI_INFO(*logger, "NT4 socket error: {}", err.str());
});
Expand Down
14 changes: 12 additions & 2 deletions ntcore/src/main/native/cpp/net/WebSocketConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <span>

#include <wpi/Endian.h>
#include <wpi/Logger.h>
#include <wpi/SpanExtras.h>
#include <wpi/raw_ostream.h>
#include <wpi/timestamp.h>
Expand Down Expand Up @@ -53,6 +54,7 @@ void WebSocketConnection::Stream::write_impl(const char* data, size_t len) {
if (data == m_conn.m_bufs.back().base) {
// flush_nonempty() case
size_t amt = len - m_conn.m_bufs.back().len;
WPI_DEBUG4(m_conn.m_logger, "conn: writing {} bytes (nonempty)", amt);
m_conn.m_bufs.back().len = len;
m_conn.m_framePos += amt;
m_conn.m_written += amt;
Expand All @@ -77,6 +79,7 @@ void WebSocketConnection::Stream::write_impl(const char* data, size_t len) {
size_t amt = (std::min)(static_cast<int>(kAllocSize - buf.len),
static_cast<int>(len));
if (amt > 0) {
WPI_DEBUG4(m_conn.m_logger, "conn: writing {} bytes", amt);
std::memcpy(buf.base + buf.len, data, amt);
buf.len += amt;
m_conn.m_framePos += amt;
Expand Down Expand Up @@ -104,8 +107,9 @@ void WebSocketConnection::Stream::write_impl(const char* data, size_t len) {
}

WebSocketConnection::WebSocketConnection(wpi::WebSocket& ws,
unsigned int version)
: m_ws{ws}, m_version{version} {}
unsigned int version,
wpi::Logger& logger)
: m_ws{ws}, m_logger{logger}, m_version{version} {}

WebSocketConnection::~WebSocketConnection() {
for (auto&& buf : m_bufs) {
Expand All @@ -117,6 +121,7 @@ WebSocketConnection::~WebSocketConnection() {
}

void WebSocketConnection::SendPing(uint64_t time) {
WPI_DEBUG4(m_logger, "conn: sending ping {}", time);
auto buf = AllocBuf();
buf.len = 8;
wpi::support::endian::write64<wpi::support::native>(buf.base, time);
Expand All @@ -133,6 +138,8 @@ void WebSocketConnection::SendPing(uint64_t time) {
}

void WebSocketConnection::StartFrame(uint8_t opcode) {
WPI_DEBUG4(m_logger, "conn: starting frame {}",
static_cast<unsigned int>(opcode));
m_frames.emplace_back(opcode, m_bufs.size(), m_bufs.size() + 1);
m_bufs.emplace_back(AllocBuf());
m_bufs.back().len = 0;
Expand Down Expand Up @@ -168,6 +175,7 @@ int WebSocketConnection::Write(
if (kind == kText) {
os << (first ? '[' : ',');
}
WPI_DEBUG4(m_logger, "writing");
writer(os);
}
++m_frames.back().count;
Expand All @@ -179,6 +187,7 @@ int WebSocketConnection::Write(
}

int WebSocketConnection::Flush() {
WPI_DEBUG4(m_logger, "conn: flushing");
m_lastFlushTime = wpi::Now();
if (m_state == kEmpty) {
return 0;
Expand Down Expand Up @@ -243,6 +252,7 @@ void WebSocketConnection::Send(
os << ']';
}
wpi::WebSocket::Frame frame{opcode, os.bufs()};
WPI_DEBUG4(m_logger, "Send({})", static_cast<uint8_t>(opcode));
m_ws.SendFrames({{frame}}, [selfweak = weak_from_this()](auto bufs, auto) {
if (auto self = selfweak.lock()) {
self->ReleaseBufs(bufs);
Expand Down
8 changes: 7 additions & 1 deletion ntcore/src/main/native/cpp/net/WebSocketConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@

#include "WireConnection.h"

namespace wpi {
class Logger;
} // namespace wpi

namespace nt::net {

class WebSocketConnection final
: public WireConnection,
public std::enable_shared_from_this<WebSocketConnection> {
public:
WebSocketConnection(wpi::WebSocket& ws, unsigned int version);
WebSocketConnection(wpi::WebSocket& ws, unsigned int version,
wpi::Logger& logger);
~WebSocketConnection() override;
WebSocketConnection(const WebSocketConnection&) = delete;
WebSocketConnection& operator=(const WebSocketConnection&) = delete;
Expand Down Expand Up @@ -70,6 +75,7 @@ class WebSocketConnection final
void ReleaseBufs(std::span<wpi::uv::Buffer> bufs);

wpi::WebSocket& m_ws;
wpi::Logger& m_logger;

class Stream;

Expand Down
2 changes: 1 addition & 1 deletion outlineviewer/src/main/native/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static void NtInitialize() {
auto inst = nt::GetDefaultInstance();
auto poller = nt::CreateListenerPoller(inst);
nt::AddPolledListener(poller, inst, NT_EVENT_CONNECTION | NT_EVENT_IMMEDIATE);
nt::AddPolledLogger(poller, 0, 100);
nt::AddPolledLogger(poller, NT_LOG_INFO, 100);
gui::AddEarlyExecute([inst, poller] {
auto win = gui::GetSystemWindow();
if (!win) {
Expand Down
47 changes: 27 additions & 20 deletions wpinet/src/main/native/cpp/WebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,21 @@

using namespace wpi;

#ifdef WPINET_WEBSOCKET_VERBOSE_DEBUG
static std::string DebugBinary(std::span<const uint8_t> val) {
#ifdef WPINET_WEBSOCKET_VERBOSE_DEBUG_CONTENT
std::string str;
wpi::raw_string_ostream stros{str};
size_t limited = 0;
if (val.size() > 30) {
limited = val.size();
val = val.subspan(0, 30);
}
for (auto ch : val) {
stros << fmt::format("{:02x},", static_cast<unsigned int>(ch) & 0xff);
}
if (limited != 0) {
stros << fmt::format("... (total {})", limited);
}
return str;
#else
return "";
Expand All @@ -46,7 +53,6 @@ static inline std::string_view DebugText(std::string_view val) {
return "";
#endif
}
#endif // WPINET_WEBSOCKET_VERBOSE_DEBUG

class WebSocket::WriteReq : public uv::WriteReq,
public detail::WebSocketWriteReqBase {
Expand All @@ -61,7 +67,7 @@ class WebSocket::WriteReq : public uv::WriteReq,
void Send(uv::Error err) {
auto ws = m_ws.lock();
if (!ws || err) {
WS_DEBUG("no WS or error, calling callback\n");
// WS_DEBUG("no WS or error, calling callback\n");
m_frames.ReleaseBufs();
m_callback(m_userBufs, err);
return;
Expand All @@ -71,18 +77,18 @@ class WebSocket::WriteReq : public uv::WriteReq,
if (m_controlCont) {
// We have a control frame; switch to it. We will come back here via
// the control frame's m_cont when it's done.
WS_DEBUG("Continuing with a control write\n");
WS_DEBUG(ws->GetStream(), "Continuing with a control write");
auto controlCont = std::move(m_controlCont);
m_controlCont.reset();
return controlCont->Send({});
}
int result = Continue(ws->m_stream, shared_from_this());
WS_DEBUG("Continue() -> {}\n", result);
WS_DEBUG(ws->GetStream(), "Continue() -> {}", result);
if (result <= 0) {
m_frames.ReleaseBufs();
m_callback(m_userBufs, uv::Error{result});
if (result == 0 && m_cont) {
WS_DEBUG("Continuing with another write\n");
WS_DEBUG(ws->GetStream(), "Continuing with another write");
ws->m_curWriteReq = m_cont;
return m_cont->Send({});
} else {
Expand Down Expand Up @@ -557,23 +563,23 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
uint8_t opcode = m_header[0] & kOpMask;
switch (opcode) {
case kOpCont:
WS_DEBUG("WS Fragment {} [{}]\n", m_payload.size(),
WS_DEBUG(m_stream, "WS Fragment {} [{}]", m_payload.size(),
DebugBinary(m_payload));
switch (m_fragmentOpcode) {
case kOpText:
if (!m_combineFragments || fin) {
std::string_view content{
reinterpret_cast<char*>(m_payload.data()),
m_payload.size()};
WS_DEBUG("WS RecvText(Defrag) {} ({})\n", m_payload.size(),
DebugText(content));
WS_DEBUG(m_stream, "WS RecvText(Defrag) {} ({})",
m_payload.size(), DebugText(content));
text(content, fin);
}
break;
case kOpBinary:
if (!m_combineFragments || fin) {
WS_DEBUG("WS RecvBinary(Defrag) {} ({})\n", m_payload.size(),
DebugBinary(m_payload));
WS_DEBUG(m_stream, "WS RecvBinary(Defrag) {} ({})",
m_payload.size(), DebugBinary(m_payload));
binary(m_payload, fin);
}
break;
Expand All @@ -589,34 +595,35 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
std::string_view content{reinterpret_cast<char*>(m_payload.data()),
m_payload.size()};
if (m_fragmentOpcode != 0) {
WS_DEBUG("WS RecvText {} ({}) -> INCOMPLETE FRAGMENT\n",
WS_DEBUG(m_stream, "WS RecvText {} ({}) -> INCOMPLETE FRAGMENT",
m_payload.size(), DebugText(content));
return Fail(1002, "incomplete fragment");
}
if (!m_combineFragments || fin) {
WS_DEBUG("WS RecvText {} ({})\n", m_payload.size(),
WS_DEBUG(m_stream, "WS RecvText {} ({})", m_payload.size(),
DebugText(content));
text(content, fin);
}
if (!fin) {
WS_DEBUG("WS RecvText {} StartFrag\n", m_payload.size());
WS_DEBUG(m_stream, "WS RecvText {} StartFrag", m_payload.size());
m_fragmentOpcode = opcode;
}
break;
}
case kOpBinary:
if (m_fragmentOpcode != 0) {
WS_DEBUG("WS RecvBinary {} ({}) -> INCOMPLETE FRAGMENT\n",
WS_DEBUG(m_stream, "WS RecvBinary {} ({}) -> INCOMPLETE FRAGMENT",
m_payload.size(), DebugBinary(m_payload));
return Fail(1002, "incomplete fragment");
}
if (!m_combineFragments || fin) {
WS_DEBUG("WS RecvBinary {} ({})\n", m_payload.size(),
WS_DEBUG(m_stream, "WS RecvBinary {} ({})", m_payload.size(),
DebugBinary(m_payload));
binary(m_payload, fin);
}
if (!fin) {
WS_DEBUG("WS RecvBinary {} StartFrag\n", m_payload.size());
WS_DEBUG(m_stream, "WS RecvBinary {} StartFrag",
m_payload.size());
m_fragmentOpcode = opcode;
}
break;
Expand Down Expand Up @@ -664,15 +671,15 @@ void WebSocket::HandleIncoming(uv::Buffer& buf, size_t size) {
}
});
}
WS_DEBUG("WS RecvPing() {} ({})\n", m_controlPayload.size(),
WS_DEBUG(m_stream, "WS RecvPing() {} ({})", m_controlPayload.size(),
DebugBinary(m_controlPayload));
ping(m_controlPayload);
break;
case kOpPong:
if (!fin) {
return Fail(1002, "cannot fragment control frames");
}
WS_DEBUG("WS RecvPong() {} ({})\n", m_controlPayload.size(),
WS_DEBUG(m_stream, "WS RecvPong() {} ({})", m_controlPayload.size(),
DebugBinary(m_controlPayload));
pong(m_controlPayload);
break;
Expand Down Expand Up @@ -738,7 +745,7 @@ void WebSocket::SendFrames(
std::span<const Frame> frames,
std::function<void(std::span<uv::Buffer>, uv::Error)> callback) {
// If we're not open, emit an error and don't send the data
WS_DEBUG("SendFrames({})\n", frames.size());
WS_DEBUG(m_stream, "SendFrames({})", frames.size());
if (m_state != OPEN) {
SendError(frames, callback);
return;
Expand Down
15 changes: 6 additions & 9 deletions wpinet/src/main/native/cpp/WebSocketDebug.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,15 @@

#pragma once

#include <fmt/format.h>
#include <wpi/Logger.h>

// #define WPINET_WEBSOCKET_VERBOSE_DEBUG
// #define WPINET_WEBSOCKET_VERBOSE_DEBUG_CONTENT
#define WPINET_WEBSOCKET_VERBOSE_DEBUG_CONTENT

#ifdef __clang__
#pragma clang diagnostic ignored "-Wgnu-zero-variadic-macro-arguments"
#endif

#ifdef WPINET_WEBSOCKET_VERBOSE_DEBUG
#define WS_DEBUG(format, ...) \
::fmt::print(FMT_STRING(format) __VA_OPT__(, ) __VA_ARGS__)
#else
#define WS_DEBUG(fmt, ...)
#endif
#define WS_DEBUG(stream, format, ...) \
if (auto logger_ = stream.GetLogger()) { \
WPI_DEBUG4(*logger_, "WS: " format __VA_OPT__(, ) __VA_ARGS__) \
}
14 changes: 7 additions & 7 deletions wpinet/src/main/native/cpp/WebSocketSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ int WebSocketWriteReqBase::Continue(Stream& stream, std::shared_ptr<Req> req) {
}

int sentBytes = stream.TryWrite(bufs);
WS_DEBUG("TryWrite({}) -> {} (expected {})\n", bufs.size(), sentBytes,
WS_DEBUG(stream, "TryWrite({}) -> {} (expected {})", bufs.size(), sentBytes,
numBytes);
if (sentBytes < 0) {
return sentBytes; // error
Expand Down Expand Up @@ -133,10 +133,10 @@ int WebSocketWriteReqBase::Continue(Stream& stream, std::shared_ptr<Req> req) {
m_continueBufPos += bufIt - bufs.begin();

if (writeBufs.empty()) {
WS_DEBUG("Write Done\n");
WS_DEBUG(stream, "Write Done");
return 0;
}
WS_DEBUG("Write({})\n", writeBufs.size());
WS_DEBUG(stream, "Write({})", writeBufs.size());
stream.Write(writeBufs, req);
return 1;
}
Expand All @@ -146,7 +146,7 @@ std::span<const WebSocket::Frame> TrySendFrames(
bool server, Stream& stream, std::span<const WebSocket::Frame> frames,
MakeReq&& makeReq,
std::function<void(std::span<uv::Buffer>, uv::Error)> callback) {
WS_DEBUG("TrySendFrames({})\n", frames.size());
WS_DEBUG(stream, "TrySendFrames({})", frames.size());
auto frameIt = frames.begin();
auto frameEnd = frames.end();
while (frameIt != frameEnd) {
Expand All @@ -168,8 +168,8 @@ std::span<const WebSocket::Frame> TrySendFrames(

// try to send
int sentBytes = stream.TryWrite(sendFrames.m_bufs);
WS_DEBUG("TryWrite({}) -> {} (expected {})\n", sendFrames.m_bufs.size(),
sentBytes, numBytes);
WS_DEBUG(stream, "TryWrite({}) -> {} (expected {})",
sendFrames.m_bufs.size(), sentBytes, numBytes);

if (sentBytes == 0) {
// we haven't started a frame yet; clean up any bufs that have actually
Expand Down Expand Up @@ -281,7 +281,7 @@ std::span<const WebSocket::Frame> TrySendFrames(
req->m_userBufs.append(it->data.begin(), it->data.end());
}

WS_DEBUG("Write({})\n", writeBufs.size());
WS_DEBUG(stream, "Write({})", writeBufs.size());
stream.Write(writeBufs, req);
#ifdef __clang__
// work around clang bug
Expand Down
Loading
Loading