Skip to content

Commit

Permalink
Automated Code Change
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 701691089
  • Loading branch information
tensorflower-gardener authored and tensorflow-copybara committed Dec 1, 2024
1 parent 29d8656 commit 84b3e7a
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 141 deletions.
10 changes: 5 additions & 5 deletions tensorflow_serving/batching/batch_scheduler_retrier_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class BrokenScheduler : public BatchScheduler<FakeTask> {
BrokenScheduler() = default;
~BrokenScheduler() override = default;

Status Schedule(std::unique_ptr<FakeTask>* task) override {
absl::Status Schedule(std::unique_ptr<FakeTask>* task) override {
++num_submit_calls_;
return errors::Unknown("BrokenScheduler faithfully failing");
}
Expand All @@ -76,7 +76,7 @@ class StubbornScheduler : public BatchScheduler<FakeTask> {
: num_attempts_to_succeed_(num_attempts_to_succeed) {}
~StubbornScheduler() override = default;

Status Schedule(std::unique_ptr<FakeTask>* task) override {
absl::Status Schedule(std::unique_ptr<FakeTask>* task) override {
++num_attempts_;
if (num_attempts_ >= num_attempts_to_succeed_) {
std::unique_ptr<FakeTask> consumed_task = std::move(*task);
Expand Down Expand Up @@ -124,7 +124,7 @@ TEST(BatchSchedulerRetrierTest, PermanentFailure) {
TF_CHECK_OK(BatchSchedulerRetrier<FakeTask>::Create(
options, std::move(broken_scheduler), &retrier));
auto task = std::unique_ptr<FakeTask>(new FakeTask);
Status status = retrier->Schedule(&task);
absl::Status status = retrier->Schedule(&task);
ASSERT_FALSE(status.ok());
EXPECT_EQ(error::UNKNOWN, status.code());
EXPECT_FALSE(task == nullptr);
Expand Down Expand Up @@ -154,7 +154,7 @@ TEST(BatchSchedulerRetrierTest, MaxTime) {
{}, "RunRetrier",
[&retrier, &expect_success, &done]() {
auto task = std::unique_ptr<FakeTask>(new FakeTask);
Status status = retrier->Schedule(&task);
absl::Status status = retrier->Schedule(&task);
EXPECT_EQ(expect_success, status.ok());
if (!status.ok()) {
EXPECT_EQ(error::UNAVAILABLE, status.code());
Expand Down Expand Up @@ -196,7 +196,7 @@ TEST(BatchSchedulerRetrierTest, RetryDelay) {
{}, "RunRetrier",
[&retrier, &done]() {
auto task = std::unique_ptr<FakeTask>(new FakeTask);
Status status = retrier->Schedule(&task);
absl::Status status = retrier->Schedule(&task);
TF_EXPECT_OK(status);
done.Notify();
}));
Expand Down
106 changes: 55 additions & 51 deletions tensorflow_serving/batching/batching_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class BatchingSession : public ServingSession {
// signatures, and for each one supplies a lambda to construct a batch
// scheduler given a process-batch callback. See batching_session.h for
// example usage.
static Status Create(
static absl::Status Create(
const BatchingSessionOptions& options, std::unique_ptr<Session> wrapped,
const std::vector<SignatureWithBatchingSessionSchedulerCreator>&
signatures_with_scheduler_creators,
Expand All @@ -176,7 +176,7 @@ class BatchingSession : public ServingSession {
// Same as above but allows for specification of a default scheduler creator
// which enables requests that don't match an exact signature to also
// have batching.
static Status Create(
static absl::Status Create(
const BatchingSessionOptions& options, std::unique_ptr<Session> wrapped,
const std::vector<SignatureWithBatchingSessionSchedulerCreator>&
signatures_with_scheduler_creators,
Expand All @@ -186,10 +186,10 @@ class BatchingSession : public ServingSession {

~BatchingSession() override = default;

Status Run(const std::vector<std::pair<string, Tensor>>& inputs,
const std::vector<string>& output_tensor_names,
const std::vector<string>& target_node_names,
std::vector<Tensor>* outputs) override;
absl::Status Run(const std::vector<std::pair<string, Tensor>>& inputs,
const std::vector<string>& output_tensor_names,
const std::vector<string>& target_node_names,
std::vector<Tensor>* outputs) override;

// RunOptions handling:
// Since multiple of these Run() calls get backed into a single call to the
Expand All @@ -206,31 +206,33 @@ class BatchingSession : public ServingSession {
// assuming all individual tasks in a batch have equal cost, which is the
// assumption before splitting is introduced), the rest of fields in
// `RunMetadata` are copied from the processing result of first split.
Status Run(const RunOptions& run_options,
const std::vector<std::pair<string, Tensor>>& inputs,
const std::vector<string>& output_tensor_names,
const std::vector<string>& target_node_names,
std::vector<Tensor>* outputs, RunMetadata* run_metadata) override;
absl::Status Run(const RunOptions& run_options,
const std::vector<std::pair<string, Tensor>>& inputs,
const std::vector<string>& output_tensor_names,
const std::vector<string>& target_node_names,
std::vector<Tensor>* outputs,
RunMetadata* run_metadata) override;

// Similar to the function above, but takes an additional
// 'thread_pool_options' to pass to the underlying Session's Run(). We select
// an arbitrary 'thread_pool_options' (typically they are the same across
// calls).
Status Run(const RunOptions& run_options,
const std::vector<std::pair<string, Tensor>>& inputs,
const std::vector<string>& output_tensor_names,
const std::vector<string>& target_node_names,
std::vector<Tensor>* outputs, RunMetadata* run_metadata,
const thread::ThreadPoolOptions& thread_pool_options) override;
absl::Status Run(
const RunOptions& run_options,
const std::vector<std::pair<string, Tensor>>& inputs,
const std::vector<string>& output_tensor_names,
const std::vector<string>& target_node_names,
std::vector<Tensor>* outputs, RunMetadata* run_metadata,
const thread::ThreadPoolOptions& thread_pool_options) override;

Status ListDevices(std::vector<DeviceAttributes>* response) override;
absl::Status ListDevices(std::vector<DeviceAttributes>* response) override;

private:
explicit BatchingSession(const BatchingSessionOptions& options,
const std::string& thread_pool_name);

// Helper fucntion to run the session.
Status InternalRun(
absl::Status InternalRun(
const RunOptions& run_options,
const std::vector<std::pair<string, Tensor>>& inputs,
const std::vector<string>& output_tensor_names,
Expand All @@ -242,28 +244,28 @@ class BatchingSession : public ServingSession {
// analyzing the 0th dimension size of each of the tensors. All tensors in the
// list must have the same 0th dimension size to be batchable. If the sizes
// are not all identical, returns an error.
Status ComputeInputSize(const std::vector<std::pair<string, Tensor>>& inputs,
size_t* size) const;
absl::Status ComputeInputSize(
const std::vector<std::pair<string, Tensor>>& inputs, size_t* size) const;

// Merges the input tensors in a batch, via concatenation of correspondingly-
// named tensors. Puts the merged inputs in the order they are in in the
// signature. Assumes 'batch' is non-empty. Returns an error if there are any
// mismatches among the tasks in the batch that violate the constraints for
// batchability.
Status MergeInputTensors(
absl::Status MergeInputTensors(
const TensorSignature& signature, const Batch<BatchingSessionTask>& batch,
std::vector<std::pair<string, Tensor>>* merged_inputs);

// Splits the output of a batched call to 'wrapped_->Run()' into individual
// task outputs. Assumes the output tensor order matches the signature.
Status SplitOutputTensors(const TensorSignature& signature,
const std::vector<Tensor>& combined_outputs,
Batch<BatchingSessionTask>* batch);
absl::Status SplitOutputTensors(const TensorSignature& signature,
const std::vector<Tensor>& combined_outputs,
Batch<BatchingSessionTask>* batch);

// Splits RunMetadata parts (e.g. costgraph attribution) into individual task
// outputs.
Status SplitRunMetadata(RunMetadata* batch_metadata,
Batch<BatchingSessionTask>* batch);
absl::Status SplitRunMetadata(RunMetadata* batch_metadata,
Batch<BatchingSessionTask>* batch);

// Processes one batch of Run() calls with 'signature'. Called by
// 'batch_scheduler_' in a batch thread.
Expand Down Expand Up @@ -295,7 +297,7 @@ class BatchingSession : public ServingSession {
TF_DISALLOW_COPY_AND_ASSIGN(BatchingSession);
};

Status BatchingSession::Create(
absl::Status BatchingSession::Create(
const BatchingSessionOptions& options, std::unique_ptr<Session> wrapped,
const std::vector<SignatureWithBatchingSessionSchedulerCreator>&
signatures_with_scheduler_creators,
Expand All @@ -309,7 +311,7 @@ Status BatchingSession::Create(
return status;
}

Status BatchingSession::Create(
absl::Status BatchingSession::Create(
const BatchingSessionOptions& options, std::unique_ptr<Session> wrapped,
const std::vector<SignatureWithBatchingSessionSchedulerCreator>&
signatures_with_scheduler_creators,
Expand Down Expand Up @@ -339,7 +341,7 @@ Status BatchingSession::Create(
return absl::OkStatus();
}

Status BatchingSession::Run(
absl::Status BatchingSession::Run(
const std::vector<std::pair<string, Tensor>>& inputs,
const std::vector<string>& output_tensor_names,
const std::vector<string>& target_node_names,
Expand All @@ -349,7 +351,7 @@ Status BatchingSession::Run(
outputs, &run_metadata);
}

Status BatchingSession::Run(
absl::Status BatchingSession::Run(
const RunOptions& run_options,
const std::vector<std::pair<string, Tensor>>& inputs,
const std::vector<string>& output_tensor_names,
Expand All @@ -359,7 +361,7 @@ Status BatchingSession::Run(
target_node_names, outputs, run_metadata, absl::nullopt);
}

Status BatchingSession::Run(
absl::Status BatchingSession::Run(
const RunOptions& run_options,
const std::vector<std::pair<string, Tensor>>& inputs,
const std::vector<string>& output_tensor_names,
Expand All @@ -371,7 +373,7 @@ Status BatchingSession::Run(
thread_pool_options);
}

Status BatchingSession::InternalRun(
absl::Status BatchingSession::InternalRun(
const RunOptions& run_options,
const std::vector<std::pair<string, Tensor>>& inputs,
const std::vector<string>& output_tensor_names,
Expand Down Expand Up @@ -434,7 +436,7 @@ Status BatchingSession::InternalRun(
outputs->clear();

Notification done;
Status status;
absl::Status status;
auto task = std::unique_ptr<BatchingSessionTask>(new BatchingSessionTask);
task->enqueue_time_micros = EnvTime::NowMicros();
task->run_options = run_options;
Expand All @@ -455,15 +457,16 @@ Status BatchingSession::InternalRun(
return status;
}

Status BatchingSession::ListDevices(std::vector<DeviceAttributes>* response) {
absl::Status BatchingSession::ListDevices(
std::vector<DeviceAttributes>* response) {
return wrapped_->ListDevices(response);
}

BatchingSession::BatchingSession(const BatchingSessionOptions& options,
const std::string& thread_pool_name)
: options_(options), thread_pool_name_(thread_pool_name) {}

Status BatchingSession::ComputeInputSize(
absl::Status BatchingSession::ComputeInputSize(
const std::vector<std::pair<string, Tensor>>& inputs, size_t* size) const {
TF_RETURN_IF_ERROR(::tensorflow::serving::ComputeTensorBatchSize(
inputs, size,
Expand All @@ -480,7 +483,7 @@ Status BatchingSession::ComputeInputSize(
return absl::OkStatus();
}

Status BatchingSession::MergeInputTensors(
absl::Status BatchingSession::MergeInputTensors(
const TensorSignature& signature, const Batch<BatchingSessionTask>& batch,
std::vector<std::pair<string, Tensor>>* merged_inputs) {
DCHECK_GE(batch.num_tasks(), 1);
Expand Down Expand Up @@ -574,7 +577,8 @@ Status BatchingSession::MergeInputTensors(
"One or more tasks does not conform to batch signature");
}
Tensor concated;
const Status concat_status = tensor::Concat(tensors->second, &concated);
const absl::Status concat_status =
tensor::Concat(tensors->second, &concated);
DCHECK(concat_status.ok()) << concat_status.ToString();
if (!concat_status.ok()) {
return errors::Internal("Tensor concat operation failed: ",
Expand All @@ -586,7 +590,7 @@ Status BatchingSession::MergeInputTensors(
return absl::OkStatus();
}

Status BatchingSession::SplitOutputTensors(
absl::Status BatchingSession::SplitOutputTensors(
const TensorSignature& signature,
const std::vector<Tensor>& combined_outputs,
Batch<BatchingSessionTask>* batch) {
Expand Down Expand Up @@ -633,7 +637,7 @@ Status BatchingSession::SplitOutputTensors(
}

std::vector<Tensor> split_tensor;
const Status split_status =
const absl::Status split_status =
tensor::Split(tensor, task_sizes_plus_optional_padding, &split_tensor);
DCHECK(split_status.ok()) << split_status.ToString();
if (!split_status.ok()) {
Expand Down Expand Up @@ -673,8 +677,8 @@ Status BatchingSession::SplitOutputTensors(
return absl::OkStatus();
}

Status BatchingSession::SplitRunMetadata(RunMetadata* batch_metadata,
Batch<BatchingSessionTask>* batch) {
absl::Status BatchingSession::SplitRunMetadata(
RunMetadata* batch_metadata, Batch<BatchingSessionTask>* batch) {
if (batch->num_tasks() > 0) {
if (batch_metadata->has_cost_graph()) {
// Scale the batch aggregated to reflect the cost of an individual request
Expand Down Expand Up @@ -725,7 +729,7 @@ void BatchingSession::ProcessBatch(
// Regardless of the outcome, we need to propagate the status to the
// individual tasks and signal that they are done. We use MakeCleanup() to
// ensure that this happens no matter how we exit the method below.
Status status;
absl::Status status;
auto finally = gtl::MakeCleanup([&status, &batch] {
for (int i = 0; i < batch->num_tasks(); ++i) {
BatchingSessionTask* task = batch->mutable_task(i);
Expand Down Expand Up @@ -764,9 +768,9 @@ void BatchingSession::ProcessBatch(
->Add(dequeue_time_micros - task.enqueue_time_micros);
}
if (all_tasks_timeout_exceeded) {
status = Status(static_cast<tensorflow::errors::Code>(
absl::StatusCode::kResourceExhausted),
"Run() timeout exceeded while waiting in batching queue");
status = absl::Status(
static_cast<absl::StatusCode>(absl::StatusCode::kResourceExhausted),
"Run() timeout exceeded while waiting in batching queue");
return;
}

Expand Down Expand Up @@ -817,7 +821,7 @@ void BatchingSession::ProcessBatch(
// Share implementation between `SplitInputTask` here and
// `BatchResource::SplitInputTask` by refactoring and unifying the naming or
// type differences of data members.
Status SplitInputTask(
absl::Status SplitInputTask(
std::unique_ptr<BatchingSessionTask>* input_task_ptr,
int open_batch_remaining_slot, int max_batch_size,
std::vector<std::unique_ptr<BatchingSessionTask>>* output_tasks) {
Expand Down Expand Up @@ -947,7 +951,7 @@ Status SplitInputTask(
// TODO(b/158393551):
// Figure out the optimal implementation of Split, by using
// 'Tensor::Slice' and eliminating unnecessary memcpy as much as possible.
const Status split_status =
const absl::Status split_status =
tensor::Split(input_tensor, output_task_sizes, &split_tensors);
if (!split_status.ok()) {
return errors::Internal(
Expand All @@ -969,7 +973,7 @@ Status SplitInputTask(
return absl::OkStatus();
}

Status CreateBatchingSession(
absl::Status CreateBatchingSession(
const BatchingSessionOptions& options,
const std::vector<SignatureWithBatchingSessionSchedulerCreator>&
signatures_with_scheduler_creators,
Expand All @@ -984,7 +988,7 @@ Status CreateBatchingSession(
return absl::OkStatus();
}

Status CreateBatchingSession(
absl::Status CreateBatchingSession(
const BatchingSessionOptions& options,
const std::vector<SignatureWithBatchingSessionSchedulerCreator>&
signatures_with_scheduler_creators,
Expand All @@ -998,7 +1002,7 @@ Status CreateBatchingSession(
return absl::OkStatus();
}

Status CreateBasicBatchingSession(
absl::Status CreateBasicBatchingSession(
const BasicBatchScheduler<BatchingSessionTask>::Options& schedule_options,
const BatchingSessionOptions& batching_session_options,
const TensorSignature& signature, std::unique_ptr<Session> session,
Expand Down
Loading

0 comments on commit 84b3e7a

Please sign in to comment.