diff --git a/doc/admin-guide/plugins/slice.en.rst b/doc/admin-guide/plugins/slice.en.rst index dca442d557b..b131178eba6 100644 --- a/doc/admin-guide/plugins/slice.en.rst +++ b/doc/admin-guide/plugins/slice.en.rst @@ -138,9 +138,9 @@ Under normal logging these slice block errors tend to show up as:: By default more detailed stitching errors are written to ``diags.log``. Examples are as follows:: - ERROR: [slice.cc: 288] logSliceError(): 1555705573.639 reason="Non 206 internal block response" uri="http://ats_ep/someasset.mp4" uas="curl" req_range="bytes=1000000-" norm_range="bytes 1000000-52428799/52428800" etag_exp="%221603934496%22" lm_exp="Fri, 19 Apr 2019 18:53:20 GMT" blk_range="21000000-21999999" status_got="206" cr_got="" etag_got="%221603934496%22" lm_got="" cc="no-store" via="" +ERROR: [slice.cc: 288] logSliceError(): 1555705573.639 reason="Non 206 internal block response" uri="http://ats_ep/someasset.mp4" uas="curl" req_range="bytes=1000000-" norm_range="bytes 1000000-52428799/52428800" etag_exp="%221603934496%22" lm_exp="Fri, 19 Apr 2019 18:53:20 GMT" blk_range="21000000-21999999" status_got="206" cr_got="" etag_got="%221603934496%22" lm_got="" cc="no-store" via="" - ERROR: [server.cc: 288] logSliceError(): 1572370000.219 reason="Mismatch block Etag" uri="http://ats_ep/someasset.mp4" uas="curl" req_range="bytes=1092779033-1096299354" norm_range="bytes 1092779033-1096299354/2147483648" etag_exp="%223719843648%22" lm_exp="Tue, 29 Oct 2019 14:40:00 GMT" blk_range="1095000000-1095999999" status_got="206" cr_got="bytes 1095000000-1095999999/2147483648" etag_got="%223719853648%22" lm_got="Tue, 29 Oct 2019 17:26:40 GMT" cc="max-age=10000" via="" +ERROR: [server.cc: 288] logSliceError(): 1572370000.219 reason="Mismatch block Etag" uri="http://ats_ep/someasset.mp4" uas="curl" req_range="bytes=1092779033-1096299354" norm_range="bytes 1092779033-1096299354/2147483648" etag_exp="%223719843648%22" lm_exp="Tue, 29 Oct 2019 14:40:00 GMT" blk_range="1095000000-1095999999" status_got="206" cr_got="bytes 1095000000-1095999999/2147483648" etag_got="%223719853648%22" lm_got="Tue, 29 Oct 2019 17:26:40 GMT" cc="max-age=10000" via="" Whether or how often these detailed log entries are written are configurable plugin options. diff --git a/plugins/experimental/slice/Config.cc b/plugins/experimental/slice/Config.cc index 6d27089dbe4..fb04ebb27b8 100644 --- a/plugins/experimental/slice/Config.cc +++ b/plugins/experimental/slice/Config.cc @@ -98,13 +98,14 @@ Config::fromArgs(int const argc, char const *const argv[]) {const_cast("remap-host"), required_argument, nullptr, 'r'}, {const_cast("pace-errorlog"), required_argument, nullptr, 'p'}, {const_cast("disable-errorlog"), no_argument, nullptr, 'd'}, + {const_cast("throttle"), no_argument, nullptr, 'o'}, {nullptr, 0, nullptr, 0}, }; // getopt assumes args start at '1' so this hack is needed char *const *argvp = (const_cast(argv) - 1); for (;;) { - int const opt = getopt_long(argc + 1, argvp, "b:t:r:p:d", longopts, nullptr); + int const opt = getopt_long(argc + 1, argvp, "b:t:r:p:do", longopts, nullptr); if (-1 == opt) { break; } @@ -149,6 +150,10 @@ Config::fromArgs(int const argc, char const *const argv[]) case 'd': m_paceerrsecs = -1; break; + case 'o': + m_throttle = true; + DEBUG_LOG("Enabling internal block throttling"); + break; default: break; } diff --git a/plugins/experimental/slice/Config.h b/plugins/experimental/slice/Config.h index 8c191b06c0c..42de33d1bef 100644 --- a/plugins/experimental/slice/Config.h +++ b/plugins/experimental/slice/Config.h @@ -30,6 +30,7 @@ struct Config { int64_t m_blockbytes{blockbytesdefault}; std::string m_remaphost; // remap host to use for loopback slice GET + bool m_throttle{false}; // internal block throttling int m_paceerrsecs{0}; // -1 disable logging, 0 no pacing, max 60s // Convert optarg to bytes diff --git a/plugins/experimental/slice/Data.h b/plugins/experimental/slice/Data.h index 30996438c31..3150bd299b8 100644 --- a/plugins/experimental/slice/Data.h +++ b/plugins/experimental/slice/Data.h @@ -20,16 +20,13 @@ #include "ts/ts.h" -#include "Config.h" #include "HttpHeader.h" #include "Range.h" #include "Stage.h" #include -void incrData(); - -void decrData(); +struct Config; struct Data { Data(Data const &) = delete; @@ -61,7 +58,9 @@ struct Data { int64_t m_blockexpected; // body bytes expected int64_t m_blockskip; // number of bytes to skip in this block int64_t m_blockconsumed; // body bytes consumed - bool m_iseos; // server in EOS state + + enum BlockState { Pending, Active, Done, Fail }; + BlockState m_blockstate; // is there an active slice block int64_t m_bytestosend; // header + content bytes to send int64_t m_bytessent; // number of bytes written to the client @@ -86,33 +85,32 @@ struct Data { m_etaglen(0), m_lastmodifiedlen(0), m_statustype(TS_HTTP_STATUS_NONE), - m_bail(false), m_req_range(-1, -1), - m_contentlen(-1) - - , + m_contentlen(-1), m_blocknum(-1), m_blockexpected(0), m_blockskip(0), m_blockconsumed(0), - m_iseos(false) - - , + m_blockstate(Pending), m_bytestosend(0), m_bytessent(0), m_server_block_header_parsed(false), m_server_first_header_parsed(false), m_http_parser(nullptr) { - // incrData(); m_hostname[0] = '\0'; m_lastmodified[0] = '\0'; m_etag[0] = '\0'; +#if defined(COLLECT_STATS) + TSStatIntIncrement(stats::DataCreate, 1); +#endif } ~Data() { - // decrData(); +#if defined(COLLECT_STATS) + TSStatIntIncrement(stats::DataDestroy, 1); +#endif if (nullptr != m_urlbuf) { if (nullptr != m_urlloc) { TSHandleMLocRelease(m_urlbuf, TS_NULL_MLOC, m_urlloc); diff --git a/plugins/experimental/slice/HttpHeader.cc b/plugins/experimental/slice/HttpHeader.cc index 332370576c1..99137018066 100644 --- a/plugins/experimental/slice/HttpHeader.cc +++ b/plugins/experimental/slice/HttpHeader.cc @@ -311,7 +311,8 @@ HttpHeader::toString() const /////// HdrMgr TSParseResult -HdrMgr::populateFrom(TSHttpParser const http_parser, TSIOBufferReader const reader, HeaderParseFunc const parsefunc) +HdrMgr::populateFrom(TSHttpParser const http_parser, TSIOBufferReader const reader, HeaderParseFunc const parsefunc, + int64_t *const bytes) { TSParseResult parse_res = TS_PARSE_CONT; @@ -322,14 +323,14 @@ HdrMgr::populateFrom(TSHttpParser const http_parser, TSIOBufferReader const read m_lochdr = TSHttpHdrCreate(m_buffer); } - int64_t read_avail = TSIOBufferReaderAvail(reader); - if (0 < read_avail) { + int64_t avail = TSIOBufferReaderAvail(reader); + if (0 < avail) { TSIOBufferBlock block = TSIOBufferReaderStart(reader); int64_t consumed = 0; parse_res = TS_PARSE_CONT; - while (nullptr != block && 0 < read_avail) { + while (nullptr != block && 0 < avail) { int64_t blockbytes = 0; char const *const bstart = TSIOBufferBlockReadStart(block, reader, &blockbytes); @@ -341,7 +342,7 @@ HdrMgr::populateFrom(TSHttpParser const http_parser, TSIOBufferReader const read int64_t const bytes_parsed(ptr - bstart); consumed += bytes_parsed; - read_avail -= bytes_parsed; + avail -= bytes_parsed; if (TS_PARSE_CONT == parse_res) { block = TSIOBufferBlockNext(block); @@ -350,6 +351,12 @@ HdrMgr::populateFrom(TSHttpParser const http_parser, TSIOBufferReader const read } } TSIOBufferReaderConsume(reader, consumed); + + if (nullptr != bytes) { + *bytes = consumed; + } + } else if (nullptr != bytes) { + *bytes = 0; } return parse_res; diff --git a/plugins/experimental/slice/HttpHeader.h b/plugins/experimental/slice/HttpHeader.h index c72f776adad..001e5f6480a 100644 --- a/plugins/experimental/slice/HttpHeader.h +++ b/plugins/experimental/slice/HttpHeader.h @@ -53,6 +53,16 @@ struct HttpHeader { return nullptr != m_buffer && nullptr != m_lochdr; } + int + byteSize() const + { + if (isValid()) { + return TSHttpHdrLengthGet(m_buffer, m_lochdr); + } else { + return 0; + } + } + // TS_HTTP_TYPE_UNKNOWN, TS_HTTP_TYPE_REQUEST, TS_HTTP_TYPE_RESPONSE TSHttpType type() const; @@ -205,7 +215,8 @@ struct HdrMgr { TSHttpHdrParseResp Call this multiple times if necessary. */ - TSParseResult populateFrom(TSHttpParser const http_parser, TSIOBufferReader const reader, HeaderParseFunc const parsefunc); + TSParseResult populateFrom(TSHttpParser const http_parser, TSIOBufferReader const reader, HeaderParseFunc const parsefunc, + int64_t *const consumed); bool isValid() const diff --git a/plugins/experimental/slice/Makefile.inc b/plugins/experimental/slice/Makefile.inc index dbac02b4940..21f6166f5aa 100644 --- a/plugins/experimental/slice/Makefile.inc +++ b/plugins/experimental/slice/Makefile.inc @@ -23,7 +23,6 @@ experimental_slice_slice_la_SOURCES = \ experimental/slice/Config.h \ experimental/slice/ContentRange.cc \ experimental/slice/ContentRange.h \ - experimental/slice/Data.cc \ experimental/slice/Data.h \ experimental/slice/HttpHeader.cc \ experimental/slice/HttpHeader.h \ @@ -39,7 +38,9 @@ experimental_slice_slice_la_SOURCES = \ experimental/slice/slice.h \ experimental/slice/Stage.h \ experimental/slice/transfer.cc \ - experimental/slice/transfer.h + experimental/slice/transfer.h \ + experimental/slice/util.cc \ + experimental/slice/util.h check_PROGRAMS += experimental/slice/test_content_range diff --git a/plugins/experimental/slice/Makefile.tsxs b/plugins/experimental/slice/Makefile.tsxs index 8e9449c3e75..f8022a1714e 100644 --- a/plugins/experimental/slice/Makefile.tsxs +++ b/plugins/experimental/slice/Makefile.tsxs @@ -22,7 +22,6 @@ all: $(PLUGIN).so SOURCES = \ Config.cc \ ContentRange.cc \ - Data.cc \ HttpHeader.cc \ Range.cc \ client.cc \ diff --git a/plugins/experimental/slice/Range.cc b/plugins/experimental/slice/Range.cc index ad62c34ae65..c0dfb046d9b 100644 --- a/plugins/experimental/slice/Range.cc +++ b/plugins/experimental/slice/Range.cc @@ -152,6 +152,16 @@ Range::firstBlockFor(int64_t const blocksize) const } } +int64_t +Range::lastBlockFor(int64_t const blocksize) const +{ + if (0 < blocksize && isValid()) { + return std::max((int64_t)0, (m_end - 1) / blocksize); + } else { + return -1; + } +} + Range Range::intersectedWith(Range const &other) const { @@ -162,7 +172,6 @@ bool Range::blockIsInside(int64_t const blocksize, int64_t const blocknum) const { Range const blockrange(blocksize * blocknum, blocksize * (blocknum + 1)); - Range const isec(blockrange.intersectedWith(*this)); return isec.isValid(); diff --git a/plugins/experimental/slice/Range.h b/plugins/experimental/slice/Range.h index 0fb3145ddae..8c8826d2091 100644 --- a/plugins/experimental/slice/Range.h +++ b/plugins/experimental/slice/Range.h @@ -56,6 +56,10 @@ struct Range { */ int64_t firstBlockFor(int64_t const blockbytes) const; + /** block number of last (inclusive) range block + */ + int64_t lastBlockFor(int64_t const blockbytes) const; + /** block intersection */ Range intersectedWith(Range const &other) const; diff --git a/plugins/experimental/slice/Stage.h b/plugins/experimental/slice/Stage.h index 4166071e5eb..f677d649614 100644 --- a/plugins/experimental/slice/Stage.h +++ b/plugins/experimental/slice/Stage.h @@ -20,6 +20,11 @@ #include "ts/ts.h" +#include "slice.h" +#include "util.h" + +#include + struct Channel { TSVIO m_vio{nullptr}; TSIOBuffer m_iobuf{nullptr}; @@ -29,45 +34,65 @@ struct Channel { { if (nullptr != m_reader) { TSIOBufferReaderFree(m_reader); +#if defined(COLLECT_STATS) + TSStatIntDecrement(stats::Reader, 1); +#endif } if (nullptr != m_iobuf) { TSIOBufferDestroy(m_iobuf); } } - void + int64_t drainReader() { - TSAssert(nullptr != m_reader); - int64_t const bytes_avail = TSIOBufferReaderAvail(m_reader); - TSIOBufferReaderConsume(m_reader, bytes_avail); + int64_t consumed = 0; + + if (nullptr != m_reader && reader_avail_more_than(m_reader, 0)) { + int64_t const avail = TSIOBufferReaderAvail(m_reader); + TSIOBufferReaderConsume(m_reader, avail); + consumed = avail; + TSVIONDoneSet(m_vio, TSVIONDoneGet(m_vio) + consumed); + } + + return consumed; } bool - setForRead(TSVConn vc, TSCont contp, int64_t const bytesin //=INT64_MAX - ) + setForRead(TSVConn vc, TSCont contp, int64_t const bytesin) { TSAssert(nullptr != vc); if (nullptr == m_iobuf) { m_iobuf = TSIOBufferCreate(); m_reader = TSIOBufferReaderAlloc(m_iobuf); +#if defined(COLLECT_STATS) + TSStatIntIncrement(stats::Reader, 1); +#endif } else { - drainReader(); + int64_t const drained = drainReader(); + if (0 < drained) { + DEBUG_LOG("Drained from reader: %" PRId64, drained); + } } m_vio = TSVConnRead(vc, contp, m_iobuf, bytesin); return nullptr != m_vio; } bool - setForWrite(TSVConn vc, TSCont contp, int64_t const bytesout //=INT64_MAX - ) + setForWrite(TSVConn vc, TSCont contp, int64_t const bytesout) { TSAssert(nullptr != vc); if (nullptr == m_iobuf) { m_iobuf = TSIOBufferCreate(); m_reader = TSIOBufferReaderAlloc(m_iobuf); +#if defined(COLLECT_STATS) + TSStatIntIncrement(stats::Reader, 1); +#endif } else { - drainReader(); + int64_t const drained = drainReader(); + if (0 < drained) { + DEBUG_LOG("Drained from reader: %" PRId64, drained); + } } m_vio = TSVConnWrite(vc, contp, m_reader, bytesout); return nullptr != m_vio; @@ -85,7 +110,13 @@ struct Channel { bool isOpen() const { - return nullptr != m_iobuf && nullptr != m_reader && nullptr != m_vio; + return nullptr != m_vio; + } + + bool + isDrained() const + { + return nullptr == m_reader || !reader_avail_more_than(m_reader, 0); } }; @@ -112,38 +143,48 @@ struct Stage // upstream or downstream (server or client) if (nullptr != m_vc) { TSVConnClose(m_vc); } - m_vc = vc; - m_read.m_vio = nullptr; - m_write.m_vio = nullptr; + m_read.close(); + m_write.close(); + m_vc = vc; } void - setupVioRead(TSCont contp, int64_t const bytesin = INT64_MAX) + setupVioRead(TSCont contp, int64_t const bytesin) { m_read.setForRead(m_vc, contp, bytesin); } void - setupVioWrite(TSCont contp, int64_t const bytesout = INT64_MAX) + setupVioWrite(TSCont contp, int64_t const bytesout) { m_write.setForWrite(m_vc, contp, bytesout); } void - close() + abort() { + if (nullptr != m_vc) { + TSVConnAbort(m_vc, TS_VC_CLOSE_ABORT); + m_vc = nullptr; + } m_read.close(); m_write.close(); + } + void + close() + { if (nullptr != m_vc) { TSVConnClose(m_vc); m_vc = nullptr; } + m_read.close(); + m_write.close(); } bool isOpen() const { - return nullptr != m_vc && m_read.isOpen() && m_write.isOpen(); + return nullptr != m_vc && (m_read.isOpen() || m_write.isOpen()); } }; diff --git a/plugins/experimental/slice/client.cc b/plugins/experimental/slice/client.cc index adf0a4a0fa6..5d4bde5e690 100644 --- a/plugins/experimental/slice/client.cc +++ b/plugins/experimental/slice/client.cc @@ -18,90 +18,33 @@ #include "client.h" -#include "transfer.h" +#include "Config.h" +#include "util.h" -namespace -{ -void -shutdown(TSCont const contp, Data *const data) -{ - DEBUG_LOG("shutting down transaction"); - delete data; - TSContDestroy(contp); -} - -// create and issue a block request -bool -requestBlock(TSCont contp, Data *const data) -{ - int64_t const blockbeg = (data->m_config->m_blockbytes * data->m_blocknum); - Range blockbe(blockbeg, blockbeg + data->m_config->m_blockbytes); - - char rangestr[1024]; - int rangelen = sizeof(rangestr); - bool const rpstat = blockbe.toStringClosed(rangestr, &rangelen); - TSAssert(rpstat); - - DEBUG_LOG("requestBlock: %s", rangestr); - - // reuse the incoming client header, just change the range - HttpHeader header(data->m_req_hdrmgr.m_buffer, data->m_req_hdrmgr.m_lochdr); - - // add/set sub range key and add slicer tag - bool const rangestat = header.setKeyVal(TS_MIME_FIELD_RANGE, TS_MIME_LEN_RANGE, rangestr, rangelen); - - if (!rangestat) { - ERROR_LOG("Error trying to set range request header %s", rangestr); - return false; - } - - // create virtual connection back into ATS - TSVConn const upvc = TSHttpConnect(reinterpret_cast(&data->m_client_ip)); - - // set up connection with the HttpConnect server, maybe clear old one - data->m_upstream.setupConnection(upvc); - data->m_upstream.setupVioWrite(contp); - - TSHttpHdrPrint(header.m_buffer, header.m_lochdr, data->m_upstream.m_write.m_iobuf); - TSVIOReenable(data->m_upstream.m_write.m_vio); - - /* - std::string const headerstr(header.toString()); - DEBUG_LOG("Headers\n%s", headerstr.c_str()); - */ - - // get ready for data back from the server - data->m_upstream.setupVioRead(contp); - - // anticipate the next server response header - TSHttpParserClear(data->m_http_parser); - data->m_resp_hdrmgr.resetHeader(); - - data->m_blockexpected = 0; - data->m_blockconsumed = 0; - data->m_iseos = false; - data->m_server_block_header_parsed = false; - - return true; -} - -} // namespace +#include // this is called once per transaction when the client sends a req header bool handle_client_req(TSCont contp, TSEvent event, Data *const data) { - if (TS_EVENT_VCONN_READ_READY == event || TS_EVENT_VCONN_READ_COMPLETE == event) { + switch (event) { + case TS_EVENT_VCONN_READ_READY: + case TS_EVENT_VCONN_READ_COMPLETE: { if (nullptr == data->m_http_parser) { data->m_http_parser = TSHttpParserCreate(); } - // the client request header didn't fit into the input buffer: + // Read the header from the buffer + int64_t consumed = 0; if (TS_PARSE_DONE != - data->m_req_hdrmgr.populateFrom(data->m_http_parser, data->m_dnstream.m_read.m_reader, TSHttpHdrParseReq)) { + data->m_req_hdrmgr.populateFrom(data->m_http_parser, data->m_dnstream.m_read.m_reader, TSHttpHdrParseReq, &consumed)) { return false; } + // update the VIO + TSVIO const input_vio = data->m_dnstream.m_read.m_vio; + TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + consumed); + // make the header manipulator HttpHeader header(data->m_req_hdrmgr.m_buffer, data->m_req_hdrmgr.m_lochdr); @@ -123,18 +66,18 @@ handle_client_req(TSCont contp, TSEvent event, Data *const data) bool const isRangeGood = rangebe.fromStringClosed(rangestr); if (isRangeGood) { - DEBUG_LOG("Partial content request"); + DEBUG_LOG("%p Partial content request", data); data->m_statustype = TS_HTTP_STATUS_PARTIAL_CONTENT; } else // signal a 416 needs to be formed and sent { - DEBUG_LOG("Ill formed/unhandled range: %s", rangestr); + DEBUG_LOG("%p Ill formed/unhandled range: %s", data, rangestr); data->m_statustype = TS_HTTP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE; // First block will give Content-Length rangebe = Range(0, data->m_config->m_blockbytes); } } else { - DEBUG_LOG("Full content request"); + DEBUG_LOG("%p Full content request", data); static char const *const valstr = "-"; static size_t const vallen = strlen(valstr); header.setKeyVal(SLICER_MIME_FIELD_INFO, strlen(SLICER_MIME_FIELD_INFO), valstr, vallen); @@ -151,8 +94,8 @@ handle_client_req(TSCont contp, TSEvent event, Data *const data) header.removeKey(TS_MIME_FIELD_X_FORWARDED_FOR, TS_MIME_LEN_X_FORWARDED_FOR); // send the first block request to server - if (!requestBlock(contp, data)) { - shutdown(contp, data); + if (!request_block(contp, data)) { + abort(contp, data); return false; } @@ -163,6 +106,10 @@ handle_client_req(TSCont contp, TSEvent event, Data *const data) header.removeKey(TS_MIME_FIELD_IF_NONE_MATCH, TS_MIME_LEN_IF_NONE_MATCH); header.removeKey(TS_MIME_FIELD_IF_RANGE, TS_MIME_LEN_IF_RANGE); header.removeKey(TS_MIME_FIELD_IF_UNMODIFIED_SINCE, TS_MIME_LEN_IF_UNMODIFIED_SINCE); + } break; + default: { + DEBUG_LOG("%p handle_client_req unhandled event %d %s", data, event, TSHttpEventNameLookup(event)); + } break; } return true; @@ -172,61 +119,47 @@ handle_client_req(TSCont contp, TSEvent event, Data *const data) void handle_client_resp(TSCont contp, TSEvent event, Data *const data) { - if (TS_EVENT_VCONN_WRITE_READY == event || TS_EVENT_VCONN_WRITE_COMPLETE == event) { - transfer_content_bytes(data); - - // done transferring from server to client buffer? - if (data->m_bytestosend <= data->m_bytessent) { - // real amount transferred to client - int64_t const bytessent(TSVIONDoneGet(data->m_dnstream.m_write.m_vio)); - - // is the output buffer drained? - if (data->m_bytestosend <= bytessent) { - data->m_dnstream.close(); - if (!data->m_upstream.m_read.isOpen()) { - shutdown(contp, data); - return; + DEBUG_LOG("%p handle_client_resp %s", data, TSHttpEventNameLookup(event)); + +#if defined(COLLECT_STATS) + TSStatIntIncrement(stats::Client, 1); +#endif + + switch (event) { + case TS_EVENT_VCONN_WRITE_READY: { + if (Data::BlockState::Pending == data->m_blockstate) { + bool start_next_block = true; + + if (data->m_config->m_throttle) { + TSVIO const output_vio = data->m_dnstream.m_write.m_vio; + int64_t const output_done = TSVIONDoneGet(output_vio); + int64_t const output_sent = data->m_bytessent; + int64_t const threshout = data->m_config->m_blockbytes; + + if (threshout < (output_done - output_sent)) { + start_next_block = false; + DEBUG_LOG("%p handle_client_resp: throttling %" PRId64, data, (output_done - output_sent)); } } - // continue allowing the downstream to drain - return; - } - - // error condition from the server side - if (data->m_bail) { - shutdown(contp, data); - return; - } - - // check for upstream eos, maybe request next block - if (data->m_iseos) { - // still need to drain the server side - if (0 < TSIOBufferReaderAvail(data->m_upstream.m_read.m_reader)) { - TSVIOReenable(data->m_dnstream.m_write.m_vio); - return; - } - - // if done or partial block - if (data->m_blocknum < 0 || data->m_blockconsumed < data->m_blockexpected) { - shutdown(contp, data); - return; + if (start_next_block) { + request_block(contp, data); } - - // ready for next block - requestBlock(contp, data); } - } - // client closed connection - else if (TS_EVENT_ERROR == event) { - DEBUG_LOG("got a TS_EVENT_ERROR from the client"); + } break; + case TS_EVENT_VCONN_WRITE_COMPLETE: { + if (TSIsDebugTagSet(PLUGIN_NAME) && reader_avail_more_than(data->m_upstream.m_read.m_reader, 0)) { + int64_t const left = TSIOBufferReaderAvail(data->m_upstream.m_read.m_reader); + DEBUG_LOG("%p WRITE_COMPLETE called with %" PRId64 " bytes left", data, left); + } - // allow the upstream server to drain data->m_dnstream.close(); if (!data->m_upstream.m_read.isOpen()) { shutdown(contp, data); } - } else { - DEBUG_LOG("Unhandled event: %d", event); + } break; + default: { + DEBUG_LOG("%p handle_client_resp unhandled event %d %s", data, event, TSHttpEventNameLookup(event)); + } break; } } diff --git a/plugins/experimental/slice/client.h b/plugins/experimental/slice/client.h index cc27bee3b1d..b91516165d8 100644 --- a/plugins/experimental/slice/client.h +++ b/plugins/experimental/slice/client.h @@ -27,6 +27,8 @@ * New block requests are also initiated by the client. */ +bool requestBlock(TSCont contp, Data *const data); + /** returns true if the incoming vio can be turned off */ bool handle_client_req(TSCont contp, TSEvent event, Data *const data); diff --git a/plugins/experimental/slice/intercept.cc b/plugins/experimental/slice/intercept.cc index e3a325b3d4b..2b6ef3bdfcd 100644 --- a/plugins/experimental/slice/intercept.cc +++ b/plugins/experimental/slice/intercept.cc @@ -26,11 +26,10 @@ int intercept_hook(TSCont contp, TSEvent event, void *edata) { - // DEBUG_LOG("intercept_hook: %d", event); - Data *const data = static_cast(TSContDataGet(contp)); + if (nullptr == data) { - DEBUG_LOG("Events handled after data already torn down"); + ERROR_LOG("intercept_hook called without data"); TSContDestroy(contp); return TS_EVENT_ERROR; } @@ -42,15 +41,15 @@ intercept_hook(TSCont contp, TSEvent event, void *edata) // set up reader from client TSVConn const downvc = static_cast(edata); data->m_dnstream.setupConnection(downvc); - data->m_dnstream.setupVioRead(contp); + data->m_dnstream.setupVioRead(contp, INT64_MAX); } break; + case TS_EVENT_NET_ACCEPT_FAILED: case TS_EVENT_VCONN_INACTIVITY_TIMEOUT: case TS_EVENT_VCONN_ACTIVE_TIMEOUT: - case TS_EVENT_HTTP_TXN_CLOSE: - delete data; - TSContDestroy(contp); - break; + case TS_EVENT_ERROR: { + abort(contp, data); + } break; default: { // data from client -- only the initial header @@ -66,7 +65,7 @@ intercept_hook(TSCont contp, TSEvent event, void *edata) // DEBUG_LOG("shutting down send to server pipe"); TSVConnShutdown(data->m_upstream.m_vc, 0, 1); } - // server has data for us, typically handle just the header + // server has data for us else if (data->m_upstream.m_read.isOpen() && edata == data->m_upstream.m_read.m_vio) { handle_server_resp(contp, event, data); } @@ -75,13 +74,8 @@ intercept_hook(TSCont contp, TSEvent event, void *edata) handle_client_resp(contp, event, data); } else { ERROR_LOG("Unhandled event: %d", event); - /* - std::cerr << __func__ - << ": events received after intercept state torn down" - << std::endl; - */ } - } + } break; } return TS_EVENT_CONTINUE; diff --git a/plugins/experimental/slice/server.cc b/plugins/experimental/slice/server.cc index 8c9c7c6fa28..5356806a52f 100644 --- a/plugins/experimental/slice/server.cc +++ b/plugins/experimental/slice/server.cc @@ -18,9 +18,11 @@ #include "server.h" +#include "Config.h" #include "ContentRange.h" #include "response.h" #include "transfer.h" +#include "util.h" #include "ts/experimental.h" @@ -28,14 +30,6 @@ namespace { -void -shutdown(TSCont const contp, Data *const data) -{ - DEBUG_LOG("shutting down transaction"); - delete data; - TSContDestroy(contp); -} - ContentRange contentRangeFrom(HttpHeader const &header) { @@ -66,15 +60,20 @@ handleFirstServerHeader(Data *const data, TSCont const contp) // DEBUG_LOG("First header\n%s", header.toString().c_str()); - data->m_dnstream.setupVioWrite(contp); + data->m_dnstream.setupVioWrite(contp, INT64_MAX); + + TSVIO const output_vio = data->m_dnstream.m_write.m_vio; + TSIOBuffer const output_buf = data->m_dnstream.m_write.m_iobuf; - // only process a 206, everything else gets a pass through + // only process a 206, everything else gets a (possibly incomplete) + // pass through if (TS_HTTP_STATUS_PARTIAL_CONTENT != header.status()) { DEBUG_LOG("Initial response other than 206: %d", header.status()); - data->m_bail = true; - - TSHttpHdrPrint(header.m_buffer, header.m_lochdr, data->m_dnstream.m_write.m_iobuf); + // Should run TSVIONSetBytes(output_io, hlen + bodybytes); + // int const hlen = TSHttpHdrLengthGet(header.m_buffer, header.m_lochdr); + // TSVIONBytesSet(output_vio, hlen); + TSHttpHdrPrint(header.m_buffer, header.m_lochdr, output_buf); transfer_all_bytes(data); return false; @@ -83,13 +82,10 @@ handleFirstServerHeader(Data *const data, TSCont const contp) ContentRange const blockcr = contentRangeFrom(header); // 206 with bad content range? if (!blockcr.isValid()) { - data->m_bail = true; - static std::string const &msg502 = string502(); - - TSIOBufferWrite(data->m_dnstream.m_write.m_iobuf, msg502.data(), msg502.size()); - TSVIOReenable(data->m_dnstream.m_write.m_vio); - + TSVIONBytesSet(output_vio, msg502.size()); + TSIOBufferWrite(output_buf, msg502.data(), msg502.size()); + TSVIOReenable(output_vio); return false; } @@ -108,18 +104,21 @@ handleFirstServerHeader(Data *const data, TSCont const contp) int64_t const bodybytes = data->m_req_range.size(); - // range past end of data, assume 416 needs to be sent + // range begins past end of data but inside last block, send 416 bool const send416 = (bodybytes <= 0 || TS_HTTP_STATUS_REQUESTED_RANGE_NOT_SATISFIABLE == data->m_statustype); if (send416) { - data->m_bail = true; std::string const &bodystr = bodyString416(); form416HeaderAndBody(header, data->m_contentlen, bodystr); - TSHttpHdrPrint(header.m_buffer, header.m_lochdr, data->m_dnstream.m_write.m_iobuf); + int const hlen = TSHttpHdrLengthGet(header.m_buffer, header.m_lochdr); + int64_t const blen = bodystr.size(); - TSIOBufferWrite(data->m_dnstream.m_write.m_iobuf, bodystr.data(), bodystr.size()); + TSVIONBytesSet(output_vio, int64_t(hlen) + blen); + TSHttpHdrPrint(header.m_buffer, header.m_lochdr, output_buf); + TSIOBufferWrite(output_buf, bodystr.data(), bodystr.size()); + TSVIOReenable(output_vio); - TSVIOReenable(data->m_dnstream.m_write.m_vio); + data->m_upstream.m_read.close(); return false; } @@ -146,8 +145,6 @@ handleFirstServerHeader(Data *const data, TSCont const contp) // corner case, return 500 ?? if (!crstat) { - data->m_bail = true; - data->m_upstream.close(); data->m_dnstream.close(); @@ -156,9 +153,7 @@ handleFirstServerHeader(Data *const data, TSCont const contp) } header.setKeyVal(TS_MIME_FIELD_CONTENT_RANGE, TS_MIME_LEN_CONTENT_RANGE, rangestr, rangelen); - } - // fix up for 200 response - else if (TS_HTTP_STATUS_OK == data->m_statustype) { + } else if (TS_HTTP_STATUS_OK == data->m_statustype) { header.setStatus(TS_HTTP_STATUS_OK); static char const *const reason = TSHttpHdrReasonLookup(TS_HTTP_STATUS_OK); header.setReason(reason, strlen(reason)); @@ -170,15 +165,13 @@ handleFirstServerHeader(Data *const data, TSCont const contp) header.setKeyVal(TS_MIME_FIELD_CONTENT_LENGTH, TS_MIME_LEN_CONTENT_LENGTH, bufstr, buflen); // add the response header length to the total bytes to send - int64_t const headerbytes = TSHttpHdrLengthGet(header.m_buffer, header.m_lochdr); + int const hbytes = TSHttpHdrLengthGet(header.m_buffer, header.m_lochdr); - data->m_bytestosend = headerbytes + bodybytes; - - TSHttpHdrPrint(header.m_buffer, header.m_lochdr, data->m_dnstream.m_write.m_iobuf); - - data->m_bytessent = headerbytes; - - TSVIOReenable(data->m_dnstream.m_write.m_vio); + TSVIONBytesSet(output_vio, hbytes + bodybytes); + data->m_bytestosend = hbytes + bodybytes; + TSHttpHdrPrint(header.m_buffer, header.m_lochdr, output_buf); + data->m_bytessent = hbytes; + TSVIOReenable(output_vio); return true; } @@ -304,7 +297,6 @@ handleNextServerHeader(Data *const data, TSCont const contp) // only process a 206, everything else just aborts if (TS_HTTP_STATUS_PARTIAL_CONTENT != header.status()) { logSliceError("Non 206 internal block response", data, header); - data->m_bail = true; return false; } @@ -312,7 +304,6 @@ handleNextServerHeader(Data *const data, TSCont const contp) ContentRange const blockcr = contentRangeFrom(header); if (!blockcr.isValid() || blockcr.m_length != data->m_contentlen) { logSliceError("Mismatch/Bad block Content-Range", data, header); - data->m_bail = true; return false; } @@ -341,7 +332,7 @@ handleNextServerHeader(Data *const data, TSCont const contp) } if (!same) { - data->m_bail = true; + data->m_upstream.close(); return false; } @@ -356,30 +347,46 @@ handleNextServerHeader(Data *const data, TSCont const contp) void handle_server_resp(TSCont contp, TSEvent event, Data *const data) { - if (TS_EVENT_VCONN_READ_READY == event || TS_EVENT_VCONN_READ_COMPLETE == event) { + DEBUG_LOG("%p handle_server_resp: %s", data, TSHttpEventNameLookup(event)); + +#if defined(COLLECT_STATS) + TSStatIntIncrement(stats::Server, 1); +#endif + + switch (event) { + case TS_EVENT_VCONN_READ_READY: { // has block response header been parsed?? if (!data->m_server_block_header_parsed) { - // the server response header didn't fit into the input buffer?? - if (TS_PARSE_DONE != - data->m_resp_hdrmgr.populateFrom(data->m_http_parser, data->m_upstream.m_read.m_reader, TSHttpHdrParseResp)) { + int64_t consumed = 0; + TSIOBufferReader const reader = data->m_upstream.m_read.m_reader; + TSVIO const input_vio = data->m_upstream.m_read.m_vio; + TSParseResult const res = data->m_resp_hdrmgr.populateFrom(data->m_http_parser, reader, TSHttpHdrParseResp, &consumed); + + TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + consumed); + + // the server response header didn't fit into the input buffer. + if (TS_PARSE_CONT == res) { return; } // very first server response header bool headerStat = false; - if (!data->m_server_first_header_parsed) { - headerStat = handleFirstServerHeader(data, contp); - data->m_server_first_header_parsed = true; - } else { - headerStat = handleNextServerHeader(data, contp); - } - data->m_server_block_header_parsed = true; + if (TS_PARSE_DONE == res) { + if (!data->m_server_first_header_parsed) { + headerStat = handleFirstServerHeader(data, contp); + data->m_server_first_header_parsed = true; + } else { + headerStat = handleNextServerHeader(data, contp); + } + + data->m_server_block_header_parsed = true; + } // kill the upstream and allow dnstream to clean up if (!headerStat) { - data->m_upstream.close(); - data->m_bail = true; + data->m_upstream.abort(); + data->m_blockstate = Data::BlockState::Fail; if (data->m_dnstream.m_write.isOpen()) { TSVIOReenable(data->m_dnstream.m_write.m_vio); } else { @@ -393,41 +400,22 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data) } transfer_content_bytes(data); - } else if (TS_EVENT_VCONN_EOS == event) { - // from testing as far as I can tell, if the sub transaction returns - // a valid header TS_EVENT_VCONN_READ_READY event is always called first. - // this event being called means the input stream is null. - // An upstream transaction that aborts immediately (or a few bytes) - // after it sends a header may end up here with nothing in the upstream - // buffer. - - // this is called when the upstream connection is done. - // make sure to drain all the bytes out before - // issuing the next block request - data->m_iseos = true; - - // corner condition, good source header + 0 length aborted content - // results in no header being read, just an EOS. - // trying to delete the upstream will crash ATS (??) - if (0 == data->m_blockexpected) { - shutdown(contp, data); // this will crash if first block + } break; + case TS_EVENT_VCONN_READ_COMPLETE: { + // fprintf(stderr, "%p: TS_EVENT_VCONN_READ_COMPLETE\n", data); + } break; + case TS_EVENT_VCONN_EOS: { + data->m_blockstate = Data::BlockState::Pending; + data->m_upstream.close(); + + // check for block truncation + if (data->m_blockconsumed < data->m_blockexpected) { + DEBUG_LOG("%p handle_server_resp truncation: %" PRId64 "\n", data, data->m_blockexpected - data->m_blockconsumed); + data->m_blockstate = Data::BlockState::Fail; + // shutdown(contp, data); return; } - transfer_content_bytes(data); - - if (!data->m_dnstream.m_write.isOpen()) // server drain condition - { - shutdown(contp, data); - return; - } - - // all bytes left transferred to client buffer - if (0 == TSIOBufferReaderAvail(data->m_upstream.m_read.m_reader)) { - data->m_upstream.close(); - TSVIOReenable(data->m_dnstream.m_write.m_vio); - } - // prepare for the next request block ++data->m_blocknum; @@ -435,16 +423,45 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data) // issues a speculative request for the first block // in that case fast forward to the real first in range block // Btw this isn't implemented yet, to be handled - int64_t const firstblock(data->m_req_range.firstBlockFor(data->m_config->m_blockbytes)); + int64_t const firstblock = data->m_req_range.firstBlockFor(data->m_config->m_blockbytes); if (data->m_blocknum < firstblock) { data->m_blocknum = firstblock; } - // done processing blocks? - if (!data->m_req_range.blockIsInside(data->m_config->m_blockbytes, data->m_blocknum)) { - data->m_blocknum = -1; // signal value no more blocks + // continue processing blocks? + if (data->m_req_range.blockIsInside(data->m_config->m_blockbytes, data->m_blocknum)) { + // Don't immediately request the next slice if the client + // isn't keeping up + + bool start_next_block = true; + + // throttle condition + if (data->m_config->m_throttle && data->m_dnstream.m_read.isOpen()) { + TSVIO const output_vio = data->m_dnstream.m_write.m_vio; + int64_t const output_done = TSVIONDoneGet(output_vio); + int64_t const output_sent = data->m_bytessent; + int64_t const threshout = data->m_config->m_blockbytes; + + if (threshout < (output_done - output_sent)) { + start_next_block = false; + DEBUG_LOG("%p handle_server_resp: throttling %" PRId64, data, (output_done - output_sent)); + } + } + + if (start_next_block) { + request_block(contp, data); + } + + } else { + data->m_upstream.close(); + data->m_blockstate = Data::BlockState::Done; + if (!data->m_dnstream.m_read.isOpen()) { + shutdown(contp, data); + } } - } else { - DEBUG_LOG("Unhandled event: %d", event); + } break; + default: { + DEBUG_LOG("%p handle_server_resp uhandled event: %s", data, TSHttpEventNameLookup(event)); + } break; } } diff --git a/plugins/experimental/slice/slice.cc b/plugins/experimental/slice/slice.cc index 432e119a966..d6279f75752 100644 --- a/plugins/experimental/slice/slice.cc +++ b/plugins/experimental/slice/slice.cc @@ -27,6 +27,19 @@ #include "ts/ts.h" #include +#include +#include + +#if defined(COLLECT_STATS) +namespace stats +{ +int DataCreate = -1; +int DataDestroy = -1; +int Reader = -1; +int Server = -1; +int Client = -1; +} // namespace stats +#endif // COLLECT_STATS namespace { @@ -143,7 +156,9 @@ read_request(TSHttpTxn txnp, Config *const config) } // we'll intercept this GET and do it ourselves - TSCont const icontp(TSContCreate(intercept_hook, TSMutexCreate())); + TSMutex const mutex = TSContMutexGet(reinterpret_cast(txnp)); + // TSMutex const mutex = TSMutexCreate(); + TSCont const icontp(TSContCreate(intercept_hook, mutex)); TSContDataSet(icontp, (void *)data); TSHttpTxnIntercept(icontp, txnp); return true; @@ -218,7 +233,44 @@ SLICE_EXPORT TSReturnCode TSRemapInit(TSRemapInterface *api_info, char *errbug, int errbuf_size) { - DEBUG_LOG("slice remap is successfully initialized."); + DEBUG_LOG("slice remap initializing."); + +#if defined(COLLECT_STATS) + static bool init = false; + static std::mutex mutex; + + std::lock_guard lock(mutex); + + if (!init) { + init = true; + + std::string const namedatacreate = std::string(PLUGIN_NAME) + ".DataCreate"; + stats::DataCreate = TSStatCreate(namedatacreate.c_str(), TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_SUM); + + assert(0 <= stats::DataCreate); + + std::string const namedatadestroy = std::string(PLUGIN_NAME) + ".DataDestroy"; + stats::DataDestroy = TSStatCreate(namedatadestroy.c_str(), TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_SUM); + + assert(0 <= stats::DataDestroy); + + std::string const namereader = std::string(PLUGIN_NAME) + ".Reader"; + stats::Reader = TSStatCreate(namereader.c_str(), TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_SUM); + + assert(0 <= stats::Reader); + + std::string const nameserver = std::string(PLUGIN_NAME) + ".Server"; + stats::Server = TSStatCreate(nameserver.c_str(), TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_SUM); + + assert(0 <= stats::Server); + + std::string const nameclient = std::string(PLUGIN_NAME) + ".Client"; + stats::Client = TSStatCreate(nameclient.c_str(), TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_SUM); + + assert(0 <= stats::Client); + } +#endif // COLLECT_STATS + return TS_SUCCESS; } diff --git a/plugins/experimental/slice/slice.h b/plugins/experimental/slice/slice.h index 682c0168c5a..d4e040b32f2 100644 --- a/plugins/experimental/slice/slice.h +++ b/plugins/experimental/slice/slice.h @@ -52,3 +52,14 @@ #define ERROR_LOG(fmt, ...) #endif + +#if defined(COLLECT_STATS) +namespace stats +{ +extern int DataCreate; +extern int DataDestroy; +extern int Reader; +extern int Server; +extern int Client; +} // namespace stats +#endif // COLLECT_STATS diff --git a/plugins/experimental/slice/transfer.cc b/plugins/experimental/slice/transfer.cc index c94842e4fae..f83b0882c0b 100644 --- a/plugins/experimental/slice/transfer.cc +++ b/plugins/experimental/slice/transfer.cc @@ -19,64 +19,80 @@ #include "transfer.h" int64_t -transfer_content_bytes(Data *const data) // , char const * const fstr) +transfer_content_bytes(Data *const data) { - int64_t consumed(0); - - // is the downstream is fulfilled or closed - if (!data->m_dnstream.m_write.isOpen()) { - // drain the upstream - if (data->m_upstream.m_read.isOpen()) { - int64_t const avail = TSIOBufferReaderAvail(data->m_upstream.m_read.m_reader); - TSIOBufferReaderConsume(data->m_upstream.m_read.m_reader, avail); - consumed += avail; - } - } else // if (data->m_dnstream.m_write.isOpen()) - { - if (data->m_upstream.m_read.isOpen()) { - int64_t avail = TSIOBufferReaderAvail(data->m_upstream.m_read.m_reader); - if (0 < avail) { - int64_t const toskip = std::min(data->m_blockskip, avail); - - // consume any up front (first block) padding - if (0 < toskip) { - TSIOBufferReaderConsume(data->m_upstream.m_read.m_reader, toskip); - data->m_blockskip -= toskip; - avail -= toskip; - consumed += toskip; - } + // nothing to transfer if there's no source. + if (nullptr == data->m_upstream.m_read.m_reader) { + return 0; + } - if (0 < avail) { - int64_t const bytesleft = (data->m_bytestosend - data->m_bytessent); - int64_t const tocopy = std::min(avail, bytesleft); + TSIOBufferReader const reader = data->m_upstream.m_read.m_reader; + TSIOBuffer const output_buf = data->m_dnstream.m_write.m_iobuf; + TSVIO const output_vio = data->m_dnstream.m_write.m_vio; - if (0 < tocopy) { - int64_t const copied(TSIOBufferCopy(data->m_dnstream.m_write.m_iobuf, data->m_upstream.m_read.m_reader, tocopy, 0)); + int64_t consumed = 0; // input vio bytes visited + int64_t copied = 0; // output bytes transferred - data->m_bytessent += copied; + bool const canWrite = data->m_dnstream.m_write.isOpen(); + bool done = false; - TSIOBufferReaderConsume(data->m_upstream.m_read.m_reader, copied); + TSIOBufferBlock block = TSIOBufferReaderStart(reader); - avail -= copied; - consumed += copied; - } - } + while (!done && nullptr != block) { + int64_t bavail = TSIOBufferBlockReadAvail(block, reader); - // if hit fulfillment start bulk consuming - if (0 < avail && data->m_bytestosend <= data->m_bytessent) { - TSIOBufferReaderConsume(data->m_upstream.m_read.m_reader, avail); - consumed += avail; + if (0 == bavail) { + block = TSIOBufferBlockNext(block); + } else { + int64_t toconsume = 0; + + if (canWrite) { + int64_t const toskip = std::min(data->m_blockskip, bavail); + if (0 < toskip) { // before bytes + toconsume = toskip; + data->m_blockskip -= toskip; + } else { + int64_t const bytesleft = data->m_bytestosend - data->m_bytessent; + if (0 < bytesleft) { // transfer bytes + int64_t const tocopy = std::min(bavail, bytesleft); + int64_t const nbytes = TSIOBufferCopy(output_buf, reader, tocopy, 0); + + done = (nbytes < tocopy); // output buffer stuffed + + copied += nbytes; + data->m_bytessent += nbytes; + + toconsume = nbytes; + } else { // after bytes + toconsume = bavail; + } } + } else { // drain + toconsume = bavail; } - if (0 < consumed) { - TSVIOReenable(data->m_dnstream.m_write.m_vio); + if (0 < toconsume) { + if (bavail == toconsume) { + block = TSIOBufferBlockNext(block); + } + TSIOBufferReaderConsume(reader, toconsume); + consumed += toconsume; } } } + // tell output more data is available + if (0 < copied) { + TSVIOReenable(output_vio); + } + if (0 < consumed) { data->m_blockconsumed += consumed; + + TSVIO const input_vio = data->m_upstream.m_read.m_vio; + if (nullptr != input_vio) { + TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + consumed); + } } return consumed; @@ -86,21 +102,50 @@ transfer_content_bytes(Data *const data) // , char const * const fstr) int64_t transfer_all_bytes(Data *const data) { - DEBUG_LOG("transfer_all_bytes"); - int64_t consumed = 0; + // nothing to transfer if there's no source. + if (nullptr == data->m_upstream.m_read.m_reader || !data->m_dnstream.m_write.isOpen()) { + return 0; + } + + int64_t consumed = 0; // input vio bytes visited + + TSIOBufferReader const reader = data->m_upstream.m_read.m_reader; + TSIOBuffer const output_buf = data->m_dnstream.m_write.m_iobuf; + + bool done = false; + + TSIOBufferBlock block = TSIOBufferReaderStart(reader); - if (data->m_dnstream.m_write.isOpen()) { - int64_t const read_avail = TSIOBufferReaderAvail(data->m_upstream.m_read.m_reader); + while (!done && nullptr != block) { + int64_t bavail = TSIOBufferBlockReadAvail(block, reader); - if (0 < read_avail) { - int64_t const copied(TSIOBufferCopy(data->m_dnstream.m_write.m_iobuf, data->m_upstream.m_read.m_reader, read_avail, 0)); + if (0 == bavail) { + block = TSIOBufferBlockNext(block); + } else { + int64_t const nbytes = TSIOBufferCopy(output_buf, reader, bavail, 0); + done = nbytes < bavail; // output buffer is full - if (0 < copied) { - TSIOBufferReaderConsume(data->m_upstream.m_read.m_reader, copied); - consumed = copied; + if (0 < nbytes) { + if (bavail == nbytes) { + block = TSIOBufferBlockNext(block); + } + TSIOBufferReaderConsume(reader, nbytes); + consumed += nbytes; } } } + if (0 < consumed) { + TSVIO const output_vio = data->m_dnstream.m_write.m_vio; + if (nullptr != output_vio) { + TSVIOReenable(output_vio); + } + + TSVIO const input_vio = data->m_upstream.m_read.m_vio; + if (nullptr != input_vio) { + TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + consumed); + } + } + return consumed; } diff --git a/plugins/experimental/slice/transfer.h b/plugins/experimental/slice/transfer.h index de3e1766d04..a63bb243a2d 100644 --- a/plugins/experimental/slice/transfer.h +++ b/plugins/experimental/slice/transfer.h @@ -32,3 +32,6 @@ int64_t transfer_content_bytes(Data *const data); // , char const * const fstr); // transfer all bytes from the server (error condition) int64_t transfer_all_bytes(Data *const data); + +// Signal the input about write state +void signal_input(TSVIO const input_vio, int64_t const consumed); diff --git a/plugins/experimental/slice/util.cc b/plugins/experimental/slice/util.cc new file mode 100644 index 00000000000..f3ae7d2bcf1 --- /dev/null +++ b/plugins/experimental/slice/util.cc @@ -0,0 +1,134 @@ +/** @file + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "util.h" + +#include "Config.h" +#include "Data.h" + +void +shutdown(TSCont const contp, Data *const data) +{ + DEBUG_LOG("shutting down transaction"); + TSContDataSet(contp, nullptr); + delete data; + TSContDestroy(contp); +} + +void +abort(TSCont const contp, Data *const data) +{ + DEBUG_LOG("aborting transaction"); + TSContDataSet(contp, nullptr); + data->m_upstream.abort(); + data->m_dnstream.abort(); + delete data; + TSContDestroy(contp); +} + +// create and issue a block request +bool +request_block(TSCont contp, Data *const data) +{ + // ensure no upstream connection + if (data->m_upstream.m_read.isOpen()) { + ERROR_LOG("Block request already in flight!"); + return false; + } + + if (Data::BlockState::Pending != data->m_blockstate) { + ERROR_LOG("request_block called with non Pending state!"); + return false; + } + + int64_t const blockbeg = (data->m_config->m_blockbytes * data->m_blocknum); + Range blockbe(blockbeg, blockbeg + data->m_config->m_blockbytes); + + char rangestr[1024]; + int rangelen = sizeof(rangestr); + bool const rpstat = blockbe.toStringClosed(rangestr, &rangelen); + TSAssert(rpstat); + + DEBUG_LOG("requestBlock: %s", rangestr); + + // reuse the incoming client header, just change the range + HttpHeader header(data->m_req_hdrmgr.m_buffer, data->m_req_hdrmgr.m_lochdr); + + // add/set sub range key and add slicer tag + bool const rangestat = header.setKeyVal(TS_MIME_FIELD_RANGE, TS_MIME_LEN_RANGE, rangestr, rangelen); + + if (!rangestat) { + ERROR_LOG("Error trying to set range request header %s", rangestr); + return false; + } + + // create virtual connection back into ATS + TSVConn const upvc = TSHttpConnectWithPluginId((sockaddr *)&data->m_client_ip, PLUGIN_NAME, 0); + + int const hlen = TSHttpHdrLengthGet(header.m_buffer, header.m_lochdr); + + // set up connection with the HttpConnect server + data->m_upstream.setupConnection(upvc); + data->m_upstream.setupVioWrite(contp, hlen); + + // Send full request + TSHttpHdrPrint(header.m_buffer, header.m_lochdr, data->m_upstream.m_write.m_iobuf); + TSVIOReenable(data->m_upstream.m_write.m_vio); + + /* + std::string const headerstr(header.toString()); + DEBUG_LOG("Headers\n%s", headerstr.c_str()); + */ + + // get ready for data back from the server + data->m_upstream.setupVioRead(contp, INT64_MAX); + + // anticipate the next server response header + TSHttpParserClear(data->m_http_parser); + data->m_resp_hdrmgr.resetHeader(); + + data->m_blockexpected = 0; + data->m_blockconsumed = 0; + data->m_blockstate = Data::BlockState::Active; + data->m_server_block_header_parsed = false; + + return true; +} + +bool +reader_avail_more_than(TSIOBufferReader const reader, int64_t bytes) +{ + TSIOBufferBlock block = TSIOBufferReaderStart(reader); + + if (nullptr == block) { + return false; + } + + while (nullptr != block) { + int64_t const blockbytes = TSIOBufferBlockReadAvail(block, reader); + if (bytes < blockbytes) { + return true; + } else { + bytes -= blockbytes; + } + + block = TSIOBufferBlockNext(block); + } + + return false; +} diff --git a/plugins/experimental/slice/util.h b/plugins/experimental/slice/util.h new file mode 100644 index 00000000000..9da6f368a3b --- /dev/null +++ b/plugins/experimental/slice/util.h @@ -0,0 +1,36 @@ +/** @file + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include "ts/ts.h" + +struct Data; + +/** Functions to deal with the connection to the client. + * Body content transfers are handled by the client. + * New block requests are also initiated by the client. + */ + +void shutdown(TSCont const contp, Data *const data); + +void abort(TSCont const contp, Data *const data); + +bool request_block(TSCont contp, Data *const data); + +bool reader_avail_more_than(TSIOBufferReader const reader, int64_t bytes);