diff --git a/tensorboard/uploader/proto/BUILD b/tensorboard/uploader/proto/BUILD index f96ce1264a..cdc7212e1a 100644 --- a/tensorboard/uploader/proto/BUILD +++ b/tensorboard/uploader/proto/BUILD @@ -10,6 +10,7 @@ exports_files(["LICENSE"]) tb_proto_library( name = "protos_all", srcs = [ + "blob.proto", "export_service.proto", "scalar.proto", "server_info.proto", diff --git a/tensorboard/uploader/proto/blob.proto b/tensorboard/uploader/proto/blob.proto new file mode 100644 index 0000000000..f218306f74 --- /dev/null +++ b/tensorboard/uploader/proto/blob.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package tensorboard.service; + +enum BlobState { + // Object state is unknown. This value should never be used; it is present + // only as a proto3 best practice. + // See https://developers.google.com/protocol-buffers/docs/proto3#enum + UNKNOWN = 0; + // Object is being written and not yet finalized. + UNFINALIZED = 1; + // Object is finalized. + CURRENT = 2; +} \ No newline at end of file diff --git a/tensorboard/uploader/proto/write_service.proto b/tensorboard/uploader/proto/write_service.proto index b5dd3b0bce..6f168e62f6 100644 --- a/tensorboard/uploader/proto/write_service.proto +++ b/tensorboard/uploader/proto/write_service.proto @@ -8,6 +8,7 @@ package tensorboard.service; import "tensorboard/uploader/proto/export_service.proto"; import "tensorboard/uploader/proto/scalar.proto"; import "tensorboard/uploader/proto/tensor.proto"; +import "tensorboard/uploader/proto/blob.proto"; import "tensorboard/compat/proto/summary.proto"; // Service for writing data to TensorBoard.dev. @@ -30,6 +31,16 @@ service TensorBoardWriterService { rpc WriteScalar(WriteScalarRequest) returns (WriteScalarResponse) {} // Request additional tensor data be stored in TensorBoard.dev. rpc WriteTensor(WriteTensorRequest) returns (WriteTensorResponse) {} + // Request to obtain a specific BlobSequence entry, creating it if needed, + // to be subsequently populated with blobs. + rpc GetOrCreateBlobSequence(GetOrCreateBlobSequenceRequest) + returns (GetOrCreateBlobSequenceResponse) {} + // Request the current status of blob data being stored in TensorBoard.dev, + // to support resumable uploads. + rpc GetBlobMetadata(GetBlobMetadataRequest) + returns (GetBlobMetadataResponse) {} + // Request additional blob data be stored in TensorBoard.dev. + rpc WriteBlob(stream WriteBlobRequest) returns (stream WriteBlobResponse) {} // Request that the calling user and all their data be permanently deleted. // Used for testing purposes. rpc DeleteOwnUser(DeleteOwnUserRequest) returns (DeleteOwnUserResponse) {} @@ -190,6 +201,148 @@ message WriteTensorResponse { // This is empty on purpose. } +// Writing a blob sequence +// ======================= +// * The caller requests creation of a new BlobSequence, via the +// GetOrCreateBlobSequence RPC. The response provides a new BlobSequence ID. +// * The caller writes each blob in the sequence separately via the WriteBlob +// RPC as described below, providing the blob sequence ID and sequence index +// for each blob. +// * Multiple WriteBlob streams may operate in parallel (for different blobs), +// with no constraint on their ordering. + +// Resuming a partially written blob sequence +// ========================================== +// * The client starts the upload process with a new GetOrCreateBlobSequence +// RPC. +// * The blob sequences are uniquely keyed by experiment, run, tag, and step. +// Since this is a second (or later) attempt regarding an existing sequence, +// a new sequence is not created. Instead the existing sequence ID is +// returned. +// * The client attempts to upload each blob in the sequence. These uploads +// resume or may be skipped entirely, as needed, according to the below +// procedure. + +// Writing a blob +// ============== +// * The caller instantiates a WriteBlob RPC, which is a bidirectional stream. +// * The caller sends one or more WriteBlobRequests, providing the blob data +// in an ordered sequence of chunks. +// * The caller receives a sequence of WriteBlobResponses in return. Each +// WriteBlobResponse corresponds to one WriteBlobRequest in order, reporting +// the state of the write operation. +// * The caller should not wait to verify that each response reports the +// expected status and size before sending the next request. Doing so reduces +// throughput, with little benefit since any mismatch on the server side will +// cause the RPC to throw an error anyway. +// * Nonetheless the caller may wish to validate these values, if provided, +// whenever the response does arrive. +// * The request size should be less than 4 MiB (2^20 = 4,194,304), because +// that is the default request limit for gRPC. Accounting for the other +// fields of the request, then, and leaving some padding for safety, the data +// chunk should be somewhat smaller-- perhaps simply 4e6 bytes. +// * The last of these requests must set finalize_object=True, and the prior +// ones must not. +// * For a blob <= 4MB, a single request suffices, so it is the last chunk. + +// Resuming a partial blob upload +// ============================== +// * When the client begins an upload by the usual method above, it may turn +// out that the data chunk is redundant with a previous partial upload. +// The serve will raise an error in this case, closing the stream. +// * At this point the client should issue a GetBlobMetadataRequest. The +// GetBlobMetadataResponse reports the state, size, and crc32c of the object +// so far, i.e. taking into account any previously uploaded data. +// * The client checks whether any data remains to upload, and optionally +// validates the cumulative crc32c against the local file (up to the given +// offset). +// * If needed, the client starts a new WriteBlob stream, starting from the +// specified offset. +// * The client may choose to call GetBlobMetadata before starting any +// WriteBlob stream, to avoid the potential for wasted transfer of a +// redundant chunk, at the cost of the initial roundtrip to obtain the +// status. It is up to the client to weigh this tradeoff. + +// Obtain a unique ID for a blob sequence, given the composite key +// (experiment_id, run, tag, step). If such a blob sequence already exists, +// return its ID. If not, create it first, and return the new ID. +message GetOrCreateBlobSequenceRequest { + // Service-wide unique identifier of an uploaded log dir. + // eg: "1r9d0kQkh2laODSZcQXWP" + string experiment_id = 1; + // The name of the run to which the blob sequence belongs, for example + // "/some/path/mnist_experiments/run1/". + string run = 2; + // The name of the tag to which the blob sequence belongs, for example "loss". + string tag = 3; + // Step index within the run. + int64 step = 4; + // The total number of elements expected in the sequence. + // This effectively delares a number of initially empty 'upload slots', + // to be filled with subsequent WriteBlob RPCs. + int64 final_sequence_length = 5; + // Note that metadata.plugin_data.content does not carry the payload. + .tensorboard.SummaryMetadata metadata = 6; +} + +message GetOrCreateBlobSequenceResponse { + // A unique ID for the the requested blob sequence. + string blob_sequence_id = 1; +} + +message GetBlobMetadataRequest { + // The ID of the BlobSequence of which this blob is a member. + string blob_sequence_id = 1; + // The position of this Blob within the BlobSequence + int64 index = 2; +} + +message GetBlobMetadataResponse { + // State of the object (still appending vs. complete). + BlobState blob_state = 1; + // Size of the object in bytes. In the case of a partial upload, this + // reflects only the data actually received so far. + int64 size = 2; + // crc32c of the blob data stored so far, i.e. over the byte range [0, size). + fixed32 crc32c = 3; +} + +// A single chunk of the blob write stream. +// +// Note that the WriteBlobRequest does not mirror the nested structure of +// WriteScalarRequest, because we only ever send one blob at a time. +message WriteBlobRequest { + // The ID of the BlobSequence of which this blob is a member. + string blob_sequence_id = 1; + // The position of this Blob within the BlobSequence + int64 index = 2; + // The bytes in this chunk. + bytes data = 3; + // The position in the blob where this chunk begins. + // This must equal the sum of the sizes of the chunks sent so far. Ignored + // if no data is provided. + int64 offset = 4; + // CRC32C of current data buffer. Clients must include the crc32c for every + // data buffer, to protect against data corruption. Note that for multi-shot + // writes, specifying the crc32c for every data buffer provides stronger + // protection than just providing the final_crc32c at the end of the upload. + fixed32 crc32c = 5; + // Indicates that this is the last chunk of the stream. + bool finalize_object = 6; + // CRC32C of the entire blob. Required, to protect against data corruption. + // This should be set only when finalize_object=True. + fixed32 final_crc32c = 7; +} + +message WriteBlobResponse { + // State of the object (still appending vs. complete). + BlobState blob_state = 1; + // Size of the object in bytes. This is the sum of the chunk sizes + // received from the stream so far. In the response to the final chunk, + // this size should equal the total size of the blob. + int64 size = 2; +} + // Requests that the calling user and all their data be permanently deleted. message DeleteOwnUserRequest { // This is empty on purpose.