Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable various C++ build-time warnings and treat them as errors #340

Merged
2 changes: 1 addition & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ set(UCXX_BUILD_TESTS ${BUILD_TESTS})
set(UCXX_BUILD_BENCHMARKS ${BUILD_BENCHMARKS})
set(UCXX_BUILD_EXAMPLES ${BUILD_EXAMPLES})

set(UCXX_CXX_FLAGS "")
set(UCXX_CXX_FLAGS -Wall -Wattributes -Werror -Wextra -Wsign-conversion -Wno-missing-field-initializers)
set(UCXX_CXX_DEFINITIONS "")

# Set RMM logging level
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/ucxx/delayed_submission.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class BaseDelayedSubmissionCollection {
toProcess = _collection.size();
}

for (auto i = 0; i < toProcess; ++i) {
for (size_t i = 0; i < toProcess; ++i) {
std::pair<ItemIdType, T> item;
{
std::lock_guard<std::mutex> lock(_mutex);
Expand Down
14 changes: 7 additions & 7 deletions cpp/include/ucxx/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ namespace ucxx {
*/
class Request : public Component {
protected:
ucs_status_t _status{UCS_INPROGRESS}; ///< Requests status
std::string _status_msg{}; ///< Human-readable status message
void* _request{nullptr}; ///< Pointer to UCP request
std::shared_ptr<Future> _future{nullptr}; ///< Future to notify upon completion
RequestCallbackUserFunction _callback{nullptr}; ///< Completion callback
RequestCallbackUserData _callbackData{nullptr}; ///< Completion callback data
ucs_status_t _status{UCS_INPROGRESS}; ///< Requests status
std::string _status_msg{}; ///< Human-readable status message
void* _request{nullptr}; ///< Pointer to UCP request
std::shared_ptr<Future> _future{nullptr}; ///< Future to notify upon completion
std::shared_ptr<Worker> _worker{
nullptr}; ///< Worker that generated request (if not from endpoint)
std::shared_ptr<Endpoint> _endpoint{
nullptr}; ///< Endpoint that generated request (if not from worker)
std::string _ownerString{
"undetermined owner"}; ///< String to print owner (endpoint or worker) when logging
std::recursive_mutex _mutex{}; ///< Mutex to prevent checking status while it's being set
data::RequestData _requestData{}; ///< The operation-specific data to be used in the request
std::string _operationName{
"request_undefined"}; ///< Human-readable operation name, mostly used for log messages
std::recursive_mutex _mutex{}; ///< Mutex to prevent checking status while it's being set
bool _enablePythonFuture{true}; ///< Whether Python future is enabled for this request
RequestCallbackUserFunction _callback{nullptr}; ///< Completion callback
RequestCallbackUserData _callbackData{nullptr}; ///< Completion callback data

/**
* @brief Protected constructor of an abstract `ucxx::Request`.
Expand Down
3 changes: 1 addition & 2 deletions cpp/include/ucxx/request_tag_multi.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,8 @@ class RequestTagMulti : public Request {
* may still verify each of the underlying requests individually.
*
* @param[in] status the status of the request being completed.
* @param[in] request the `ucxx::BufferRequest` object containing a single tag .
*/
void markCompleted(ucs_status_t status, RequestCallbackUserData request);
void markCompleted(ucs_status_t status);

/**
* @brief Callback to submit request to receive new header or frames.
Expand Down
7 changes: 3 additions & 4 deletions cpp/src/delayed_submission.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ GenericDelayedSubmissionCollection::GenericDelayedSubmissionCollection(const std
{
}

void GenericDelayedSubmissionCollection::scheduleLog(ItemIdType id,
DelayedSubmissionCallbackType item)
void GenericDelayedSubmissionCollection::scheduleLog(ItemIdType id, DelayedSubmissionCallbackType)
{
ucxx_trace_req("Registered %s [%lu]", _name.c_str(), id);
}
Expand All @@ -58,8 +57,8 @@ void GenericDelayedSubmissionCollection::processItem(ItemIdType id,
}

DelayedSubmissionCollection::DelayedSubmissionCollection(bool enableDelayedRequestSubmission)
: _enableDelayedRequestSubmission(enableDelayedRequestSubmission),
_requests(RequestDelayedSubmissionCollection{"request", enableDelayedRequestSubmission})
: _requests(RequestDelayedSubmissionCollection{"request", enableDelayedRequestSubmission}),
_enableDelayedRequestSubmission(enableDelayedRequestSubmission)
{
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ std::shared_ptr<Request> Endpoint::close(const bool enablePythonFuture,
bool force = _endpointErrorHandling;

auto combineCallbacksFunction = [this, &callbackFunction, &callbackData](
ucs_status_t status, EndpointCloseCallbackUserData unused) {
ucs_status_t status,
EndpointCloseCallbackUserData /* callbackData */) {
_status = status;
if (callbackFunction) callbackFunction(status, callbackData);
{
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/internal/request_am.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void RecvAmMessage::setUcpRequest(void* request) { _request->_request = request;
void RecvAmMessage::callback(void* request, ucs_status_t status)
{
std::visit(data::dispatch{
[this, request, status](data::AmReceive amReceive) {
[this, request, status](data::AmReceive) {
_request->callback(request, status);
{
std::lock_guard<std::mutex> lock(_amData->_mutex);
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/memory_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ MemoryHandle::MemoryHandle(std::shared_ptr<Context> context,
_size = attr.length;
_memoryType = attr.mem_type;

ucxx_trace("MemoryHandle created: %p, UCP handle: %p, base address: 0x%lx, size: %lu, type: %lu",
ucxx_trace("MemoryHandle created: %p, UCP handle: %p, base address: 0x%lx, size: %lu, type: %d",
this,
_handle,
_baseAddress,
Expand All @@ -61,7 +61,7 @@ MemoryHandle::~MemoryHandle()
{
ucp_mem_unmap(std::dynamic_pointer_cast<Context>(getParent())->getHandle(), _handle);
ucxx_trace(
"ucxx::MemoryHandle destroyed: %p, UCP handle: %p, base address: 0x%lx, size: %lu, type: %lu",
"ucxx::MemoryHandle destroyed: %p, UCP handle: %p, base address: 0x%lx, size: %lu, type: %d",
this,
_handle,
_baseAddress,
Expand Down
17 changes: 14 additions & 3 deletions cpp/src/remote_key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* SPDX-License-Identifier: BSD-3-Clause
*/
#include <cstdio>
#include <limits>
#include <memory>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -67,7 +68,7 @@ RemoteKey::~RemoteKey()
// Only destroy remote key if this was created from a `SerializedRemoteKey`, i.e., the
// buffer is remote.
ucp_rkey_destroy(_remoteKey);
ucxx_trace("ucxx::RemoteKey destroyed (deserialized): %p, UCP handle: %p", _remoteKey);
ucxx_trace("ucxx::RemoteKey destroyed (deserialized): %p, UCP handle: %p", this, _remoteKey);
}
}

Expand All @@ -93,8 +94,13 @@ SerializedRemoteKey RemoteKey::serialize() const
{
std::stringstream ss;

if (_packedRemoteKeySize > std::numeric_limits<std::streamsize>::max())
// We should never have a remote key this big, but just in case.
throw std::overflow_error("Remote key is too large to deserialize");

ss.write(reinterpret_cast<char const*>(&_packedRemoteKeySize), sizeof(_packedRemoteKeySize));
ss.write(reinterpret_cast<char const*>(_packedRemoteKey), _packedRemoteKeySize);
ss.write(reinterpret_cast<char const*>(_packedRemoteKey),
static_cast<std::streamsize>(_packedRemoteKeySize));
ss.write(reinterpret_cast<char const*>(&_memoryBaseAddress), sizeof(_memoryBaseAddress));
ss.write(reinterpret_cast<char const*>(&_memorySize), sizeof(_memorySize));

Expand Down Expand Up @@ -134,7 +140,12 @@ void RemoteKey::deserialize(const SerializedRemoteKey& serializedRemoteKey)
_packedRemoteKeyVector = std::vector<char>(_packedRemoteKeySize);
_packedRemoteKey = _packedRemoteKeyVector.data();

ss.read(reinterpret_cast<char*>(_packedRemoteKey), _packedRemoteKeySize);
if (_packedRemoteKeySize > std::numeric_limits<std::streamsize>::max())
// We should never have a remote key this big, but just in case.
throw std::overflow_error("Remote key is too large to deserialize");

ss.read(reinterpret_cast<char*>(_packedRemoteKey),
static_cast<std::streamsize>(_packedRemoteKeySize));
ss.read(reinterpret_cast<char*>(&_memoryBaseAddress), sizeof(_memoryBaseAddress));
ss.read(reinterpret_cast<char*>(&_memorySize), sizeof(_memorySize));
}
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/request_am.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ RequestAm::RequestAm(std::shared_ptr<Component> endpointOrWorker,
callbackData)
{
std::visit(data::dispatch{
[this](data::AmSend amSend) {
[this](data::AmSend) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there are multiple cases where you dispatch to an empty functor, maybe that should be added as a method for simplification?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm misunderstanding your suggestion, but the empty functors are not (necessarily) exactly the same, their signature changes with the argument type which is used to match the dispatcher, so I don't think this is possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, never mind. What was confusing me is that you're using a visitor pattern where none of the arms actually use the input object (both here and in other parts of this PR), so it looks like you don't need the input object at all, but in fact you do need it for the dispatch itself to be done so you can't drop the function parameter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You nailed it, that's exactly why we need it regardless of whether we use it or not, the type carries at a minimum the meaning of the kind of transfer the request will perform.

if (_endpoint == nullptr)
throw ucxx::Error("An endpoint is required to send active messages");
},
[](data::AmReceive amReceive) {},
[](data::AmReceive) {},
},
requestData);
}
Expand Down Expand Up @@ -185,7 +185,7 @@ static void _amSendCallback(void* request, ucs_status_t status, void* user_data)

static void _recvCompletedCallback(void* request,
ucs_status_t status,
size_t length,
size_t /* length */,
void* user_data)
{
internal::RecvAmMessage* recvAmMessage = static_cast<internal::RecvAmMessage*>(user_data);
Expand Down Expand Up @@ -225,7 +225,7 @@ ucs_status_t RequestAm::recvCallback(void* arg,
try {
return amData->_receiverCallbacks.at(amHeader.receiverCallbackInfo->owner)
.at(amHeader.receiverCallbackInfo->id);
} catch (std::out_of_range) {
} catch (const std::out_of_range& e) {
ucxx_error("No AM receiver callback registered for owner '%s' with id %lu",
std::string(amHeader.receiverCallbackInfo->owner).data(),
amHeader.receiverCallbackInfo->id);
Expand Down Expand Up @@ -417,7 +417,7 @@ void RequestAm::populateDelayedSubmission()
{
bool terminate =
std::visit(data::dispatch{
[this](data::AmSend amSend) {
[this](data::AmSend) {
if (_endpoint->getHandle() == nullptr) {
ucxx_warn("Endpoint was closed before message could be sent");
Request::callback(this, UCS_ERR_CANCELED);
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/request_endpoint_close.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@ void RequestEndpointClose::request()
_request = request;
}

static void logPopulateDelayedSubmission() {}

void RequestEndpointClose::populateDelayedSubmission()
{
if (_endpoint != nullptr && _endpoint->getHandle() == nullptr) {
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/request_flush.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ void RequestFlush::request()
_request = request;
}

static void logPopulateDelayedSubmission() {}

void RequestFlush::populateDelayedSubmission()
{
if (_endpoint != nullptr && _endpoint->getHandle() == nullptr) {
Expand Down
10 changes: 4 additions & 6 deletions cpp/src/request_mem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ RequestMem::RequestMem(std::shared_ptr<Endpoint> endpoint,
callbackData)
{
std::visit(data::dispatch{
[this](data::MemPut memPut) {
[this](data::MemPut) {
if (_endpoint == nullptr)
throw ucxx::Error("A valid endpoint is required to send memory messages.");
},
[this](data::MemGet memGet) {
[this](data::MemGet) {
if (_endpoint == nullptr)
throw ucxx::Error("A valid endpoint is required to receive memory messages.");
},
Expand Down Expand Up @@ -122,21 +122,19 @@ void RequestMem::request()
_request = request;
}

static void logPopulateDelayedSubmission() {}

void RequestMem::populateDelayedSubmission()
{
bool terminate =
std::visit(data::dispatch{
[this](data::MemPut memPut) {
[this](data::MemPut) {
if (_endpoint->getHandle() == nullptr) {
ucxx_warn("Endpoint was closed before message could be sent");
Request::callback(this, UCS_ERR_CANCELED);
return true;
}
return false;
},
[this](data::MemGet memGet) {
[this](data::MemGet) {
if (_worker->getHandle() == nullptr) {
ucxx_warn("Endpoint was closed before message could be received");
Request::callback(this, UCS_ERR_CANCELED);
Expand Down
19 changes: 11 additions & 8 deletions cpp/src/request_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ RequestStream::RequestStream(std::shared_ptr<Endpoint> endpoint,
: Request(endpoint, data::getRequestData(requestData), operationName, enablePythonFuture)
{
std::visit(data::dispatch{
[this](data::StreamSend streamSend) {
[this](data::StreamSend) {
if (_endpoint == nullptr)
throw ucxx::Error("A valid endpoint is required to send stream messages.");
},
[this](data::StreamReceive streamReceive) {
[this](data::StreamReceive) {
if (_endpoint == nullptr)
throw ucxx::Error("A valid endpoint is required to receive stream messages.");
},
Expand Down Expand Up @@ -97,15 +97,15 @@ void RequestStream::populateDelayedSubmission()
{
bool terminate =
std::visit(data::dispatch{
[this](data::StreamSend streamSend) {
[this](data::StreamSend) {
if (_endpoint->getHandle() == nullptr) {
ucxx_warn("Endpoint was closed before message could be sent");
Request::callback(this, UCS_ERR_CANCELED);
return true;
}
return false;
},
[this](data::StreamReceive streamReceive) {
[this](data::StreamReceive) {
if (_worker->getHandle() == nullptr) {
ucxx_warn("Worker was closed before message could be received");
Request::callback(this, UCS_ERR_CANCELED);
Expand Down Expand Up @@ -163,10 +163,13 @@ void RequestStream::callback(void* request, ucs_status_t status, size_t length)

if (status == UCS_ERR_MESSAGE_TRUNCATED) {
const char* fmt = "length mismatch: %llu (got) != %llu (expected)";
size_t len = std::snprintf(nullptr, 0, fmt, length, streamReceive._length);
_status_msg = std::string(len + 1, '\0'); // +1 for null terminator
std::snprintf(
_status_msg.data(), _status_msg.size(), fmt, length, streamReceive._length);
int charsLen = std::snprintf(nullptr, 0, fmt, length, streamReceive._length);
if (charsLen > 0) {
_status_msg = std::string(static_cast<size_t>(charsLen) + 1,
'\0'); // +1 for null terminator
std::snprintf(
_status_msg.data(), _status_msg.size(), fmt, length, streamReceive._length);
}
}

Request::callback(request, status);
Expand Down
10 changes: 5 additions & 5 deletions cpp/src/request_tag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,16 @@ RequestTag::RequestTag(std::shared_ptr<Component> endpointOrWorker,
callbackData)
{
std::visit(data::dispatch{
[this](data::TagSend tagSend) {
[this](data::TagSend) {
if (_endpoint == nullptr)
throw ucxx::Error("An endpoint is required to send tag messages");
},
[](data::TagReceive tagReceive) {},
[](data::TagReceive) {},
},
requestData);
}

void RequestTag::callback(void* request, ucs_status_t status, const ucp_tag_recv_info_t* info)
void RequestTag::callback(void* request, ucs_status_t status, const ucp_tag_recv_info_t* /* info */)
{
// TODO: Decide on behavior. See https://github.com/rapidsai/ucxx/issues/104 .
// if (status != UCS_ERR_CANCELED && info->length != _length) {
Expand Down Expand Up @@ -143,15 +143,15 @@ void RequestTag::populateDelayedSubmission()
{
bool terminate =
std::visit(data::dispatch{
[this](data::TagSend tagSend) {
[this](data::TagSend) {
if (_endpoint->getHandle() == nullptr) {
ucxx_warn("Endpoint was closed before message could be sent");
Request::callback(this, UCS_ERR_CANCELED);
return true;
}
return false;
},
[this](data::TagReceive tagReceive) {
[this](data::TagReceive) {
if (_worker->getHandle() == nullptr) {
ucxx_warn("Worker was closed before message could be received");
Request::callback(this, UCS_ERR_CANCELED);
Expand Down
Loading
Loading