diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1f1db341c82de9..cbe1662e53788f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -83,7 +83,7 @@ repos: # | python/paddle/distributed/f.+ - # | python/paddle/distributed/[g-z].+ + | python/paddle/distributed/[g-z].+ # | python/paddle/[e-i].+ @@ -139,7 +139,7 @@ repos: | python/paddle/distributed/f.+ - | python/paddle/distributed/[g-z].+ + # | python/paddle/distributed/[g-z].+ | python/paddle/[e-i].+ diff --git a/python/paddle/distributed/launch/controllers/controller.py b/python/paddle/distributed/launch/controllers/controller.py index fc8d9261f4ff1f..20bc46aaa2876b 100644 --- a/python/paddle/distributed/launch/controllers/controller.py +++ b/python/paddle/distributed/launch/controllers/controller.py @@ -62,9 +62,9 @@ def __init__(self, ctx): self.join_server = None def deploy_pod(self): - assert ( - len(self.pod.containers) + len(self.pod.init_containers) > 0 - ), "No container in the pod" + assert len(self.pod.containers) + len(self.pod.init_containers) > 0, ( + "No container in the pod" + ) self.ctx.logger.info(f"Run {self.pod}") if len(self.pod.init_containers) > 0: @@ -309,9 +309,9 @@ def save_pod_log(self, info): self.ctx.logger.error(f"save log failed because {e}") def save_pod_env(self): - assert ( - len(self.pod.containers) + len(self.pod.init_containers) > 0 - ), "No container in the pod" + assert len(self.pod.containers) + len(self.pod.init_containers) > 0, ( + "No container in the pod" + ) if not self.ctx.args.log_dir: return diff --git a/python/paddle/distributed/launch/controllers/ipu_controller.py b/python/paddle/distributed/launch/controllers/ipu_controller.py index 651b58c13b1399..ce7f9436d8fede 100644 --- a/python/paddle/distributed/launch/controllers/ipu_controller.py +++ b/python/paddle/distributed/launch/controllers/ipu_controller.py @@ -69,9 +69,9 @@ def replace_training_script(self): num_ipus = int(self.ctx.args.devices) # The number of replicas for data parallel - assert ( - num_ipus % poprun_args.ipus_per_replica - ) == 0, f"The number of IPUs:{num_ipus} mod the number of IPUs per replica:{poprun_args.ipus_per_replica} must == 0" + assert (num_ipus % poprun_args.ipus_per_replica) == 0, ( + f"The number of IPUs:{num_ipus} mod the number of IPUs per replica:{poprun_args.ipus_per_replica} must == 0" + ) num_replicas = num_ipus // poprun_args.ipus_per_replica self.ctx.logger.info(f"The number of total replicas is {num_replicas}.") @@ -79,9 +79,9 @@ def replace_training_script(self): num_nodes = len(poprun_args.hosts.split(',')) num_procs = num_nodes * poprun_args.nproc_per_host self.ctx.logger.info(f"The number of total processes is {num_procs}.") - assert ( - num_replicas % num_procs - ) == 0, f"The number of replicas:{num_replicas} mod the number of processes:{num_procs} must == 0" + assert (num_replicas % num_procs) == 0, ( + f"The number of replicas:{num_replicas} mod the number of processes:{num_procs} must == 0" + ) # hosts and endpoints hosts = poprun_args.hosts.replace(' ', '').split(',') diff --git a/python/paddle/distributed/launch/controllers/rpc.py b/python/paddle/distributed/launch/controllers/rpc.py index 91d59adb2bef2f..b6ab3292f2e41d 100644 --- a/python/paddle/distributed/launch/controllers/rpc.py +++ b/python/paddle/distributed/launch/controllers/rpc.py @@ -27,9 +27,9 @@ def enable(cls, ctx): return False def build_pod(self): - assert ( - self.ctx.args.master is not None - ), "Master is None, Please set master address!" + assert self.ctx.args.master is not None, ( + "Master is None, Please set master address!" + ) self._build_pod_with_master() def _build_pod_with_master(self): diff --git a/python/paddle/distributed/launch/job/container.py b/python/paddle/distributed/launch/job/container.py index ac83b118da3ed7..a9870efc08a5c5 100644 --- a/python/paddle/distributed/launch/job/container.py +++ b/python/paddle/distributed/launch/job/container.py @@ -94,9 +94,9 @@ def update_env(self, env={}, **kwargs): def _validate_env(self): for k, v in self._env.items(): - assert isinstance(k, str) and isinstance( - v, str - ), f'env {k}:{v} must be str' + assert isinstance(k, str) and isinstance(v, str), ( + f'env {k}:{v} must be str' + ) def _get_fd(self, pth): if not pth: diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index 88a22460dc5304..ba510d295b2f1a 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -391,9 +391,9 @@ def __init__( ) -> None: super().__init__(layers.full_name() + "_data_parallel") - assert ( - in_dynamic_mode() - ), "It's not supported to construct DataParallel in static graph mode." + assert in_dynamic_mode(), ( + "It's not supported to construct DataParallel in static graph mode." + ) self._layers = layers self.find_unused_parameters = find_unused_parameters @@ -756,12 +756,12 @@ def __init__(self): ).split(",") self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT", "") self._nrings = int(os.getenv("FLAGS_nccl_nrings", "1")) - assert ( - self._nrings > 0 - ), "nccl_nrings must be an integer greater than 0." - assert ( - self._nrings < 9 - ), "nccl_nrings should be less than 9, which is enough in most scenarios." + assert self._nrings > 0, ( + "nccl_nrings must be an integer greater than 0." + ) + assert self._nrings < 9, ( + "nccl_nrings should be less than 9, which is enough in most scenarios." + ) @property def rank(self) -> int: diff --git a/python/paddle/distributed/parallel_helper.py b/python/paddle/distributed/parallel_helper.py index b8a552071eaf20..5b35f28f02ef10 100644 --- a/python/paddle/distributed/parallel_helper.py +++ b/python/paddle/distributed/parallel_helper.py @@ -33,17 +33,17 @@ def _is_parallel_ctx_initialized(): def _set_parallel_ctx(ccl_parallel_context): global __parallel_ctx__clz__ - assert ( - __parallel_ctx__clz__ is None - ), "ParallelContext can only be initialized once." + assert __parallel_ctx__clz__ is None, ( + "ParallelContext can only be initialized once." + ) __parallel_ctx__clz__ = ccl_parallel_context def _init_parallel_ctx(): global __parallel_ctx__clz__ - assert ( - __parallel_ctx__clz__ is not None - ), "ParallelContext should be initialized." + assert __parallel_ctx__clz__ is not None, ( + "ParallelContext should be initialized." + ) __parallel_ctx__clz__.init() diff --git a/python/paddle/distributed/parallel_with_gloo.py b/python/paddle/distributed/parallel_with_gloo.py index 57eb9cc59d0bbd..8f52852b9b574f 100755 --- a/python/paddle/distributed/parallel_with_gloo.py +++ b/python/paddle/distributed/parallel_with_gloo.py @@ -96,9 +96,9 @@ def gloo_init_parallel_env( ... test_gloo_init_with_multiprocess(2) """ - assert ( - rank_num < 2 - ) is False, "rank_num should greater than or equal to 2 for parallel environment initialization." + assert (rank_num < 2) is False, ( + "rank_num should greater than or equal to 2 for parallel environment initialization." + ) # init gloo context manager = Manager() diff --git a/python/paddle/distributed/passes/auto_parallel_amp.py b/python/paddle/distributed/passes/auto_parallel_amp.py index 22705efe37a888..4f7303f5ff4ac5 100644 --- a/python/paddle/distributed/passes/auto_parallel_amp.py +++ b/python/paddle/distributed/passes/auto_parallel_amp.py @@ -340,9 +340,9 @@ def _cast_block(self, block): out_var = block.var(out_var_name) in_var = block._find_var_recursive(in_var_name) for in_var_name in op.input_arg_names: - assert ( - in_var.dtype == block.var(in_var_name).dtype - ), f"{in_var}, {block.var(in_var_name)}, {op}" + assert in_var.dtype == block.var(in_var_name).dtype, ( + f"{in_var}, {block.var(in_var_name)}, {op}" + ) out_var.desc.set_dtype(in_var.dtype) elif int(op.attr('op_role')) == 257: pass @@ -545,9 +545,9 @@ def _keep_fp32_output(op, out_name): cast_name, in_var_dist_attr ) else: - assert ( - in_var.dtype == dst_dtype - ), f"op [{op.type}] expect input [{in_name}] to be dtype [{dst_dtype}] BUT got [{in_var.dtype}]. {op}" + assert in_var.dtype == dst_dtype, ( + f"op [{op.type}] expect input [{in_name}] to be dtype [{dst_dtype}] BUT got [{in_var.dtype}]. {op}" + ) for out_name in op.output_names: if src_dtype == paddle.float32 and _keep_fp32_output(op, out_name): @@ -1158,13 +1158,13 @@ def _update_loss_scaling(self, grads, found_inf): e, "x", ['float16', 'float32', 'float64'], 'update_loss_scaling' ) if e.dtype == paddle.float16: - assert ( - self._loss_scaling.dtype == paddle.float32 - ), "The dtype of prev_loss_scaling should be float32 when the dtype of x is float16." + assert self._loss_scaling.dtype == paddle.float32, ( + "The dtype of prev_loss_scaling should be float32 when the dtype of x is float16." + ) else: - assert ( - self._loss_scaling.dtype == e.dtype - ), "The dtype of prev_loss_scaling should be equal to the dtype of x." + assert self._loss_scaling.dtype == e.dtype, ( + "The dtype of prev_loss_scaling should be equal to the dtype of x." + ) inputs = { 'X': grads, diff --git a/python/paddle/distributed/passes/auto_parallel_c_embedding.py b/python/paddle/distributed/passes/auto_parallel_c_embedding.py index ef3896752db2f4..fdeeb49ac3177f 100644 --- a/python/paddle/distributed/passes/auto_parallel_c_embedding.py +++ b/python/paddle/distributed/passes/auto_parallel_c_embedding.py @@ -173,9 +173,9 @@ def _update_before_dims_mapping(self, new_op): results.append(dist_attr_new) sub_name = op.name().split('.')[1] if op.num_operands() > 0: - assert ( - sub_name != "cast" - ), "Need to add support for {sub_name}." + assert sub_name != "cast", ( + "Need to add support for {sub_name}." + ) operands.append(dist_attr_new) next_op = op.operand(0).source().get_defining_op() stack.append(next_op) diff --git a/python/paddle/distributed/passes/auto_parallel_data_parallel_optimization.py b/python/paddle/distributed/passes/auto_parallel_data_parallel_optimization.py index 23644d464adea0..6194d7a41dd219 100644 --- a/python/paddle/distributed/passes/auto_parallel_data_parallel_optimization.py +++ b/python/paddle/distributed/passes/auto_parallel_data_parallel_optimization.py @@ -150,14 +150,14 @@ def _analyze_program(self): grad_name = op.output_arg_names[0] if grad_name in self._grad_name_to_group_map: continue - assert op.has_attr( - "ring_id" - ), f"Unexpected: comm op [{op}] has NOT ring id." + assert op.has_attr("ring_id"), ( + f"Unexpected: comm op [{op}] has NOT ring id." + ) group = ring_id_to_process_group(op.attr("ring_id")) - assert ( - group is not None - ), f"Unexpected: data parallel group of [{grad_name}] from op [{op}] is None" + assert group is not None, ( + f"Unexpected: data parallel group of [{grad_name}] from op [{op}] is None" + ) self._grad_name_to_group_map[grad_name] = group @@ -182,9 +182,9 @@ def _analyze_program(self): for grad_name in scaled_grads: if grad_name not in self._grad_name_to_group_map: not_synchronized_grads.append(grad_name) - assert ( - len(not_synchronized_grads) == 0 - ), f"Unexpected: gradients [{not_synchronized_grads}] is scaled BUT NOT synchronized." + assert len(not_synchronized_grads) == 0, ( + f"Unexpected: gradients [{not_synchronized_grads}] is scaled BUT NOT synchronized." + ) def is_data_parallel_applied(self): return len(self._group_to_grad_name_map) > 0 @@ -239,12 +239,12 @@ def _update_opt_rescale_grad(self): is_optimize_op(op) and op.type in __rescale_grad_supported_opts__ ): - assert op.has_attr( - 'rescale_grad' - ), f"Unexpected: op [{op}] is supported to have [rescale_grad] attribute." - assert ( - len(op.input("Grad")) == 1 - ), f"Unexpected: op [{op}] is supported to have only one input grad var." + assert op.has_attr('rescale_grad'), ( + f"Unexpected: op [{op}] is supported to have [rescale_grad] attribute." + ) + assert len(op.input("Grad")) == 1, ( + f"Unexpected: op [{op}] is supported to have only one input grad var." + ) grad_name = op.input("Grad")[0] dp_degree = len( @@ -255,9 +255,9 @@ def _update_opt_rescale_grad(self): rescale_grad = float(op.attr('rescale_grad')) / dp_degree op._set_attr('rescale_grad', rescale_grad) - assert scaled_grads == set( - self._grad_name_to_group_map.keys() - ), f"Unexpected: gradients [{set(self._grad_name_to_group_map.keys()) - scaled_grads}] are unscaled." + assert scaled_grads == set(self._grad_name_to_group_map.keys()), ( + f"Unexpected: gradients [{set(self._grad_name_to_group_map.keys()) - scaled_grads}] are unscaled." + ) def _could_be_overlap(self): # NOTE current different nccl comm will use different cuda stream @@ -478,9 +478,9 @@ def _update_program(self, grad_groups): # update allreduce & scale op if group.scale_op_idx != -1: scale_op = block.ops[group.scale_op_idx] - assert ( - scale_op.type == 'scale' - ), f"should found scale op but found {scale_op}" + assert scale_op.type == 'scale', ( + f"should found scale op but found {scale_op}" + ) scale_op._rename_input( scale_op.input_arg_names[0], group.coalesce_var.name ) @@ -524,9 +524,9 @@ def _update_program(self, grad_groups): + group.remove_scale_op_indices ) for idx in sorted(remove_op_indices, reverse=True): - assert ( - block.ops[idx].type in remove_op_types - ), f"Unexpected: try to remove op {block.ops[idx]}" + assert block.ops[idx].type in remove_op_types, ( + f"Unexpected: try to remove op {block.ops[idx]}" + ) block._remove_op(idx, False) # insert coalesce op @@ -753,9 +753,9 @@ def add(self, grad_var, ring_id, i): grad_op_idx -= 1 grad_op = self.ops[grad_op_idx] - assert ( - grad_var.name in grad_op.output_arg_names - ), f"grad [{grad_var.name}] should be output of {grad_op}" + assert grad_var.name in grad_op.output_arg_names, ( + f"grad [{grad_var.name}] should be output of {grad_op}" + ) self.coalesce_op_idx = grad_op_idx def finalize(self): diff --git a/python/paddle/distributed/passes/auto_parallel_fp16.py b/python/paddle/distributed/passes/auto_parallel_fp16.py index 54b268d2571f03..c5ce33dafb85ee 100644 --- a/python/paddle/distributed/passes/auto_parallel_fp16.py +++ b/python/paddle/distributed/passes/auto_parallel_fp16.py @@ -75,9 +75,9 @@ def set_auto_cast_attr(cast_op, block): out_name = cast_op.output('Out')[0] in_var = block._find_var_recursive(in_name) out_var = block._find_var_recursive(out_name) - assert ( - in_var is not None and out_var is not None - ), f"in_var {in_name} or out_var {out_name} is None of cast op" + assert in_var is not None and out_var is not None, ( + f"in_var {in_name} or out_var {out_name} is None of cast op" + ) if is_forward_op(cast_op): cast_op._set_attr('in_dtype', in_var.dtype) out_var.desc.set_dtype(paddle.dtype(cast_op.attr('out_dtype'))) @@ -172,9 +172,7 @@ def __init__( self.input_data_var_names = input_data_var_names else: self.input_data_var_names = [] - self._op_fp16_dict = ( - {} - ) # op_id --> True/False. 'True' means that the op is should run in fp16 mode. + self._op_fp16_dict = {} # op_id --> True/False. 'True' means that the op is should run in fp16 mode. # a trick to determine leaf tensor node in program {varname: generator_op_id} self.forward_non_leaf_tensors = {} # record the cast ops that are inserted for a forward @@ -431,9 +429,9 @@ def cast_block(self, block): out_var = block.var(out_var_name) in_var = block._find_var_recursive(in_var_name) for in_var_name in op.input_arg_names: - assert ( - in_var.dtype == block.var(in_var_name).dtype - ), f"{in_var}, {block.var(in_var_name)}, {op}" + assert in_var.dtype == block.var(in_var_name).dtype, ( + f"{in_var}, {block.var(in_var_name)}, {op}" + ) out_var.desc.set_dtype(in_var.dtype) idx += num_cast_ops + 1 @@ -560,9 +558,9 @@ def _insert_backward_cast_ops( # rename input # some forward output is not need by backward computation, e.g. logit in softmax_with_cross_entropy if slot_name in op.input_names: - assert src_name in op.input( - slot_name - ), f"var: {src_name} not in op's {slot_name}. {op}" + assert src_name in op.input(slot_name), ( + f"var: {src_name} not in op's {slot_name}. {op}" + ) src_var_dist_attr = grad_op_attr.get_input_dist_attr(src_name) assert src_var_dist_attr is not None op._rename_input(src_name, cast_name) @@ -574,9 +572,9 @@ def _insert_backward_cast_ops( # some forward input maybe stop_gradient=True, e.g. input_mask if len(op.output(grad_slot_name)) == 0: continue - assert ( - len(op.output(grad_slot_name)) == 1 - ), f"[{grad_slot_name}], Current Op: {op}" + assert len(op.output(grad_slot_name)) == 1, ( + f"[{grad_slot_name}], Current Op: {op}" + ) grad_name = op.output(grad_slot_name)[0] grad = block.var(grad_name) grad_dist_attr = grad_op_attr.get_output_dist_attr(grad_name) @@ -692,9 +690,9 @@ def _split_grads(params_grads): grads = [g for _, g in params_grads] fp32_grads = [g for g in grads if g.dtype == paddle.float32] fp16_grads = [g for g in grads if g.dtype == __target_dtype__] - assert len(fp32_grads) + len(fp16_grads) == len( - grads - ), "Data types of all grads must be either fp16 or fp32." + assert len(fp32_grads) + len(fp16_grads) == len(grads), ( + "Data types of all grads must be either fp16 or fp32." + ) return grads, fp32_grads, fp16_grads @@ -803,9 +801,9 @@ def is_initialization_op(op): if is_initialization_op(op): output_name = op.output_arg_names[0] if param_to_dtype.get(output_name, None) == __target_dtype__: - assert op.has_attr( - 'dtype' - ), f"initialization op is supported to has dtype attribute but got {op}." + assert op.has_attr('dtype'), ( + f"initialization op is supported to has dtype attribute but got {op}." + ) out_var = startup_program.global_block().var(output_name) if out_var.dtype == paddle.float32: out_var.desc.set_dtype(__target_dtype__) diff --git a/python/paddle/distributed/passes/auto_parallel_fused_linear_promotion.py b/python/paddle/distributed/passes/auto_parallel_fused_linear_promotion.py index 9ab643db57a04e..b6b271280387bc 100644 --- a/python/paddle/distributed/passes/auto_parallel_fused_linear_promotion.py +++ b/python/paddle/distributed/passes/auto_parallel_fused_linear_promotion.py @@ -353,9 +353,9 @@ def can_match_pattern( ) else: pass - assert len(forward_segments) >= len( - backward_segments - ), "The number of forward segments should be not shorter than the number of backward segments." + assert len(forward_segments) >= len(backward_segments), ( + "The number of forward segments should be not shorter than the number of backward segments." + ) logger.info(f"forward_segments: {forward_segments}") logger.info(f"backward_segments: {backward_segments}") return forward_segments, backward_segments @@ -409,21 +409,21 @@ def _transform_forward_segment( ) origin_matmul_output_name = origin_matmul_op.output_arg_names[0] origin_comm_input_name = origin_comm_op.input_arg_names[0] - assert ( - origin_matmul_output_name == origin_comm_input_name - ), f"The 0th op output name {origin_matmul_output_name} is not equal to the 1st op input name {origin_comm_input_name}" + assert origin_matmul_output_name == origin_comm_input_name, ( + f"The 0th op output name {origin_matmul_output_name} is not equal to the 1st op input name {origin_comm_input_name}" + ) origin_comm_output_name = origin_comm_op.output_arg_names[0] origin_add_input_names = origin_add_op.input_arg_names - assert ( - origin_comm_output_name == origin_add_input_names[0] - ), f"The 1st op output name {origin_comm_output_name} is not equal to the 2nd op input name {origin_add_input_names[0]}" + assert origin_comm_output_name == origin_add_input_names[0], ( + f"The 1st op output name {origin_comm_output_name} is not equal to the 2nd op input name {origin_add_input_names[0]}" + ) # 1.2 get the origin dist_attr origin_add_dist_attr = ( self._dist_context.get_op_dist_attr_for_program(origin_add_op) ) - assert ( - origin_add_dist_attr is not None - ), f"Origin add op {origin_add_op.type} has no dist attr" + assert origin_add_dist_attr is not None, ( + f"Origin add op {origin_add_op.type} has no dist attr" + ) ref_mesh = origin_add_dist_attr.process_mesh in_var_dist_attr = origin_add_dist_attr.get_input_dist_attr( origin_add_op.input_arg_names[0] diff --git a/python/paddle/distributed/passes/auto_parallel_grad_clip.py b/python/paddle/distributed/passes/auto_parallel_grad_clip.py index 7beb56529c1a14..91f070a3aa8f2f 100644 --- a/python/paddle/distributed/passes/auto_parallel_grad_clip.py +++ b/python/paddle/distributed/passes/auto_parallel_grad_clip.py @@ -287,9 +287,9 @@ def _partition_parameters(self, params): rank = sizes.index(min(sizes)) mapping[rank].append(param.name) numel = reduce(lambda x, y: x * y, param.shape, 1) - assert ( - numel > 0 - ), f"param [{param.name}] should larger than 0, but it is [{numel}]" + assert numel > 0, ( + f"param [{param.name}] should larger than 0, but it is [{numel}]" + ) sizes[rank] += numel return mapping @@ -510,13 +510,13 @@ def _remove_no_need_ops_vars(self, block): prior_op = block.ops[j] break j -= 1 - assert ( - prior_op is not None - ), "Unexpected: ClipByGlobalNorm could not find priory depend op" + assert prior_op is not None, ( + "Unexpected: ClipByGlobalNorm could not find priory depend op" + ) prior_var = block.vars[prior_op.output_arg_names[0]] - assert ( - prior_var is not None - ), "Unexpected: ClipByGlobalNorm could not find priory depend var" + assert prior_var is not None, ( + "Unexpected: ClipByGlobalNorm could not find priory depend var" + ) insert_dependencies_for_vars( block, idx, diff --git a/python/paddle/distributed/passes/auto_parallel_gradient_merge.py b/python/paddle/distributed/passes/auto_parallel_gradient_merge.py index 3a96fa040a20db..d343f99a03d95d 100644 --- a/python/paddle/distributed/passes/auto_parallel_gradient_merge.py +++ b/python/paddle/distributed/passes/auto_parallel_gradient_merge.py @@ -89,9 +89,9 @@ def _pir_append_gradient_merge_backward_op( if grad is None: continue - assert ( - not param.is_selected_row_type() - ), "SELECTED_ROWS is not supported in GradientMergeOptimizer for now" + assert not param.is_selected_row_type(), ( + "SELECTED_ROWS is not supported in GradientMergeOptimizer for now" + ) grad_dtype = grad.dtype grad_type = grad.type() @@ -214,9 +214,9 @@ def _insert_scale_op_after(target_value, optimizer_op, scale, bias=0.0): scale_op.op_role = int(OpRole.Optimize) full_op = scale_op.operand_source(1).get_defining_op() - assert ( - full_op.name() == "pd_op.full" - ), f"The defining op of the scale value should be `pd_op.full`, but got {full_op.name()}" + assert full_op.name() == "pd_op.full", ( + f"The defining op of the scale value should be `pd_op.full`, but got {full_op.name()}" + ) full_op.op_role = int(OpRole.Optimize) if "adam" in optimizer_op.name(): @@ -237,9 +237,9 @@ def _append_scale_op_before_comm(block, new_params_to_grads, k_steps): scale_op.op_role = int(OpRole.Optimize) full_op = scale_op.operand_source(1).get_defining_op() - assert ( - full_op.name() == "pd_op.full" - ), f"The defining op of the scale value should be `pd_op.full`, but got {full_op.name()}" + assert full_op.name() == "pd_op.full", ( + f"The defining op of the scale value should be `pd_op.full`, but got {full_op.name()}" + ) full_op.op_role = int(OpRole.Optimize) paddle.pir.set_insertion_point_to_block_end(block) @@ -255,9 +255,9 @@ def _append_scale_op_after_comm(block, optimizer_ops, k_steps): raise NotImplementedError( f"We yet support adamw, adam and sgd, but got {optimizer_op.name()}" ) - assert ( - target_value is not None - ), "target_value is not expected to be None" + assert target_value is not None, ( + "target_value is not expected to be None" + ) insertion_point = target_value.get_defining_op() if insertion_point is None: # target_value is a gradient_merge_var, which hasn't defining_op diff --git a/python/paddle/distributed/passes/auto_parallel_master_grad.py b/python/paddle/distributed/passes/auto_parallel_master_grad.py index 29d0f38b6fcefc..fc75049237439e 100644 --- a/python/paddle/distributed/passes/auto_parallel_master_grad.py +++ b/python/paddle/distributed/passes/auto_parallel_master_grad.py @@ -134,15 +134,15 @@ def _add_cast_op(self, cur_block, grad_names: list[str], dist_context): producer_op_dist_attr = ( dist_context.get_op_dist_attr_for_program(producer_op) ) - assert ( - producer_op_dist_attr is not None - ), f"The op: '{producer_op}' should be distributed" + assert producer_op_dist_attr is not None, ( + f"The op: '{producer_op}' should be distributed" + ) ref_output_dist_attr = ( producer_op_dist_attr.get_output_dist_attr(grad_name) ) - assert ( - ref_output_dist_attr is not None - ), f"The output: '{grad_name}' should be distributed" + assert ref_output_dist_attr is not None, ( + f"The output: '{grad_name}' should be distributed" + ) ref_mesh = ref_output_dist_attr.process_mesh ref_dims_mapping = ref_output_dist_attr.dims_mapping ref_chunk_id = producer_op_dist_attr.chunk_id @@ -216,9 +216,9 @@ def _regenerate_optimizer( if is_optimize_op(op) and is_gradient_clip_op(op): first_optimize_idx = idx break - assert ( - first_optimize_idx < main_ops_len - ), "The first optimizer op is not found!" + assert first_optimize_idx < main_ops_len, ( + "The first optimizer op is not found!" + ) deleted_temp_var_names = [] deleted_persist_var_names = [] reserved_var_names = [] diff --git a/python/paddle/distributed/passes/auto_parallel_quantization.py b/python/paddle/distributed/passes/auto_parallel_quantization.py index e5eb98d135730b..39c1db36654c51 100644 --- a/python/paddle/distributed/passes/auto_parallel_quantization.py +++ b/python/paddle/distributed/passes/auto_parallel_quantization.py @@ -381,9 +381,9 @@ def set_dist_attr_for_qat_program( dist_origin_op = dist_context.get_dist_op_for_program( origin_op ) - assert ( - dist_origin_op is not None - ), "origin op must have dist attr." + assert dist_origin_op is not None, ( + "origin op must have dist attr." + ) origin_op_dist_attr = dist_origin_op.dist_attr quant_op_dist_attr.impl_idx = origin_op_dist_attr.impl_idx diff --git a/python/paddle/distributed/passes/auto_parallel_recompute.py b/python/paddle/distributed/passes/auto_parallel_recompute.py index cb4ecb9d6d62d8..35835e12223d49 100644 --- a/python/paddle/distributed/passes/auto_parallel_recompute.py +++ b/python/paddle/distributed/passes/auto_parallel_recompute.py @@ -94,9 +94,9 @@ def build_states(self): if seg_name not in self.seg_op_deps: self.seg_op_deps[seg_name] = [i] else: - assert ( - self.seg_op_deps[seg_name][-1] + 1 == i - ), "The recompute segment's ops should be continuous" + assert self.seg_op_deps[seg_name][-1] + 1 == i, ( + "The recompute segment's ops should be continuous" + ) self.seg_op_deps[seg_name].extend([i]) def get_recompute_segments(self, no_recompute_segments=[]): @@ -108,9 +108,9 @@ def get_recompute_segments(self, no_recompute_segments=[]): self._checkpoints.extend(self.ops[segment_idx[-1]].output_arg_names) for i in sorted(no_recompute_segments, reverse=True): - assert i < len( - segments - ), f"the no_recompute_segments idx [{i}] should be lower the number of segment [{len(segments)}]" + assert i < len(segments), ( + f"the no_recompute_segments idx [{i}] should be lower the number of segment [{len(segments)}]" + ) segments.pop(i) return segments @@ -324,9 +324,9 @@ def reset_recompute_op(op): pushed_ops_count += 1 ops_of_stages[id].append(op) op_names_of_stages[id].append(op.type) - assert ( - len(ops) == reset_ops_count + pushed_ops_count - ), f"The sum of pushed_ops_count and reset_ops_count must be the same as length of ops, but the sum is {reset_ops_count + pushed_ops_count} while length of ops is {len(ops)}" + assert len(ops) == reset_ops_count + pushed_ops_count, ( + f"The sum of pushed_ops_count and reset_ops_count must be the same as length of ops, but the sum is {reset_ops_count + pushed_ops_count} while length of ops is {len(ops)}" + ) return ops_of_stages, op_names_of_stages def _apply_single_impl(self, main_program, startup_program, context): diff --git a/python/paddle/distributed/passes/auto_parallel_recompute_pir.py b/python/paddle/distributed/passes/auto_parallel_recompute_pir.py index 0ced091ea9ee5c..425c93603f92a0 100644 --- a/python/paddle/distributed/passes/auto_parallel_recompute_pir.py +++ b/python/paddle/distributed/passes/auto_parallel_recompute_pir.py @@ -182,10 +182,10 @@ def _apply_single_impl(self, main_program, startup_program, context=None): self.program_ops = list(main_program.global_block().ops) # 1. Get the recompute segments information form program. segments = self.get_segments() - assert ( - len(segments) > 0 - ), "No segment found in the PIR recompute pass.\n \ + assert len(segments) > 0, ( + "No segment found in the PIR recompute pass.\n \ Please disable 'recompute.enable' or check 'recompute()' usage in model code." + ) # 2. Get the forward and backward OPs from program. fwd_ops, bwd_ops = self.get_fwd_bwd_ops() diff --git a/python/paddle/distributed/passes/auto_parallel_sequence_parallel_optimization.py b/python/paddle/distributed/passes/auto_parallel_sequence_parallel_optimization.py index e6a70aba4ca650..b45b545b9dc27e 100644 --- a/python/paddle/distributed/passes/auto_parallel_sequence_parallel_optimization.py +++ b/python/paddle/distributed/passes/auto_parallel_sequence_parallel_optimization.py @@ -118,9 +118,9 @@ def is_valid_split_op(idx, block): intersection = set(split_output_names).intersection( set(consumer_input_names) ) - assert ( - len(intersection) == 1 - ), f"Sequence Parallel ReduceScatter Output more than 1: {intersection}." + assert len(intersection) == 1, ( + f"Sequence Parallel ReduceScatter Output more than 1: {intersection}." + ) keep_output_name = intersection.pop() split_output_names.remove(keep_output_name) remove_varnames.extend(split_output_names) diff --git a/python/paddle/distributed/passes/auto_parallel_sharding.py b/python/paddle/distributed/passes/auto_parallel_sharding.py index bba86aef5c515a..c0dd66663b5c4d 100644 --- a/python/paddle/distributed/passes/auto_parallel_sharding.py +++ b/python/paddle/distributed/passes/auto_parallel_sharding.py @@ -171,9 +171,9 @@ def _apply_single_impl(self, main_program, startup_program, context): "enable_hierarchical_comm" ) if self.param_comm_stream_num > 1 or self.grad_comm_stream_num > 1: - assert ( - self.enable_overlap - ), "multiple comm stream need enable_overlap to be True" + assert self.enable_overlap, ( + "multiple comm stream need enable_overlap to be True" + ) self.param_bucket_size_numel = int( self.get_attr("param_bucket_size_numel") ) @@ -243,27 +243,27 @@ def _build_sharding_infos(self, main_block, params_grads): # partition for dp_group in self.dp_groups: - assert ( - dp_group.nranks >= self.sharding_world_size - ), f"sharding world size [{self.sharding_world_size}] should not larger than dp world size [{dp_group.nranks}]" - assert ( - dp_group.nranks % self.sharding_world_size == 0 - ), f"sharding world size [{self.sharding_world_size}] should be divisible by dp world size [{dp_group.nranks}]" - assert ( - self.global_rank in dp_group.ranks - ), f"current ranks [{self.global_rank}] does NOT belong to the data parallel group [{dp_group.ranks}]" - assert ( - len(params_grads) >= self.sharding_world_size - ), f"number of parameters [{len(params_grads)}] is not enough to be shard among [{self.sharding_world_size}] ranks" + assert dp_group.nranks >= self.sharding_world_size, ( + f"sharding world size [{self.sharding_world_size}] should not larger than dp world size [{dp_group.nranks}]" + ) + assert dp_group.nranks % self.sharding_world_size == 0, ( + f"sharding world size [{self.sharding_world_size}] should be divisible by dp world size [{dp_group.nranks}]" + ) + assert self.global_rank in dp_group.ranks, ( + f"current ranks [{self.global_rank}] does NOT belong to the data parallel group [{dp_group.ranks}]" + ) + assert len(params_grads) >= self.sharding_world_size, ( + f"number of parameters [{len(params_grads)}] is not enough to be shard among [{self.sharding_world_size}] ranks" + ) # sharding hybrid data parallel: partial sharding param within if dp_group.nranks > self.sharding_world_size: self.sharding_hybrid_dp = True assert self.param_comm_stream_num < 2 assert self.grad_comm_stream_num < 2 - assert ( - len(self.dp_groups) == 1 - ), "hybrid sharding and data parallelism are supported only when there is exactly one data parallel group in the network" + assert len(self.dp_groups) == 1, ( + "hybrid sharding and data parallelism are supported only when there is exactly one data parallel group in the network" + ) outer_dp_group, sharding_group = _get_dp_and_sharding_groups( dp_group.ranks, self.sharding_world_size, self.global_rank ) @@ -729,9 +729,9 @@ def _optimization_pass(self, main_program, startup_program): self.comm_op_scheduling_priority = -1 # TODO support multiple sub_blocks - assert ( - len(self.sharding_infos) == 1 - ), f"gradient synchronization optimization only support one sharding group right now, but got [{len(self.sharding_infos)}]." + assert len(self.sharding_infos) == 1, ( + f"gradient synchronization optimization only support one sharding group right now, but got [{len(self.sharding_infos)}]." + ) sharding_info = self.sharding_infos[0] with paddle.static.program_guard(main_program, startup_program): @@ -893,9 +893,9 @@ def _fuse_overlap_parameter_comm_stage_two(self, sharding_info): prior_var = main_block.vars[op.output("ParamOut")[0]] else: pre_op = main_block.ops[i - self.param_comm_stream_num] - assert is_sharding_param_broadcast_op( - pre_op - ), "Unexpected: sharding broadcast pre op should be broadcast." + assert is_sharding_param_broadcast_op(pre_op), ( + "Unexpected: sharding broadcast pre op should be broadcast." + ) prior_var = main_block.vars[pre_op.output("Out")[0]] # broadcast order dependencies dep_map[i] = [(i, [prior_var], [broadcast_var], comm_stream)] @@ -1002,9 +1002,9 @@ def op_depend_on_group(op, group): dist.ReduceOp.AVG, dist.ReduceOp.SUM, ] - assert ( - is_reduce - ), "Sharding should reduce grad first and than allreduce if Hybrid Sharding with Data-Parallel" + assert is_reduce, ( + "Sharding should reduce grad first and than allreduce if Hybrid Sharding with Data-Parallel" + ) grad_name = op.output_arg_names[0] param_name = _get_base_name_from_grad_name(grad_name) @@ -1041,10 +1041,12 @@ def op_depend_on_group(op, group): 'reduce_type' ) in [ paddle.distributed.ReduceOp.SUM, - ], "Sharding should reduce grad first and than allreduce if Hybrid Sharding with Data-Parallel" - assert ( - ops[i + 1].output_arg_names[0] == grad_name - ), "Hybrid Sharding with Data-Parallel should sync same gradient var" + ], ( + "Sharding should reduce grad first and than allreduce if Hybrid Sharding with Data-Parallel" + ) + assert ops[i + 1].output_arg_names[0] == grad_name, ( + "Hybrid Sharding with Data-Parallel should sync same gradient var" + ) cur_group.allreduce_op_indices.append(i + 1) i += 1 elif op_depend_on_group(op, cur_group): @@ -1120,9 +1122,9 @@ def op_depend_on_group(op, group): if idx in modify_reduce_op_map: group = modify_reduce_op_map[idx] grad_name = op.output_arg_names[0] - assert ( - grad_name == group.vars[-1].name - ), f"Unexpected: it is supposed to sync [{group.vars[-1].name}] but got [{grad_name}]" + assert grad_name == group.vars[-1].name, ( + f"Unexpected: it is supposed to sync [{group.vars[-1].name}] but got [{grad_name}]" + ) op._rename_input(grad_name, group.coalesce_var.name) op._rename_output(grad_name, group.coalesce_var.name) @@ -1132,9 +1134,9 @@ def op_depend_on_group(op, group): if idx in coalesce_op_map: group = coalesce_op_map[idx] first_grad_name = group.vars[0].name - assert ( - first_grad_name in op.output_arg_names - ), f"Unexpected: op is supposed to generate grad [{first_grad_name}] but got [{op}]" + assert first_grad_name in op.output_arg_names, ( + f"Unexpected: op is supposed to generate grad [{first_grad_name}] but got [{op}]" + ) grad_names = [grad.name for grad in group.vars] concated_shapes = [] @@ -1560,9 +1562,9 @@ def _insert_reduce_op( reduce_type, op_role=OpRole.Backward, ): - assert ( - root_id >= 0 - ), f"root id should be a positive int, but now root id is {root_id}" + assert root_id >= 0, ( + f"root id should be a positive int, but now root id is {root_id}" + ) new_op = block._insert_op_without_sync( insert_idx, type=op_type, @@ -1775,9 +1777,9 @@ def partition_by_greedy_even(params, group_size): rank = sizes.index(min(sizes)) mapping[rank].append(param) numel = reduce(lambda x, y: x * y, param.shape, 1) - assert ( - numel > 0 - ), f"param [{param.name}] should larger than 0, but it is [{numel}]" + assert numel > 0, ( + f"param [{param.name}] should larger than 0, but it is [{numel}]" + ) sizes[rank] += numel return mapping @@ -1889,9 +1891,9 @@ class ShardingInfo: def __init__(self, group, rank, params_grads, partition_algor): self.group = group self.params_grads = {p.name: (p, g) for p, g in params_grads} - assert len(self.params_grads) == len( - set(self.params_grads) - ), "found duplicated param in params_grads" + assert len(self.params_grads) == len(set(self.params_grads)), ( + "found duplicated param in params_grads" + ) self.params = [p for p, _ in params_grads] self.param_names = [p.name for p in self.params] diff --git a/python/paddle/distributed/passes/auto_parallel_sync_shared_params.py b/python/paddle/distributed/passes/auto_parallel_sync_shared_params.py index b50dd496d04a11..8fbf42c92f7f44 100644 --- a/python/paddle/distributed/passes/auto_parallel_sync_shared_params.py +++ b/python/paddle/distributed/passes/auto_parallel_sync_shared_params.py @@ -140,9 +140,9 @@ def sync_shared_parameters(self, main_program, startup_program): if tmp_param.name == param_name: dy_param = tmp_param break - assert ( - dy_param is not None - ), f"The parameter {param_name} was not found in the concrete_degram" + assert dy_param is not None, ( + f"The parameter {param_name} was not found in the concrete_degram" + ) new_dist_attr = TensorDistAttr() new_dist_attr.process_mesh = dst_mesh @@ -230,9 +230,9 @@ def sync_shared_parameter_gradient( # Only support one shared parameter. # TODO: support more shared parameters - assert ( - len(self.params_maybe_shared) == 1 - ), "Currently, only one shared parameter is supported, and it cannot support more at the moment." + assert len(self.params_maybe_shared) == 1, ( + "Currently, only one shared parameter is supported, and it cannot support more at the moment." + ) cur_rank = paddle.distributed.get_rank() @@ -256,9 +256,9 @@ def sync_shared_parameter_gradient( if p_param.is_same(param_value): grad_idx = p_idx break - assert ( - grad_idx is not None - ), f"Parameter {param_name} not found in params_grades, unable to find corresponding gradient value." + assert grad_idx is not None, ( + f"Parameter {param_name} not found in params_grades, unable to find corresponding gradient value." + ) grad_value = params_grads[p_idx][1] # Create allreduce op comm group. diff --git a/python/paddle/distributed/passes/cpp_pass.py b/python/paddle/distributed/passes/cpp_pass.py index 8f7974afddca9a..5cfc6e95870dcb 100755 --- a/python/paddle/distributed/passes/cpp_pass.py +++ b/python/paddle/distributed/passes/cpp_pass.py @@ -203,9 +203,9 @@ def _type(self): return PassType.CALC_OPT def _apply_single_impl(self, main_program, startup_program, context): - assert ( - 'FLAGS_allow_cinn_ops' in core.globals() - ), "PaddlePaddle is not compiled with CINN support" + assert 'FLAGS_allow_cinn_ops' in core.globals(), ( + "PaddlePaddle is not compiled with CINN support" + ) old_allow_ops = core.globals()['FLAGS_allow_cinn_ops'] old_deny_ops = core.globals()['FLAGS_deny_cinn_ops'] try: diff --git a/python/paddle/distributed/passes/pass_base.py b/python/paddle/distributed/passes/pass_base.py index 1ca91bf3e24267..d8e279474c8669 100755 --- a/python/paddle/distributed/passes/pass_base.py +++ b/python/paddle/distributed/passes/pass_base.py @@ -226,9 +226,9 @@ def rule(pass_before, pass_after): def _get_list_index(in_pass): - assert ( - in_pass.name in PassBase._PASS_PROCESS_ORDER_LIST - ), f"Pass {in_pass.name} is not in _PASS_PROCESS_ORDER_LIST" + assert in_pass.name in PassBase._PASS_PROCESS_ORDER_LIST, ( + f"Pass {in_pass.name} is not in _PASS_PROCESS_ORDER_LIST" + ) return PassBase._PASS_PROCESS_ORDER_LIST.index(in_pass.name) diff --git a/python/paddle/distributed/passes/pass_utils.py b/python/paddle/distributed/passes/pass_utils.py index c09657524eabeb..28ee34d98a35f0 100644 --- a/python/paddle/distributed/passes/pass_utils.py +++ b/python/paddle/distributed/passes/pass_utils.py @@ -147,9 +147,9 @@ def split_program(program, op_indices): op_indices.append(op_num) for idx in range(len(op_indices) - 1): - assert ( - op_indices[idx] < op_indices[idx + 1] - ), "op_indices must be strictly sorted" + assert op_indices[idx] < op_indices[idx + 1], ( + "op_indices must be strictly sorted" + ) split_programs = [] for idx in range(len(op_indices) - 1): @@ -303,9 +303,9 @@ def _set_skip_gc_vars_in_old_ir( ) if job_type in ["backward", "backward_w"]: - assert ( - len(skip_gc_vars) == 0 - ), f"When enabling pipeline parallelism strategy, the skip_gc_vars for {job_type} subprogram must be empty, but it is {skip_gc_vars}." + assert len(skip_gc_vars) == 0, ( + f"When enabling pipeline parallelism strategy, the skip_gc_vars for {job_type} subprogram must be empty, but it is {skip_gc_vars}." + ) job.set_skip_gc_vars(skip_gc_vars) suffixed_required_vars[micro_batch_id] |= required_vars @@ -355,9 +355,9 @@ def _set_skip_gc_vars_in_pir(num_micro_batches, job_types, sub_programs, jobs): ) if job_type in ["send_backward", "backward_w"]: - assert ( - len(skip_gc_vars) == 0 - ), f"When enabling pipeline parallelism strategy, the skip_gc_vars for {job_type} subprogram must be empty, but it is {skip_gc_vars}." + assert len(skip_gc_vars) == 0, ( + f"When enabling pipeline parallelism strategy, the skip_gc_vars for {job_type} subprogram must be empty, but it is {skip_gc_vars}." + ) job.set_skip_gc_vars(skip_gc_vars) suffixed_required_vars[micro_batch_id] |= required_vars @@ -603,9 +603,9 @@ def forward_complete_op_role(main_program): while right_idx < ops_len and all_ops[right_idx].op_role == -1: right_idx += 1 if right_idx >= ops_len: # [first_left_op_role, xx, xx, xx, xx] - assert ( - first_left_op_role == -1 - ), "first_left_op_role can't be -1." + assert first_left_op_role == -1, ( + "first_left_op_role can't be -1." + ) for idx in range(iop, right_idx): all_ops[idx].op_role = first_left_op_role break @@ -614,7 +614,9 @@ def forward_complete_op_role(main_program): assert ( first_left_op_role == -1 or first_left_op_role == first_right_op_role - ), f"The left and right operators of (idx[{iop}]) have different op_role." + ), ( + f"The left and right operators of (idx[{iop}]) have different op_role." + ) for idx in range(iop, right_idx): all_ops[idx].op_role = first_right_op_role iop = right_idx + 1 @@ -985,13 +987,13 @@ def split_matmul_grad_to_matmul( matmul_grad_op = ops[matmul_grad_id] tran_x = matmul_grad_op.attr("trans_x") - assert ( - not tran_x - ), f"matmul_grad(id={matmul_grad_id}) with tran_x == True is not supported for splitting matmul_grad to matmul" + assert not tran_x, ( + f"matmul_grad(id={matmul_grad_id}) with tran_x == True is not supported for splitting matmul_grad to matmul" + ) tran_y = matmul_grad_op.attr("trans_y") - assert ( - not tran_y - ), f"matmul_grad(id={matmul_grad_id}) with tran_y == True is not supported for splitting matmul_grad to matmul" + assert not tran_y, ( + f"matmul_grad(id={matmul_grad_id}) with tran_y == True is not supported for splitting matmul_grad to matmul" + ) x = matmul_grad_op.input("X") y = matmul_grad_op.input("Y") @@ -1008,13 +1010,13 @@ def split_matmul_grad_to_matmul( out_grad_dims = var_out_grad.shape y_grad_dims = var_y_grad.shape - assert len(x_dims) == len( - out_grad_dims - ), f"The rank of x must be equal to that of out_grad, but got x rank = {len(x_dims)} and out_grad rank = {len(out_grad_dims)}." + assert len(x_dims) == len(out_grad_dims), ( + f"The rank of x must be equal to that of out_grad, but got x rank = {len(x_dims)} and out_grad rank = {len(out_grad_dims)}." + ) if len(x_dims) > 2: - assert ( - x_dims[0:2] == out_grad_dims[0:2] - ), f"The first two dimensions of x must be equal to that of out_grad, but got x_dims:{x_dims} and out_grad_dims:{out_grad_dims}." + assert x_dims[0:2] == out_grad_dims[0:2], ( + f"The first two dimensions of x must be equal to that of out_grad, but got x_dims:{x_dims} and out_grad_dims:{out_grad_dims}." + ) new_x_dims = [x_dims[0] * x_dims[1], *list(x_dims[2:])] new_out_grad_dims = [ out_grad_dims[0] * out_grad_dims[1], @@ -1124,13 +1126,13 @@ def _pir_split_matmul_grad_to_matmul(block, matmul_grad_id): ops = block.ops matmul_grad_op = ops[matmul_grad_id] - assert not matmul_grad_op.has_attr( - "trans_x" - ), f"matmul_grad(id={matmul_grad_id}) with tran_x == True is not supported for splitting matmul_grad to matmul" + assert not matmul_grad_op.has_attr("trans_x"), ( + f"matmul_grad(id={matmul_grad_id}) with tran_x == True is not supported for splitting matmul_grad to matmul" + ) - assert not matmul_grad_op.has_attr( - "trans_y" - ), f"matmul_grad(id={matmul_grad_id}) with tran_y == True is not supported for splitting matmul_grad to matmul" + assert not matmul_grad_op.has_attr("trans_y"), ( + f"matmul_grad(id={matmul_grad_id}) with tran_y == True is not supported for splitting matmul_grad to matmul" + ) x = matmul_grad_op.operand_source(0) y = matmul_grad_op.operand_source(1) @@ -1143,14 +1145,14 @@ def _pir_split_matmul_grad_to_matmul(block, matmul_grad_id): out_grad_dims = out_grad.shape y_grad_dims = y_grad.shape - assert len(x_dims) == len( - out_grad_dims - ), f"The rank of x must be equal to that of out_grad, but got x rank = {len(x_dims)} and out_grad rank = {len(out_grad_dims)}." + assert len(x_dims) == len(out_grad_dims), ( + f"The rank of x must be equal to that of out_grad, but got x rank = {len(x_dims)} and out_grad rank = {len(out_grad_dims)}." + ) if len(x_dims) > 2: - assert ( - x_dims[0:2] == out_grad_dims[0:2] - ), f"The first two dimensions of x must be equal to that of out_grad, but got x_dims:{x_dims} and out_grad_dims:{out_grad_dims}." + assert x_dims[0:2] == out_grad_dims[0:2], ( + f"The first two dimensions of x must be equal to that of out_grad, but got x_dims:{x_dims} and out_grad_dims:{out_grad_dims}." + ) new_x_dims = [x_dims[0] * x_dims[1], *list(x_dims[2:])] new_out_grad_dims = [ @@ -1236,9 +1238,9 @@ def set_program_skip_gc_vars(self, type_to_program, program_types): skip_gc_vars = required_vars & suffixed_required_vars if job_type in ["backward", "backward_w"]: - assert ( - len(skip_gc_vars) == 0 - ), f"When enabling pipeline parallelism strategy, the skip_gc_vars for {job_type} subprogram must be empty, but it is {skip_gc_vars}." + assert len(skip_gc_vars) == 0, ( + f"When enabling pipeline parallelism strategy, the skip_gc_vars for {job_type} subprogram must be empty, but it is {skip_gc_vars}." + ) skip_gc_vars = dict(zip(skip_gc_vars, [-1] * len(skip_gc_vars))) self.type_to_skip_gc_vars[job_type] = skip_gc_vars diff --git a/python/paddle/distributed/passes/pipeline_scheduler_pass/__init__.py b/python/paddle/distributed/passes/pipeline_scheduler_pass/__init__.py index 9daa49a8f2a8dc..9a0dfea48a07d7 100644 --- a/python/paddle/distributed/passes/pipeline_scheduler_pass/__init__.py +++ b/python/paddle/distributed/passes/pipeline_scheduler_pass/__init__.py @@ -35,7 +35,9 @@ def apply_pass(main_program, startup_program, pass_name, pass_attr={}): "VPP", "ZBH1", "ZBVPP", - ], f"pipeline scheduler only support FThenB, 1F1B, Eager1F1B, VPP and ZBH1, but receive {pass_name}" + ], ( + f"pipeline scheduler only support FThenB, 1F1B, Eager1F1B, VPP and ZBH1, but receive {pass_name}" + ) if pass_name == "1F1B": # TODO(Ruibiao): Move FLAGS_1f1b_backward_forward_overlap and diff --git a/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_1f1b.py b/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_1f1b.py index 7fe4e91beff335..27ce8712d7bd01 100644 --- a/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_1f1b.py +++ b/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_1f1b.py @@ -59,9 +59,9 @@ def _create_job_list_in_pir(self): pp_degree = self.get_attr("pp_degree") job_list = [] - assert ( - pp_degree <= num_micro_batches - ), "Num of micro batches should larger than or equal to pp degree." + assert pp_degree <= num_micro_batches, ( + "Num of micro batches should larger than or equal to pp degree." + ) micro_batch_in_warmup = pp_degree - pp_stage micro_batch_in_1f1b = num_micro_batches - micro_batch_in_warmup @@ -113,9 +113,9 @@ def _partial_programs(self, program): def _partial_pir_programs(self, program): enable_send_recv_overlap = self.get_attr("enable_send_recv_overlap") - assert ( - not enable_send_recv_overlap - ), "PIR does not support 1F1B with enable_send_recv_overlap yet." + assert not enable_send_recv_overlap, ( + "PIR does not support 1F1B with enable_send_recv_overlap yet." + ) self._overlap_send_recv(program) forward_complete_op_role(program) diff --git a/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_eager_1f1b.py b/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_eager_1f1b.py index 27d0c6adae8407..633d837d02896d 100644 --- a/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_eager_1f1b.py +++ b/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_eager_1f1b.py @@ -34,9 +34,9 @@ def _create_job_list(self): pp_degree = self.get_attr("pp_degree") job_list = [] - assert ( - 2 * (pp_degree - pp_stage) - 1 <= num_micro_batches - ), "Num of micro batches should larger than 2 * (pp_degree - pp_stage) - 1." + assert 2 * (pp_degree - pp_stage) - 1 <= num_micro_batches, ( + "Num of micro batches should larger than 2 * (pp_degree - pp_stage) - 1." + ) micro_batch_in_warmup = 2 * (pp_degree - pp_stage) - 1 micro_batch_in_1f1b = num_micro_batches - micro_batch_in_warmup diff --git a/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_vpp.py b/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_vpp.py index d11c61d834df98..38a64ed6998aff 100644 --- a/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_vpp.py +++ b/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_vpp.py @@ -357,9 +357,9 @@ def _partial_pir_programs(self, program): if accumulate_steps != num_stages: split_backward = False - assert ( - not enable_send_recv_overlap - ), "PIR does not support VPP with enable_send_recv_overlap yet." + assert not enable_send_recv_overlap, ( + "PIR does not support VPP with enable_send_recv_overlap yet." + ) if split_backward: self._pir_split_matmul_grad_ops_to_matmul(program) diff --git a/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_zero_bubble.py b/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_zero_bubble.py index 733d454ec9af4f..8a3fff483667e6 100644 --- a/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_zero_bubble.py +++ b/python/paddle/distributed/passes/pipeline_scheduler_pass/pipeline_zero_bubble.py @@ -41,9 +41,9 @@ def _create_job_list(self): pp_degree = self.get_attr("pp_degree") job_list = [] - assert ( - pp_degree <= num_micro_batches - ), "Num of micro batches should larger than or equal to pp degree." + assert pp_degree <= num_micro_batches, ( + "Num of micro batches should larger than or equal to pp degree." + ) micro_batch_in_warmup = pp_degree - pp_stage micro_batch_in_zero_bubble = num_micro_batches - pp_degree @@ -134,9 +134,9 @@ def _create_job_list(self): assert num_micro_batches % pp_degree == 0 # TODO(luchang): Fix the gradient explosion issue when num_model_chunks(accumulate steps) > pp_degree - assert ( - num_micro_batches <= pp_degree - ), "zbvpp now only supports accumulate steps <= pp degree. It will cause gradient exploitation when accumulate steps > pp degree." + assert num_micro_batches <= pp_degree, ( + "zbvpp now only supports accumulate steps <= pp degree. It will cause gradient exploitation when accumulate steps > pp degree." + ) program_runtimes = self.get_attr("program_runtimes") diff --git a/python/paddle/distributed/passes/ps_server_pass.py b/python/paddle/distributed/passes/ps_server_pass.py index 0e72ed013f7e6e..70492f7b269fb9 100755 --- a/python/paddle/distributed/passes/ps_server_pass.py +++ b/python/paddle/distributed/passes/ps_server_pass.py @@ -61,9 +61,9 @@ def _add_tensor_table( tensor_table_dict[feed_var_name]["fetch_var_name"] = fetch_var_name tensor_table_dict[feed_var_name]["startup_program"] = startup_program tensor_table_dict[feed_var_name]["main_program"] = main_program - tensor_table_dict[feed_var_name][ - "tensor_table_class" - ] = tensor_table_class + tensor_table_dict[feed_var_name]["tensor_table_class"] = ( + tensor_table_class + ) attrs['tensor_table'] = tensor_table_dict def _get_lr_scheduler_program(self, lr_scheduler, lr_decay_steps): diff --git a/python/paddle/distributed/ps/coordinator.py b/python/paddle/distributed/ps/coordinator.py index b2c0abcd49a997..656ea72268d3ad 100755 --- a/python/paddle/distributed/ps/coordinator.py +++ b/python/paddle/distributed/ps/coordinator.py @@ -64,15 +64,15 @@ def parse_from_string(self): bytes(info, encoding="utf8"), self.fl_client_info_desc ) self.clients_info[client_id] = {} - self.clients_info[client_id][ - ClientInfoAttr.DEVICE_TYPE - ] = self.fl_client_info_desc.device_type - self.clients_info[client_id][ - ClientInfoAttr.COMPUTE_CAPACITY - ] = self.fl_client_info_desc.compute_capacity - self.clients_info[client_id][ - ClientInfoAttr.BANDWIDTH - ] = self.fl_client_info_desc.bandwidth + self.clients_info[client_id][ClientInfoAttr.DEVICE_TYPE] = ( + self.fl_client_info_desc.device_type + ) + self.clients_info[client_id][ClientInfoAttr.COMPUTE_CAPACITY] = ( + self.fl_client_info_desc.compute_capacity + ) + self.clients_info[client_id][ClientInfoAttr.BANDWIDTH] = ( + self.fl_client_info_desc.bandwidth + ) @abc.abstractmethod def select(self): diff --git a/python/paddle/distributed/ps/the_one_ps.py b/python/paddle/distributed/ps/the_one_ps.py index c34aca1cc49215..89a8b08cd53740 100755 --- a/python/paddle/distributed/ps/the_one_ps.py +++ b/python/paddle/distributed/ps/the_one_ps.py @@ -983,13 +983,9 @@ def build_fl_client_desc(self, client_info): def build_worker_desc(self): for table in self.tables: - table_proto = ( - self.ps_desc.worker_param.downpour_worker_param.downpour_table_param.add() - ) + table_proto = self.ps_desc.worker_param.downpour_worker_param.downpour_table_param.add() table._set(table_proto) - table_proto = ( - self.ps_desc.server_param.downpour_server_param.downpour_table_param.add() - ) + table_proto = self.ps_desc.server_param.downpour_server_param.downpour_table_param.add() table._set(table_proto) if type(table) == BarrierTable and self.barrier_table_id is None: self.barrier_table_id = table.idx @@ -1002,9 +998,7 @@ def build_worker_desc(self): def build_server_desc(self): self.sparse_table_maps = {} for table in self.tables: - table_proto = ( - self.ps_desc.server_param.downpour_server_param.downpour_table_param.add() - ) + table_proto = self.ps_desc.server_param.downpour_server_param.downpour_table_param.add() table._set(table_proto) if ( table_proto.type == the_one_ps_pb2.PS_SPARSE_TABLE @@ -1402,9 +1396,9 @@ def _stop_worker(self): self._communicator.stop() self._worker.stop_worker() if self.is_heter_ps_mode: - assert ( - self._heter_client is not None - ), "heter client should not be None in heterps mode" + assert self._heter_client is not None, ( + "heter client should not be None in heterps mode" + ) self._heter_client.stop() @staticmethod diff --git a/python/paddle/distributed/ps/utils/public.py b/python/paddle/distributed/ps/utils/public.py index 3844b3a070ef72..934a085047cf69 100755 --- a/python/paddle/distributed/ps/utils/public.py +++ b/python/paddle/distributed/ps/utils/public.py @@ -842,9 +842,9 @@ def _append_heter_op(op, current_heter_block_ops, heter_ops): # for cpu-op block append if len(current_default_block_ops) > 1: - default_ops[default_device][ - block_index - ] = current_default_block_ops + default_ops[default_device][block_index] = ( + current_default_block_ops + ) program_block_ops.append(current_default_block_ops) current_default_block_ops = [] block_index += 1 @@ -918,9 +918,9 @@ def union_forward_gradient_op(program_block_ops_list): """ block_length = len(program_block_ops_list) union_program_block_ops_list = [] - assert ( - block_length % 2 != 0 - ), "the length of program_block_ops_list should be odd" + assert block_length % 2 != 0, ( + "the length of program_block_ops_list should be odd" + ) for i in range(0, block_length // 2): block_op_list = {"forward": program_block_ops_list[i]} block_op_list.update( @@ -1499,12 +1499,12 @@ def build_var_distributed(context): for merged in merged_variables_pairs: m_param, m_grad = merged - context["merged_variable_map"][ - m_param.merged_var.name - ] = m_param.merged_var - context["merged_variable_map"][ - m_grad.merged_var.name - ] = m_grad.merged_var + context["merged_variable_map"][m_param.merged_var.name] = ( + m_param.merged_var + ) + context["merged_variable_map"][m_grad.merged_var.name] = ( + m_grad.merged_var + ) param_merges = [] param_merges.extend(origin_for_sparse) diff --git a/python/paddle/distributed/rpc/rpc.py b/python/paddle/distributed/rpc/rpc.py index cdfc97694f9fa0..077727e2d3908c 100644 --- a/python/paddle/distributed/rpc/rpc.py +++ b/python/paddle/distributed/rpc/rpc.py @@ -67,9 +67,9 @@ def _exchange_all_service_infos(world_size): s = set() for rank in range(world_size): info = pickle.loads(_barrier_store.get(str(rank))) - assert ( - info.name not in s - ), "The Worker name must be unique, but name `{}` is repeated." + assert info.name not in s, ( + "The Worker name must be unique, but name `{}` is repeated." + ) s.add(info.name) all_infos.append(info) return all_infos diff --git a/python/paddle/distributed/sharding/group_sharded.py b/python/paddle/distributed/sharding/group_sharded.py index 7f4d25d24b318f..3dfbe9f820dfac 100644 --- a/python/paddle/distributed/sharding/group_sharded.py +++ b/python/paddle/distributed/sharding/group_sharded.py @@ -127,9 +127,9 @@ def group_sharded_parallel( or device in paddle.device.get_all_custom_device_type() ), "group_sharded_parallel only support gpu, xpu and custom_device now" # check option type - assert isinstance( - model, paddle.nn.Layer - ), "The model must be the instance of paddle.nn.Layer." + assert isinstance(model, paddle.nn.Layer), ( + "The model must be the instance of paddle.nn.Layer." + ) assert isinstance(optimizer, (MixPrecisionOptimizer, Optimizer)), ( "The optimizer must be the instance of paddle.optimizer.Optimizer " "or MixPrecisionOptimizer for main grad." @@ -248,9 +248,9 @@ def save_group_sharded_model( logger_.info( "==========Begin to save group sharded model and optimizer==========" ) - assert not os.path.isfile( - output - ), f"Saving directory ({output}) should be a directory, not a file" + assert not os.path.isfile(output), ( + f"Saving directory ({output}) should be a directory, not a file" + ) os.makedirs(output, exist_ok=True) output_model = os.path.join(output, "model.pdmodel") if isinstance(model, GroupShardedStage2): @@ -265,9 +265,9 @@ def save_group_sharded_model( ) if optimizer is not None: - assert hasattr( - optimizer, "_optim" - ), "Please use the optimizer which is wrapped with group_sharded_parallel." + assert hasattr(optimizer, "_optim"), ( + "Please use the optimizer which is wrapped with group_sharded_parallel." + ) output_opt = os.path.join(output, "model.pdopt") paddle.save(optimizer._optim.state_dict(), output_opt) logger_.info( diff --git a/python/paddle/distributed/spawn.py b/python/paddle/distributed/spawn.py index bf1e347969f5c6..a225b2b434c85a 100644 --- a/python/paddle/distributed/spawn.py +++ b/python/paddle/distributed/spawn.py @@ -274,15 +274,15 @@ def _get_subprocess_env_list(nprocs, options): args.paddle_cpuonly = True args.selected_devices = None args.ips = args.cluster_node_ips - assert ( - options.get('use_paddlecloud', None) is None - ), "CPUONLY spawn doesn't support use paddle cloud" - assert ( - len(args.cluster_node_ips.split(',')) <= 1 - ), "CPUONLY spawn only support single trainer, that is len(ips)=1, but got %s." - assert ( - _get_trainers_num() == 1 - ), "CPUONLY spawn doesn't support multi-trainer" + assert options.get('use_paddlecloud', None) is None, ( + "CPUONLY spawn doesn't support use paddle cloud" + ) + assert len(args.cluster_node_ips.split(',')) <= 1, ( + "CPUONLY spawn only support single trainer, that is len(ips)=1, but got %s." + ) + assert _get_trainers_num() == 1, ( + "CPUONLY spawn doesn't support multi-trainer" + ) elif options['backend'] == 'xccl': args.selected_devices = None custom_device_name = core.get_all_custom_device_type()[0] diff --git a/python/paddle/distributed/transpiler/distribute_transpiler.py b/python/paddle/distributed/transpiler/distribute_transpiler.py index 8cd4b180330496..e64b1ec7b2711a 100644 --- a/python/paddle/distributed/transpiler/distribute_transpiler.py +++ b/python/paddle/distributed/transpiler/distribute_transpiler.py @@ -667,13 +667,17 @@ def transpile( assert ( trainers_num > self.config.hierarchical_allreduce_inter_nranks - ), f"trainers_num:{trainers_num} < hierarchical_allreduce_inter_nranks:{self.config.hierarchical_allreduce_inter_nranks}" + ), ( + f"trainers_num:{trainers_num} < hierarchical_allreduce_inter_nranks:{self.config.hierarchical_allreduce_inter_nranks}" + ) assert ( trainers_num % self.config.hierarchical_allreduce_inter_nranks == 0 - ), f"trainers_num:{trainers_num} mod hierarchical_allreduce_inter_nranks:{self.config.hierarchical_allreduce_inter_nranks} != 0" + ), ( + f"trainers_num:{trainers_num} mod hierarchical_allreduce_inter_nranks:{self.config.hierarchical_allreduce_inter_nranks} != 0" + ) self.origin_program._hierarchical_allreduce_inter_nranks = int( self.config.hierarchical_allreduce_inter_nranks @@ -842,10 +846,10 @@ def transpile( name=framework.generate_control_dev_var_name() ) if self.has_distributed_lookup_table: - self.grad_name_to_send_dummy_out[ - self.table_name - ] = program.global_block().create_var( - name=framework.generate_control_dev_var_name() + self.grad_name_to_send_dummy_out[self.table_name] = ( + program.global_block().create_var( + name=framework.generate_control_dev_var_name() + ) ) input_deps = list(self.grad_name_to_send_dummy_out.values()) diff --git a/python/paddle/distributed/utils/launch_utils.py b/python/paddle/distributed/utils/launch_utils.py index a9d52da552dc5d..6200f708bac569 100644 --- a/python/paddle/distributed/utils/launch_utils.py +++ b/python/paddle/distributed/utils/launch_utils.py @@ -168,9 +168,9 @@ def pods_endpoints(self): r = [] for pod in self.pods: ep = f"{pod.addr}:{pod.port}" - assert ( - pod.port is not None and pod.addr is not None - ), f"{ep} not a valid endpoint" + assert pod.port is not None and pod.addr is not None, ( + f"{ep} not a valid endpoint" + ) r.append(ep) return r @@ -286,9 +286,9 @@ def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus): pod.addr = ip cur_node_endpoints = trainer_endpoints[node_rank] # when use paddlecloud, endpoints may > selected_gpus(user_defined) - assert len(cur_node_endpoints) >= len( - selected_gpus - ), "current trainer_endpoints size should be greater equal than selected_gpus size." + assert len(cur_node_endpoints) >= len(selected_gpus), ( + "current trainer_endpoints size should be greater equal than selected_gpus size." + ) for i in range(len(selected_gpus)): trainer = Trainer() trainer.gpus.append(selected_gpus[i])