Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WorkerSettings: Add disableLiburing option (enable_liburing in Rust) #1442

Merged
merged 15 commits into from
Aug 12, 2024
10 changes: 10 additions & 0 deletions node/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ export type WorkerSettings<WorkerAppData extends AppData = AppData> = {
*/
libwebrtcFieldTrials?: string;

/**
* Disable liburing (io_uring) despite it's supported in current host.
*/
disableLiburing?: boolean;

/**
* Custom application data.
*/
Expand Down Expand Up @@ -287,6 +292,7 @@ export class Worker<
dtlsCertificateFile,
dtlsPrivateKeyFile,
libwebrtcFieldTrials,
disableLiburing,
appData,
}: WorkerSettings<WorkerAppData>) {
super();
Expand Down Expand Up @@ -338,6 +344,10 @@ export class Worker<
spawnArgs.push(`--libwebrtcFieldTrials=${libwebrtcFieldTrials}`);
}

if (disableLiburing) {
spawnArgs.push(`--disableLiburing`);
}

logger.debug(
'spawning worker process: %s %s',
spawnBin,
Expand Down
1 change: 1 addition & 0 deletions node/src/test/test-Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ test('createWorker() succeeds', async () => {
dtlsCertificateFile: path.join(__dirname, 'data', 'dtls-cert.pem'),
dtlsPrivateKeyFile: path.join(__dirname, 'data', 'dtls-key.pem'),
libwebrtcFieldTrials: 'WebRTC-Bwe-AlrLimitedBackoff/Disabled/',
disableLiburing: true,
appData: { foo: 456 },
});

Expand Down
2 changes: 1 addition & 1 deletion node/src/test/test-node-sctp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ beforeEach(async () => {
// Set node-sctp default PMTU to 1200.
sctp.defaults({ PMTU: 1200 });

ctx.worker = await mediasoup.createWorker();
ctx.worker = await mediasoup.createWorker({ disableLiburing: true });
ctx.router = await ctx.worker.createRouter();
ctx.plainTransport = await ctx.router.createPlainTransport({
// https://github.com/nodejs/node/issues/14900.
Expand Down
7 changes: 6 additions & 1 deletion rust/src/router/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ async fn init() -> Worker {
let worker_manager = WorkerManager::new();

worker_manager
.create_worker(WorkerSettings::default())
.create_worker({
let mut settings = WorkerSettings::default();
settings.enable_liburing = false;

settings
})
.await
.expect("Failed to create worker")
}
Expand Down
13 changes: 13 additions & 0 deletions rust/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ pub struct WorkerSettings {
/// "WebRTC-Bwe-AlrLimitedBackoff/Enabled/".
#[doc(hidden)]
pub libwebrtc_field_trials: Option<String>,
/// Enable liburing This option is ignored if io_uring is not supported by
/// current host.
///
/// Default `true`.
pub enable_liburing: bool,
/// Function that will be called under worker thread before worker starts, can be used for
/// pinning worker threads to CPU cores.
pub thread_initializer: Option<Arc<dyn Fn() + Send + Sync>>,
Expand Down Expand Up @@ -221,6 +226,7 @@ impl Default for WorkerSettings {
rtc_port_range: 10000..=59999,
dtls_files: None,
libwebrtc_field_trials: None,
enable_liburing: true,
thread_initializer: None,
app_data: AppData::default(),
}
Expand All @@ -235,6 +241,7 @@ impl fmt::Debug for WorkerSettings {
rtc_port_range,
dtls_files,
libwebrtc_field_trials,
enable_liburing,
thread_initializer,
app_data,
} = self;
Expand All @@ -245,6 +252,7 @@ impl fmt::Debug for WorkerSettings {
.field("rtc_port_range", &rtc_port_range)
.field("dtls_files", &dtls_files)
.field("libwebrtc_field_trials", &libwebrtc_field_trials)
.field("enable_liburing", &enable_liburing)
.field(
"thread_initializer",
&thread_initializer.as_ref().map(|_| "ThreadInitializer"),
Expand Down Expand Up @@ -356,6 +364,7 @@ impl Inner {
rtc_port_range,
dtls_files,
libwebrtc_field_trials,
enable_liburing,
thread_initializer,
app_data,
}: WorkerSettings,
Expand Down Expand Up @@ -404,6 +413,10 @@ impl Inner {
));
}

if !enable_liburing {
spawn_args.push("--disableLiburing".to_string());
}

let id = WorkerId::new();
debug!(
"spawning worker with arguments [id:{}]: {}",
Expand Down
1 change: 1 addition & 0 deletions worker/include/Settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Settings
std::string dtlsCertificateFile;
std::string dtlsPrivateKeyFile;
std::string libwebrtcFieldTrials{ "WebRTC-Bwe-AlrLimitedBackoff/Enabled/" };
bool liburingDisabled{ false };
};

public:
Expand Down
12 changes: 10 additions & 2 deletions worker/src/DepLibUring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
#include "DepLibUring.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Settings.hpp"
#include "Utils.hpp"
#include <sys/eventfd.h>
#include <sys/resource.h>
#include <sys/utsname.h>

/* Static variables. */
bool DepLibUring::enabled{ false };
/* liburing instance per thread. */
// liburing instance per thread.
thread_local DepLibUring::LibUring* DepLibUring::liburing{ nullptr };
/* Completion queue entry array used to retrieve processes tasks. */
// Completion queue entry array used to retrieve processes tasks.
thread_local struct io_uring_cqe* cqes[DepLibUring::QueueDepth];

/* Static methods for UV callbacks. */
Expand Down Expand Up @@ -121,6 +122,13 @@ void DepLibUring::ClassInit()

MS_DEBUG_TAG(info, "liburing version: \"%i.%i\"", mayor, minor);

if (Settings::configuration.liburingDisabled)
{
MS_DEBUG_TAG(info, "liburing disabled by user settings");

return;
}

// This must be called first.
DepLibUring::CheckRuntimeSupport();

Expand Down
16 changes: 10 additions & 6 deletions worker/src/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ void Settings::SetConfiguration(int argc, char* argv[])
{ "dtlsCertificateFile", optional_argument, nullptr, 'c' },
{ "dtlsPrivateKeyFile", optional_argument, nullptr, 'p' },
{ "libwebrtcFieldTrials", optional_argument, nullptr, 'W' },
{ nullptr, 0, nullptr, 0 }
{ "disableLiburing", no_argument, nullptr, 'd' },
{ nullptr, 0, nullptr, 0 }
};
// clang-format on
std::string stringValue;
Expand All @@ -73,13 +74,9 @@ void Settings::SetConfiguration(int argc, char* argv[])

optind = 1; // Set explicitly, otherwise subsequent runs will fail.
opterr = 0; // Don't allow getopt to print error messages.

while ((c = getopt_long_only(argc, argv, "", options, &optionIdx)) != -1)
{
if (!optarg)
{
MS_THROW_TYPE_ERROR("unknown configuration parameter: %s", optarg);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this deleted? It seems like an important safety check that caller provided something meaningful as an input.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the new --disableLiburing command line argument doesn't have any value so such a check throws if present. I can check that optargs exist for all the other arguments but didn't consider it necessary.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, but other options do require a value. Maybe we have a test that checks that and it causes memory corruption because you suddenly create a value out of null pointer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made this change:

db2252c

Is it enough? What do you mean with "you suddenly create a value out of null pointer"? Command line arguments are created by Node and Rust layers in their Worker classes. Tests can not trigger wrong arguments passed to the worker.

Just wondering about this: In worker/utils.rs:

pub(super) fn run_worker_with_channels<OE>(
    id: WorkerId,
    thread_initializer: Option<Arc<dyn Fn() + Send + Sync>>,
    args: Vec<String>,
    worker_closed: Arc<AtomicBool>,
    on_exit: OE,
) -> WorkerRunResult
where
    OE: FnOnce(Result<(), ExitError>) + Send + 'static,
{
    let (channel, prepared_channel_read, prepared_channel_write) =
        Channel::new(Arc::clone(&worker_closed));
    let buffer_worker_messages_guard =
        channel.buffer_messages_for(SubscriptionTarget::String(std::process::id().to_string()));

    std::thread::Builder::new()
        .name(format!("mediasoup-worker-{id}"))
        .spawn(move || {
            if let Some(thread_initializer) = thread_initializer {
                thread_initializer();
            }
            let argc = args.len() as c_int;
            let args_cstring = args
                .into_iter()
                .map(|s| -> CString { CString::new(s).unwrap() })
                .collect::<Vec<_>>();
            let argv = args_cstring
                .iter()
                .map(|arg| arg.as_ptr().cast::<c_char>())
                .collect::<Vec<_>>();
            let version = CString::new(env!("CARGO_PKG_VERSION")).unwrap();

            let status_code = unsafe {
                let (channel_read_fn, channel_read_ctx, _channel_write_callback) =
                    prepared_channel_read.deconstruct();
                let (channel_write_fn, channel_write_ctx, _channel_read_callback) =
                    prepared_channel_write.deconstruct();

                mediasoup_sys::mediasoup_worker_run(
                    argc,
                    argv.as_ptr(),
                    version.as_ptr(),
                    0,
                    0,
                    channel_read_fn,
                    channel_read_ctx,
                    channel_write_fn,
                    channel_write_ctx,
                )
            };

Here args is a command line arguments string, something like:

"--logLevel=warn --disableLiburing"

Maybe something dangerous when doing this?:

let args_cstring = args
                .into_iter()
                .map(|s| -> CString { CString::new(s).unwrap() })
                .collect::<Vec<_>>();
            let argv = args_cstring
                .iter()
                .map(|arg| arg.as_ptr().cast::<c_char>())
                .collect::<Vec<_>>();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, no problem there IMHO. It just splits the string into these strings:

  • "--logLevel=warn"
  • "--disableLiburing"

It doesn't do anything like assuming/expecting a "=" symbol, so no danger here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why commit db2252c should fix this problem. It probably won't and, instead of wasting more time on this, I will change the new command line argument and add a value to it. No time to deal with ancient command line args stuff.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we're not calling it incorrectly right now, but we could. And that would blow up instead of crashing with a nice message. Do not trust input, at least not to the degree that impacts memory safety.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it's safe and we don't assume anything. Arg values are now mandatory. See latest changes.


switch (c)
{
case 'l':
Expand Down Expand Up @@ -158,6 +155,13 @@ void Settings::SetConfiguration(int argc, char* argv[])
break;
}

case 'd':
{
Settings::configuration.liburingDisabled = true;

break;
}

// Invalid option.
case '?':
{
Expand Down
Loading