diff --git a/src/roma/byob/dispatcher/dispatcher.cc b/src/roma/byob/dispatcher/dispatcher.cc index bfd5aa47..b6a8e59b 100644 --- a/src/roma/byob/dispatcher/dispatcher.cc +++ b/src/roma/byob/dispatcher/dispatcher.cc @@ -84,12 +84,16 @@ absl::Status Dispatcher::Init(const int listen_fd) { absl::StatusOr Dispatcher::LoadBinary( std::filesystem::path binary_path, const int n_workers) { + if (n_workers <= 0) { + return absl::InvalidArgumentError( + absl::StrCat("`n_workers=", n_workers, "` must be positive")); + } std::string code_token = ToString(Uuid::GenerateUuid()); LoadRequest payload; { std::ifstream ifs(std::move(binary_path), std::ios::binary); if (!ifs.is_open()) { - return absl::InvalidArgumentError( + return absl::UnavailableError( absl::StrCat("Cannot open ", binary_path.native())); } payload.set_binary_content(std::string(std::istreambuf_iterator(ifs), diff --git a/src/roma/byob/dispatcher/dispatcher_test.cc b/src/roma/byob/dispatcher/dispatcher_test.cc index da5a0944..5b20d167 100644 --- a/src/roma/byob/dispatcher/dispatcher_test.cc +++ b/src/roma/byob/dispatcher/dispatcher_test.cc @@ -111,6 +111,28 @@ TEST(DispatcherTest, ShutdownDispatcherThenWorker) { worker.join(); } +TEST(DispatcherTest, LoadErrorsWhenNWorkersNonPositive) { + const int fd = ::socket(AF_UNIX, SOCK_STREAM, 0); + ASSERT_NE(fd, -1); + BindAndListenOnPath(fd, "abcd.sock"); + absl::Cleanup cleanup = [] { EXPECT_EQ(::unlink("abcd.sock"), 0); }; + absl::Notification done; + std::thread worker([&done] { + const int fd = ::socket(AF_UNIX, SOCK_STREAM, 0); + ASSERT_NE(fd, -1); + ConnectToPath(fd, "abcd.sock"); + done.WaitForNotification(); + EXPECT_EQ(::close(fd), 0); + }); + Dispatcher dispatcher; + ASSERT_TRUE(dispatcher.Init(fd).ok()); + const absl::StatusOr code_token = + dispatcher.LoadBinary("src/roma/byob/udf/new_udf", /*n_workers=*/0); + EXPECT_FALSE(code_token.ok()); + done.Notify(); + worker.join(); +} + TEST(DispatcherTest, LoadErrorsWhenFileDoesntExist) { const int fd = ::socket(AF_UNIX, SOCK_STREAM, 0); ASSERT_NE(fd, -1);