Skip to content

Commit

Permalink
update the handling of EOF
Browse files Browse the repository at this point in the history
one BIO can tell fr_bio_eof() that it's at EOF.  That function
will take care of calling the various BIO internal EOF functions
until such time as it's at the first BIO.  At which point it will
call the application EOF function.
  • Loading branch information
alandekok committed Nov 19, 2024
1 parent 44b2425 commit c6ec648
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 29 deletions.
48 changes: 34 additions & 14 deletions src/lib/bio/base.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,28 +231,48 @@ void fr_bio_cb_set(fr_bio_t *bio, fr_bio_cb_funcs_t const *cb)

/** Internal BIO function to run EOF callbacks.
*
* The EOF callbacks are _internal_, and tell the various BIOs that there is nothing more to read from the
* BIO.
*
* @todo - do we need to have separate _write_ EOF? Likely not.
* When a BIO hits EOF, it MUST call this function. This function will take care of changing the read()
* function to return nothing. It will also take care of walking back up the hierarchy, and calling any
*/
void fr_bio_eof(fr_bio_t *bio)
{
fr_bio_t *x = bio;
fr_bio_common_t *this = (fr_bio_common_t *) bio;

/*
* Start from the first BIO.
* This BIO is at EOF. So we can't call read() any more.
*/
while (fr_bio_prev(x) != NULL) x = fr_bio_prev(x);
this->bio.read = fr_bio_null_read;

/*
* Shut each one down, including the one which called us.
*/
while (x) {
fr_bio_common_t *this = (fr_bio_common_t *) x;
while (true) {
fr_bio_common_t *prev = (fr_bio_common_t *) fr_bio_prev(&this->bio);

/*
* There are no more BIOs. Tell the application that the entire BIO chain is at EOF.
*/
if (!prev) {
if (this->cb.eof) {
this->cb.eof(&this->bio);
this->cb.eof = NULL;
}
break;
}

if (this->priv_cb.eof) this->priv_cb.eof((fr_bio_t *) this);
/*
* Go to the previous BIO. If it doesn't have an EOF handler, then keep going back up
* the chain until we're at the top.
*/
this = prev;
if (!this->priv_cb.eof) continue;

x = fr_bio_next(x);
/*
* The EOF handler said it's NOT at EOF, so we stop processing here.
*/
if (this->priv_cb.eof((fr_bio_t *) this) == 0) break;

/*
* Don't run the EOF callback multiple times.
*/
this->priv_cb.eof = NULL;
break;
}
}
2 changes: 1 addition & 1 deletion src/lib/bio/bio_priv.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ typedef struct fr_bio_common_s fr_bio_common_t;
typedef struct {
fr_bio_io_t connected;
fr_bio_callback_t shutdown;
fr_bio_callback_t eof;
fr_bio_io_t eof;
fr_bio_callback_t failed;

fr_bio_io_t read_blocked;
Expand Down
13 changes: 6 additions & 7 deletions src/lib/bio/fd.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ static int fr_bio_fd_destructor(fr_bio_fd_t *my)
fr_assert(!fr_bio_prev(&my->bio));
fr_assert(!fr_bio_next(&my->bio));

if (!my->info.eof && my->cb.eof) my->cb.eof(&my->bio);

if (my->connect.ev) {
talloc_const_free(my->connect.ev);
my->connect.ev = NULL;
Expand All @@ -117,13 +115,15 @@ static int fr_bio_fd_destructor(fr_bio_fd_t *my)
return fr_bio_fd_close(&my->bio);
}

static void fr_bio_fd_eof(fr_bio_t *bio)
static int fr_bio_fd_eof(fr_bio_t *bio)
{
fr_bio_fd_t *my = talloc_get_type_abort(bio, fr_bio_fd_t);

bio->read = fr_bio_null_read;
bio->write = fr_bio_null_write;
my->info.eof = true;

/*
* Nothing more for us to do, tell fr_bio_eof() that it can continue with poking other BIOs.
*/
return 1;
}

static int fr_bio_fd_write_resume(fr_bio_t *bio)
Expand All @@ -150,7 +150,6 @@ static ssize_t fr_bio_fd_read_stream(fr_bio_t *bio, UNUSED void *packet_ctx, voi
rcode = read(my->info.socket.fd, buffer, size);
if (rcode == 0) {
fr_bio_eof(bio);
if (my->cb.eof) my->cb.eof(&my->bio); /* inform the application that we're at EOF */
return 0;
}

Expand Down
1 change: 0 additions & 1 deletion src/lib/bio/fd_errno.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ case EPIPE:
* and set EOF on the BIO.
*/
fr_bio_eof(&my->bio);
if (my->cb.eof) my->cb.eof(&my->bio); /* inform the application that we're at EOF */
return 0;

default:
Expand Down
18 changes: 16 additions & 2 deletions src/lib/bio/mem.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,12 @@ static ssize_t fr_bio_mem_read_eof(fr_bio_t *bio, UNUSED void *packet_ctx, void
* No more data: return EOF from now on.
*/
if (fr_bio_buf_used(&my->read_buffer) == 0) {
my->bio.read = fr_bio_null_read;

/*
* Don't call our EOF function. But do tell the other BIOs that we're at EOF.
*/
my->priv_cb.eof = NULL;
fr_bio_eof(&my->bio);
return 0;
}

Expand All @@ -69,11 +74,20 @@ static ssize_t fr_bio_mem_read_eof(fr_bio_t *bio, UNUSED void *packet_ctx, void
return fr_bio_buf_read(&my->read_buffer, buffer, size);
}

static void fr_bio_mem_eof(fr_bio_t *bio)
static int fr_bio_mem_eof(fr_bio_t *bio)
{
fr_bio_mem_t *my = talloc_get_type_abort(bio, fr_bio_mem_t);

/*
* Nothing more for us to read, tell fr_bio_eof() that it can continue with poking other BIOs.
*/
if (fr_bio_buf_used(&my->read_buffer) == 0) {
return 1;
}

my->bio.read = fr_bio_mem_read_eof;

return 0;
}

/** Read from a memory BIO
Expand Down
19 changes: 15 additions & 4 deletions src/lib/bio/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,14 @@ static ssize_t fr_bio_pipe_read(fr_bio_t *bio, void *packet_ctx, void *buffer, s
pthread_mutex_lock(&my->mutex);
rcode = my->next->read(my->next, packet_ctx, buffer, size);
if ((rcode == 0) && my->eof) {
rcode = 0;
my->bio.read = fr_bio_null_read;
pthread_mutex_unlock(&my->mutex);

/*
* Don't call our EOF function. But do tell the other BIOs that we're at EOF.
*/
my->priv_cb.eof = NULL;
fr_bio_eof(bio);
return 0;

} else if (rcode > 0) {
/*
Expand Down Expand Up @@ -134,14 +140,19 @@ static void fr_bio_pipe_shutdown(fr_bio_t *bio)
* Either side can set EOF, in which case pending reads are still processed. Writes return EOF immediately.
* Readers return pending data, and then EOF.
*/
static void fr_bio_pipe_eof(fr_bio_t *bio)
static int fr_bio_pipe_eof(fr_bio_t *bio)
{
fr_bio_pipe_t *my = talloc_get_type_abort(bio, fr_bio_pipe_t);

pthread_mutex_lock(&my->mutex);
my->eof = true;
fr_bio_eof(my->next);
pthread_mutex_unlock(&my->mutex);

/*
* We don't know if the other end is at EOF, we have to do a read. So we tell fr_bio_eof() to
* stop processing.
*/
return 0;
}

/** Allocate a thread-safe pipe which can be used for both reads and writes.
Expand Down

0 comments on commit c6ec648

Please sign in to comment.