diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc index df8e27bea8f..aae86476d39 100644 --- a/iocore/aio/AIO.cc +++ b/iocore/aio/AIO.cc @@ -504,35 +504,29 @@ DiskHandler::startAIOEvent(int /* event ATS_UNUSED */, Event *e) int DiskHandler::mainAIOEvent(int event, Event *e) { - AIOCallback *op = nullptr; Lagain: int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, nullptr); for (int i = 0; i < ret; i++) { - op = (AIOCallback *)events[i].data; - op->aio_result = events[i].res; + AIOCallback *op = static_cast(events[i].data); + op->aio_result = events[i].res; ink_assert(op->action.continuation); complete_list.enqueue(op); } - if (ret == MAX_AIO_EVENTS) { goto Lagain; } - if (ret < 0) { if (errno == EINTR) goto Lagain; if (errno == EFAULT || errno == ENOSYS) Debug("aio", "io_getevents failed: %s (%d)", strerror(-ret), -ret); } - ink_aiocb *cbs[MAX_AIO_EVENTS]; int num = 0; - - for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) != nullptr); ++num) { + for (AIOCallback *op; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) != nullptr); ++num) { cbs[num] = &op->aiocb; ink_assert(op->action.continuation); } - if (num > 0) { int ret; do { @@ -547,8 +541,7 @@ DiskHandler::mainAIOEvent(int event, Event *e) } } } - - while ((op = complete_list.dequeue()) != nullptr) { + while (AIOCallback *op = complete_list.dequeue()) { op->mutex = op->action.mutex; MUTEX_TRY_LOCK(lock, op->mutex, trigger_event->ethread); if (!lock.is_locked()) { @@ -566,11 +559,23 @@ ink_aio_read(AIOCallback *op, int /* fromAPI ATS_UNUSED */) op->aiocb.aio_lio_opcode = IO_CMD_PREAD; op->aiocb.data = op; EThread *t = this_ethread(); +#ifdef AIO_MODE_MMAP + ink_assert(MAP_FAILED!=op->map); + memcpy(op->aiocb.aio_buf,static_cast(op->map)+op->aiocb.aio_offset,op->aiocb.aio_nbytes); + op->aio_result=op->aiocb.aio_nbytes; + if(!op->mutex)return 0; + MUTEX_TRY_LOCK(lock, op->mutex, t); + if (lock.is_locked()) { + op->handleEvent(EVENT_NONE, nullptr); + } else { + t->schedule_imm(op); + } +#else #ifdef HAVE_EVENTFD io_set_eventfd(&op->aiocb, t->evfd); #endif t->diskHandler->ready_list.enqueue(op); - +#endif return 1; } @@ -580,17 +585,32 @@ ink_aio_write(AIOCallback *op, int /* fromAPI ATS_UNUSED */) op->aiocb.aio_lio_opcode = IO_CMD_PWRITE; op->aiocb.data = op; EThread *t = this_ethread(); +#ifdef AIO_MODE_MMAP + ink_assert(MAP_FAILED!=op->map); + memcpy(static_cast(op->map)+op->aiocb.aio_offset,op->aiocb.aio_buf,op->aiocb.aio_nbytes); + op->aio_result=op->aiocb.aio_nbytes; + if(!op->mutex)return 0; + MUTEX_TRY_LOCK(lock, op->mutex, t); + if (lock.is_locked()) { + op->handleEvent(EVENT_NONE, nullptr); + } else { + t->schedule_imm(op); + } + return 1; +#endif #ifdef HAVE_EVENTFD io_set_eventfd(&op->aiocb, t->evfd); #endif t->diskHandler->ready_list.enqueue(op); - return 1; } int ink_aio_readv(AIOCallback *op, int /* fromAPI ATS_UNUSED */) { +#ifdef AIO_MODE_MMAP + for(;op->then;op=op->then)ink_aio_read(op); +#else EThread *t = this_ethread(); DiskHandler *dh = t->diskHandler; AIOCallback *io = op; @@ -615,12 +635,16 @@ ink_aio_readv(AIOCallback *op, int /* fromAPI ATS_UNUSED */) op = op->then; } } +#endif return 1; } int ink_aio_writev(AIOCallback *op, int /* fromAPI ATS_UNUSED */) { +#ifdef AIO_MODE_MMAP + for(;op->then;op=op->then)ink_aio_write(op); +#else EThread *t = this_ethread(); DiskHandler *dh = t->diskHandler; AIOCallback *io = op; @@ -645,6 +669,7 @@ ink_aio_writev(AIOCallback *op, int /* fromAPI ATS_UNUSED */) op = op->then; } } +#endif return 1; } #endif // AIO_MODE != AIO_MODE_NATIVE diff --git a/iocore/aio/I_AIO.h b/iocore/aio/I_AIO.h index 6144e188f6e..15bdfab3d3e 100644 --- a/iocore/aio/I_AIO.h +++ b/iocore/aio/I_AIO.h @@ -40,6 +40,9 @@ static constexpr ts::ModuleVersion AIO_MODULE_PUBLIC_VERSION(1, 0, ts::ModuleVer #define AIO_MODE_THREAD 0 #define AIO_MODE_NATIVE 1 +#if 0 +#define AIO_MODE_MMAP 2 +#endif #if TS_USE_LINUX_NATIVE_AIO #define AIO_MODE AIO_MODE_NATIVE @@ -87,6 +90,9 @@ bool ink_aio_thread_num_set(int thread_num); struct AIOCallback : public Continuation { // set before calling aio_read/aio_write +#ifdef AIO_MODE_MMAP + void* map = MAP_FAILED; +#endif ink_aiocb aiocb; Action action; EThread *thread = AIO_CALLBACK_THREAD_ANY; diff --git a/iocore/aio/test_AIO.cc b/iocore/aio/test_AIO.cc index 87e6f06cf7d..c99dabbe67f 100644 --- a/iocore/aio/test_AIO.cc +++ b/iocore/aio/test_AIO.cc @@ -287,6 +287,9 @@ AIO_Device::do_fd(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) io->aiocb.aio_offset = seq_read_point; io->aiocb.aio_nbytes = seq_read_size; io->aiocb.aio_lio_opcode = LIO_READ; +#ifdef AIO_MODE_MMAP + io->mutex=mutex; +#endif ink_assert(ink_aio_read(io) >= 0); seq_read_point += seq_read_size; if (seq_read_point > max_offset) { diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc index edb825a4a82..fa21e44408e 100644 --- a/iocore/cache/Cache.cc +++ b/iocore/cache/Cache.cc @@ -1215,11 +1215,14 @@ vol_dir_clear(Vol *d) { size_t dir_len = d->dirlen(); vol_clear_init(d); - +#ifdef AIO_MODE_MMAP + memcpy(static_cast(d->map)+d->skip,d->raw_dir,dir_len); +#else if (pwrite(d->fd, d->raw_dir, dir_len, d->skip) < 0) { Warning("unable to clear cache directory '%s'", d->hash_text.get()); return -1; } +#endif return 0; } @@ -1230,14 +1233,20 @@ Vol::clear_dir() vol_clear_init(this); SET_HANDLER(&Vol::handle_dir_clear); - +#ifdef AIO_MODE_MMAP + io.map = map; +#else io.aiocb.aio_fildes = fd; +#endif io.aiocb.aio_buf = raw_dir; io.aiocb.aio_nbytes = dir_len; io.aiocb.aio_offset = skip; io.action = this; io.thread = AIO_CALLBACK_THREAD_ANY; io.then = nullptr; +#ifdef AIO_MODE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_write(&io)); return 0; } @@ -1309,7 +1318,11 @@ Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear) for (unsigned i = 0; i < countof(init_info->vol_aio); i++) { AIOCallback *aio = &(init_info->vol_aio[i]); +#ifdef AIO_MODE_MMAP + aio->map = map; +#else aio->aiocb.aio_fildes = fd; +#endif aio->aiocb.aio_buf = &(init_info->vol_h_f[i * STORE_BLOCK_SIZE]); aio->aiocb.aio_nbytes = footerlen; aio->action = this; @@ -1317,6 +1330,9 @@ Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear) aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : nullptr; } #if AIO_MODE == AIO_MODE_NATIVE +#ifdef AIO_MODE_MMAP + init_info->vol_aio->mutex=mutex; +#endif ink_assert(ink_aio_readv(init_info->vol_aio)); #else ink_assert(ink_aio_read(init_info->vol_aio)); @@ -1335,7 +1351,11 @@ Vol::handle_dir_clear(int event, void *data) if (static_cast(op->aio_result) != op->aiocb.aio_nbytes) { Warning("unable to clear cache directory '%s'", hash_text.get()); disk->incrErrors(op); +#ifdef AIO_MODE_MMAP + map = MAP_FAILED; +#else fd = -1; +#endif } if (op->aiocb.aio_nbytes == dir_len) { @@ -1344,6 +1364,7 @@ Vol::handle_dir_clear(int event, void *data) skip + len */ op->aiocb.aio_nbytes = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)); op->aiocb.aio_offset = skip + dir_len; + assert(op->mutex); ink_assert(ink_aio_write(op)); return EVENT_DONE; } @@ -1610,6 +1631,10 @@ Vol::handle_recover_from_data(int event, void * /* data ATS_UNUSED */) } prev_recover_pos = recover_pos; io.aiocb.aio_offset = recover_pos; +#ifdef AIO_MODE_MMAP + io.map = map; + io.mutex = mutex; +#endif ink_assert(ink_aio_read(&io)); return EVENT_CONT; @@ -1656,7 +1681,11 @@ Ldone : { for (int i = 0; i < 3; i++) { AIOCallback *aio = &(init_info->vol_aio[i]); +#ifdef AIO_MODE_MMAP + aio->map = map; +#else aio->aiocb.aio_fildes = fd; +#endif aio->action = this; aio->thread = AIO_CALLBACK_THREAD_ANY; aio->then = (i < 2) ? &(init_info->vol_aio[i + 1]) : nullptr; @@ -1726,8 +1755,11 @@ Vol::handle_header_read(int event, void *data) } op = op->then; } - +#ifdef AIO_MODE_MMAP + io.map = map; +#else io.aiocb.aio_fildes = fd; +#endif io.aiocb.aio_nbytes = this->dirlen(); io.aiocb.aio_buf = raw_dir; io.action = this; @@ -1741,6 +1773,9 @@ Vol::handle_header_read(int event, void *data) Note("using directory A for '%s'", hash_text.get()); } io.aiocb.aio_offset = skip; +#ifdef AIO_MODE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_read(&io)); } // try B @@ -1750,6 +1785,9 @@ Vol::handle_header_read(int event, void *data) Note("using directory B for '%s'", hash_text.get()); } io.aiocb.aio_offset = skip + this->dirlen(); +#ifdef AIO_MODE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_read(&io)); } else { Note("no good directory, clearing '%s' since sync_serials on both A and B copies are invalid", hash_text.get()); @@ -1777,7 +1815,11 @@ Vol::dir_init_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */) ink_assert(!gvol[vol_no]); gvol[vol_no] = this; SET_HANDLER(&Vol::aggWrite); +#ifdef AIO_MODE_MMAP + cache->vol_initialized(map != MAP_FAILED); +#else cache->vol_initialized(fd != -1); +#endif return EVENT_DONE; } } @@ -1947,7 +1989,11 @@ CacheProcessor::mark_storage_offline(CacheDisk *d, ///< Target disk } for (p = 0; p < gnvol; p++) { +#ifdef AIO_MODE_MMAP + if (d->map == gvol[p]->map) { +#else if (d->fd == gvol[p]->fd) { +#endif total_dir_delete += gvol[p]->buckets * gvol[p]->segments * DIR_DEPTH; used_dir_delete += dir_entries_used(gvol[p]); total_bytes_delete += gvol[p]->len - gvol[p]->dirlen(); @@ -2009,8 +2055,11 @@ AIO_Callback_handler::handle_disk_failure(int /* event ATS_UNUSED */, void *data for (; disk_no < gndisks; disk_no++) { CacheDisk *d = gdisks[disk_no]; - +#ifdef AIO_MODE_MMAP + if (d->map == cb->map) { +#else if (d->fd == cb->aiocb.aio_fildes) { +#endif char message[256]; d->incrErrors(cb); @@ -2091,7 +2140,11 @@ Cache::open(bool clear, bool /* fix ATS_UNUSED */) cp->vols[vol_no] = new Vol(); CacheDisk *d = cp->disk_vols[i]->disk; cp->vols[vol_no]->disk = d; +#ifdef AIO_MODE_MMAP + cp->vols[vol_no]->map = d->map; +#else cp->vols[vol_no]->fd = d->fd; +#endif cp->vols[vol_no]->cache = this; cp->vols[vol_no]->cache_vol = cp; blocks = q->b->len; @@ -2316,14 +2369,18 @@ CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED); ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned)vol->agg_buf_pos); char *doc = buf->data(); + assert(vol->agg_buffer); char *agg = vol->agg_buffer + agg_offset; memcpy(doc, agg, io.aiocb.aio_nbytes); io.aio_result = io.aiocb.aio_nbytes; SET_HANDLER(&CacheVC::handleReadDone); return EVENT_RETURN; } - +#ifdef AIO_MODE_MMAP + io.map = vol->map; +#else io.aiocb.aio_fildes = vol->fd; +#endif io.aiocb.aio_offset = vol->vol_offset(&dir); if (static_cast(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast(vol->skip + vol->len)) { io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset; @@ -2333,6 +2390,9 @@ CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) io.action = this; io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding; SET_HANDLER(&CacheVC::handleReadDone); +#ifdef AIO_MODE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_read(&io) >= 0); CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat); return EVENT_CONT; diff --git a/iocore/cache/CacheDir.cc b/iocore/cache/CacheDir.cc index 8b2d428645b..961549ac52f 100644 --- a/iocore/cache/CacheDir.cc +++ b/iocore/cache/CacheDir.cc @@ -569,7 +569,11 @@ dir_probe(const CacheKey *key, Vol *d, Dir *result, Dir **last_collision) goto Lcont; } if (dir_valid(d, e)) { +#ifdef AIO_MODE_MMAP + DDebug("dir_probe_hit", "found %X %X vol %p bucket %d boffset %" PRId64 "", key->slice32(0), key->slice32(1), d->map, b, +#else DDebug("dir_probe_hit", "found %X %X vol %d bucket %d boffset %" PRId64 "", key->slice32(0), key->slice32(1), d->fd, b, +#endif dir_offset(e)); dir_assign(result, e); *last_collision = e; @@ -594,7 +598,11 @@ dir_probe(const CacheKey *key, Vol *d, Dir *result, Dir **last_collision) collision = nullptr; goto Lagain; } +#ifdef AIO_MODE_MMAP + DDebug("dir_probe_miss", "missed %X %X on vol %p bucket %d at %p", key->slice32(0), key->slice32(1), d->map, b, seg); +#else DDebug("dir_probe_miss", "missed %X %X on vol %d bucket %d at %p", key->slice32(0), key->slice32(1), d->fd, b, seg); +#endif CHECK_DIR(d); return 0; } @@ -645,7 +653,11 @@ dir_insert(const CacheKey *key, Vol *d, Dir *to_part) dir_assign_data(e, to_part); dir_set_tag(e, key->slice32(2)); ink_assert(d->vol_offset(e) < (d->skip + d->len)); +#ifdef AIO_MODE_MMAP + DDebug("dir_insert", "insert %p %X into vol %p bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0), d->map, bi, e, +#else DDebug("dir_insert", "insert %p %X into vol %d bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0), d->fd, bi, e, +#endif key->slice32(1), dir_tag(e), dir_offset(e)); CHECK_DIR(d); d->header->dirty = 1; @@ -722,7 +734,11 @@ dir_overwrite(const CacheKey *key, Vol *d, Dir *dir, Dir *overwrite, bool must_o dir_assign_data(e, dir); dir_set_tag(e, t); ink_assert(d->vol_offset(e) < d->skip + d->len); +#ifdef AIO_MODE_MMAP + DDebug("dir_overwrite", "overwrite %p %X into vol %p bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0), d->map, +#else DDebug("dir_overwrite", "overwrite %p %X into vol %d bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0), d->fd, +#endif bi, e, t, dir_tag(e), dir_offset(e)); CHECK_DIR(d); d->header->dirty = 1; @@ -888,9 +904,17 @@ dir_sync_init() } void +#ifdef AIO_MODE_MMAP +CacheSync::aio_write(void* map, char *b, int n, off_t o) +#else CacheSync::aio_write(int fd, char *b, int n, off_t o) +#endif { +#ifdef AIO_MODE_MMAP + io.map = map; +#else io.aiocb.aio_fildes = fd; +#endif io.aiocb.aio_offset = o; io.aiocb.aio_nbytes = n; io.aiocb.aio_buf = b; @@ -970,8 +994,12 @@ sync_cache_dir_on_shutdown() // set write limit d->header->agg_pos = d->header->write_pos + d->agg_buf_pos; - +#ifdef AIO_MODE_MMAP + assert(!d->agg_buffer); + int r = d->agg_buf_pos; d->agg_buffer=static_cast(d->map)+d->header->write_pos; +#else int r = pwrite(d->fd, d->agg_buffer, d->agg_buf_pos, d->header->write_pos); +#endif if (r != d->agg_buf_pos) { ink_assert(!"flushing agg buffer failed"); continue; @@ -1014,7 +1042,11 @@ sync_cache_dir_on_shutdown() memcpy(buf, d->raw_dir, dirlen); size_t B = d->header->sync_serial & 1; off_t start = d->skip + (B ? dirlen : 0); +#ifdef AIO_MODE_MMAP + B = dirlen; memcpy(static_cast(d->map)+start, buf, dirlen); +#else B = pwrite(d->fd, buf, dirlen, start); +#endif ink_assert(B == dirlen); Debug("cache_dir_sync", "done syncing dir for vol %s", d->hash_text.get()); } @@ -1146,7 +1178,11 @@ CacheSync::mainEvent(int event, Event *e) if (!writepos) { // write header +#ifdef AIO_MODE_MMAP + aio_write(vol->map, buf + writepos, headerlen, start + writepos); +#else aio_write(vol->fd, buf + writepos, headerlen, start + writepos); +#endif writepos += headerlen; } else if (writepos < static_cast(dirlen) - headerlen) { // write part of body @@ -1154,12 +1190,20 @@ CacheSync::mainEvent(int event, Event *e) if (writepos + l > static_cast(dirlen) - headerlen) { l = dirlen - headerlen - writepos; } +#ifdef AIO_MODE_MMAP + aio_write(vol->map, buf + writepos, l, start + writepos); +#else aio_write(vol->fd, buf + writepos, l, start + writepos); +#endif writepos += l; } else if (writepos < static_cast(dirlen)) { ink_assert(writepos == (off_t)dirlen - headerlen); // write footer +#ifdef AIO_MODE_MMAP + aio_write(vol->map, buf + writepos, headerlen, start + writepos); +#else aio_write(vol->fd, buf + writepos, headerlen, start + writepos); +#endif writepos += headerlen; } else { vol->dir_sync_in_progress = false; diff --git a/iocore/cache/CacheDisk.cc b/iocore/cache/CacheDisk.cc index c2a3a92b3e6..bdd78ca69b8 100644 --- a/iocore/cache/CacheDisk.cc +++ b/iocore/cache/CacheDisk.cc @@ -57,12 +57,21 @@ CacheDisk::open(char *s, off_t blocks, off_t askip, int ahw_sector_size, int fil { path = ats_strdup(s); hw_sector_size = ahw_sector_size; +#ifdef AIO_MODE_MMAP + map = mmap(0,blocks*STORE_BLOCK_SIZE,PROT_READ|PROT_WRITE,MAP_SHARED,fildes,0); + assert(MAP_FAILED!=map); +#else fd = fildes; +#endif skip = askip; start = skip; /* we can't use fractions of store blocks. */ len = blocks; +#ifdef AIO_MODE_MMAP + io.map = map; +#else io.aiocb.aio_fildes = fd; +#endif io.action = this; // determine header size and hence start point by successive approximation uint64_t l; @@ -99,6 +108,9 @@ CacheDisk::open(char *s, off_t blocks, off_t askip, int ahw_sector_size, int fil // SET_HANDLER(&CacheDisk::openStart); +#ifdef AIO_MODE_MMAP + io.mutex = mutex; +#endif io.aiocb.aio_offset = skip; io.aiocb.aio_buf = reinterpret_cast(header); io.aiocb.aio_nbytes = header_len; @@ -138,6 +150,7 @@ CacheDisk::clearDisk() io.aiocb.aio_buf = header; io.aiocb.aio_nbytes = header_len; io.thread = AIO_CALLBACK_THREAD_ANY; + assert(io.mutex); ink_aio_write(&io); return 0; } @@ -224,6 +237,10 @@ CacheDisk::sync() io.aiocb.aio_buf = header; io.aiocb.aio_nbytes = header_len; io.thread = AIO_CALLBACK_THREAD_ANY; +#ifdef AIO_MODE_MMAP + io.mutex = mutex; +#endif + assert(io.mutex); ink_aio_write(&io); return 0; } diff --git a/iocore/cache/CacheVol.cc b/iocore/cache/CacheVol.cc index bf0a68ed3e1..a303ad0a9ca 100644 --- a/iocore/cache/CacheVol.cc +++ b/iocore/cache/CacheVol.cc @@ -369,11 +369,18 @@ CacheVC::scanObject(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) } Lread: +#ifdef AIO_MODE_MMAP + io.map = vol->map; +#else io.aiocb.aio_fildes = vol->fd; +#endif if (static_cast(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast(vol->skip + vol->len)) { io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset; } offset = 0; +#ifdef AIO_MODE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_read(&io) >= 0); Debug("cache_scan_truss", "read %p:scanObject %" PRId64 " %zu", this, (int64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes); return EVENT_CONT; diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc index cf8d0244660..6b0244d512b 100644 --- a/iocore/cache/CacheWrite.cc +++ b/iocore/cache/CacheWrite.cc @@ -383,6 +383,7 @@ Vol::aggWriteDone(int event, Event *e) Dir del_dir; dir_clear(&del_dir); for (int done = 0; done < agg_buf_pos;) { + assert(agg_buffer); Doc *doc = reinterpret_cast(agg_buffer + done); dir_set_offset(&del_dir, header->write_pos + done); dir_delete(&doc->key, this, &del_dir); @@ -752,7 +753,11 @@ Vol::evac_range(off_t low, off_t high, int evac_phase) } if (first) { first->f.done = 1; +#ifdef AIO_MODE_MMAP + io.map = map; +#else io.aiocb.aio_fildes = fd; +#endif io.aiocb.aio_nbytes = dir_approx_size(&first->dir); io.aiocb.aio_offset = this->vol_offset(&first->dir); if (static_cast(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast(skip + len)) { @@ -766,6 +771,9 @@ Vol::evac_range(off_t low, off_t high, int evac_phase) io.thread = AIO_CALLBACK_THREAD_ANY; DDebug("cache_evac", "evac_range evacuating %X %d", (int)dir_tag(&first->dir), (int)dir_offset(&first->dir)); SET_HANDLER(&Vol::evacuateDocReadDone); +#ifdef AIO_MODE_MMAP + io.mutex = mutex; +#endif ink_assert(ink_aio_read(&io) >= 0); return -1; } @@ -1013,6 +1021,12 @@ Vol::agg_wrap() int Vol::aggWrite(int event, void * /* e ATS_UNUSED */) { +#ifdef AIO_MODE_MMAP + if(agg_buffer!=(static_cast(map)+header->write_pos)) { + agg_buffer=static_cast(map)+header->write_pos; + memset(agg_buffer, 0, 0x1000); + } +#endif ink_assert(!is_io_in_progress()); Que(CacheVC, link) tocall; @@ -1030,6 +1044,9 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */) break; } DDebug("agg_read", "copying: %d, %" PRIu64 ", key: %d", agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.slice32(0)); +#ifdef AIO_MODE_MMAP + assert(agg_buffer==(static_cast(map)+header->write_pos)); +#endif int wrotelen = agg_copy(agg_buffer + agg_buf_pos, c); ink_assert(writelen == wrotelen); agg_todo_size -= writelen; @@ -1095,6 +1112,7 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */) ink_assert(sync.head); int l = round_to_approx_size(sizeof(Doc)); agg_buf_pos = l; + assert(agg_buffer); Doc *d = reinterpret_cast(agg_buffer); memset(static_cast(d), 0, sizeof(Doc)); d->magic = DOC_MAGIC; @@ -1105,9 +1123,13 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */) // set write limit header->agg_pos = header->write_pos + agg_buf_pos; - +#ifdef AIO_MODE_MMAP + io.map = map; +#else io.aiocb.aio_fildes = fd; +#endif io.aiocb.aio_offset = header->write_pos; + assert(agg_buffer); io.aiocb.aio_buf = agg_buffer; io.aiocb.aio_nbytes = agg_buf_pos; io.action = this; @@ -1118,6 +1140,7 @@ Vol::aggWrite(int event, void * /* e ATS_UNUSED */) */ io.thread = AIO_CALLBACK_THREAD_AIO; SET_HANDLER(&Vol::aggWriteDone); + assert(io.mutex); ink_aio_write(&io); Lwait: diff --git a/iocore/cache/P_CacheDir.h b/iocore/cache/P_CacheDir.h index 58c1c9ffd3c..121e7b7b067 100644 --- a/iocore/cache/P_CacheDir.h +++ b/iocore/cache/P_CacheDir.h @@ -267,8 +267,11 @@ struct CacheSync : public Continuation { Event *trigger = nullptr; ink_hrtime start_time = 0; int mainEvent(int event, Event *e); +#ifdef AIO_MODE_MMAP + void aio_write(void* fd, char *b, int n, off_t o); +#else void aio_write(int fd, char *b, int n, off_t o); - +#endif CacheSync() : Continuation(new_ProxyMutex()) { SET_HANDLER(&CacheSync::mainEvent); } }; diff --git a/iocore/cache/P_CacheDisk.h b/iocore/cache/P_CacheDisk.h index 18379de9d6e..18ac2f9d70d 100644 --- a/iocore/cache/P_CacheDisk.h +++ b/iocore/cache/P_CacheDisk.h @@ -88,7 +88,11 @@ struct CacheDisk : public Continuation { off_t skip = 0; off_t num_usable_blocks = 0; int hw_sector_size = 0; +#ifdef AIO_MODE_MMAP + void* map = MAP_FAILED; +#else int fd = -1; +#endif off_t free_space = 0; off_t wasted_space = 0; DiskVol **disk_vols = nullptr; diff --git a/iocore/cache/P_CacheVol.h b/iocore/cache/P_CacheVol.h index 95449289a53..748f61c9466 100644 --- a/iocore/cache/P_CacheVol.h +++ b/iocore/cache/P_CacheVol.h @@ -123,8 +123,11 @@ struct Vol : public Continuation { char *path = nullptr; ats_scoped_str hash_text; CryptoHash hash_id; - int fd = -1; - +#ifdef AIO_MODE_MMAP + void* map = MAP_FAILED; +#else + int fd = -1; +#endif char *raw_dir = nullptr; Dir *dir = nullptr; VolHeaderFooter *header = nullptr; @@ -144,7 +147,7 @@ struct Vol : public Continuation { Queue agg; Queue stat_cache_vcs; Queue sync; - char *agg_buffer = nullptr; + char *agg_buffer; int agg_todo_size = 0; int agg_buf_pos = 0; @@ -264,12 +267,17 @@ struct Vol : public Continuation { Vol() : Continuation(new_ProxyMutex()) { open_dir.mutex = mutex; +#ifdef AIO_MODE_MMAP + agg_buffer=nullptr; +#else agg_buffer = (char *)ats_memalign(ats_pagesize(), AGG_SIZE); memset(agg_buffer, 0, AGG_SIZE); +#endif SET_HANDLER(&Vol::aggWrite); } - +#ifndef AIO_MODE_MMAP ~Vol() override { ats_free(agg_buffer); } +#endif }; struct AIO_Callback_handler : public Continuation { diff --git a/src/traffic_server/InkAPI.cc b/src/traffic_server/InkAPI.cc index 509c9974582..f80ce3b7e47 100644 --- a/src/traffic_server/InkAPI.cc +++ b/src/traffic_server/InkAPI.cc @@ -8300,7 +8300,9 @@ TSAIORead(int fd, off_t offset, char *buf, size_t buffSize, TSCont contp) pAIO->aiocb.aio_buf = buf; pAIO->action = pCont; pAIO->thread = pCont->mutex->thread_holding; - +#ifdef AIO_MODE_MMAP + pAIO->mutex=pCont->mutex; +#endif if (ink_aio_read(pAIO, 1) == 1) { return TS_SUCCESS; } @@ -8339,7 +8341,6 @@ TSAIOWrite(int fd, off_t offset, char *buf, const size_t bufSize, TSCont contp) pAIO->aiocb.aio_nbytes = bufSize; pAIO->action = pCont; pAIO->thread = pCont->mutex->thread_holding; - if (ink_aio_write(pAIO, 1) == 1) { return TS_SUCCESS; } diff --git a/src/traffic_server/Makefile.inc b/src/traffic_server/Makefile.inc index 0e5f43cb0a1..72212e56c28 100644 --- a/src/traffic_server/Makefile.inc +++ b/src/traffic_server/Makefile.inc @@ -20,6 +20,7 @@ bin_PROGRAMS += traffic_server/traffic_server traffic_server_traffic_server_CPPFLAGS = \ + -DGIT="\""$(shell git log -1 --format=%h)"\"" \ $(AM_CPPFLAGS) \ $(iocore_include_dirs) \ -I$(abs_top_srcdir)/include \ diff --git a/src/traffic_server/traffic_server.cc b/src/traffic_server/traffic_server.cc index 7f50493ad79..4b719bf864d 100644 --- a/src/traffic_server/traffic_server.cc +++ b/src/traffic_server/traffic_server.cc @@ -1726,6 +1726,10 @@ bind_outputs(const char *bind_stdout_p, const char *bind_stderr_p) int main(int /* argc ATS_UNUSED */, const char **argv) { +#ifdef GIT +(void*)write(2,"GIT " GIT "\n",sizeof(GIT)+4); +#endif + #if TS_HAS_PROFILER HeapProfilerStart("/tmp/ts.hprof"); ProfilerStart("/tmp/ts.prof");