Skip to content

Commit

Permalink
Support inplace for lazy consistent (#7112)
Browse files Browse the repository at this point in the history
* Support inplace for lazy consistent

* fix single client sbp hint

* refine

Co-authored-by: oneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
  • Loading branch information
hjchen2 and oneflow-ci-bot authored Dec 31, 2021
1 parent 32ab3e3 commit f12f8f7
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 43 deletions.
2 changes: 1 addition & 1 deletion oneflow/api/python/job_build/job_build_and_infer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ONEFLOW_API_PYBIND11_MODULE("", m) {
m.def("JobBuildAndInferCtx_GetDataType", &JobBuildAndInferCtx_GetDataType);
m.def("JobBuildAndInferCtx_IsDynamic", &JobBuildAndInferCtx_IsDynamic);

m.def("JobBuildAndInferCtx_DisableBoxing", &JobBuildAndInferCtx_DisableBoxing);
m.def("JobBuildAndInferCtx_IsDisableBoxing", &JobBuildAndInferCtx_IsDisableBoxing);

m.def("JobBuildAndInferCtx_GetSplitAxisFromProducerView",
&JobBuildAndInferCtx_GetSplitAxisFromProducerView);
Expand Down
6 changes: 3 additions & 3 deletions oneflow/api/python/job_build/job_build_and_infer.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ inline Maybe<bool> JobBuildAndInferCtx_IsDynamic(const std::string& job_name,
return ctx->IsDynamic(lbn);
}

inline Maybe<bool> JobBuildAndInferCtx_DisableBoxing(const std::string& job_name,
const std::string& lbn) {
inline Maybe<bool> JobBuildAndInferCtx_IsDisableBoxing(const std::string& job_name,
const std::string& lbn) {
auto* ctx = JUST(GetJobBuildAndInferCtx(job_name));
return ctx->DisableBoxing(lbn);
return ctx->IsDisableBoxing(lbn);
}

inline Maybe<std::string> JobBuildAndInferCtx_GetSplitAxisFromProducerView(
Expand Down
5 changes: 3 additions & 2 deletions oneflow/api/python/job_build/job_build_and_infer_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ inline bool JobBuildAndInferCtx_IsDynamic(const std::string& job_name, const std
return oneflow::JobBuildAndInferCtx_IsDynamic(job_name, lbn).GetOrThrow();
}

inline bool JobBuildAndInferCtx_DisableBoxing(const std::string& job_name, const std::string& lbn) {
return oneflow::JobBuildAndInferCtx_DisableBoxing(job_name, lbn).GetOrThrow();
inline bool JobBuildAndInferCtx_IsDisableBoxing(const std::string& job_name,
const std::string& lbn) {
return oneflow::JobBuildAndInferCtx_IsDisableBoxing(job_name, lbn).GetOrThrow();
}

inline std::string JobBuildAndInferCtx_GetSplitAxisFromProducerView(const std::string& job_name,
Expand Down
30 changes: 19 additions & 11 deletions oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,9 @@ Maybe<Tensor> BuildTensor(const OpAttribute& op_attribute, const std::string& bn
Maybe<void> CheckTensorMatchAttr(const std::shared_ptr<Tensor>& tensor,
const OpAttribute& op_attribute, const std::string& bn_in_op,
const std::shared_ptr<ParallelDesc>& parallel_desc,
const bool is_lazy, const bool is_local, const bool requires_grad,
const bool is_leaf) {
const bool is_lazy, const bool is_local) {
CHECK_EQ_OR_RETURN(tensor->is_lazy(), is_lazy);
CHECK_EQ_OR_RETURN(tensor->is_local(), is_local);
CHECK_EQ_OR_RETURN(tensor->requires_grad(), requires_grad);
CHECK_EQ_OR_RETURN(tensor->is_leaf(), is_leaf);

CHECK_OR_RETURN(op_attribute.has_logical_blob_desc_signature());
const auto& blob_desc_sign_map = op_attribute.logical_blob_desc_signature().bn_in_op2blob_desc();
Expand All @@ -102,7 +99,8 @@ Maybe<void> CheckTensorMatchAttr(const std::shared_ptr<Tensor>& tensor,
CHECK_OR_RETURN(nd_sbp_it != nd_sbp_sign_map.end())
<< "nd_sbp of " << bn_in_op << " not found in op " << op_attribute.op_conf().name();
cfg::NdSbp nd_sbp(nd_sbp_it->second);
CHECK_OR_RETURN(JUST(tensor->nd_sbp()) == SymbolOf(nd_sbp));
CHECK_OR_RETURN(JUST(tensor->nd_sbp()) == SymbolOf(nd_sbp))
<< "The input sbp is not valid for an inplace operation, please try to use non-inplace.";
CHECK_OR_RETURN(JUST(tensor->parallel_desc()) == SymbolOf(*parallel_desc));
}
return Maybe<void>::Ok();
Expand Down Expand Up @@ -655,6 +653,21 @@ Maybe<void> LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTu
}
}

// Check outputs num and setup output tensor properties.
CHECK_EQ_OR_RETURN(outputs->size(), op_expr.output_size());

// Disable boxing if the computation is inplace.
for (int i = 0; i < op_expr.output_size(); ++i) {
const auto& output = outputs->at(i);
if (output) {
const std::string& lbn = TensorNameScope::Global()->Lookup(output);
CHECK_OR_RETURN(!lbn.empty()) << "The output which index is " << i
<< " has no tensor name, please check whether the inplaced "
"output is also an input of the operation "
<< new_op_name;
JUST(infer_ctx->DisableBoxing(lbn));
}
}
VLOG(2) << "Lazy nn.Graph name " << graph_name << " try to add op: \n"
<< op_conf->DebugString() << std::endl;
OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(*op_conf));
Expand All @@ -665,9 +678,6 @@ Maybe<void> LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTu

int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(*op_conf));
auto blob_parallel_desc = JUST(GetSymbol<cfg::ParallelConf, ParallelDesc>(parallel_desc_sym_id));

// Check outputs num and setup output tensor properties.
CHECK_EQ_OR_RETURN(outputs->size(), op_expr.output_size());
for (int i = 0; i < op_expr.output_size(); ++i) {
const std::string& obn = op_expr.indexed_obns().at(i);
if (!(*outputs)[i]) {
Expand All @@ -676,9 +686,7 @@ Maybe<void> LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTu
} else {
const std::shared_ptr<Tensor>& inplace_out = (*outputs)[i];
JUST(CheckTensorMatchAttr(inplace_out, op_attr, obn, blob_parallel_desc, /* is_lazy= */ true,
is_local,
/* requires_grad */ false,
/* is_leaf */ true));
is_local));
}
TensorNameScope::Global()->Record((*outputs)[i], GenLogicalBlobName(new_op_name, obn));
}
Expand Down
52 changes: 32 additions & 20 deletions oneflow/core/job/job_build_and_infer_ctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,13 @@ Maybe<void> JobBuildAndInferCtx::AddLbiParallelConf2BlobPlacement(
}

Maybe<OperatorConf> JobBuildAndInferCtx::DecodeLbiHintAndReturnNewOpConf(
const Operator& op, cfg::SbpSignature* sbp_sig_conf,
HashMap<std::string, bool>* ibn2disable_boxing) const {
const Operator& op, cfg::SbpSignature* sbp_sig_conf) const {
auto op_conf_without_split_hint = std::make_shared<OperatorConf>(op.op_conf());
for (const std::string& ibn : op.input_bns()) {
std::string lbn_may_with_hint = GetInputLbnInOpCustomizedConf(op.op_conf(), ibn);
cfg::SbpParallel sbp_parallel;
bool has_sbp_hint = JUST(GetSbpParallelInLbnOrNothing(lbn_may_with_hint, &sbp_parallel));
bool has_disable_boxing_hint =
JUST(ParseDisableBoxingFlag(lbn_may_with_hint, &(*ibn2disable_boxing)[ibn]));
if (has_sbp_hint || has_disable_boxing_hint) {
if (has_sbp_hint) {
(*(sbp_sig_conf->mutable_bn_in_op2sbp_parallel()))[ibn] = sbp_parallel;
const LogicalBlobId& lbi = op.BnInOp2Lbi(ibn);
std::string lbn = GenLogicalBlobName(lbi);
Expand Down Expand Up @@ -372,18 +369,24 @@ void JobBuildAndInferCtx::InitIbn2DisableBoxing(const Operator& op,
}
}

void JobBuildAndInferCtx::UpdateLbi2DisableBoxing(
const Operator& op, const HashMap<std::string, bool>& ibn2disable_boxing) {
bool disable_boxing = false;
for (const auto& ibn : op.input_bns()) {
if (ibn2disable_boxing.at(ibn)) {
disable_boxing = true;
break;
Maybe<cfg::NdSbpSignature> JobBuildAndInferCtx::InitConstraitNdSbpSignature(
const Operator& op, const HashMap<std::string, bool>& ibn2disable_boxing) const {
auto nd_sbp_sig = std::make_shared<cfg::NdSbpSignature>();
for (const auto& it : ibn2disable_boxing) {
if (it.second) {
const auto& ibn = it.first;
const LogicalBlobId& lbi = op.BnInOp2Lbi(ibn);
const auto& nd_sbp_iter = lbi2nd_sbp_from_producer_view_.find(lbi);
if (nd_sbp_iter == lbi2nd_sbp_from_producer_view_.end()) {
return Error::RuntimeError()
<< "The nd_sbp of input " << ibn << " (tensor name is " << GenLogicalBlobName(lbi)
<< ") is not found for operation " << op.op_name()
<< ". It maybe caused by an invalid inplace operation.";
}
(*(nd_sbp_sig->mutable_bn_in_op2nd_sbp()))[ibn] = lbi2nd_sbp_from_producer_view_.at(lbi);
}
}
for (const auto& obn : op.output_bns()) {
lbi2disable_boxing_[op.BnInOp2Lbi(obn)] = disable_boxing;
}
return nd_sbp_sig;
}

bool JobBuildAndInferCtx::HasAnyMirroredBlobInput(const Operator& op) const {
Expand Down Expand Up @@ -572,12 +575,11 @@ Maybe<OpAttribute> JobBuildAndInferCtx::AddAndInferOp(const OperatorConf& op_con
cfg::SbpSignature sbp_sig_conf;
HashMap<std::string, bool> ibn2disable_boxing;
InitIbn2DisableBoxing(*op, &ibn2disable_boxing);
auto new_op_conf = JUST(DecodeLbiHintAndReturnNewOpConf(*op, &sbp_sig_conf, &ibn2disable_boxing));
auto new_op_conf = JUST(DecodeLbiHintAndReturnNewOpConf(*op, &sbp_sig_conf));
auto parallel_conf = JUST(InferOpParallelConf(*op, origin_parallel_conf, ibn2disable_boxing));
ParallelDesc parallel_desc(*parallel_conf);
JUST(op->FillOpParallelDesc(parallel_desc));
JUST(AddOpNameParallelConf2Placement(op_name, *parallel_conf));
UpdateLbi2DisableBoxing(*op, ibn2disable_boxing);

auto GetBlobDesc4BnInOp = [&](const std::string& bn) -> BlobDesc* {
const LogicalBlobId& lbi = op->BnInOp2Lbi(bn);
Expand All @@ -592,8 +594,11 @@ Maybe<OpAttribute> JobBuildAndInferCtx::AddAndInferOp(const OperatorConf& op_con
JUST(InferMirroredSignature(op, is_mirrored_parallel_view, parallel_desc));

// infer nd_sbp signature
cfg::NdSbpSignature nd_sbp_sig_conf;
SbpSignatureToNdSbpSignature(sbp_sig_conf, &nd_sbp_sig_conf);
cfg::NdSbpSignature nd_sbp_sig_conf = *JUST(InitConstraitNdSbpSignature(*op, ibn2disable_boxing));
// Override constrait nd_sbp if sbp hint is given
if (!sbp_sig_conf.bn_in_op2sbp_parallel().empty()) {
SbpSignatureToNdSbpSignature(sbp_sig_conf, &nd_sbp_sig_conf);
}
AddOpAndUpdateJobParallelViewConf(*new_op_conf, parallel_desc, nd_sbp_sig_conf,
is_mirrored_parallel_view);
JUST(InferOpOutNdSbp(op, nd_sbp_sig_conf, parallel_desc));
Expand Down Expand Up @@ -663,14 +668,21 @@ Maybe<bool> JobBuildAndInferCtx::IsDynamic(const std::string& lbn) const {
return lbi2logical_blob_desc_.at(GenLogicalBlobId(lbn))->is_dynamic();
}

Maybe<bool> JobBuildAndInferCtx::DisableBoxing(const std::string& lbn) const {
Maybe<bool> JobBuildAndInferCtx::IsDisableBoxing(const std::string& lbn) const {
JUST(CheckLbnValidAndExist(lbn));
LogicalBlobId lbi(GenLogicalBlobId(lbn));
const auto& iter = lbi2disable_boxing_.find(lbi);
CHECK_OR_RETURN(iter != lbi2disable_boxing_.end());
return iter->second;
}

Maybe<void> JobBuildAndInferCtx::DisableBoxing(const std::string& lbn) {
JUST(CheckLbnValidAndExist(lbn));
LogicalBlobId lbi(GenLogicalBlobId(lbn));
lbi2disable_boxing_[lbi] = true;
return Maybe<void>::Ok();
}

Maybe<Operator*> JobBuildAndInferCtx::Op4OpName(const std::string& op_name) const {
const auto& op_iter = op_name2op_.find(op_name);
CHECK_OR_RETURN(op_iter != op_name2op_.end());
Expand Down
12 changes: 6 additions & 6 deletions oneflow/core/job/job_build_and_infer_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class JobBuildAndInferCtx {
Maybe<Shape> GetStaticShape(const std::string& lbn) const;
Maybe<DataType> GetDataType(const std::string& lbn) const;
Maybe<bool> IsDynamic(const std::string& lbn) const;
Maybe<bool> DisableBoxing(const std::string& lbn) const;
Maybe<bool> IsDisableBoxing(const std::string& lbn) const;
Maybe<void> DisableBoxing(const std::string& lbn);
Maybe<OptInt64> GetSplitAxisFromProducerView(const std::string& lbn) const;
Maybe<const ParallelDesc*> GetParallelDescFromProducerView(const std::string& lbn) const;

Expand Down Expand Up @@ -107,13 +108,12 @@ class JobBuildAndInferCtx {
Maybe<void> AddOpNameParallelConf2Placement(const std::string& op_name,
const ParallelConf& parallel_conf);
void InitIbn2DisableBoxing(const Operator& op, HashMap<std::string, bool>* ibn2disable_boxing);
void UpdateLbi2DisableBoxing(const Operator& op,
const HashMap<std::string, bool>& ibn2disable_boxing);
Maybe<cfg::NdSbpSignature> InitConstraitNdSbpSignature(
const Operator& op, const HashMap<std::string, bool>& ibn2disable_boxing) const;
Maybe<OperatorConf> DecodeLbiHintAndReturnNewOpConf(const Operator& op,
cfg::SbpSignature* sbp_sig_conf) const;
Maybe<void> AddLbiParallelConf2BlobPlacement(
const Operator* op, std::function<ParallelDesc*(const std::string&)> ParallelDesc4Obn);
Maybe<OperatorConf> DecodeLbiHintAndReturnNewOpConf(
const Operator& op, cfg::SbpSignature* sbp_sig_conf,
HashMap<std::string, bool>* ibn2disable_boxing) const;
void AddOpAndUpdateJobParallelViewConf(const OperatorConf& operator_conf,
const ParallelDesc& parallel_desc,
const cfg::NdSbpSignature& nd_sbp_signature,
Expand Down
74 changes: 74 additions & 0 deletions python/oneflow/test/graph/test_graph_inplace_add.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import unittest
import numpy as np

import oneflow as flow
import oneflow.unittest


def _test_graph_lazy_inplace(test_case, x, y):
class LazyInplaceAdd(flow.nn.Graph):
def __init__(self):
super().__init__()

def build(self, x, y):
x += y
return x

z = LazyInplaceAdd()(x, y)
test_case.assertTrue(np.allclose(z.numpy(), (x + y).numpy(), 1e-05, 1e-05))


@unittest.skipIf(os.getenv("ONEFLOW_TEST_CPU_ONLY"), "only test cpu cases")
@flow.unittest.skip_unless_1n1d()
class TestLocalInplace(oneflow.unittest.TestCase):
def test_graph_inplace_gpu(test_case):
x = flow.randn(10, 10, device=flow.device("cuda"))
y = flow.ones(10, device=flow.device("cuda"))
_test_graph_lazy_inplace(test_case, x, y)

def test_graph_inplace_cpu(test_case):
x = flow.randn(10, 10, device=flow.device("cpu"))
y = flow.ones(10, device=flow.device("cpu"))
_test_graph_lazy_inplace(test_case, x, y)


@unittest.skipIf(os.getenv("ONEFLOW_TEST_CPU_ONLY"), "only test cpu cases")
@flow.unittest.skip_unless_1n2d()
class TestConsistentInplace(oneflow.unittest.TestCase):
def test_graph_inplace_gpu(test_case):
x = flow.randn(
10, 10, placement=flow.placement("cuda", {0: [0, 1]}), sbp=flow.sbp.split(1)
)
y = flow.ones(
10, placement=flow.placement("cuda", {0: [0, 1]}), sbp=flow.sbp.broadcast
)
_test_graph_lazy_inplace(test_case, x, y)

def test_graph_inplace_cpu(test_case):
x = flow.randn(
10, 10, placement=flow.placement("cpu", {0: [0, 1]}), sbp=flow.sbp.split(1)
)
y = flow.ones(
10, placement=flow.placement("cpu", {0: [0, 1]}), sbp=flow.sbp.broadcast
)
_test_graph_lazy_inplace(test_case, x, y)


if __name__ == "__main__":
unittest.main()

0 comments on commit f12f8f7

Please sign in to comment.