Skip to content

Commit

Permalink
pw_transfer: Protocol versioning and chunk refactor
Browse files Browse the repository at this point in the history
This adds the concept of versioning to the C++ pw_transfer
implementation. The version of a given transfer is deduced from the
chunks sent and received based on the fields which are present within
them. Notably, there is no explicit version number encoded anywhere on
the wire.

The addition of versioning is done in such a way that transfer
implementations can remain backwards compatible, supporting
communication with users of older protocol versions.

Initially, only a single "legacy" version is supported and used, with
its handling behavior matching the current uses of pw_transfer. An
improved protocol (tentatively "v2") will be rolled out over a series
of subsequent CLs.

To support these goals, the transfer chunk and code using it is
refactored into an encapsulating class, with internal logic for encoding
and decoding different fields based on its configured or inferred
version. A common public API is presented to the rest of the transfer
code, allowing it to be written in a (mostly) version-agnostic manner.

Change-Id: I8584e12ca5b44f1a973c837389e1e00f8c6112f6
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/92640
Reviewed-by: Wyatt Hepler <hepler@google.com>
Commit-Queue: Alexei Frolov <frolv@google.com>
  • Loading branch information
frolv authored and CQ Bot Account committed May 10, 2022
1 parent 56e055a commit 42efd50
Show file tree
Hide file tree
Showing 14 changed files with 1,478 additions and 1,190 deletions.
1 change: 1 addition & 0 deletions pw_transfer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pw_cc_library(
"public/pw_transfer/internal/client_context.h",
"public/pw_transfer/internal/context.h",
"public/pw_transfer/internal/event.h",
"public/pw_transfer/internal/protocol.h",
"public/pw_transfer/internal/server_context.h",
"rate_estimate.cc",
"server_context.cc",
Expand Down
1 change: 1 addition & 0 deletions pw_transfer/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pw_source_set("core") {
"public/pw_transfer/internal/client_context.h",
"public/pw_transfer/internal/context.h",
"public/pw_transfer/internal/event.h",
"public/pw_transfer/internal/protocol.h",
"public/pw_transfer/internal/server_context.h",
"rate_estimate.cc",
"server_context.cc",
Expand Down
122 changes: 79 additions & 43 deletions pw_transfer/chunk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "pw_transfer/internal/chunk.h"

#include "pw_assert/check.h"
#include "pw_protobuf/decoder.h"
#include "pw_status/try.h"
#include "pw_transfer/transfer.pwpb.h"
Expand All @@ -22,7 +23,7 @@ namespace pw::transfer::internal {

namespace ProtoChunk = transfer::Chunk;

Result<uint32_t> ExtractSessionId(ConstByteSpan message) {
Result<uint32_t> Chunk::ExtractSessionId(ConstByteSpan message) {
protobuf::Decoder decoder(message);

while (decoder.Next().ok()) {
Expand All @@ -44,124 +45,159 @@ Result<uint32_t> ExtractSessionId(ConstByteSpan message) {
return Status::DataLoss();
}

Status DecodeChunk(ConstByteSpan message, Chunk& chunk) {
Result<Chunk> Chunk::Parse(ConstByteSpan message) {
protobuf::Decoder decoder(message);
Status status;
uint32_t value;

chunk = {};
Chunk chunk;

// Assume the legacy protocol by default. Field presence in the serialized
// message may change this.
chunk.protocol_version_ = ProtocolVersion::kLegacy;

// Some older versions of the protocol set the deprecated pending_bytes field
// in their chunks. The newer transfer handling code does not process this
// field, instead working only in terms of window_end_offset. If pending_bytes
// is encountered in the serialized message, save its value, then calculate
// window_end_offset from it once parsing is complete.
uint32_t pending_bytes = 0;

while ((status = decoder.Next()).ok()) {
ProtoChunk::Fields field =
static_cast<ProtoChunk::Fields>(decoder.FieldNumber());

switch (field) {
case ProtoChunk::Fields::SESSION_ID:
PW_TRY(decoder.ReadUint32(&chunk.session_id));
PW_TRY(decoder.ReadUint32(&chunk.session_id_));
break;

case ProtoChunk::Fields::PENDING_BYTES:
PW_TRY(decoder.ReadUint32(&value));
chunk.pending_bytes = value;
PW_TRY(decoder.ReadUint32(&pending_bytes));
break;

case ProtoChunk::Fields::MAX_CHUNK_SIZE_BYTES:
PW_TRY(decoder.ReadUint32(&value));
chunk.max_chunk_size_bytes = value;
chunk.set_max_chunk_size_bytes(value);
break;

case ProtoChunk::Fields::MIN_DELAY_MICROSECONDS:
PW_TRY(decoder.ReadUint32(&value));
chunk.min_delay_microseconds = value;
chunk.set_min_delay_microseconds(value);
break;

case ProtoChunk::Fields::OFFSET:
PW_TRY(decoder.ReadUint32(&chunk.offset));
PW_TRY(decoder.ReadUint32(&chunk.offset_));
break;

case ProtoChunk::Fields::DATA:
PW_TRY(decoder.ReadBytes(&chunk.data));
PW_TRY(decoder.ReadBytes(&chunk.payload_));
break;

case ProtoChunk::Fields::REMAINING_BYTES: {
uint64_t remaining;
PW_TRY(decoder.ReadUint64(&remaining));
chunk.remaining_bytes = remaining;
uint64_t remaining_bytes;
PW_TRY(decoder.ReadUint64(&remaining_bytes));
chunk.set_remaining_bytes(remaining_bytes);
break;
}

case ProtoChunk::Fields::STATUS:
PW_TRY(decoder.ReadUint32(&value));
chunk.status = static_cast<Status::Code>(value);
chunk.set_status(static_cast<Status::Code>(value));
break;

case ProtoChunk::Fields::WINDOW_END_OFFSET:
PW_TRY(decoder.ReadUint32(&chunk.window_end_offset));
PW_TRY(decoder.ReadUint32(&chunk.window_end_offset_));
break;

case ProtoChunk::Fields::TYPE: {
uint32_t type;
PW_TRY(decoder.ReadUint32(&type));
chunk.type = static_cast<Chunk::Type>(type);
chunk.type_ = static_cast<Chunk::Type>(type);
break;
}

case ProtoChunk::Fields::RESOURCE_ID:
PW_TRY(decoder.ReadUint32(&value));
chunk.resource_id = value;
chunk.set_resource_id(value);

// The existence of a resource_id field indicates that a newer protocol
// is running.
chunk.protocol_version_ = ProtocolVersion::kVersionTwo;
break;

// Silently ignore any unrecognized fields.
}
}

return status.IsOutOfRange() ? OkStatus() : status;
if (pending_bytes != 0) {
// Compute window_end_offset if it isn't explicitly provided (in older
// protocol versions).
chunk.set_window_end_offset(chunk.offset() + pending_bytes);
}

if (status.ok() || status.IsOutOfRange()) {
return chunk;
}

return status;
}

Result<ConstByteSpan> EncodeChunk(const Chunk& chunk, ByteSpan buffer) {
Result<ConstByteSpan> Chunk::Encode(ByteSpan buffer) const {
PW_CHECK(protocol_version_ != ProtocolVersion::kUnknown,
"Cannot encode a transfer chunk with an unknown protocol version");

ProtoChunk::MemoryEncoder encoder(buffer);

encoder.WriteSessionId(chunk.session_id).IgnoreError();
encoder.WriteSessionId(session_id_).IgnoreError();

if (type_.has_value()) {
encoder.WriteType(static_cast<ProtoChunk::Type>(type_.value()))
.IgnoreError();
}

if (chunk.window_end_offset != 0) {
encoder.WriteWindowEndOffset(chunk.window_end_offset).IgnoreError();
if (window_end_offset_ != 0) {
encoder.WriteWindowEndOffset(window_end_offset_).IgnoreError();
}

if (chunk.pending_bytes.has_value()) {
encoder.WritePendingBytes(chunk.pending_bytes.value()).IgnoreError();
if (protocol_version_ == ProtocolVersion::kLegacy) {
// In the legacy protocol, the pending_bytes field must be set alongside
// window_end_offset, as some transfer implementations require it.
encoder.WritePendingBytes(window_end_offset_ - offset_).IgnoreError();
}
if (chunk.max_chunk_size_bytes.has_value()) {
encoder.WriteMaxChunkSizeBytes(chunk.max_chunk_size_bytes.value())
.IgnoreError();

if (max_chunk_size_bytes_.has_value()) {
encoder.WriteMaxChunkSizeBytes(max_chunk_size_bytes_.value()).IgnoreError();
}
if (chunk.min_delay_microseconds.has_value()) {
encoder.WriteMinDelayMicroseconds(chunk.min_delay_microseconds.value())
if (min_delay_microseconds_.has_value()) {
encoder.WriteMinDelayMicroseconds(min_delay_microseconds_.value())
.IgnoreError();
}
if (chunk.offset != 0) {
encoder.WriteOffset(chunk.offset).IgnoreError();
}
if (!chunk.data.empty()) {
encoder.WriteData(chunk.data).IgnoreError();

if (offset_ != 0) {
encoder.WriteOffset(offset_).IgnoreError();
}
if (chunk.remaining_bytes.has_value()) {
encoder.WriteRemainingBytes(chunk.remaining_bytes.value()).IgnoreError();

if (has_payload()) {
encoder.WriteData(payload_).IgnoreError();
}
if (chunk.status.has_value()) {
encoder.WriteStatus(chunk.status.value().code()).IgnoreError();

if (remaining_bytes_.has_value()) {
encoder.WriteRemainingBytes(remaining_bytes_.value()).IgnoreError();
}

if (chunk.type.has_value()) {
encoder.WriteType(static_cast<ProtoChunk::Type>(chunk.type.value()))
.IgnoreError();
if (status_.has_value()) {
encoder.WriteStatus(status_.value().code()).IgnoreError();
}

if (chunk.resource_id != 0) {
encoder.WriteResourceId(chunk.resource_id).IgnoreError();
if (resource_id_.has_value()) {
encoder.WriteResourceId(resource_id_.value()).IgnoreError();
}

PW_TRY(encoder.status());
return ConstByteSpan(encoder);
}

Status DecodeChunk(ConstByteSpan, Chunk&) { return Status::Unimplemented(); }

} // namespace pw::transfer::internal
Loading

0 comments on commit 42efd50

Please sign in to comment.