diff --git a/qiskit/transpiler/passes/utils/check_gate_direction.py b/qiskit/transpiler/passes/utils/check_gate_direction.py index a9d339788744..7a32800ab882 100644 --- a/qiskit/transpiler/passes/utils/check_gate_direction.py +++ b/qiskit/transpiler/passes/utils/check_gate_direction.py @@ -12,7 +12,8 @@ """Check if the gates follow the right direction with respect to the coupling map.""" -from qiskit.transpiler.layout import Layout +from qiskit.circuit import ControlFlowOp +from qiskit.converters import circuit_to_dag from qiskit.transpiler.basepasses import AnalysisPass @@ -33,6 +34,41 @@ def __init__(self, coupling_map, target=None): self.coupling_map = coupling_map self.target = target + def _coupling_map_visit(self, dag, wire_map, edges=None): + if edges is None: + edges = self.coupling_map.get_edges() + # Don't include directives to avoid things like barrier, which are assumed always supported. + for node in dag.op_nodes(include_directives=False): + if isinstance(node.op, ControlFlowOp): + for block in node.op.blocks: + inner_wire_map = { + inner: wire_map[outer] for outer, inner in zip(node.qargs, block.qubits) + } + if not self._coupling_map_visit(circuit_to_dag(block), inner_wire_map, edges): + return False + elif ( + len(node.qargs) == 2 + and (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) not in edges + ): + return False + return True + + def _target_visit(self, dag, wire_map): + # Don't include directives to avoid things like barrier, which are assumed always supported. + for node in dag.op_nodes(include_directives=False): + if isinstance(node.op, ControlFlowOp): + for block in node.op.blocks: + inner_wire_map = { + inner: wire_map[outer] for outer, inner in zip(node.qargs, block.qubits) + } + if not self._target_visit(circuit_to_dag(block), inner_wire_map): + return False + elif len(node.qargs) == 2 and not self.target.instruction_supported( + node.op.name, (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) + ): + return False + return True + def run(self, dag): """Run the CheckGateDirection pass on `dag`. @@ -42,22 +78,9 @@ def run(self, dag): Args: dag (DAGCircuit): DAG to check. """ - self.property_set["is_direction_mapped"] = True - edges = self.coupling_map.get_edges() - trivial_layout = Layout.generate_trivial_layout(*dag.qregs.values()) - if self.target is None: - for gate in dag.two_qubit_ops(): - physical_q0 = trivial_layout[gate.qargs[0]] - physical_q1 = trivial_layout[gate.qargs[1]] - - if (physical_q0, physical_q1) not in edges: - self.property_set["is_direction_mapped"] = False - return - else: - for gate in dag.two_qubit_ops(): - physical_q0 = trivial_layout[gate.qargs[0]] - physical_q1 = trivial_layout[gate.qargs[1]] - - if (physical_q0, physical_q1) not in self.target[gate.op.name]: - self.property_set["is_direction_mapped"] = False - return + wire_map = {bit: i for i, bit in enumerate(dag.qubits)} + self.property_set["is_direction_mapped"] = ( + self._coupling_map_visit(dag, wire_map) + if self.target is None + else self._target_visit(dag, wire_map) + ) diff --git a/qiskit/transpiler/passes/utils/gate_direction.py b/qiskit/transpiler/passes/utils/gate_direction.py index 3c331441d83e..350b0584fa85 100644 --- a/qiskit/transpiler/passes/utils/gate_direction.py +++ b/qiskit/transpiler/passes/utils/gate_direction.py @@ -14,11 +14,11 @@ from math import pi -from qiskit.transpiler.layout import Layout from qiskit.transpiler.basepasses import TransformationPass from qiskit.transpiler.exceptions import TranspilerError -from qiskit.circuit import QuantumRegister +from qiskit.converters import dag_to_circuit, circuit_to_dag +from qiskit.circuit import QuantumRegister, ControlFlowOp from qiskit.dagcircuit import DAGCircuit from qiskit.circuit.library.standard_gates import RYGate, HGate, CXGate, CZGate, ECRGate, RZXGate @@ -83,6 +83,8 @@ def __init__(self, coupling_map, target=None): self._cz_dag.add_qreg(qr) self._cz_dag.apply_operation_back(CZGate(), [qr[1], qr[0]], []) + self._static_replacements = {"cx": self._cx_dag, "cz": self._cz_dag, "ecr": self._ecr_dag} + @staticmethod def _rzx_dag(parameter): _rzx_dag = DAGCircuit() @@ -95,6 +97,100 @@ def _rzx_dag(parameter): _rzx_dag.apply_operation_back(HGate(), [qr[1]], []) return _rzx_dag + def _run_coupling_map(self, dag, wire_map, edges=None): + if edges is None: + edges = set(self.coupling_map.get_edges()) + if not edges: + return dag + # Don't include directives to avoid things like barrier, which are assumed always supported. + for node in dag.op_nodes(include_directives=False): + if isinstance(node.op, ControlFlowOp): + node.op = node.op.replace_blocks( + dag_to_circuit( + self._run_coupling_map( + circuit_to_dag(block), + { + inner: wire_map[outer] + for outer, inner in zip(node.qargs, block.qubits) + }, + edges, + ) + ) + for block in node.op.blocks + ) + continue + if len(node.qargs) != 2: + continue + qargs = (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) + if qargs not in edges and (qargs[1], qargs[0]) not in edges: + raise TranspilerError( + f"The circuit requires a connection between physical qubits {qargs}" + ) + if qargs not in edges: + replacement = self._static_replacements.get(node.name) + if replacement is not None: + dag.substitute_node_with_dag(node, replacement) + elif node.name == "rzx": + dag.substitute_node_with_dag(node, self._rzx_dag(*node.op.params)) + else: + raise TranspilerError( + f"Flipping of gate direction is only supported " + f"for {list(self._static_replacements)} at this time, not '{node.name}'." + ) + return dag + + def _run_target(self, dag, wire_map): + # Don't include directives to avoid things like barrier, which are assumed always supported. + for node in dag.op_nodes(include_directives=False): + if isinstance(node.op, ControlFlowOp): + node.op = node.op.replace_blocks( + dag_to_circuit( + self._run_target( + circuit_to_dag(block), + { + inner: wire_map[outer] + for outer, inner in zip(node.qargs, block.qubits) + }, + ) + ) + for block in node.op.blocks + ) + continue + if len(node.qargs) != 2: + continue + qargs = (wire_map[node.qargs[0]], wire_map[node.qargs[1]]) + swapped = (qargs[1], qargs[0]) + if node.name in self._static_replacements: + if self.target.instruction_supported(node.name, qargs): + continue + if self.target.instruction_supported(node.name, swapped): + dag.substitute_node_with_dag(node, self._static_replacements[node.name]) + else: + raise TranspilerError( + f"The circuit requires a connection between physical qubits {qargs}" + f" for {node.name}" + ) + elif node.name == "rzx": + if self.target.instruction_supported( + qargs=qargs, operation_class=RZXGate, parameters=node.op.params + ): + continue + if self.target.instruction_supported( + qargs=swapped, operation_class=RZXGate, parameters=node.op.params + ): + dag.substitute_node_with_dag(node, self._rzx_dag(*node.op.params)) + else: + raise TranspilerError( + f"The circuit requires a connection between physical qubits {qargs}" + f" for {node.name}" + ) + else: + raise TranspilerError( + f"Flipping of gate direction is only supported " + f"for {list(self._static_replacements)} at this time, not '{node.name}'." + ) + return dag + def run(self, dag): """Run the GateDirection pass on `dag`. @@ -111,104 +207,12 @@ def run(self, dag): TranspilerError: If the circuit cannot be mapped just by flipping the cx nodes. """ - trivial_layout = Layout.generate_trivial_layout(*dag.qregs.values()) - layout_map = trivial_layout.get_virtual_bits() + layout_map = {bit: i for i, bit in enumerate(dag.qubits)} if len(dag.qregs) > 1: raise TranspilerError( "GateDirection expects a single qreg input DAG," "but input DAG had qregs: {}.".format(dag.qregs) ) if self.target is None: - cmap_edges = set(self.coupling_map.get_edges()) - if not cmap_edges: - return dag - - self.coupling_map.compute_distance_matrix() - - dist_matrix = self.coupling_map.distance_matrix - - for node in dag.two_qubit_ops(): - control = node.qargs[0] - target = node.qargs[1] - - physical_q0 = layout_map[control] - physical_q1 = layout_map[target] - - if dist_matrix[physical_q0, physical_q1] != 1: - raise TranspilerError( - "The circuit requires a connection between physical " - "qubits %s and %s" % (physical_q0, physical_q1) - ) - - if (physical_q0, physical_q1) not in cmap_edges: - if node.name == "cx": - dag.substitute_node_with_dag(node, self._cx_dag) - elif node.name == "cz": - dag.substitute_node_with_dag(node, self._cz_dag) - elif node.name == "ecr": - dag.substitute_node_with_dag(node, self._ecr_dag) - elif node.name == "rzx": - dag.substitute_node_with_dag(node, self._rzx_dag(*node.op.params)) - else: - raise TranspilerError( - f"Flipping of gate direction is only supported " - f"for CX, ECR, and RZX at this time, not {node.name}." - ) - else: - # TODO: Work with the gate instances and only use names as look up keys. - # This will require iterating over the target names to build a mapping - # of names to gates that implement CXGate, ECRGate, RZXGate (including - # fixed angle variants) - for node in dag.two_qubit_ops(): - control = node.qargs[0] - target = node.qargs[1] - - physical_q0 = layout_map[control] - physical_q1 = layout_map[target] - - if node.name == "cx": - if (physical_q0, physical_q1) in self.target["cx"]: - continue - if (physical_q1, physical_q0) in self.target["cx"]: - dag.substitute_node_with_dag(node, self._cx_dag) - else: - raise TranspilerError( - "The circuit requires a connection between physical " - "qubits %s and %s for cx" % (physical_q0, physical_q1) - ) - elif node.name == "cz": - if (physical_q0, physical_q1) in self.target["cz"]: - continue - if (physical_q1, physical_q0) in self.target["cz"]: - dag.substitute_node_with_dag(node, self._cz_dag) - else: - raise TranspilerError( - "The circuit requires a connection between physical " - "qubits %s and %s for cz" % (physical_q0, physical_q1) - ) - elif node.name == "ecr": - if (physical_q0, physical_q1) in self.target["ecr"]: - continue - if (physical_q1, physical_q0) in self.target["ecr"]: - dag.substitute_node_with_dag(node, self._ecr_dag) - else: - raise TranspilerError( - "The circuit requires a connection between physical " - "qubits %s and %s for ecr" % (physical_q0, physical_q1) - ) - elif node.name == "rzx": - if (physical_q0, physical_q1) in self.target["rzx"]: - continue - if (physical_q1, physical_q0) in self.target["rzx"]: - dag.substitute_node_with_dag(node, self._rzx_dag(*node.op.params)) - else: - raise TranspilerError( - "The circuit requires a connection between physical " - "qubits %s and %s for rzx" % (physical_q0, physical_q1) - ) - else: - raise TranspilerError( - f"Flipping of gate direction is only supported " - f"for CX, ECR, and RZX at this time, not {node.name}." - ) - return dag + return self._run_coupling_map(dag, layout_map) + return self._run_target(dag, layout_map) diff --git a/releasenotes/notes/gate-direction-target-a9f0acd0cf30ed66.yaml b/releasenotes/notes/gate-direction-target-a9f0acd0cf30ed66.yaml new file mode 100644 index 000000000000..a0a56ff0d85a --- /dev/null +++ b/releasenotes/notes/gate-direction-target-a9f0acd0cf30ed66.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - | + The :class:`.GateDirection` transpiler pass will now respect the available + values for gate parameters when handling parametrised gates with a + :class:`.Target`. diff --git a/test/python/transpiler/test_check_gate_direction.py b/test/python/transpiler/test_check_gate_direction.py index f6cf038fe70b..48131f9705d4 100644 --- a/test/python/transpiler/test_check_gate_direction.py +++ b/test/python/transpiler/test_check_gate_direction.py @@ -14,13 +14,17 @@ import unittest +import ddt + from qiskit import QuantumRegister, QuantumCircuit +from qiskit.circuit.library import CXGate, CZGate, ECRGate from qiskit.transpiler.passes import CheckGateDirection -from qiskit.transpiler import CouplingMap +from qiskit.transpiler import CouplingMap, Target from qiskit.converters import circuit_to_dag from qiskit.test import QiskitTestCase +@ddt.ddt class TestCheckGateDirection(QiskitTestCase): """Tests the CheckGateDirection pass""" @@ -196,6 +200,99 @@ def test_ecr_gate(self): self.assertFalse(pass_.property_set["is_direction_mapped"]) + @ddt.data(CXGate(), CZGate(), ECRGate()) + def test_target_static(self, gate): + """Test that static 2q gates are detected correctly both if available and not available.""" + circuit = QuantumCircuit(2) + circuit.append(gate, [0, 1], []) + + matching = Target(num_qubits=2) + matching.add_instruction(gate, {(0, 1): None}) + pass_ = CheckGateDirection(None, target=matching) + pass_(circuit) + self.assertTrue(pass_.property_set["is_direction_mapped"]) + + swapped = Target(num_qubits=2) + swapped.add_instruction(gate, {(1, 0): None}) + pass_ = CheckGateDirection(None, target=swapped) + pass_(circuit) + self.assertFalse(pass_.property_set["is_direction_mapped"]) + + def test_coupling_map_control_flow(self): + """Test recursing into control-flow operations with a coupling map.""" + matching = CouplingMap.from_line(5, bidirectional=True) + swapped = CouplingMap.from_line(5, bidirectional=False) + + circuit = QuantumCircuit(5, 1) + circuit.h(0) + circuit.measure(0, 0) + with circuit.for_loop((2,)): + circuit.cx(1, 0) + + pass_ = CheckGateDirection(matching) + pass_(circuit) + self.assertTrue(pass_.property_set["is_direction_mapped"]) + pass_ = CheckGateDirection(swapped) + pass_(circuit) + self.assertFalse(pass_.property_set["is_direction_mapped"]) + + circuit = QuantumCircuit(5, 1) + circuit.h(0) + circuit.measure(0, 0) + with circuit.for_loop((2,)): + with circuit.if_test((circuit.clbits[0], True)) as else_: + circuit.cz(3, 2) + with else_: + with circuit.while_loop((circuit.clbits[0], True)): + circuit.ecr(4, 3) + + pass_ = CheckGateDirection(matching) + pass_(circuit) + self.assertTrue(pass_.property_set["is_direction_mapped"]) + pass_ = CheckGateDirection(swapped) + pass_(circuit) + self.assertFalse(pass_.property_set["is_direction_mapped"]) + + def test_target_control_flow(self): + """Test recursing into control-flow operations with a coupling map.""" + swapped = Target(num_qubits=5) + for gate in (CXGate(), CZGate(), ECRGate()): + swapped.add_instruction(gate, {qargs: None for qargs in zip(range(4), range(1, 5))}) + + matching = Target(num_qubits=5) + for gate in (CXGate(), CZGate(), ECRGate()): + matching.add_instruction(gate, {None: None}) + + circuit = QuantumCircuit(5, 1) + circuit.h(0) + circuit.measure(0, 0) + with circuit.for_loop((2,)): + circuit.cx(1, 0) + + pass_ = CheckGateDirection(None, target=matching) + pass_(circuit) + self.assertTrue(pass_.property_set["is_direction_mapped"]) + pass_ = CheckGateDirection(None, target=swapped) + pass_(circuit) + self.assertFalse(pass_.property_set["is_direction_mapped"]) + + circuit = QuantumCircuit(5, 1) + circuit.h(0) + circuit.measure(0, 0) + with circuit.for_loop((2,)): + with circuit.if_test((circuit.clbits[0], True)) as else_: + circuit.cz(3, 2) + with else_: + with circuit.while_loop((circuit.clbits[0], True)): + circuit.ecr(4, 3) + + pass_ = CheckGateDirection(None, target=matching) + pass_(circuit) + self.assertTrue(pass_.property_set["is_direction_mapped"]) + pass_ = CheckGateDirection(None, target=swapped) + pass_(circuit) + self.assertFalse(pass_.property_set["is_direction_mapped"]) + if __name__ == "__main__": unittest.main() diff --git a/test/python/transpiler/test_gate_direction.py b/test/python/transpiler/test_gate_direction.py index 59fa04a517a2..6fd3edd6033a 100644 --- a/test/python/transpiler/test_gate_direction.py +++ b/test/python/transpiler/test_gate_direction.py @@ -11,18 +11,23 @@ # that they have been altered from the originals. """Test the CX Direction pass""" + import unittest from math import pi +import ddt + from qiskit import ClassicalRegister, QuantumRegister, QuantumCircuit +from qiskit.circuit import Parameter +from qiskit.circuit.library import CXGate, CZGate, ECRGate, RZXGate from qiskit.compiler import transpile -from qiskit.transpiler import TranspilerError -from qiskit.transpiler import CouplingMap +from qiskit.transpiler import TranspilerError, CouplingMap, Target from qiskit.transpiler.passes import GateDirection from qiskit.converters import circuit_to_dag from qiskit.test import QiskitTestCase +@ddt.ddt class TestGateDirection(QiskitTestCase): """Tests the GateDirection pass.""" @@ -261,6 +266,142 @@ def test_regression_gh_8387(self): optimization_level=2, ) + @ddt.data(CXGate(), CZGate(), ECRGate()) + def test_target_static(self, gate): + """Test that static 2q gates are swapped correctly both if available and not available.""" + circuit = QuantumCircuit(2) + circuit.append(gate, [0, 1], []) + + matching = Target(num_qubits=2) + matching.add_instruction(gate, {(0, 1): None}) + self.assertEqual(GateDirection(None, target=matching)(circuit), circuit) + + swapped = Target(num_qubits=2) + swapped.add_instruction(gate, {(1, 0): None}) + self.assertNotEqual(GateDirection(None, target=swapped)(circuit), circuit) + + def test_target_parameter_any(self): + """Test that a parametrised 2q gate is replaced correctly both if available and not + available.""" + circuit = QuantumCircuit(2) + circuit.rzx(1.5, 0, 1) + + matching = Target(num_qubits=2) + matching.add_instruction(RZXGate(Parameter("a")), {(0, 1): None}) + self.assertEqual(GateDirection(None, target=matching)(circuit), circuit) + + swapped = Target(num_qubits=2) + swapped.add_instruction(RZXGate(Parameter("a")), {(1, 0): None}) + self.assertNotEqual(GateDirection(None, target=swapped)(circuit), circuit) + + def test_target_parameter_exact(self): + """Test that a parametrised 2q gate is detected correctly both if available and not + available.""" + circuit = QuantumCircuit(2) + circuit.rzx(1.5, 0, 1) + + matching = Target(num_qubits=2) + matching.add_instruction(RZXGate(1.5), {(0, 1): None}) + self.assertEqual(GateDirection(None, target=matching)(circuit), circuit) + + swapped = Target(num_qubits=2) + swapped.add_instruction(RZXGate(1.5), {(1, 0): None}) + self.assertNotEqual(GateDirection(None, target=swapped)(circuit), circuit) + + def test_target_parameter_mismatch(self): + """Test that the pass raises if a gate is not supported due to a parameter mismatch.""" + circuit = QuantumCircuit(2) + circuit.rzx(1.5, 0, 1) + + matching = Target(num_qubits=2) + matching.add_instruction(RZXGate(2.5), {(0, 1): None}) + pass_ = GateDirection(None, target=matching) + with self.assertRaises(TranspilerError): + pass_(circuit) + + swapped = Target(num_qubits=2) + swapped.add_instruction(RZXGate(2.5), {(1, 0): None}) + pass_ = GateDirection(None, target=swapped) + with self.assertRaises(TranspilerError): + pass_(circuit) + + def test_coupling_map_control_flow(self): + """Test that gates are replaced within nested control-flow blocks.""" + circuit = QuantumCircuit(4, 1) + circuit.h(0) + circuit.measure(0, 0) + with circuit.for_loop((1, 2)): + circuit.cx(1, 0) + circuit.cx(0, 1) + with circuit.if_test((circuit.clbits[0], True)) as else_: + circuit.ecr(3, 2) + with else_: + with circuit.while_loop((circuit.clbits[0], True)): + circuit.rzx(2.3, 2, 1) + + expected = QuantumCircuit(4, 1) + expected.h(0) + expected.measure(0, 0) + with expected.for_loop((1, 2)): + expected.h([0, 1]) + expected.cx(0, 1) + expected.h([0, 1]) + expected.cx(0, 1) + with expected.if_test((circuit.clbits[0], True)) as else_: + expected.ry(pi / 2, 2) + expected.ry(-pi / 2, 3) + expected.ecr(2, 3) + expected.h([2, 3]) + with else_: + with expected.while_loop((circuit.clbits[0], True)): + expected.h([1, 2]) + expected.rzx(2.3, 1, 2) + expected.h([1, 2]) + + coupling = CouplingMap.from_line(4, bidirectional=False) + pass_ = GateDirection(coupling) + self.assertEqual(pass_(circuit), expected) + + def test_target_control_flow(self): + """Test that gates are replaced within nested control-flow blocks.""" + circuit = QuantumCircuit(4, 1) + circuit.h(0) + circuit.measure(0, 0) + with circuit.for_loop((1, 2)): + circuit.cx(1, 0) + circuit.cx(0, 1) + with circuit.if_test((circuit.clbits[0], True)) as else_: + circuit.ecr(3, 2) + with else_: + with circuit.while_loop((circuit.clbits[0], True)): + circuit.rzx(2.3, 2, 1) + + expected = QuantumCircuit(4, 1) + expected.h(0) + expected.measure(0, 0) + with expected.for_loop((1, 2)): + expected.h([0, 1]) + expected.cx(0, 1) + expected.h([0, 1]) + expected.cx(0, 1) + with expected.if_test((circuit.clbits[0], True)) as else_: + expected.ry(pi / 2, 2) + expected.ry(-pi / 2, 3) + expected.ecr(2, 3) + expected.h([2, 3]) + with else_: + with expected.while_loop((circuit.clbits[0], True)): + expected.h([1, 2]) + expected.rzx(2.3, 1, 2) + expected.h([1, 2]) + + target = Target(num_qubits=4) + target.add_instruction(CXGate(), {(0, 1): None}) + target.add_instruction(ECRGate(), {(2, 3): None}) + target.add_instruction(RZXGate(Parameter("a")), {(1, 2): None}) + pass_ = GateDirection(None, target) + self.assertEqual(pass_(circuit), expected) + if __name__ == "__main__": unittest.main()