Skip to content

Commit

Permalink
DAOS-16686 dfuse: fix hang for chunk read
Browse files Browse the repository at this point in the history
fix hang during chunk read

Run-GHA: true
Allow-unstable-test: true
Required-githooks: true

Signed-off-by: Di Wang <ddiwang@google.com>
  • Loading branch information
wangdi1 committed Dec 24, 2024
1 parent 8840964 commit 81f5ff4
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
4 changes: 4 additions & 0 deletions src/client/dfuse/file.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ active_oh_decref(struct dfuse_info *dfuse_info, struct dfuse_obj_hdl *oh)
if (read_chunk_close(oh->doh_ie))
oh->doh_linear_read = true;

/* Invalid readahead cache */
if (oh->doh_ie->ie_active->readahead)
oh->doh_ie->ie_active->readahead->dra_ev = NULL;

/* Do not set linear read in the case where there's no reads or writes, this could be
* simple open/close calls but it could also be cache use so leave the setting unchanged
* in this case.
Expand Down
41 changes: 29 additions & 12 deletions src/client/dfuse/ops/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,25 +48,28 @@ cb_read_helper(struct dfuse_event *ev, void *buff)

/* read slave complete */
static void
dfuse_cb_slave_list_read_complete(struct dfuse_event *ev)
dfuse_cb_slave_list_read_complete(struct dfuse_event *ev, struct dfuse_inode_entry *ie)
{
struct dfuse_event *evs, *evn;
d_list_t cblist;
char *buf = ev->de_iov.iov_buf;
struct active_inode *ia = ie->ie_active;

D_INIT_LIST_HEAD(&cblist);
D_SPIN_LOCK(&ev->de_oh->doh_ie->ie_active->lock);
D_SPIN_LOCK(&ia->lock);
d_list_del(&ev->de_read_list);
d_list_for_each_entry_safe(evs, evn, &ev->de_read_slaves, de_read_list)
d_list_for_each_entry_safe(evs, evn, &ev->de_read_slaves, de_read_list) {
d_list_move(&evs->de_read_list, &cblist);
D_SPIN_UNLOCK(&ev->de_oh->doh_ie->ie_active->lock);
}
D_SPIN_UNLOCK(&ia->lock);

d_list_for_each_entry(evs, &cblist, de_read_list) {
DFUSE_TRA_DEBUG(ev->de_oh, "concurrent network read %p", evs->de_oh);
d_list_del(&evs->de_read_list);
evs->de_len = min(ev->de_len, evs->de_req_len);
evs->de_ev.ev_error = ev->de_ev.ev_error;
DFUSE_IE_STAT_ADD(ev->de_oh->doh_ie, DS_PRE_READ);
if (ia->readahead && ia->readahead->complete)
DFUSE_IE_STAT_ADD(ie, DS_PRE_READ);
D_ASSERT(evs->de_req_position >= ev->de_req_position);
memcpy(evs->de_iov.iov_buf, buf + (evs->de_req_position - ev->de_req_position),
evs->de_len);
Expand All @@ -78,7 +81,7 @@ static void
dfuse_cb_read_complete(struct dfuse_event *ev)
{
/* check slave list first */
dfuse_cb_slave_list_read_complete(ev);
dfuse_cb_slave_list_read_complete(ev, ev->de_oh->doh_ie);

/* Then complete itself */
cb_read_helper(ev, ev->de_iov.iov_buf);
Expand Down Expand Up @@ -233,7 +236,7 @@ check_inflight_fetch(struct active_inode *active, struct dfuse_event *ev)

struct read_chunk_data {
struct dfuse_event *ev;
struct active_inode *ia;
struct dfuse_inode_entry *ie;
fuse_req_t reqs[8];
struct dfuse_obj_hdl *ohs[8];
d_list_t list;
Expand Down Expand Up @@ -288,7 +291,8 @@ static void
chunk_cb(struct dfuse_event *ev)
{
struct read_chunk_data *cd = ev->de_cd;
struct active_inode *ia = cd->ia;
struct dfuse_inode_entry *ie = cd->ie;
struct active_inode *ia = ie->ie_active;
fuse_req_t req;
bool done = false;

Expand All @@ -300,6 +304,7 @@ chunk_cb(struct dfuse_event *ev)
cd->bucket, cd->bucket * CHUNK_SIZE, CHUNK_SIZE, ev->de_len);
}

dfuse_cb_slave_list_read_complete(ev, ie);
daos_event_fini(&ev->de_ev);

do {
Expand Down Expand Up @@ -455,7 +460,7 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh)
if (cd == NULL)
goto err;

cd->ia = ie->ie_active;
cd->ie = ie;
cd->bucket = bucket;
submit = true;

Expand Down Expand Up @@ -497,6 +502,7 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh)
*/
rcb = false;
} else {
DFUSE_IE_STAT_ADD(ie, DS_PRE_READ);
DFUSE_TRA_DEBUG(oh, "%#zx-%#zx read", position,
position + K128 - 1);
DFUSE_REPLY_BUFQ(oh, req, ev->de_iov.iov_buf + (slot * K128), K128);
Expand All @@ -505,6 +511,17 @@ chunk_read(fuse_req_t req, size_t len, off_t position, struct dfuse_obj_hdl *oh)
d_slab_release(cd->eqt->de_read_slab, cd->ev);
D_FREE(cd);
}
} else {
struct dfuse_info *dfuse_info = fuse_req_userdata(req);
struct dfuse_eq *eqt;

eqt = pick_eqt(dfuse_info);

/* Send a message to the async thread to wake it up and poll for events */
sem_post(&eqt->de_sem);

/* Now ensure there are more descriptors for the next request */
d_slab_restock(eqt->de_read_slab);
}
}

Expand Down Expand Up @@ -534,9 +551,9 @@ dfuse_cb_read(fuse_req_t req, fuse_ino_t ino, size_t len, off_t position, struct
DFUSE_TRA_DEBUG(oh, "Returning EOF early without round trip %#zx", position);
oh->doh_linear_read_eof = false;

if (active->readahead) {
if (active->readahead)
DFUSE_IE_STAT_ADD(oh->doh_ie, DS_PRE_READ);
}

DFUSE_REPLY_BUFQ(oh, req, NULL, 0);
return;
}
Expand Down Expand Up @@ -648,7 +665,7 @@ dfuse_cb_pre_read_complete(struct dfuse_event *ev)
}
pre_read_mark_done(active);

dfuse_cb_slave_list_read_complete(ev);
dfuse_cb_slave_list_read_complete(ev, ie);
/* Drop the extra ref on active, the file could be closed before this read completes */
active_ie_decref(dfuse_info, ie);
ev->de_oh->doh_readahead_inflight = 0;
Expand Down

0 comments on commit 81f5ff4

Please sign in to comment.