diff --git a/src/iocore/cache/CMakeLists.txt b/src/iocore/cache/CMakeLists.txt index a77b47b38d3..3e0e83b9b17 100644 --- a/src/iocore/cache/CMakeLists.txt +++ b/src/iocore/cache/CMakeLists.txt @@ -21,6 +21,7 @@ add_library( Cache.cc CacheDir.cc CacheDisk.cc + CacheDoc.cc CacheEvacuateDocVC.cc CacheHosting.cc CacheHttp.cc diff --git a/src/iocore/cache/CacheDoc.cc b/src/iocore/cache/CacheDoc.cc new file mode 100644 index 00000000000..0a79651f240 --- /dev/null +++ b/src/iocore/cache/CacheDoc.cc @@ -0,0 +1,90 @@ +/** @file + + Operations on cache documents (may also be called fragments). + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "P_CacheDoc.h" + +#include "iocore/eventsystem/IOBuffer.h" + +#include "tscore/ink_hrtime.h" + +#include + +namespace +{ + +char * +iobufferblock_memcpy(char *p, int len, IOBufferBlock const *ab, int offset) +{ + IOBufferBlock const *b = ab; + while (b && len >= 0) { + char *start = b->_start; + char *end = b->_end; + int max_bytes = end - start; + max_bytes -= offset; + if (max_bytes <= 0) { + offset = -max_bytes; + b = b->next.get(); + continue; + } + int bytes = len; + if (bytes >= max_bytes) { + bytes = max_bytes; + } + ::memcpy(p, start + offset, bytes); + p += bytes; + len -= bytes; + b = b->next.get(); + offset = 0; + } + return p; +} + +} // namespace + +void +Doc::set_data(int const len, IOBufferBlock const *block, int const offset) +{ + iobufferblock_memcpy(this->data(), len, block, offset); +} + +void +Doc::calculate_checksum() +{ + this->checksum = 0; + for (char *b = this->hdr(); b < reinterpret_cast(this) + this->len; b++) { + this->checksum += *b; + } +} + +void +Doc::pin(std::uint32_t const pin_in_cache) +{ + // coverity[Y2K38_SAFETY:FALSE] + this->pinned = static_cast(ink_get_hrtime() / HRTIME_SECOND) + pin_in_cache; +} + +void +Doc::unpin() +{ + this->pinned = 0; +} diff --git a/src/iocore/cache/CacheWrite.cc b/src/iocore/cache/CacheWrite.cc index bfd60326b33..f947bacbfde 100644 --- a/src/iocore/cache/CacheWrite.cc +++ b/src/iocore/cache/CacheWrite.cc @@ -273,33 +273,6 @@ CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */) return EVENT_CONT; } -static char * -iobufferblock_memcpy(char *p, int len, IOBufferBlock *ab, int offset) -{ - IOBufferBlock *b = ab; - while (b && len >= 0) { - char *start = b->_start; - char *end = b->_end; - int max_bytes = end - start; - max_bytes -= offset; - if (max_bytes <= 0) { - offset = -max_bytes; - b = b->next.get(); - continue; - } - int bytes = len; - if (bytes >= max_bytes) { - bytes = max_bytes; - } - ::memcpy(p, start + offset, bytes); - p += bytes; - len -= bytes; - b = b->next.get(); - offset = 0; - } - return p; -} - EvacuationBlock * Stripe::force_evacuate_head(Dir const *evac_dir, int pinned) { @@ -641,140 +614,10 @@ Stripe::evac_range(off_t low, off_t high, int evac_phase) int Stripe::_agg_copy(CacheVC *vc) { - off_t o = this->header->write_pos + this->get_agg_buf_pos(); - - if (!vc->f.evacuator) { - uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeof(Doc); - Doc *doc = this->_write_buffer.emplace(this->round_to_approx_size(len)); - IOBufferBlock *res_alt_blk = nullptr; - - ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeof(Doc)); - ink_assert(this->round_to_approx_size(len) == vc->agg_len); - // update copy of directory entry for this document - dir_set_approx_size(&vc->dir, vc->agg_len); - dir_set_offset(&vc->dir, this->offset_to_vol_offset(o)); - ink_assert(this->vol_offset(&vc->dir) < (this->skip + this->len)); - dir_set_phase(&vc->dir, this->header->phase); - - // fill in document header - doc->magic = DOC_MAGIC; - doc->len = len; - doc->hlen = vc->header_len; - doc->doc_type = vc->frag_type; - doc->v_major = CACHE_DB_MAJOR_VERSION; - doc->v_minor = CACHE_DB_MINOR_VERSION; - doc->unused = 0; // force this for forward compatibility. - doc->total_len = vc->total_len; - doc->first_key = vc->first_key; - doc->sync_serial = this->header->sync_serial; - vc->write_serial = doc->write_serial = this->header->write_serial; - doc->checksum = DOC_NO_CHECKSUM; - if (vc->pin_in_cache) { - dir_set_pinned(&vc->dir, 1); - // coverity[Y2K38_SAFETY:FALSE] - doc->pinned = static_cast(ink_get_hrtime() / HRTIME_SECOND) + vc->pin_in_cache; - } else { - dir_set_pinned(&vc->dir, 0); - doc->pinned = 0; - } - - if (vc->f.use_first_key) { - if (doc->data_len() || vc->f.allow_empty_doc) { - doc->key = vc->earliest_key; - } else { // the vector is being written by itself - if (vc->earliest_key.is_zero()) { - do { - rand_CacheKey(&doc->key); - } while (DIR_MASK_TAG(doc->key.slice32(2)) == DIR_MASK_TAG(vc->first_key.slice32(2))); - } else { - prev_CacheKey(&doc->key, &vc->earliest_key); - } - } - dir_set_head(&vc->dir, true); - } else { - doc->key = vc->key; - dir_set_head(&vc->dir, !vc->fragment); - } - - if (vc->f.rewrite_resident_alt) { - ink_assert(vc->f.use_first_key); - Doc *res_doc = reinterpret_cast(vc->first_buf->data()); - res_alt_blk = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), sizeof(Doc) + res_doc->hlen); - doc->key = res_doc->key; - doc->total_len = res_doc->data_len(); - } - // update the new_info object_key, and total_len and dirinfo - if (vc->header_len) { - ink_assert(vc->f.use_first_key); - if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) { - ink_assert(vc->write_vector->count() > 0); - if (!vc->f.update && !vc->f.evac_vector) { - ink_assert(!(vc->first_key.is_zero())); - CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index); - http_info->object_size_set(vc->total_len); - } - // update + data_written => Update case (b) - // need to change the old alternate's object length - if (vc->f.update && vc->total_len) { - CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index); - http_info->object_size_set(vc->total_len); - } - ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK)); - ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len)); - } else { - memcpy(doc->hdr(), vc->header_to_write, vc->header_len); - } - // the single fragment flag is not used in the write call. - // putting it in for completeness. - vc->f.single_fragment = doc->single_fragment(); - } - // move data - if (vc->write_len) { - { - ProxyMutex *mutex ATS_UNUSED = this->mutex.get(); - ink_assert(mutex->thread_holding == this_ethread()); - - Metrics::Counter::increment(cache_rsb.write_bytes, vc->write_len); - Metrics::Counter::increment(this->cache_vol->vol_rsb.write_bytes, vc->write_len); - } - if (vc->f.rewrite_resident_alt) { - iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0); - } else { - iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks.get(), vc->offset); - } - } - if (cache_config_enable_checksum) { - doc->checksum = 0; - for (char *b = doc->hdr(); b < reinterpret_cast(doc) + doc->len; b++) { - doc->checksum += *b; - } - } - if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment) { - ink_assert(doc->hlen); - } - - if (res_alt_blk) { - res_alt_blk->free(); - } - - return vc->agg_len; + if (vc->f.evacuator) { + return this->_copy_evacuator_to_aggregation(vc); } else { - // for evacuated documents, copy the data, and update directory - Doc *doc = reinterpret_cast(vc->buf->data()); - int l = this->round_to_approx_size(doc->len); - - Metrics::Counter::increment(cache_rsb.gc_frags_evacuated); - Metrics::Counter::increment(this->cache_vol->vol_rsb.gc_frags_evacuated); - - doc->sync_serial = this->header->sync_serial; - doc->write_serial = this->header->write_serial; - - this->_write_buffer.add(doc, l); - - vc->dir = vc->overwrite_dir; - dir_set_offset(&vc->dir, this->offset_to_vol_offset(o)); - dir_set_phase(&vc->dir, this->header->phase); - return l; + return this->_copy_writer_to_aggregation(vc); } } diff --git a/src/iocore/cache/P_CacheDoc.h b/src/iocore/cache/P_CacheDoc.h index d1c35063a1f..d501ca5308b 100644 --- a/src/iocore/cache/P_CacheDoc.h +++ b/src/iocore/cache/P_CacheDoc.h @@ -23,6 +23,8 @@ #pragma once +#include "iocore/eventsystem/IOBuffer.h" + #include "tscore/CryptoHash.h" #include @@ -63,6 +65,10 @@ struct Doc { int single_fragment() const; char *hdr(); char *data(); + void set_data(int len, IOBufferBlock const *block, int offset); + void calculate_checksum(); + void pin(std::uint32_t const pin_in_cache); + void unpin(); using self_type = Doc; }; diff --git a/src/iocore/cache/P_CacheVol.h b/src/iocore/cache/P_CacheVol.h index aa3a5a86148..2f1e8377a8f 100644 --- a/src/iocore/cache/P_CacheVol.h +++ b/src/iocore/cache/P_CacheVol.h @@ -330,6 +330,8 @@ class Stripe : public Continuation void _init_data_internal(); void _init_data(); int _agg_copy(CacheVC *vc); + int _copy_writer_to_aggregation(CacheVC *vc); + int _copy_evacuator_to_aggregation(CacheVC *vc); bool flush_aggregate_write_buffer(); AggregateWriteBuffer _write_buffer; diff --git a/src/iocore/cache/Stripe.cc b/src/iocore/cache/Stripe.cc index e9feed52b3c..5c35c0d82a8 100644 --- a/src/iocore/cache/Stripe.cc +++ b/src/iocore/cache/Stripe.cc @@ -21,13 +21,17 @@ limitations under the License. */ -#include "iocore/cache/Cache.h" #include "P_CacheDisk.h" #include "P_CacheDoc.h" #include "P_CacheInternal.h" #include "P_CacheVol.h" +#include "proxy/hdrs/HTTP.h" + +#include "tsutil/Metrics.h" + #include "iocore/eventsystem/EThread.h" +#include "iocore/eventsystem/IOBuffer.h" #include "iocore/eventsystem/Lock.h" #include "tsutil/DbgCtl.h" @@ -38,6 +42,8 @@ #include +using CacheHTTPInfo = HTTPInfo; + namespace { @@ -1010,6 +1016,159 @@ Stripe::shutdown(EThread *shutdown_thread) Dbg(dbg_ctl_cache_dir_sync, "done syncing dir for vol %s", this->hash_text.get()); } +static void +init_document(CacheVC const *vc, Doc *doc, int const len) +{ + doc->magic = DOC_MAGIC; + doc->len = len; + doc->hlen = vc->header_len; + doc->doc_type = vc->frag_type; + doc->v_major = CACHE_DB_MAJOR_VERSION; + doc->v_minor = CACHE_DB_MINOR_VERSION; + doc->unused = 0; // force this for forward compatibility. + doc->total_len = vc->total_len; + doc->first_key = vc->first_key; + doc->checksum = DOC_NO_CHECKSUM; +} + +static void +update_header_info(CacheVC *vc, Doc *doc) +{ + if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) { + ink_assert(vc->write_vector->count() > 0); + if (!vc->f.update && !vc->f.evac_vector) { + ink_assert(!(vc->first_key.is_zero())); + CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index); + http_info->object_size_set(vc->total_len); + } + // update + data_written => Update case (b) + // need to change the old alternate's object length + if (vc->f.update && vc->total_len) { + CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index); + http_info->object_size_set(vc->total_len); + } + ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK)); + ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len)); + } else { + memcpy(doc->hdr(), vc->header_to_write, vc->header_len); + } +} + +static void +update_document_key(CacheVC *vc, Doc *doc) +{ + if (vc->f.use_first_key) { + if (doc->data_len() || vc->f.allow_empty_doc) { + doc->key = vc->earliest_key; + } else { // the vector is being written by itself + if (vc->earliest_key.is_zero()) { + do { + rand_CacheKey(&doc->key); + } while (DIR_MASK_TAG(doc->key.slice32(2)) == DIR_MASK_TAG(vc->first_key.slice32(2))); + } else { + prev_CacheKey(&doc->key, &vc->earliest_key); + } + } + dir_set_head(&vc->dir, true); + } else { + doc->key = vc->key; + dir_set_head(&vc->dir, !vc->fragment); + } +} + +int +Stripe::_copy_writer_to_aggregation(CacheVC *vc) +{ + off_t doc_offset{this->header->write_pos + this->get_agg_buf_pos()}; + uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeof(Doc); + Doc *doc = this->_write_buffer.emplace(this->round_to_approx_size(len)); + IOBufferBlock *res_alt_blk = nullptr; + + ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeof(Doc)); + ink_assert(this->round_to_approx_size(len) == vc->agg_len); + // update copy of directory entry for this document + dir_set_approx_size(&vc->dir, vc->agg_len); + dir_set_offset(&vc->dir, this->offset_to_vol_offset(doc_offset)); + ink_assert(this->vol_offset(&vc->dir) < (this->skip + this->len)); + dir_set_phase(&vc->dir, this->header->phase); + + // fill in document header + init_document(vc, doc, len); + doc->sync_serial = this->header->sync_serial; + vc->write_serial = doc->write_serial = this->header->write_serial; + if (vc->get_pin_in_cache()) { + dir_set_pinned(&vc->dir, 1); + doc->pin(vc->get_pin_in_cache()); + } else { + dir_set_pinned(&vc->dir, 0); + doc->unpin(); + } + + update_document_key(vc, doc); + + if (vc->f.rewrite_resident_alt) { + ink_assert(vc->f.use_first_key); + Doc *res_doc = reinterpret_cast(vc->first_buf->data()); + res_alt_blk = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), sizeof(Doc) + res_doc->hlen); + doc->key = res_doc->key; + doc->total_len = res_doc->data_len(); + } + // update the new_info object_key, and total_len and dirinfo + if (vc->header_len) { + ink_assert(vc->f.use_first_key); + update_header_info(vc, doc); + // the single fragment flag is not used in the write call. + // putting it in for completeness. + vc->f.single_fragment = doc->single_fragment(); + } + // move data + if (vc->write_len) { + ink_assert(this->mutex.get()->thread_holding == this_ethread()); + + Metrics::Counter::increment(cache_rsb.write_bytes); + Metrics::Counter::increment(this->cache_vol->vol_rsb.write_bytes); + + if (vc->f.rewrite_resident_alt) { + doc->set_data(vc->write_len, res_alt_blk, 0); + } else { + doc->set_data(vc->write_len, vc->blocks.get(), vc->offset); + } + } + if (cache_config_enable_checksum) { + doc->calculate_checksum(); + } + if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment) { + ink_assert(doc->hlen); + } + + if (res_alt_blk) { + res_alt_blk->free(); + } + + return vc->agg_len; +} + +int +Stripe::_copy_evacuator_to_aggregation(CacheVC *vc) +{ + Doc *doc = reinterpret_cast(vc->buf->data()); + int approx_size = this->round_to_approx_size(doc->len); + + Metrics::Counter::increment(cache_rsb.gc_frags_evacuated); + Metrics::Counter::increment(this->cache_vol->vol_rsb.gc_frags_evacuated); + + doc->sync_serial = this->header->sync_serial; + doc->write_serial = this->header->write_serial; + + off_t doc_offset{this->header->write_pos + this->_write_buffer.get_buffer_pos()}; + this->_write_buffer.add(doc, approx_size); + + vc->dir = vc->overwrite_dir; + dir_set_offset(&vc->dir, this->offset_to_vol_offset(doc_offset)); + dir_set_phase(&vc->dir, this->header->phase); + return approx_size; +} + bool Stripe::flush_aggregate_write_buffer() {