diff --git a/oneflow/api/python/job_build/job_build_and_infer.cpp b/oneflow/api/python/job_build/job_build_and_infer.cpp index fd25d18f1b3..39d38199b9d 100644 --- a/oneflow/api/python/job_build/job_build_and_infer.cpp +++ b/oneflow/api/python/job_build/job_build_and_infer.cpp @@ -47,7 +47,7 @@ ONEFLOW_API_PYBIND11_MODULE("", m) { m.def("JobBuildAndInferCtx_GetDataType", &JobBuildAndInferCtx_GetDataType); m.def("JobBuildAndInferCtx_IsDynamic", &JobBuildAndInferCtx_IsDynamic); - m.def("JobBuildAndInferCtx_DisableBoxing", &JobBuildAndInferCtx_DisableBoxing); + m.def("JobBuildAndInferCtx_IsDisableBoxing", &JobBuildAndInferCtx_IsDisableBoxing); m.def("JobBuildAndInferCtx_GetSplitAxisFromProducerView", &JobBuildAndInferCtx_GetSplitAxisFromProducerView); diff --git a/oneflow/api/python/job_build/job_build_and_infer.h b/oneflow/api/python/job_build/job_build_and_infer.h index 3d88c27d109..c05d3c33a57 100644 --- a/oneflow/api/python/job_build/job_build_and_infer.h +++ b/oneflow/api/python/job_build/job_build_and_infer.h @@ -114,10 +114,10 @@ inline Maybe JobBuildAndInferCtx_IsDynamic(const std::string& job_name, return ctx->IsDynamic(lbn); } -inline Maybe JobBuildAndInferCtx_DisableBoxing(const std::string& job_name, - const std::string& lbn) { +inline Maybe JobBuildAndInferCtx_IsDisableBoxing(const std::string& job_name, + const std::string& lbn) { auto* ctx = JUST(GetJobBuildAndInferCtx(job_name)); - return ctx->DisableBoxing(lbn); + return ctx->IsDisableBoxing(lbn); } inline Maybe JobBuildAndInferCtx_GetSplitAxisFromProducerView( diff --git a/oneflow/api/python/job_build/job_build_and_infer_api.h b/oneflow/api/python/job_build/job_build_and_infer_api.h index ac9399a18a0..f092ea01d54 100644 --- a/oneflow/api/python/job_build/job_build_and_infer_api.h +++ b/oneflow/api/python/job_build/job_build_and_infer_api.h @@ -90,8 +90,9 @@ inline bool JobBuildAndInferCtx_IsDynamic(const std::string& job_name, const std return oneflow::JobBuildAndInferCtx_IsDynamic(job_name, lbn).GetOrThrow(); } -inline bool JobBuildAndInferCtx_DisableBoxing(const std::string& job_name, const std::string& lbn) { - return oneflow::JobBuildAndInferCtx_DisableBoxing(job_name, lbn).GetOrThrow(); +inline bool JobBuildAndInferCtx_IsDisableBoxing(const std::string& job_name, + const std::string& lbn) { + return oneflow::JobBuildAndInferCtx_IsDisableBoxing(job_name, lbn).GetOrThrow(); } inline std::string JobBuildAndInferCtx_GetSplitAxisFromProducerView(const std::string& job_name, diff --git a/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp b/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp index 0856bdeec88..57bc58e2a0c 100644 --- a/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp +++ b/oneflow/core/framework/op_interpreter/lazy_op_interpreter.cpp @@ -75,12 +75,9 @@ Maybe BuildTensor(const OpAttribute& op_attribute, const std::string& bn Maybe CheckTensorMatchAttr(const std::shared_ptr& tensor, const OpAttribute& op_attribute, const std::string& bn_in_op, const std::shared_ptr& parallel_desc, - const bool is_lazy, const bool is_local, const bool requires_grad, - const bool is_leaf) { + const bool is_lazy, const bool is_local) { CHECK_EQ_OR_RETURN(tensor->is_lazy(), is_lazy); CHECK_EQ_OR_RETURN(tensor->is_local(), is_local); - CHECK_EQ_OR_RETURN(tensor->requires_grad(), requires_grad); - CHECK_EQ_OR_RETURN(tensor->is_leaf(), is_leaf); CHECK_OR_RETURN(op_attribute.has_logical_blob_desc_signature()); const auto& blob_desc_sign_map = op_attribute.logical_blob_desc_signature().bn_in_op2blob_desc(); @@ -102,7 +99,8 @@ Maybe CheckTensorMatchAttr(const std::shared_ptr& tensor, CHECK_OR_RETURN(nd_sbp_it != nd_sbp_sign_map.end()) << "nd_sbp of " << bn_in_op << " not found in op " << op_attribute.op_conf().name(); cfg::NdSbp nd_sbp(nd_sbp_it->second); - CHECK_OR_RETURN(JUST(tensor->nd_sbp()) == SymbolOf(nd_sbp)); + CHECK_OR_RETURN(JUST(tensor->nd_sbp()) == SymbolOf(nd_sbp)) + << "The input sbp is not valid for an inplace operation, please try to use non-inplace."; CHECK_OR_RETURN(JUST(tensor->parallel_desc()) == SymbolOf(*parallel_desc)); } return Maybe::Ok(); @@ -655,6 +653,21 @@ Maybe LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTu } } + // Check outputs num and setup output tensor properties. + CHECK_EQ_OR_RETURN(outputs->size(), op_expr.output_size()); + + // Disable boxing if the computation is inplace. + for (int i = 0; i < op_expr.output_size(); ++i) { + const auto& output = outputs->at(i); + if (output) { + const std::string& lbn = TensorNameScope::Global()->Lookup(output); + CHECK_OR_RETURN(!lbn.empty()) << "The output which index is " << i + << " has no tensor name, please check whether the inplaced " + "output is also an input of the operation " + << new_op_name; + JUST(infer_ctx->DisableBoxing(lbn)); + } + } VLOG(2) << "Lazy nn.Graph name " << graph_name << " try to add op: \n" << op_conf->DebugString() << std::endl; OpAttribute op_attr = *JUST(infer_ctx->AddAndInferConsistentOp(*op_conf)); @@ -665,9 +678,6 @@ Maybe LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTu int64_t parallel_desc_sym_id = JUST(scope->GetParallelDescSymbolId(*op_conf)); auto blob_parallel_desc = JUST(GetSymbol(parallel_desc_sym_id)); - - // Check outputs num and setup output tensor properties. - CHECK_EQ_OR_RETURN(outputs->size(), op_expr.output_size()); for (int i = 0; i < op_expr.output_size(); ++i) { const std::string& obn = op_expr.indexed_obns().at(i); if (!(*outputs)[i]) { @@ -676,9 +686,7 @@ Maybe LazyInterpreter::ApplyImpl(const UserOpExpr& op_expr, const TensorTu } else { const std::shared_ptr& inplace_out = (*outputs)[i]; JUST(CheckTensorMatchAttr(inplace_out, op_attr, obn, blob_parallel_desc, /* is_lazy= */ true, - is_local, - /* requires_grad */ false, - /* is_leaf */ true)); + is_local)); } TensorNameScope::Global()->Record((*outputs)[i], GenLogicalBlobName(new_op_name, obn)); } diff --git a/oneflow/core/job/job_build_and_infer_ctx.cpp b/oneflow/core/job/job_build_and_infer_ctx.cpp index e24194d59f5..1aa14821ca9 100644 --- a/oneflow/core/job/job_build_and_infer_ctx.cpp +++ b/oneflow/core/job/job_build_and_infer_ctx.cpp @@ -153,16 +153,13 @@ Maybe JobBuildAndInferCtx::AddLbiParallelConf2BlobPlacement( } Maybe JobBuildAndInferCtx::DecodeLbiHintAndReturnNewOpConf( - const Operator& op, cfg::SbpSignature* sbp_sig_conf, - HashMap* ibn2disable_boxing) const { + const Operator& op, cfg::SbpSignature* sbp_sig_conf) const { auto op_conf_without_split_hint = std::make_shared(op.op_conf()); for (const std::string& ibn : op.input_bns()) { std::string lbn_may_with_hint = GetInputLbnInOpCustomizedConf(op.op_conf(), ibn); cfg::SbpParallel sbp_parallel; bool has_sbp_hint = JUST(GetSbpParallelInLbnOrNothing(lbn_may_with_hint, &sbp_parallel)); - bool has_disable_boxing_hint = - JUST(ParseDisableBoxingFlag(lbn_may_with_hint, &(*ibn2disable_boxing)[ibn])); - if (has_sbp_hint || has_disable_boxing_hint) { + if (has_sbp_hint) { (*(sbp_sig_conf->mutable_bn_in_op2sbp_parallel()))[ibn] = sbp_parallel; const LogicalBlobId& lbi = op.BnInOp2Lbi(ibn); std::string lbn = GenLogicalBlobName(lbi); @@ -372,18 +369,24 @@ void JobBuildAndInferCtx::InitIbn2DisableBoxing(const Operator& op, } } -void JobBuildAndInferCtx::UpdateLbi2DisableBoxing( - const Operator& op, const HashMap& ibn2disable_boxing) { - bool disable_boxing = false; - for (const auto& ibn : op.input_bns()) { - if (ibn2disable_boxing.at(ibn)) { - disable_boxing = true; - break; +Maybe JobBuildAndInferCtx::InitConstraitNdSbpSignature( + const Operator& op, const HashMap& ibn2disable_boxing) const { + auto nd_sbp_sig = std::make_shared(); + for (const auto& it : ibn2disable_boxing) { + if (it.second) { + const auto& ibn = it.first; + const LogicalBlobId& lbi = op.BnInOp2Lbi(ibn); + const auto& nd_sbp_iter = lbi2nd_sbp_from_producer_view_.find(lbi); + if (nd_sbp_iter == lbi2nd_sbp_from_producer_view_.end()) { + return Error::RuntimeError() + << "The nd_sbp of input " << ibn << " (tensor name is " << GenLogicalBlobName(lbi) + << ") is not found for operation " << op.op_name() + << ". It maybe caused by an invalid inplace operation."; + } + (*(nd_sbp_sig->mutable_bn_in_op2nd_sbp()))[ibn] = lbi2nd_sbp_from_producer_view_.at(lbi); } } - for (const auto& obn : op.output_bns()) { - lbi2disable_boxing_[op.BnInOp2Lbi(obn)] = disable_boxing; - } + return nd_sbp_sig; } bool JobBuildAndInferCtx::HasAnyMirroredBlobInput(const Operator& op) const { @@ -572,12 +575,11 @@ Maybe JobBuildAndInferCtx::AddAndInferOp(const OperatorConf& op_con cfg::SbpSignature sbp_sig_conf; HashMap ibn2disable_boxing; InitIbn2DisableBoxing(*op, &ibn2disable_boxing); - auto new_op_conf = JUST(DecodeLbiHintAndReturnNewOpConf(*op, &sbp_sig_conf, &ibn2disable_boxing)); + auto new_op_conf = JUST(DecodeLbiHintAndReturnNewOpConf(*op, &sbp_sig_conf)); auto parallel_conf = JUST(InferOpParallelConf(*op, origin_parallel_conf, ibn2disable_boxing)); ParallelDesc parallel_desc(*parallel_conf); JUST(op->FillOpParallelDesc(parallel_desc)); JUST(AddOpNameParallelConf2Placement(op_name, *parallel_conf)); - UpdateLbi2DisableBoxing(*op, ibn2disable_boxing); auto GetBlobDesc4BnInOp = [&](const std::string& bn) -> BlobDesc* { const LogicalBlobId& lbi = op->BnInOp2Lbi(bn); @@ -592,8 +594,11 @@ Maybe JobBuildAndInferCtx::AddAndInferOp(const OperatorConf& op_con JUST(InferMirroredSignature(op, is_mirrored_parallel_view, parallel_desc)); // infer nd_sbp signature - cfg::NdSbpSignature nd_sbp_sig_conf; - SbpSignatureToNdSbpSignature(sbp_sig_conf, &nd_sbp_sig_conf); + cfg::NdSbpSignature nd_sbp_sig_conf = *JUST(InitConstraitNdSbpSignature(*op, ibn2disable_boxing)); + // Override constrait nd_sbp if sbp hint is given + if (!sbp_sig_conf.bn_in_op2sbp_parallel().empty()) { + SbpSignatureToNdSbpSignature(sbp_sig_conf, &nd_sbp_sig_conf); + } AddOpAndUpdateJobParallelViewConf(*new_op_conf, parallel_desc, nd_sbp_sig_conf, is_mirrored_parallel_view); JUST(InferOpOutNdSbp(op, nd_sbp_sig_conf, parallel_desc)); @@ -663,7 +668,7 @@ Maybe JobBuildAndInferCtx::IsDynamic(const std::string& lbn) const { return lbi2logical_blob_desc_.at(GenLogicalBlobId(lbn))->is_dynamic(); } -Maybe JobBuildAndInferCtx::DisableBoxing(const std::string& lbn) const { +Maybe JobBuildAndInferCtx::IsDisableBoxing(const std::string& lbn) const { JUST(CheckLbnValidAndExist(lbn)); LogicalBlobId lbi(GenLogicalBlobId(lbn)); const auto& iter = lbi2disable_boxing_.find(lbi); @@ -671,6 +676,13 @@ Maybe JobBuildAndInferCtx::DisableBoxing(const std::string& lbn) const { return iter->second; } +Maybe JobBuildAndInferCtx::DisableBoxing(const std::string& lbn) { + JUST(CheckLbnValidAndExist(lbn)); + LogicalBlobId lbi(GenLogicalBlobId(lbn)); + lbi2disable_boxing_[lbi] = true; + return Maybe::Ok(); +} + Maybe JobBuildAndInferCtx::Op4OpName(const std::string& op_name) const { const auto& op_iter = op_name2op_.find(op_name); CHECK_OR_RETURN(op_iter != op_name2op_.end()); diff --git a/oneflow/core/job/job_build_and_infer_ctx.h b/oneflow/core/job/job_build_and_infer_ctx.h index 48a3416b56b..3466a31c63a 100644 --- a/oneflow/core/job/job_build_and_infer_ctx.h +++ b/oneflow/core/job/job_build_and_infer_ctx.h @@ -44,7 +44,8 @@ class JobBuildAndInferCtx { Maybe GetStaticShape(const std::string& lbn) const; Maybe GetDataType(const std::string& lbn) const; Maybe IsDynamic(const std::string& lbn) const; - Maybe DisableBoxing(const std::string& lbn) const; + Maybe IsDisableBoxing(const std::string& lbn) const; + Maybe DisableBoxing(const std::string& lbn); Maybe GetSplitAxisFromProducerView(const std::string& lbn) const; Maybe GetParallelDescFromProducerView(const std::string& lbn) const; @@ -107,13 +108,12 @@ class JobBuildAndInferCtx { Maybe AddOpNameParallelConf2Placement(const std::string& op_name, const ParallelConf& parallel_conf); void InitIbn2DisableBoxing(const Operator& op, HashMap* ibn2disable_boxing); - void UpdateLbi2DisableBoxing(const Operator& op, - const HashMap& ibn2disable_boxing); + Maybe InitConstraitNdSbpSignature( + const Operator& op, const HashMap& ibn2disable_boxing) const; + Maybe DecodeLbiHintAndReturnNewOpConf(const Operator& op, + cfg::SbpSignature* sbp_sig_conf) const; Maybe AddLbiParallelConf2BlobPlacement( const Operator* op, std::function ParallelDesc4Obn); - Maybe DecodeLbiHintAndReturnNewOpConf( - const Operator& op, cfg::SbpSignature* sbp_sig_conf, - HashMap* ibn2disable_boxing) const; void AddOpAndUpdateJobParallelViewConf(const OperatorConf& operator_conf, const ParallelDesc& parallel_desc, const cfg::NdSbpSignature& nd_sbp_signature, diff --git a/python/oneflow/test/graph/test_graph_inplace_add.py b/python/oneflow/test/graph/test_graph_inplace_add.py new file mode 100644 index 00000000000..153dd12ccee --- /dev/null +++ b/python/oneflow/test/graph/test_graph_inplace_add.py @@ -0,0 +1,74 @@ +""" +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import os +import unittest +import numpy as np + +import oneflow as flow +import oneflow.unittest + + +def _test_graph_lazy_inplace(test_case, x, y): + class LazyInplaceAdd(flow.nn.Graph): + def __init__(self): + super().__init__() + + def build(self, x, y): + x += y + return x + + z = LazyInplaceAdd()(x, y) + test_case.assertTrue(np.allclose(z.numpy(), (x + y).numpy(), 1e-05, 1e-05)) + + +@unittest.skipIf(os.getenv("ONEFLOW_TEST_CPU_ONLY"), "only test cpu cases") +@flow.unittest.skip_unless_1n1d() +class TestLocalInplace(oneflow.unittest.TestCase): + def test_graph_inplace_gpu(test_case): + x = flow.randn(10, 10, device=flow.device("cuda")) + y = flow.ones(10, device=flow.device("cuda")) + _test_graph_lazy_inplace(test_case, x, y) + + def test_graph_inplace_cpu(test_case): + x = flow.randn(10, 10, device=flow.device("cpu")) + y = flow.ones(10, device=flow.device("cpu")) + _test_graph_lazy_inplace(test_case, x, y) + + +@unittest.skipIf(os.getenv("ONEFLOW_TEST_CPU_ONLY"), "only test cpu cases") +@flow.unittest.skip_unless_1n2d() +class TestConsistentInplace(oneflow.unittest.TestCase): + def test_graph_inplace_gpu(test_case): + x = flow.randn( + 10, 10, placement=flow.placement("cuda", {0: [0, 1]}), sbp=flow.sbp.split(1) + ) + y = flow.ones( + 10, placement=flow.placement("cuda", {0: [0, 1]}), sbp=flow.sbp.broadcast + ) + _test_graph_lazy_inplace(test_case, x, y) + + def test_graph_inplace_cpu(test_case): + x = flow.randn( + 10, 10, placement=flow.placement("cpu", {0: [0, 1]}), sbp=flow.sbp.split(1) + ) + y = flow.ones( + 10, placement=flow.placement("cpu", {0: [0, 1]}), sbp=flow.sbp.broadcast + ) + _test_graph_lazy_inplace(test_case, x, y) + + +if __name__ == "__main__": + unittest.main()