Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

homeworks #109

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
Binary file added src/.DS_Store
Binary file not shown.
3 changes: 3 additions & 0 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "storage/SimpleLRU.h"
#include "storage/ThreadSafeSimpleLRU.h"
#include "storage/StripedLRU.h"

using namespace Afina;

Expand Down Expand Up @@ -55,6 +56,8 @@ class Application {
storage = std::make_shared<Afina::Backend::SimpleLRU>();
} else if (storage_type == "mt_lru") {
storage = std::make_shared<Afina::Backend::ThreadSafeSimplLRU>();
} else if (storage_type == "mt_slru") {
storage = Afina::Backend::StripedLRU::CheckParamsBuildStripedLRU();
} else {
throw std::runtime_error("Unknown storage type");
}
Expand Down
196 changes: 184 additions & 12 deletions src/network/mt_blocking/ServerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,33 @@ void ServerImpl::Start(uint16_t port, uint32_t n_accept, uint32_t n_workers) {
}

running.store(true);

{
std::lock_guard<std::mutex> myguard(_mutex_for_set);
_set_of_sockets.insert(_server_socket);
}

_thread = std::thread(&ServerImpl::OnRun, this);
_thread.detach();

}

// See Server.h
void ServerImpl::Stop() {
running.store(false);
shutdown(_server_socket, SHUT_RDWR);
std::lock_guard<std::mutex> myguard(_mutex_for_set);
for (auto s : _set_of_sockets) {
shutdown(s, SHUT_RD);
}
}

// See Server.h
void ServerImpl::Join() {
assert(_thread.joinable());
_thread.join();
close(_server_socket);
std::unique_lock<std::mutex> myguardSet(_mutex_for_set);
while (running.load() || !_set_of_sockets.empty()) {
_cv_amt_connections.wait(myguardSet);
}

}

// See Server.h
Expand All @@ -96,10 +109,7 @@ void ServerImpl::OnRun() {
// - command_to_execute: last command parsed out of stream
// - arg_remains: how many bytes to read from stream to get command argument
// - argument_for_command: buffer stores argument
std::size_t arg_remains;
Protocol::Parser parser;
std::string argument_for_command;
std::unique_ptr<Execute::Command> command_to_execute;

while (running.load()) {
_logger->debug("waiting for connection...");

Expand Down Expand Up @@ -134,18 +144,180 @@ void ServerImpl::OnRun() {

// TODO: Start new thread and process data from/to connection
{
static const std::string msg = "TODO: start new thread and process memcached protocol instead";
if (send(client_socket, msg.data(), msg.size(), 0) <= 0) {
_logger->error("Failed to write response to client: {}", strerror(errno));
// static const std::string msg = "TODO: start new thread and process memcached protocol instead";
// if (send(client_socket, msg.data(), msg.size(), 0) <= 0) {
// _logger->error("Failed to write response to client: {}", strerror(errno));
// }
// close(client_socket);

std::size_t len_of_set;
{
std::lock_guard<std::mutex> myguardSet(_mutex_for_set);
len_of_set = _set_of_sockets.size();
}

if (len_of_set >= _max_connections) {
static const std::string msg = "From server in client console: No place for a new worker_thread\n";
int otv;
if ( (otv=send(client_socket, msg.data(), msg.size(), 0)) <= 0) {
_logger->error("Failed to write response to client: {}", strerror(errno));
}
printf("From server in server console: too many workers, otv=%d!\n", otv);
close(client_socket);
continue;
}
close(client_socket);

{
std::lock_guard<std::mutex> myguardSet(_mutex_for_set);
_set_of_sockets.insert(client_socket);
}

std::thread worker_thread = std::thread(&ServerImpl::OnWorkerRun, this, client_socket);
worker_thread.detach();
}
}

bool notify_require = false;

{
std::lock_guard<std::mutex> myguardSet(_mutex_for_set);
_set_of_sockets.erase(_server_socket);
notify_require = _set_of_sockets.empty();
}

if (notify_require) {
_cv_amt_connections.notify_all();
}

close(_server_socket);


// Cleanup on exit...
_logger->warn("Network stopped");
}


void ServerImpl::OnWorkerRun(int client_socket) {
std::size_t arg_remains;
Protocol::Parser parser;
std::string argument_for_command;
std::unique_ptr<Execute::Command> command_to_execute;



//process connection
try {

int readed_bytes = -1;
char client_buffer[4096];
while ((readed_bytes = read(client_socket, client_buffer, sizeof(client_buffer))) > 0) {
_logger->debug("Got {} bytes from socket", readed_bytes);
sleep(10);


// Single block of data readed from the socket could trigger inside actions a multiple times,
// for example:
// - read#0: [<command1 start>]
// - read#1: [<command1 end> <argument> <command2> <argument for command 2> <command3> ... ]
while (readed_bytes > 0) {
_logger->debug("Process {} bytes", readed_bytes);
// There is no command yet
if (!command_to_execute) {
std::size_t parsed = 0;
if (parser.Parse(client_buffer, readed_bytes, parsed)) {
// There is no command to be launched, continue to parse input stream
// Here we are, current chunk finished some command, process it
_logger->debug("Found new command: {} in {} bytes", parser.Name(), parsed);
command_to_execute = parser.Build(arg_remains);
if (arg_remains > 0) {
arg_remains += 2;
}
}

// Parsed might fails to consume any bytes from input stream. In real life that could happens,
// for example, because we are working with UTF-16 chars and only 1 byte left in stream
if (parsed == 0) {
break;
} else {
std::memmove(client_buffer, client_buffer + parsed, readed_bytes - parsed);
readed_bytes -= parsed;
}
}

// There is command, but we still wait for argument to arrive...
if (command_to_execute && arg_remains > 0) {
_logger->debug("Fill argument: {} bytes of {}", readed_bytes, arg_remains);
// There is some parsed command, and now we are reading argument
std::size_t to_read = std::min(arg_remains, std::size_t(readed_bytes));
argument_for_command.append(client_buffer, to_read);

std::memmove(client_buffer, client_buffer + to_read, readed_bytes - to_read);
arg_remains -= to_read;
readed_bytes -= to_read;
}

// Thre is command & argument - RUN!
if (command_to_execute && arg_remains == 0) {
_logger->debug("Start command execution");

std::string result;
if (argument_for_command.size()) {
argument_for_command.resize(argument_for_command.size() - 2);
}


command_to_execute->Execute(*pStorage, argument_for_command, result);



// Send response
result += "\r\n";
if (send(client_socket, result.data(), result.size(), 0) <= 0) {
throw std::runtime_error("Failed to send response");
}

// Prepare for the next command
command_to_execute.reset();
argument_for_command.resize(0);
parser.Reset();
}
} // while (readed_bytes)
}

if (readed_bytes == 0) {
_logger->debug("Connection closed");
} else {
throw std::runtime_error(std::string(strerror(errno)));
}
}
catch (std::runtime_error &ex) {
_logger->error("Failed to process connection on descriptor {}: {}", client_socket, ex.what());

static const std::string msg = "SERVER_ERROR " + std::string(ex.what()) + "\r\n";
if ( send(client_socket, msg.data(), msg.size(), 0) <= 0) {
_logger->error("Failed to write response to client: {}", strerror(errno));
}

}

// We are done with this connection
close(client_socket);

bool notify_require = false;

{
std::lock_guard<std::mutex> myguardSet(_mutex_for_set);
_set_of_sockets.erase(client_socket);
notify_require = _set_of_sockets.empty();
}


if (notify_require) {
_cv_amt_connections.notify_all();
}

}

} // namespace MTblocking
} // namespace Network
} // namespace Afina
13 changes: 13 additions & 0 deletions src/network/mt_blocking/ServerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

#include <atomic>
#include <thread>
#include <set>
#include <mutex>
#include <condition_variable>

#include <afina/network/Server.h>

Expand Down Expand Up @@ -37,6 +40,7 @@ class ServerImpl : public Server {
* Method is running in the connection acceptor thread
*/
void OnRun();
void OnWorkerRun(int client_socket);

private:
// Logger instance
Expand All @@ -52,6 +56,15 @@ class ServerImpl : public Server {

// Thread to run network on
std::thread _thread;

int _max_connections = 5;
std::set<int> _set_of_sockets;
std::mutex _mutex_for_set;
std::condition_variable _cv_amt_connections;




};

} // namespace MTblocking
Expand Down
3 changes: 2 additions & 1 deletion src/network/st_blocking/ServerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ void ServerImpl::OnRun() {
} else {
throw std::runtime_error(std::string(strerror(errno)));
}
} catch (std::runtime_error &ex) {
}
catch (std::runtime_error &ex) {
_logger->error("Failed to process connection on descriptor {}: {}", client_socket, ex.what());
}

Expand Down
3 changes: 3 additions & 0 deletions src/network/st_blocking/ServerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class ServerImpl : public Server {

// Thread to run network on
std::thread _thread;



};

} // namespace STblocking
Expand Down
Loading