Skip to content

Commit

Permalink
[#185] Remove socket descriptor in sub-processes for killed connectio…
Browse files Browse the repository at this point in the history
…ns v2
  • Loading branch information
jesperpedersen committed Nov 8, 2021
1 parent 1740b19 commit 59bfa39
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 295 deletions.
160 changes: 29 additions & 131 deletions src/libpgagroal/pipeline_perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,6 @@ struct pipeline performance_pipeline(void)
return pipeline;
}

static int unix_socket = -1;
static struct ev_io io_mgt;

static int fds[MAX_NUMBER_OF_CONNECTIONS];
static bool news[MAX_NUMBER_OF_CONNECTIONS];

static void start_mgt(struct ev_loop *loop);
static void shutdown_mgt(struct ev_loop *loop);
static void accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents);

static int
performance_initialize(void* shmem, void** pipeline_shmem, size_t* pipeline_shmem_size)
{
Expand All @@ -86,42 +76,24 @@ performance_initialize(void* shmem, void** pipeline_shmem, size_t* pipeline_shme
static void
performance_start(struct ev_loop *loop, struct worker_io* w)
{
char p[MISC_LENGTH];
struct configuration* config;

config = (struct configuration*)shmem;

for (int i = 0; i < config->max_connections; i++)
{
fds[i] = config->connections[i].fd;
news[i] = config->connections[i].new;
}

memset(&p, 0, sizeof(p));
snprintf(&p[0], sizeof(p), ".s.%d", getpid());

if (pgagroal_bind_unix_socket(config->unix_socket_dir, &p[0], &unix_socket))
{
pgagroal_log_fatal("pgagroal: Could not bind to %s/%s", config->unix_socket_dir, &p[0]);
goto error;
if (i != w->slot && !config->connections[i].new && config->connections[i].fd > 0)
{
pgagroal_disconnect(config->connections[i].fd);
}
}

start_mgt(loop);

return;

error:

exit_code = WORKER_FAILURE;
running = 0;
ev_break(loop, EVBREAK_ALL);
return;
}

static void
performance_stop(struct ev_loop *loop, struct worker_io* w)
{
shutdown_mgt(loop);
}

static void
Expand Down Expand Up @@ -163,17 +135,7 @@ performance_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
}
else if (status == MESSAGE_STATUS_ZERO)
{
/* Retry */
if (!pgagroal_socket_isvalid(wi->client_fd))
{
goto client_error;
}
else if (!pgagroal_socket_isvalid(wi->server_fd))
{
goto server_error;
}

errno = 0;
goto client_done;
}
else
{
Expand All @@ -183,6 +145,18 @@ performance_client(struct ev_loop *loop, struct ev_io *watcher, int revents)
ev_break (loop, EVBREAK_ONE);
return;

client_done:
config = (struct configuration*)shmem;
pgagroal_log_debug("[C] Client done (slot %d database %s user %s): %s (socket %d status %d)",
wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username,
strerror(errno), wi->client_fd, status);
errno = 0;

exit_code = WORKER_CLIENT_FAILURE;
running = 0;
ev_break(loop, EVBREAK_ALL);
return;

client_error:
config = (struct configuration*)shmem;
pgagroal_log_warn("[C] Client error (slot %d database %s user %s): %s (socket %d status %d)",
Expand Down Expand Up @@ -246,17 +220,7 @@ performance_server(struct ev_loop *loop, struct ev_io *watcher, int revents)
}
else if (status == MESSAGE_STATUS_ZERO)
{
/* Retry */
if (!pgagroal_socket_isvalid(wi->client_fd))
{
goto client_error;
}
else if (!pgagroal_socket_isvalid(wi->server_fd))
{
goto server_error;
}

errno = 0;
goto server_done;
}
else
{
Expand All @@ -279,6 +243,17 @@ performance_server(struct ev_loop *loop, struct ev_io *watcher, int revents)
ev_break(loop, EVBREAK_ALL);
return;

server_done:
config = (struct configuration*)shmem;
pgagroal_log_debug("[S] Server done (slot %d database %s user %s): %s (socket %d status %d)",
wi->slot, config->connections[wi->slot].database, config->connections[wi->slot].username,
strerror(errno), wi->server_fd, status);
errno = 0;

running = 0;
ev_break(loop, EVBREAK_ALL);
return;

server_error:
config = (struct configuration*)shmem;
pgagroal_log_warn("[S] Server error (slot %d database %s user %s): %s (socket %d status %d)",
Expand All @@ -292,80 +267,3 @@ performance_server(struct ev_loop *loop, struct ev_io *watcher, int revents)
ev_break(loop, EVBREAK_ALL);
return;
}

static void
start_mgt(struct ev_loop *loop)
{
memset(&io_mgt, 0, sizeof(struct ev_io));
ev_io_init(&io_mgt, accept_cb, unix_socket, EV_READ);
ev_io_start(loop, &io_mgt);
}

static void
shutdown_mgt(struct ev_loop* loop)
{
char p[MISC_LENGTH];
struct configuration* config = NULL;

config = (struct configuration*)shmem;

memset(&p, 0, sizeof(p));
snprintf(&p[0], sizeof(p), ".s.%d", getpid());

ev_io_stop(loop, &io_mgt);
pgagroal_disconnect(unix_socket);
errno = 0;
pgagroal_remove_unix_socket(config->unix_socket_dir, &p[0]);
errno = 0;
}

static void
accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents)
{
struct sockaddr_in client_addr;
socklen_t client_addr_length;
int client_fd;
signed char id;
int32_t slot;
int payload_i;
char* payload_s = NULL;

pgagroal_log_trace("accept_cb: sockfd ready (%d)", revents);

if (EV_ERROR & revents)
{
pgagroal_log_debug("accept_cb: invalid event: %s", strerror(errno));
errno = 0;
return;
}

client_addr_length = sizeof(client_addr);
client_fd = accept(watcher->fd, (struct sockaddr *)&client_addr, &client_addr_length);
if (client_fd == -1)
{
pgagroal_log_debug("accept: %s (%d)", strerror(errno), watcher->fd);
errno = 0;
return;
}

/* Process internal management request -- f.ex. returning a file descriptor to the pool */
pgagroal_management_read_header(client_fd, &id, &slot);
pgagroal_management_read_payload(client_fd, id, &payload_i, &payload_s);

switch (id)
{
case MANAGEMENT_REMOVE_FD:
pgagroal_log_debug("pgagroal: Management remove file descriptor: Slot %d FD %d", slot, payload_i);
if (fds[slot] == payload_i && !news[slot])
{
pgagroal_disconnect(payload_i);
fds[slot] = 0;
}
break;
default:
pgagroal_log_debug("pgagroal: Unsupported management id: %d", id);
break;
}

pgagroal_disconnect(client_fd);
}
Loading

0 comments on commit 59bfa39

Please sign in to comment.