Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions proxy/http2/Http2ClientSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,14 @@ struct Http2UpgradeContext {
class Http2Frame
{
public:
// Input frame constructor
Http2Frame(const Http2FrameHeader &h, IOBufferReader *r) : hdr(h), ioreader(r) {}
Http2Frame(Http2FrameType type, Http2StreamId streamid, uint8_t flags) : hdr({0, (uint8_t)type, flags, streamid}) {}
// Output frame contstructor
Http2Frame(Http2FrameType type, Http2StreamId streamid, uint8_t flags, int index) : hdr({0, (uint8_t)type, flags, streamid})
{
alloc(index);
}
~Http2Frame() {}

IOBufferReader *
reader() const
Expand All @@ -101,6 +107,7 @@ class Http2Frame
{
this->ioblock = new_IOBufferBlock();
this->ioblock->alloc(index);
this->ioblock->fill(HTTP2_FRAME_HEADER_LEN);
}

// Return the writeable buffer space for frame payload
Expand All @@ -110,41 +117,53 @@ class Http2Frame
return make_iovec(this->ioblock->end(), this->ioblock->write_avail());
}

// Once the frame has been serialized, update the payload length of frame header.
void
finalize(size_t nbytes)
{
this->set_payload_size(nbytes);
if (this->ioblock) {
ink_assert((int64_t)nbytes <= this->ioblock->write_avail());
this->ioblock->fill(nbytes);

this->hdr.length = this->ioblock->size();
}
}

void
xmit(MIOBuffer *iobuffer)
xmit(MIOBuffer *out_iobuffer)
{
// Write frame header
uint8_t buf[HTTP2_FRAME_HEADER_LEN];
http2_write_frame_header(hdr, make_iovec(buf));
iobuffer->write(buf, sizeof(buf));

// Write frame payload
// It could be empty (e.g. SETTINGS frame with ACK flag)
if (ioblock && ioblock->read_avail() > 0) {
iobuffer->append_block(this->ioblock.get());
// Write frame header to the frame_buffer
http2_write_frame_header(hdr, make_iovec(this->ioblock->start(), HTTP2_FRAME_HEADER_LEN));

// Write the whole block to the output buffer
if (ioblock) {
int block_size = ioblock->read_avail();
if (block_size > 0) {
out_iobuffer->append_block(this->ioblock.get());

// payload should already have been written unless it doesn't all
// fit in the single block
if (out_reader) {
out_iobuffer->write(out_reader, hdr.length + HTTP2_FRAME_HEADER_LEN - block_size);
out_reader->consume(hdr.length + HTTP2_FRAME_HEADER_LEN - block_size);
}
}
}
}

int64_t
size()
{
if (ioblock) {
return HTTP2_FRAME_HEADER_LEN + ioblock->size();
} else {
return HTTP2_FRAME_HEADER_LEN;
}
return HTTP2_FRAME_HEADER_LEN + hdr.length;
}

void
set_payload_size(size_t length)
{
hdr.length = length;
}

void
add_reader(IOBufferReader *reader)
{
out_reader = reader;
}

// noncopyable
Expand All @@ -154,7 +173,8 @@ class Http2Frame
private:
Http2FrameHeader hdr; // frame header
Ptr<IOBufferBlock> ioblock; // frame payload
IOBufferReader *ioreader = nullptr;
IOBufferReader *out_reader = nullptr;
IOBufferReader *ioreader = nullptr;
};

class Http2ClientSession : public ProxySession
Expand Down
84 changes: 40 additions & 44 deletions proxy/http2/Http2ConnectionState.cc
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ rcv_settings_frame(Http2ConnectionState &cstate, const Http2Frame &frame)

// [RFC 7540] 6.5. Once all values have been applied, the recipient MUST
// immediately emit a SETTINGS frame with the ACK flag set.
Http2Frame ackFrame(HTTP2_FRAME_TYPE_SETTINGS, 0, HTTP2_FLAGS_SETTINGS_ACK);
Http2Frame ackFrame(HTTP2_FRAME_TYPE_SETTINGS, 0, HTTP2_FLAGS_SETTINGS_ACK, buffer_size_index[HTTP2_FRAME_TYPE_WINDOW_UPDATE]);
cstate.ua_session->handleEvent(HTTP2_SESSION_EVENT_XMIT, &ackFrame);

return Http2Error(Http2ErrorClass::HTTP2_ERROR_CLASS_NONE);
Expand Down Expand Up @@ -1449,8 +1449,7 @@ Http2ConnectionState::send_a_data_frame(Http2Stream *stream, size_t &payload_len
const size_t write_available_size = std::min(buf_len, static_cast<size_t>(window_size));
payload_length = 0;

uint8_t flags = 0x00;
uint8_t payload_buffer[buf_len];
uint8_t flags = 0x00;
IOBufferReader *_sm = stream->response_get_data_reader();

SCOPED_MUTEX_LOCK(stream_lock, stream->mutex, this_ethread());
Expand All @@ -1460,45 +1459,50 @@ Http2ConnectionState::send_a_data_frame(Http2Stream *stream, size_t &payload_len
return Http2SendDataFrameResult::ERROR;
}

// Select appropriate payload length
if (_sm->is_read_avail_more_than(0)) {
// Are we at the end?
// If we return here, we never send the END_STREAM in the case of a early terminating OS.
// OK if there is no body yet. Otherwise continue on to send a DATA frame and delete the stream
if (!stream->is_body_done() && !_sm->is_read_avail_more_than(0)) {
Http2StreamDebug(this->ua_session, stream->get_id(), "No payload");
return Http2SendDataFrameResult::NO_PAYLOAD;
} else if (_sm->is_read_avail_more_than(0)) {
// We only need to check for window size when there is a payload
if (window_size <= 0) {
Http2StreamDebug(this->ua_session, stream->get_id(), "No window");
return Http2SendDataFrameResult::NO_WINDOW;
}
// Copy into the payload buffer. Seems like we should be able to skip this copy step
payload_length = write_available_size;
payload_length = _sm->read(payload_buffer, static_cast<int64_t>(write_available_size));
} else {
payload_length = 0;
}

// Are we at the end?
// If we return here, we never send the END_STREAM in the case of a early terminating OS.
// OK if there is no body yet. Otherwise continue on to send a DATA frame and delete the stream
if (!stream->is_body_done() && payload_length == 0) {
Http2StreamDebug(this->ua_session, stream->get_id(), "No payload");
return Http2SendDataFrameResult::NO_PAYLOAD;
if (stream->is_body_done() && !_sm->is_read_avail_more_than(write_available_size)) {
flags |= HTTP2_FLAGS_DATA_END_STREAM;
}

if (stream->is_body_done() && !_sm->is_read_avail_more_than(0)) {
flags |= HTTP2_FLAGS_DATA_END_STREAM;
Http2Frame data(HTTP2_FRAME_TYPE_DATA, stream->get_id(), flags, BUFFER_SIZE_INDEX_16K);

IOVec iovec = data.write();
payload_length = std::min(static_cast<size_t>(_sm->read_avail()), write_available_size);

// If the amount we have allocated is just a bit less than the data available,
// leave the rest of the data on the table
size_t write_len = std::min(payload_length, iovec.iov_len - HTTP2_FRAME_HEADER_LEN);
if ((payload_length - write_len) < 1024) {
payload_length = write_len;
}

_sm->read(iovec.iov_base, write_len);
data.finalize(write_len);
data.set_payload_size(payload_length);
if (write_len < payload_length) {
data.add_reader(_sm);
}

// Update window size
this->decrement_client_rwnd(payload_length);
stream->decrement_client_rwnd(payload_length);

// Create frame
Http2StreamDebug(ua_session, stream->get_id(), "Send a DATA frame - client window con: %5zd stream: %5zd payload: %5zd",
_client_rwnd, stream->client_rwnd(), payload_length);

Http2Frame data(HTTP2_FRAME_TYPE_DATA, stream->get_id(), flags);
data.alloc(buffer_size_index[HTTP2_FRAME_TYPE_DATA]);
http2_write_data(payload_buffer, payload_length, data.write());
data.finalize(payload_length);

stream->update_sent_count(payload_length);

// xmit event
Expand Down Expand Up @@ -1592,8 +1596,7 @@ Http2ConnectionState::send_headers_frame(Http2Stream *stream)
} else {
payload_length = BUFFER_SIZE_FOR_INDEX(buffer_size_index[HTTP2_FRAME_TYPE_HEADERS]);
}
Http2Frame headers(HTTP2_FRAME_TYPE_HEADERS, stream->get_id(), flags);
headers.alloc(buffer_size_index[HTTP2_FRAME_TYPE_HEADERS]);
Http2Frame headers(HTTP2_FRAME_TYPE_HEADERS, stream->get_id(), flags, buffer_size_index[HTTP2_FRAME_TYPE_HEADERS]);
http2_write_headers(buf, payload_length, headers.write());
headers.finalize(payload_length);

Expand Down Expand Up @@ -1624,8 +1627,8 @@ Http2ConnectionState::send_headers_frame(Http2Stream *stream)
if (sent + payload_length == header_blocks_size) {
flags |= HTTP2_FLAGS_CONTINUATION_END_HEADERS;
}
Http2Frame continuation_frame(HTTP2_FRAME_TYPE_CONTINUATION, stream->get_id(), flags);
continuation_frame.alloc(buffer_size_index[HTTP2_FRAME_TYPE_CONTINUATION]);
Http2Frame continuation_frame(HTTP2_FRAME_TYPE_CONTINUATION, stream->get_id(), flags,
buffer_size_index[HTTP2_FRAME_TYPE_CONTINUATION]);
http2_write_headers(buf + sent, payload_length, continuation_frame.write());
continuation_frame.finalize(payload_length);
stream->change_state(continuation_frame.header().type, continuation_frame.header().flags);
Expand Down Expand Up @@ -1701,8 +1704,8 @@ Http2ConnectionState::send_push_promise_frame(Http2Stream *stream, URL &url, con
payload_length =
BUFFER_SIZE_FOR_INDEX(buffer_size_index[HTTP2_FRAME_TYPE_PUSH_PROMISE]) - sizeof(push_promise.promised_streamid);
}
Http2Frame push_promise_frame(HTTP2_FRAME_TYPE_PUSH_PROMISE, stream->get_id(), flags);
push_promise_frame.alloc(buffer_size_index[HTTP2_FRAME_TYPE_PUSH_PROMISE]);
Http2Frame push_promise_frame(HTTP2_FRAME_TYPE_PUSH_PROMISE, stream->get_id(), flags,
buffer_size_index[HTTP2_FRAME_TYPE_PUSH_PROMISE]);
Http2StreamId id = this->get_latest_stream_id_out() + 2;
push_promise.promised_streamid = id;
http2_write_push_promise(push_promise, buf, payload_length, push_promise_frame.write());
Expand All @@ -1721,8 +1724,8 @@ Http2ConnectionState::send_push_promise_frame(Http2Stream *stream, URL &url, con
if (sent + payload_length == header_blocks_size) {
flags |= HTTP2_FLAGS_CONTINUATION_END_HEADERS;
}
Http2Frame continuation_frame(HTTP2_FRAME_TYPE_CONTINUATION, stream->get_id(), flags);
continuation_frame.alloc(buffer_size_index[HTTP2_FRAME_TYPE_CONTINUATION]);
Http2Frame continuation_frame(HTTP2_FRAME_TYPE_CONTINUATION, stream->get_id(), flags,
buffer_size_index[HTTP2_FRAME_TYPE_CONTINUATION]);
http2_write_headers(buf + sent, payload_length, continuation_frame.write());
continuation_frame.finalize(payload_length);
// xmit event
Expand Down Expand Up @@ -1771,9 +1774,7 @@ Http2ConnectionState::send_rst_stream_frame(Http2StreamId id, Http2ErrorCode ec)
++stream_error_count;
}

Http2Frame rst_stream(HTTP2_FRAME_TYPE_RST_STREAM, id, 0);

rst_stream.alloc(buffer_size_index[HTTP2_FRAME_TYPE_RST_STREAM]);
Http2Frame rst_stream(HTTP2_FRAME_TYPE_RST_STREAM, id, 0, buffer_size_index[HTTP2_FRAME_TYPE_RST_STREAM]);
http2_write_rst_stream(static_cast<uint32_t>(ec), rst_stream.write());
rst_stream.finalize(HTTP2_RST_STREAM_LEN);

Expand Down Expand Up @@ -1804,8 +1805,7 @@ Http2ConnectionState::send_settings_frame(const Http2ConnectionSettings &new_set

Http2StreamDebug(ua_session, stream_id, "Send SETTINGS frame");

Http2Frame settings(HTTP2_FRAME_TYPE_SETTINGS, stream_id, 0);
settings.alloc(buffer_size_index[HTTP2_FRAME_TYPE_SETTINGS]);
Http2Frame settings(HTTP2_FRAME_TYPE_SETTINGS, stream_id, 0, buffer_size_index[HTTP2_FRAME_TYPE_SETTINGS]);

IOVec iov = settings.write();
uint32_t settings_length = 0;
Expand Down Expand Up @@ -1849,9 +1849,7 @@ Http2ConnectionState::send_ping_frame(Http2StreamId id, uint8_t flag, const uint
{
Http2StreamDebug(ua_session, id, "Send PING frame");

Http2Frame ping(HTTP2_FRAME_TYPE_PING, id, flag);

ping.alloc(buffer_size_index[HTTP2_FRAME_TYPE_PING]);
Http2Frame ping(HTTP2_FRAME_TYPE_PING, id, flag, buffer_size_index[HTTP2_FRAME_TYPE_PING]);
http2_write_ping(opaque_data, ping.write());
ping.finalize(HTTP2_PING_LEN);

Expand All @@ -1873,13 +1871,12 @@ Http2ConnectionState::send_goaway_frame(Http2StreamId id, Http2ErrorCode ec)
HTTP2_INCREMENT_THREAD_DYN_STAT(HTTP2_STAT_CONNECTION_ERRORS_COUNT, this_ethread());
}

Http2Frame frame(HTTP2_FRAME_TYPE_GOAWAY, 0, 0);
Http2Frame frame(HTTP2_FRAME_TYPE_GOAWAY, 0, 0, buffer_size_index[HTTP2_FRAME_TYPE_GOAWAY]);
Http2Goaway goaway;

goaway.last_streamid = id;
goaway.error_code = ec;

frame.alloc(buffer_size_index[HTTP2_FRAME_TYPE_GOAWAY]);
http2_write_goaway(goaway, frame.write());
frame.finalize(HTTP2_GOAWAY_LEN);

Expand All @@ -1896,8 +1893,7 @@ Http2ConnectionState::send_window_update_frame(Http2StreamId id, uint32_t size)
Http2StreamDebug(ua_session, id, "Send WINDOW_UPDATE frame");

// Create WINDOW_UPDATE frame
Http2Frame window_update(HTTP2_FRAME_TYPE_WINDOW_UPDATE, id, 0x0);
window_update.alloc(buffer_size_index[HTTP2_FRAME_TYPE_WINDOW_UPDATE]);
Http2Frame window_update(HTTP2_FRAME_TYPE_WINDOW_UPDATE, id, 0x0, buffer_size_index[HTTP2_FRAME_TYPE_WINDOW_UPDATE]);
http2_write_window_update(static_cast<uint32_t>(size), window_update.write());
window_update.finalize(sizeof(uint32_t));

Expand Down