Skip to content

Commit

Permalink
Make CPU affinity used for CPU pinning configurable via json file (#1136
Browse files Browse the repository at this point in the history
)

* changes to make cpu core for different processes configurable via json file

* make default value as -1 for cpu cores

* define variables for different core as extern in .h file

* modified the logic and renamed the struct

* fixed naming

* fixed variabe name

* set default value of cores as -1

* fixed variable name

* renaming

* renamed struct in localhost_config.json

* initialized struct with -1 as default value

* resolved Alex's comments. Updated PR description

* added configuration for stream_read and stream_read_write

* cleanup

* addressed rohan's comments

---------

Co-authored-by: Raghav Rawat <raghavrawat@ni.com>
  • Loading branch information
Raghav-NI and Raghav Rawat authored Dec 18, 2024
1 parent b1a0df4 commit a3d5698
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 10 deletions.
4 changes: 4 additions & 0 deletions source/server/core_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "logging.h"
#include "server_configuration_parser.h"
#include "server_security_configuration.h"
#include "moniker_stream_processor.h"

#if defined(__GNUC__)
#include <sys/mman.h>
Expand Down Expand Up @@ -37,6 +38,7 @@ struct ServerConfiguration {
int max_message_size;
int sideband_port;
nidevice_grpc::FeatureToggles feature_toggles;
MonikerStreamProcessor stream_processor;
};

static ServerConfiguration GetConfiguration(const std::string& config_file_path)
Expand All @@ -51,6 +53,7 @@ static ServerConfiguration GetConfiguration(const std::string& config_file_path)
config.server_address = server_config_parser.parse_address();
config.sideband_address = server_config_parser.parse_sideband_address();
config.sideband_port = server_config_parser.parse_sideband_port();
config.stream_processor = server_config_parser.parse_moniker_stream_processor();
config.server_cert = server_config_parser.parse_server_cert();
config.server_key = server_config_parser.parse_server_key();
config.root_cert = server_config_parser.parse_root_cert();
Expand Down Expand Up @@ -113,6 +116,7 @@ static void RunServer(const ServerConfiguration& config)
server = builder.BuildAndStart();
if (ni::data_monikers::is_sideband_streaming_enabled(config.feature_toggles)) {
auto sideband_socket_thread = new std::thread(RunSidebandSocketsAccept, config.sideband_address.c_str(), config.sideband_port);
ni::data_monikers::configure_moniker_stream_processor(config.stream_processor);
// auto sideband_rdma_send_thread = new std::thread(AcceptSidebandRdmaSendRequests);
// auto sideband_rdma_recv_thread = new std::thread(AcceptSidebandRdmaReceiveRequests);
}
Expand Down
42 changes: 33 additions & 9 deletions source/server/data_moniker_service.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//---------------------------------------------------------------------
//---------------------------------------------------------------------
#include "data_moniker_service.h"
#include "moniker_stream_processor.h"

#include <sideband_data.h>
#include <sideband_grpc.h>
Expand Down Expand Up @@ -45,6 +46,13 @@ static void SysFsWrite(const std::string& fileName, const std::string& value)

namespace ni::data_monikers {

static MonikerStreamProcessor s_StreamProcessor;

void configure_moniker_stream_processor(const MonikerStreamProcessor& stream_processor)
{
s_StreamProcessor = stream_processor;
}

bool is_sideband_streaming_enabled(const nidevice_grpc::FeatureToggles& feature_toggles)
{
return feature_toggles.is_feature_enabled("sideband_streaming", nidevice_grpc::FeatureToggles::CodeReadiness::kNextRelease);
Expand Down Expand Up @@ -103,6 +111,20 @@ void DataMonikerService::InitiateMonikerList(const MonikerList& monikers, Endpoi
}
}

//---------------------------------------------------------------------
//---------------------------------------------------------------------
#ifndef _WIN32
void set_cpu_affinity(int cpu)
{
if (cpu >= 0) {
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(cpu, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
}
}
#endif

//---------------------------------------------------------------------
//---------------------------------------------------------------------
void DataMonikerService::RunSidebandReadWriteLoop(string sidebandIdentifier, ::SidebandStrategy strategy, EndpointList* readers, EndpointList* writers)
Expand All @@ -113,10 +135,7 @@ void DataMonikerService::RunSidebandReadWriteLoop(string sidebandIdentifier, ::S
pid_t threadId = syscall(SYS_gettid);
::SysFsWrite("/dev/cgroup/cpuset/LabVIEW_tl_set/tasks", std::to_string(threadId));

cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(8, &cpuSet);
sched_setaffinity(threadId, sizeof(cpu_set_t), &cpuSet);
set_cpu_affinity(s_StreamProcessor.moniker_sideband_stream_read_write);
}
#endif

Expand Down Expand Up @@ -169,7 +188,7 @@ Status DataMonikerService::BeginSidebandStream(ServerContext* context, const Beg
char identifier[32] = {};
InitOwnerSidebandData(strategy, bufferSize, identifier);
std::string identifierString(identifier);

response->set_strategy(request->strategy());
response->set_sideband_identifier(identifier);
response->set_connection_url(GetConnectionAddress(strategy));
Expand All @@ -190,6 +209,10 @@ Status DataMonikerService::BeginSidebandStream(ServerContext* context, const Beg
//---------------------------------------------------------------------
Status DataMonikerService::StreamReadWrite(ServerContext* context, ServerReaderWriter<MonikerReadResponse, MonikerWriteRequest>* stream)
{
#ifndef _WIN32
set_cpu_affinity(s_StreamProcessor.moniker_stream_read_write);
#endif

EndpointList writers;
EndpointList readers;
MonikerWriteRequest writeRequest;
Expand Down Expand Up @@ -218,6 +241,10 @@ Status DataMonikerService::StreamReadWrite(ServerContext* context, ServerReaderW
//---------------------------------------------------------------------
Status DataMonikerService::StreamRead(ServerContext* context, const MonikerList* request, ServerWriter<MonikerReadResponse>* writer)
{
#ifndef _WIN32
set_cpu_affinity(s_StreamProcessor.moniker_stream_read);
#endif

EndpointList writers;
EndpointList readers;
InitiateMonikerList(*request, &readers, &writers);
Expand All @@ -240,10 +267,7 @@ Status DataMonikerService::StreamRead(ServerContext* context, const MonikerList*
Status DataMonikerService::StreamWrite(ServerContext* context, ServerReaderWriter<StreamWriteResponse, MonikerWriteRequest>* stream)
{
#ifndef _WIN32
cpu_set_t cpuSet;
CPU_ZERO(&cpuSet);
CPU_SET(1, &cpuSet);
sched_setaffinity(0, sizeof(cpu_set_t), &cpuSet);
set_cpu_affinity(s_StreamProcessor.moniker_stream_write);
#endif

EndpointList writers;
Expand Down
5 changes: 4 additions & 1 deletion source/server/data_moniker_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@
#include <map>
#include <sideband_data.h>
#include "feature_toggles.h"
#include "moniker_stream_processor.h"

//---------------------------------------------------------------------
//---------------------------------------------------------------------
namespace ni::data_monikers
{
void configure_moniker_stream_processor(const MonikerStreamProcessor& stream_processor);
bool is_sideband_streaming_enabled(const nidevice_grpc::FeatureToggles& feature_toggles);
void set_cpu_affinity(int cpu);
//---------------------------------------------------------------------
//---------------------------------------------------------------------
using MonikerEndpointPtr = std::add_pointer<::grpc::Status(void*, google::protobuf::Arena& arena, google::protobuf::Any&)>::type;
Expand All @@ -37,7 +40,7 @@ namespace ni::data_monikers
public:
static void RegisterMonikerEndpoint(std::string endpointName, MonikerEndpointPtr endpoint);
static void RegisterMonikerInstance(std::string endpointName, void* instanceData, Moniker& moniker);

private:
static DataMonikerService* s_Server;
std::map<std::string, MonikerEndpointPtr> _endpoints;
Expand Down
11 changes: 11 additions & 0 deletions source/server/moniker_stream_processor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#ifndef MONIKER_STREAM_PROCESSOR_H
#define MONIKER_STREAM_PROCESSOR_H

struct MonikerStreamProcessor {
int moniker_sideband_stream_read_write = -1;
int moniker_stream_write = -1;
int moniker_stream_read = -1;
int moniker_stream_read_write = -1;
};

#endif // Moniker_Stream_Processor_H
51 changes: 51 additions & 0 deletions source/server/server_configuration_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <sstream>

#include "feature_toggles.h"
#include "moniker_stream_processor.h"

#if defined(_MSC_VER)
#include <windows.h>
Expand All @@ -19,6 +20,11 @@ static const char* kAddressJsonKey = "address";
static const char* kPortJsonKey = "port";
static const char* kSidebandAddressJsonKey = "sideband_address";
static const char* kSidebandPortJsonKey = "sideband_port";
static const char* kMonikerStreamProcessorKey = "moniker_stream_processor_configuration";
static const char* kMonikerSidebandStreamReadWriteKey = "moniker_sideband_stream_read_write";
static const char* kMonikerStreamWriteKey = "moniker_stream_write";
static const char* kMonikerStreamReadKey = "moniker_stream_read";
static const char* kMonikerStreamReadWriteKey = "moniker_stream_read_write";
static const char* kServerCertJsonKey = "server_cert";
static const char* kServerKeyJsonKey = "server_key";
static const char* kRootCertJsonKey = "root_cert";
Expand Down Expand Up @@ -292,6 +298,41 @@ int ServerConfigurationParser::parse_port_with_key(const std::string& key) const
return parsed_port;
}

MonikerStreamProcessor ServerConfigurationParser::parse_moniker_stream_processor() const
{
MonikerStreamProcessor stream_processor;

auto core_config_it = config_file_.find(kMonikerStreamProcessorKey);
if (core_config_it != config_file_.end()) {
stream_processor.moniker_sideband_stream_read_write = parse_moniker_stream_processor_with_key(kMonikerSidebandStreamReadWriteKey);
stream_processor.moniker_stream_write = parse_moniker_stream_processor_with_key(kMonikerStreamWriteKey);
stream_processor.moniker_stream_read = parse_moniker_stream_processor_with_key(kMonikerStreamReadKey);
stream_processor.moniker_stream_read_write = parse_moniker_stream_processor_with_key(kMonikerStreamReadWriteKey);
}
return stream_processor;
}

int ServerConfigurationParser::parse_moniker_stream_processor_with_key(const std::string& key) const
{
int parsed_core = -1;

auto it = config_file_.find(key);
if (it != config_file_.end()) {
try {
parsed_core = it->get<int>();
}
catch (const nlohmann::json::type_error& ex) {
throw WrongMonikerStreamProcessorTypeException(ex.what());
}
}

if (parsed_core < -1) {
throw InvalidMonikerStreamProcessorException();
}

return parsed_core;
}

ServerConfigurationParser::ConfigFileNotFoundException::ConfigFileNotFoundException(const std::string& config_file_path)
: std::runtime_error(kConfigFileNotFoundMessage + config_file_path)
{
Expand All @@ -312,6 +353,11 @@ ServerConfigurationParser::InvalidPortException::InvalidPortException()
{
}

ServerConfigurationParser::InvalidMonikerStreamProcessorException::InvalidMonikerStreamProcessorException()
: std::runtime_error(kInvalidMonikerStreamProcessorMessage)
{
}

ServerConfigurationParser::MalformedJsonException::MalformedJsonException(const std::string& parse_error_details)
: std::runtime_error(kMalformedJsonMessage + parse_error_details)
{
Expand All @@ -322,6 +368,11 @@ ServerConfigurationParser::WrongPortTypeException::WrongPortTypeException(const
{
}

ServerConfigurationParser::WrongMonikerStreamProcessorTypeException::WrongMonikerStreamProcessorTypeException(const std::string& type_error_details)
: std::runtime_error(kWrongMonikerStreamProcessorTypeMessage + type_error_details)
{
}

ServerConfigurationParser::UnspecifiedPortException::UnspecifiedPortException()
: std::runtime_error(kUnspecifiedPortMessage)
{
Expand Down
13 changes: 13 additions & 0 deletions source/server/server_configuration_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@
#include <nlohmann/json.hpp>

#include "feature_toggles.h"
#include "moniker_stream_processor.h"

namespace nidevice_grpc {

static const char* kConfigFileNotFoundMessage = "The server configuration file was not found at: ";
static const char* kInvalidAddressMessage = "The specified address is not valid.\n Use a valid IPv4 or IPv6 address. Valid values include localhost, 192.168.1.1, [::], [::1], etc.";
static const char* kWrongAddressTypeMessage = "The server address must be specified in the server's configuration file as a string: \n\n";
static const char* kInvalidPortMessage = "The specified port number must between 0 and 65535.";
static const char* kInvalidMonikerStreamProcessorMessage = "The specified moniker stream processor must be -1 or greater. -1 indicates that any available cpu core can be used.";
static const char* kMalformedJsonMessage = "The JSON in the server configuration file is malformed: \n\n";
static const char* kWrongPortTypeMessage = "The server port must be specified in the server's configuration file as an integer: \n\n";
static const char* kWrongMonikerStreamProcessorTypeMessage = "The moniker stream processor must be specified in the server's configuration file as an integer: \n\n";
static const char* kUnspecifiedPortMessage = "The server port must be specified in the server's configuration file.";
static const char* kValueTypeNotStringMessage = "The following key must be specified in the server's configuration file as a string enclosed with double quotes: ";
static const char* kFileNotFoundMessage = "The following certificate file was not found: ";
Expand Down Expand Up @@ -43,6 +46,7 @@ class ServerConfigurationParser {
std::string parse_root_cert() const;
int parse_max_message_size() const;
int parse_sideband_port() const;
MonikerStreamProcessor parse_moniker_stream_processor() const;
FeatureToggles parse_feature_toggles() const;
FeatureToggles::CodeReadiness parse_code_readiness() const;

Expand All @@ -62,6 +66,10 @@ class ServerConfigurationParser {
InvalidPortException();
};

struct InvalidMonikerStreamProcessorException : public std::runtime_error {
InvalidMonikerStreamProcessorException();
};

struct MalformedJsonException : public std::runtime_error {
MalformedJsonException(const std::string& parse_error_details);
};
Expand All @@ -70,6 +78,10 @@ class ServerConfigurationParser {
WrongPortTypeException(const std::string& type_error_details);
};

struct WrongMonikerStreamProcessorTypeException : public std::runtime_error {
WrongMonikerStreamProcessorTypeException(const std::string& type_error_details);
};

struct UnspecifiedPortException : public std::runtime_error {
UnspecifiedPortException();
};
Expand Down Expand Up @@ -106,6 +118,7 @@ class ServerConfigurationParser {
std::string parse_bind_address() const;
int parse_port() const;
int parse_port_with_key(const std::string& key) const;
int parse_moniker_stream_processor_with_key(const std::string& key) const;

std::string config_file_path_;
nlohmann::json config_file_;
Expand Down

0 comments on commit a3d5698

Please sign in to comment.