Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions iocore/aio/AIO.cc
Original file line number Diff line number Diff line change
Expand Up @@ -504,35 +504,29 @@ DiskHandler::startAIOEvent(int /* event ATS_UNUSED */, Event *e)
int
DiskHandler::mainAIOEvent(int event, Event *e)
{
AIOCallback *op = nullptr;
Lagain:
int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, nullptr);
for (int i = 0; i < ret; i++) {
op = (AIOCallback *)events[i].data;
op->aio_result = events[i].res;
AIOCallback *op = static_cast<AIOCallback*>(events[i].data);
op->aio_result = events[i].res;
ink_assert(op->action.continuation);
complete_list.enqueue(op);
}

if (ret == MAX_AIO_EVENTS) {
goto Lagain;
}

if (ret < 0) {
if (errno == EINTR)
goto Lagain;
if (errno == EFAULT || errno == ENOSYS)
Debug("aio", "io_getevents failed: %s (%d)", strerror(-ret), -ret);
}

ink_aiocb *cbs[MAX_AIO_EVENTS];
int num = 0;

for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) != nullptr); ++num) {
for (AIOCallback *op; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) != nullptr); ++num) {
cbs[num] = &op->aiocb;
ink_assert(op->action.continuation);
}

if (num > 0) {
int ret;
do {
Expand All @@ -547,8 +541,7 @@ DiskHandler::mainAIOEvent(int event, Event *e)
}
}
}

while ((op = complete_list.dequeue()) != nullptr) {
while (AIOCallback *op = complete_list.dequeue()) {
op->mutex = op->action.mutex;
MUTEX_TRY_LOCK(lock, op->mutex, trigger_event->ethread);
if (!lock.is_locked()) {
Expand All @@ -566,11 +559,23 @@ ink_aio_read(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
op->aiocb.aio_lio_opcode = IO_CMD_PREAD;
op->aiocb.data = op;
EThread *t = this_ethread();
#ifdef AIO_MODE_MMAP
ink_assert(MAP_FAILED!=op->map);
memcpy(op->aiocb.aio_buf,static_cast<char *>(op->map)+op->aiocb.aio_offset,op->aiocb.aio_nbytes);
op->aio_result=op->aiocb.aio_nbytes;
if(!op->mutex)return 0;
MUTEX_TRY_LOCK(lock, op->mutex, t);
if (lock.is_locked()) {
op->handleEvent(EVENT_NONE, nullptr);
} else {
t->schedule_imm(op);
}
#else
#ifdef HAVE_EVENTFD
io_set_eventfd(&op->aiocb, t->evfd);
#endif
t->diskHandler->ready_list.enqueue(op);

#endif
return 1;
}

Expand All @@ -580,17 +585,32 @@ ink_aio_write(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
op->aiocb.aio_lio_opcode = IO_CMD_PWRITE;
op->aiocb.data = op;
EThread *t = this_ethread();
#ifdef AIO_MODE_MMAP
ink_assert(MAP_FAILED!=op->map);
memcpy(static_cast<char *>(op->map)+op->aiocb.aio_offset,op->aiocb.aio_buf,op->aiocb.aio_nbytes);
op->aio_result=op->aiocb.aio_nbytes;
if(!op->mutex)return 0;
MUTEX_TRY_LOCK(lock, op->mutex, t);
if (lock.is_locked()) {
op->handleEvent(EVENT_NONE, nullptr);
} else {
t->schedule_imm(op);
}
return 1;
#endif
#ifdef HAVE_EVENTFD
io_set_eventfd(&op->aiocb, t->evfd);
#endif
t->diskHandler->ready_list.enqueue(op);

return 1;
}

int
ink_aio_readv(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
{
#ifdef AIO_MODE_MMAP
for(;op->then;op=op->then)ink_aio_read(op);
#else
EThread *t = this_ethread();
DiskHandler *dh = t->diskHandler;
AIOCallback *io = op;
Expand All @@ -615,12 +635,16 @@ ink_aio_readv(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
op = op->then;
}
}
#endif
return 1;
}

int
ink_aio_writev(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
{
#ifdef AIO_MODE_MMAP
for(;op->then;op=op->then)ink_aio_write(op);
#else
EThread *t = this_ethread();
DiskHandler *dh = t->diskHandler;
AIOCallback *io = op;
Expand All @@ -645,6 +669,7 @@ ink_aio_writev(AIOCallback *op, int /* fromAPI ATS_UNUSED */)
op = op->then;
}
}
#endif
return 1;
}
#endif // AIO_MODE != AIO_MODE_NATIVE
6 changes: 6 additions & 0 deletions iocore/aio/I_AIO.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ static constexpr ts::ModuleVersion AIO_MODULE_PUBLIC_VERSION(1, 0, ts::ModuleVer

#define AIO_MODE_THREAD 0
#define AIO_MODE_NATIVE 1
#if 0
#define AIO_MODE_MMAP 2
#endif

#if TS_USE_LINUX_NATIVE_AIO
#define AIO_MODE AIO_MODE_NATIVE
Expand Down Expand Up @@ -87,6 +90,9 @@ bool ink_aio_thread_num_set(int thread_num);

struct AIOCallback : public Continuation {
// set before calling aio_read/aio_write
#ifdef AIO_MODE_MMAP
void* map = MAP_FAILED;
#endif
ink_aiocb aiocb;
Action action;
EThread *thread = AIO_CALLBACK_THREAD_ANY;
Expand Down
3 changes: 3 additions & 0 deletions iocore/aio/test_AIO.cc
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ AIO_Device::do_fd(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
io->aiocb.aio_offset = seq_read_point;
io->aiocb.aio_nbytes = seq_read_size;
io->aiocb.aio_lio_opcode = LIO_READ;
#ifdef AIO_MODE_MMAP
io->mutex=mutex;
#endif
ink_assert(ink_aio_read(io) >= 0);
seq_read_point += seq_read_size;
if (seq_read_point > max_offset) {
Expand Down
70 changes: 65 additions & 5 deletions iocore/cache/Cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1215,11 +1215,14 @@ vol_dir_clear(Vol *d)
{
size_t dir_len = d->dirlen();
vol_clear_init(d);

#ifdef AIO_MODE_MMAP
memcpy(static_cast<char*>(d->map)+d->skip,d->raw_dir,dir_len);
#else
if (pwrite(d->fd, d->raw_dir, dir_len, d->skip) < 0) {
Warning("unable to clear cache directory '%s'", d->hash_text.get());
return -1;
}
#endif
return 0;
}

Expand All @@ -1230,14 +1233,20 @@ Vol::clear_dir()
vol_clear_init(this);

SET_HANDLER(&Vol::handle_dir_clear);

#ifdef AIO_MODE_MMAP
io.map = map;
#else
io.aiocb.aio_fildes = fd;
#endif
io.aiocb.aio_buf = raw_dir;
io.aiocb.aio_nbytes = dir_len;
io.aiocb.aio_offset = skip;
io.action = this;
io.thread = AIO_CALLBACK_THREAD_ANY;
io.then = nullptr;
#ifdef AIO_MODE_MMAP
io.mutex = mutex;
#endif
ink_assert(ink_aio_write(&io));
return 0;
}
Expand Down Expand Up @@ -1309,14 +1318,21 @@ Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear)

for (unsigned i = 0; i < countof(init_info->vol_aio); i++) {
AIOCallback *aio = &(init_info->vol_aio[i]);
#ifdef AIO_MODE_MMAP
aio->map = map;
#else
aio->aiocb.aio_fildes = fd;
#endif
aio->aiocb.aio_buf = &(init_info->vol_h_f[i * STORE_BLOCK_SIZE]);
aio->aiocb.aio_nbytes = footerlen;
aio->action = this;
aio->thread = AIO_CALLBACK_THREAD_ANY;
aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : nullptr;
}
#if AIO_MODE == AIO_MODE_NATIVE
#ifdef AIO_MODE_MMAP
init_info->vol_aio->mutex=mutex;
#endif
ink_assert(ink_aio_readv(init_info->vol_aio));
#else
ink_assert(ink_aio_read(init_info->vol_aio));
Expand All @@ -1335,7 +1351,11 @@ Vol::handle_dir_clear(int event, void *data)
if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) {
Warning("unable to clear cache directory '%s'", hash_text.get());
disk->incrErrors(op);
#ifdef AIO_MODE_MMAP
map = MAP_FAILED;
#else
fd = -1;
#endif
}

if (op->aiocb.aio_nbytes == dir_len) {
Expand All @@ -1344,6 +1364,7 @@ Vol::handle_dir_clear(int event, void *data)
skip + len */
op->aiocb.aio_nbytes = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
op->aiocb.aio_offset = skip + dir_len;
assert(op->mutex);
ink_assert(ink_aio_write(op));
return EVENT_DONE;
}
Expand Down Expand Up @@ -1610,6 +1631,10 @@ Vol::handle_recover_from_data(int event, void * /* data ATS_UNUSED */)
}
prev_recover_pos = recover_pos;
io.aiocb.aio_offset = recover_pos;
#ifdef AIO_MODE_MMAP
io.map = map;
io.mutex = mutex;
#endif
ink_assert(ink_aio_read(&io));
return EVENT_CONT;

Expand Down Expand Up @@ -1656,7 +1681,11 @@ Ldone : {

for (int i = 0; i < 3; i++) {
AIOCallback *aio = &(init_info->vol_aio[i]);
#ifdef AIO_MODE_MMAP
aio->map = map;
#else
aio->aiocb.aio_fildes = fd;
#endif
aio->action = this;
aio->thread = AIO_CALLBACK_THREAD_ANY;
aio->then = (i < 2) ? &(init_info->vol_aio[i + 1]) : nullptr;
Expand Down Expand Up @@ -1726,8 +1755,11 @@ Vol::handle_header_read(int event, void *data)
}
op = op->then;
}

#ifdef AIO_MODE_MMAP
io.map = map;
#else
io.aiocb.aio_fildes = fd;
#endif
io.aiocb.aio_nbytes = this->dirlen();
io.aiocb.aio_buf = raw_dir;
io.action = this;
Expand All @@ -1741,6 +1773,9 @@ Vol::handle_header_read(int event, void *data)
Note("using directory A for '%s'", hash_text.get());
}
io.aiocb.aio_offset = skip;
#ifdef AIO_MODE_MMAP
io.mutex = mutex;
#endif
ink_assert(ink_aio_read(&io));
}
// try B
Expand All @@ -1750,6 +1785,9 @@ Vol::handle_header_read(int event, void *data)
Note("using directory B for '%s'", hash_text.get());
}
io.aiocb.aio_offset = skip + this->dirlen();
#ifdef AIO_MODE_MMAP
io.mutex = mutex;
#endif
ink_assert(ink_aio_read(&io));
} else {
Note("no good directory, clearing '%s' since sync_serials on both A and B copies are invalid", hash_text.get());
Expand Down Expand Up @@ -1777,7 +1815,11 @@ Vol::dir_init_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
ink_assert(!gvol[vol_no]);
gvol[vol_no] = this;
SET_HANDLER(&Vol::aggWrite);
#ifdef AIO_MODE_MMAP
cache->vol_initialized(map != MAP_FAILED);
#else
cache->vol_initialized(fd != -1);
#endif
return EVENT_DONE;
}
}
Expand Down Expand Up @@ -1947,7 +1989,11 @@ CacheProcessor::mark_storage_offline(CacheDisk *d, ///< Target disk
}

for (p = 0; p < gnvol; p++) {
#ifdef AIO_MODE_MMAP
if (d->map == gvol[p]->map) {
#else
if (d->fd == gvol[p]->fd) {
#endif
total_dir_delete += gvol[p]->buckets * gvol[p]->segments * DIR_DEPTH;
used_dir_delete += dir_entries_used(gvol[p]);
total_bytes_delete += gvol[p]->len - gvol[p]->dirlen();
Expand Down Expand Up @@ -2009,8 +2055,11 @@ AIO_Callback_handler::handle_disk_failure(int /* event ATS_UNUSED */, void *data

for (; disk_no < gndisks; disk_no++) {
CacheDisk *d = gdisks[disk_no];

#ifdef AIO_MODE_MMAP
if (d->map == cb->map) {
#else
if (d->fd == cb->aiocb.aio_fildes) {
#endif
char message[256];
d->incrErrors(cb);

Expand Down Expand Up @@ -2091,7 +2140,11 @@ Cache::open(bool clear, bool /* fix ATS_UNUSED */)
cp->vols[vol_no] = new Vol();
CacheDisk *d = cp->disk_vols[i]->disk;
cp->vols[vol_no]->disk = d;
#ifdef AIO_MODE_MMAP
cp->vols[vol_no]->map = d->map;
#else
cp->vols[vol_no]->fd = d->fd;
#endif
cp->vols[vol_no]->cache = this;
cp->vols[vol_no]->cache_vol = cp;
blocks = q->b->len;
Expand Down Expand Up @@ -2316,14 +2369,18 @@ CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned)vol->agg_buf_pos);
char *doc = buf->data();
assert(vol->agg_buffer);
char *agg = vol->agg_buffer + agg_offset;
memcpy(doc, agg, io.aiocb.aio_nbytes);
io.aio_result = io.aiocb.aio_nbytes;
SET_HANDLER(&CacheVC::handleReadDone);
return EVENT_RETURN;
}

#ifdef AIO_MODE_MMAP
io.map = vol->map;
#else
io.aiocb.aio_fildes = vol->fd;
#endif
io.aiocb.aio_offset = vol->vol_offset(&dir);
if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(vol->skip + vol->len)) {
io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset;
Expand All @@ -2333,6 +2390,9 @@ CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
io.action = this;
io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
SET_HANDLER(&CacheVC::handleReadDone);
#ifdef AIO_MODE_MMAP
io.mutex = mutex;
#endif
ink_assert(ink_aio_read(&io) >= 0);
CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat);
return EVENT_CONT;
Expand Down
Loading