Skip to content

Commit

Permalink
chore: fix linting issues (canonical#470)
Browse files Browse the repository at this point in the history
  • Loading branch information
a-dubs committed Feb 18, 2025
1 parent 23e2d0b commit 47048c3
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 154 deletions.
3 changes: 2 additions & 1 deletion examples/oracle/oracle-cluster-demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

import pycloudlib


def demo_cluster(
availability_domain: str = None,
compartment_id: str = None,
vcn_name: str = None,
):
"""Show example of using the OCI library to launch a cluster instance and ping between them."""

with pycloudlib.OCI(
"pycl-oracle-cluster-demo",
availability_domain=availability_domain,
Expand All @@ -37,6 +37,7 @@ def demo_cluster(
else:
print(f"Successfully pinged {private_ip} from {instance.private_ip}")


if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
if len(sys.argv) != 3:
Expand Down
188 changes: 121 additions & 67 deletions examples/oracle/oracle-example-cluster-test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,37 @@
# This file is part of pycloudlib. See LICENSE file for license information.
"""Basic examples of various lifecycle with an OCI instance."""

from datetime import datetime
import json
import logging
import sys
import threading
import time
from datetime import datetime
from typing import Generator

import pytest

import pycloudlib
from pycloudlib.oci.instance import OciInstance
from typing import Generator
import logging
import threading

logger = logging.getLogger(__name__)

EXISTING_INSTANCE_IDS = [
EXISTING_INSTANCE_IDS: list[str] = [
# add the OCIDs of the instances you want to use for testing here
]


# change this to either "class" or "module" as you see fit
@pytest.fixture(scope="module")
def cluster() -> Generator[list[OciInstance], None, None]:
"""
Launch a cluster of BM instances and yield them.
"""
Launch a cluster of BM instances.
Yields:
list[OciInstance]: The created or retrieved cluster instances.
"""
with pycloudlib.OCI(
"pycl-oracle-cluster-test",
# use the already created "mofed-vcn" for cluster testing
vcn_name="mofed-vcn" # THIS WILL OVERRIDE THE VCN_NAME IN THE CONFIG FILE
vcn_name="mofed-vcn", # THIS WILL OVERRIDE THE VCN_NAME IN THE CONFIG FILE
) as client:
if EXISTING_INSTANCE_IDS:
instances = [client.get_instance(instance_id) for instance_id in EXISTING_INSTANCE_IDS]
Expand All @@ -42,33 +42,49 @@ def cluster() -> Generator[list[OciInstance], None, None]:
# so once this function returns, the instances are ready
instances = client.create_compute_cluster(
# if you create a custom image, specify its OCID here
image_id=client.released_image("noble"),
image_id=client.released_image("noble"),
instance_count=2,
)
yield instances


class TestOracleClusterBasic:
def test_basic_ping_on_private_ips(self, cluster: list[OciInstance]):
"""Test basic functionalities of Oracle Cluster."""

def test_basic_ping_on_private_ips(self, cluster: list[OciInstance]): # pylint: disable=W0621
"""
Verifies that the instances in the cluster can reach each other on their private IPs.
Test that cluster instances can ping each other on private IPs.
Args:
cluster (list[OciInstance]): Instances in the cluster.
"""
# get the private ips of the instances
private_ips = [instance.private_ip for instance in cluster]
# try to ping each instance from each other instance at their private ip
for instance in cluster:
for private_ip in private_ips:
if private_ip != instance.private_ip:
logger.info(f"Pinging {private_ip} from {instance.private_ip}")
logger.info("Pinging %s from %s", private_ip, instance.private_ip)
# ping once with a timeout of 5 seconds
r = instance.execute(f"ping -c 1 -W 5 {private_ip}")
assert r.ok, f"Failed to ping {private_ip} from {instance.private_ip}"
logger.info(f"Successfully pinged {private_ip} from {instance.private_ip}")
logger.info("Successfully pinged %s from %s", private_ip, instance.private_ip)


def setup_mofed_iptables_rules(instance: OciInstance):
"""
Set up IPTABLES rules for RDMA usage.
Args:
instance (OciInstance): Target instance to configure.
Returns:
OciInstance: The same instance after configuration.
"""
# Update the cloud.cfg file to set preserve_hostname to true
instance.execute("sed -i 's/preserve_hostname: false/preserve_hostname: true/' /etc/cloud/cloud.cfg")
instance.execute(
"sed -i 's/preserve_hostname: false/preserve_hostname: true/' /etc/cloud/cloud.cfg"
)
# Backup the existing iptables rules
backup_file = f"/etc/iptables/rules.v4.bak.{datetime.now().strftime('%F-%T')}"
instance.execute(f"cp -v /etc/iptables/rules.v4 {backup_file}")
Expand Down Expand Up @@ -101,23 +117,31 @@ def setup_mofed_iptables_rules(instance: OciInstance):


def ensure_image_is_rdma_ready(instance: OciInstance):
"""
Check if the image supports RDMA.
Args:
instance (OciInstance): The instance to verify.
"""
r = instance.execute("ibstatus")
if not r.stdout or not r.ok:
logger.info("Infiniband status: %s", r.stdout + "\n" + r.stderr)
pytest.skip("The image beiing used is not RDMA ready")


class TestOracleClusterRdma:
"""Test RDMA functionalities of Oracle Cluster."""

@pytest.fixture(scope="class")
def mofed_cluster(self, cluster: list[OciInstance]) -> Generator[list[OciInstance], None, None]:
def mofed_cluster(
self,
cluster: list[OciInstance], # pylint: disable=W0621
) -> Generator[list[OciInstance], None, None]:
"""
Custom fixture to configure the instances in the cluster for RDMA testing.
This fixture will:
- Ensure the image being used is RDMA ready
- Create a secondary VNIC on the private subnet for each instance in the cluster
- Configure the secondary VNIC for RDMA usage
- Set up the necessary iptables rules for RDMA usage on each instance's secondary NIC
Configure cluster for RDMA testing.
Yields:
list[OciInstance]: RDMA-ready cluster instances.
"""
ensure_image_is_rdma_ready(cluster[0])
for instance in cluster:
Expand All @@ -130,38 +154,54 @@ def mofed_cluster(self, cluster: list[OciInstance]) -> Generator[list[OciInstanc
# create a secondary VNIC on the 2nd vnic on the private subnet for RDMA usage
instance.add_network_interface(
nic_index=1,
subnet_name="private subnet-mofed-vcn", # use the private subnet for mofed testing
subnet_name="private subnet-mofed-vcn", # use the private subnet for mofed testing
)
instance.configure_secondary_vnic()
setup_mofed_iptables_rules(instance)

yield cluster

def test_basic_ping_on_new_rdma_ips(
self,
mofed_cluster: list[OciInstance],
):
# get the private ips of the instances

def test_basic_ping_on_new_rdma_ips(self, mofed_cluster: list[OciInstance]):
"""
Test ping on RDMA-enabled private IPs.
Args:
mofed_cluster (list[OciInstance]): RDMA-enabled cluster instances.
"""
# get the private ips of the instances that are on the same RDMA-enabled subnet
rdma_ips = [instance.secondary_vnic_private_ip for instance in mofed_cluster]
# try to ping each instance from each other instance at their private ip

for instance in mofed_cluster:
for rdma_ip in rdma_ips:
if rdma_ip != instance.secondary_vnic_private_ip:
logger.info(f"Pinging {rdma_ip} from {instance.secondary_vnic_private_ip}")
# ping once with a timeout of 5 seconds
logger.info(
"Pinging %s from %s",
rdma_ip,
instance.secondary_vnic_private_ip,
)
# ping once with a timeout of 5 seconds so it doesn't hang
r = instance.execute(f"ping -c 1 -W 5 {rdma_ip}")
assert r.ok, f"Failed to ping {rdma_ip} from {instance.secondary_vnic_private_ip}"
logger.info(f"Successfully pinged {rdma_ip} from {instance.secondary_vnic_private_ip}")

def test_rping(
self,
mofed_cluster: list[OciInstance],
):
assert (
r.ok
), f"Failed to ping {rdma_ip} from {instance.secondary_vnic_private_ip}"
logger.info(
"Successfully pinged %s from %s",
rdma_ip,
instance.secondary_vnic_private_ip,
)

def test_rping(self, mofed_cluster: list[OciInstance]):
"""
Test rping between two instances.
Args:
mofed_cluster (list[OciInstance]): RDMA-enabled cluster instances
"""
server_instance = mofed_cluster[0]
client_instance = mofed_cluster[1]

def start_server():
# start the rping server on the first instance
"""Start the rping server on the "server_instance"."""
server_instance.execute(f"rping -s -a {server_instance.secondary_vnic_private_ip} -v &")

server_thread = threading.Thread(target=start_server)
Expand All @@ -170,74 +210,88 @@ def start_server():
# Wait for rping server to start
time.sleep(5)
# start the rping client on the second instance (only send 10 packets so it doesn't hang)
r = client_instance.execute(f"rping -c -a {server_instance.secondary_vnic_private_ip} -C 10 -v")
r = client_instance.execute(
f"rping -c -a {server_instance.secondary_vnic_private_ip} -C 10 -v"
)
logger.info("rping output: %s", r.stdout)
assert r.ok, "Failed to run rping"

def test_ucmatose(
self,
mofed_cluster: list[OciInstance],
):
def test_ucmatose(self, mofed_cluster: list[OciInstance]):
"""
Test ucmatose connections.
Args:
mofed_cluster (list[OciInstance]): RDMA-enabled cluster instances
"""
server_instance = mofed_cluster[0]
client_instance = mofed_cluster[1]

def start_server():
# start the rping server on the first instance
server_instance.execute(f"ucmatose &")
"""Start the ucmatose server on the "server_instance"."""
server_instance.execute("ucmatose &")

server_thread = threading.Thread(target=start_server)
server_thread.start()

# Wait for server to start
time.sleep(5)
# start the client on the second instance (only send 10 packets so it doesn't hang)
# start the ucmatose client
r = client_instance.execute(f"ucmatose -s {server_instance.secondary_vnic_private_ip}")
logger.info("ucmatose output: %s", r.stdout)
assert r.ok, "Failed to run ucmatose"

def test_ucx_perftest_lat_one_node(
self,
mofed_cluster: list[OciInstance],
):
def test_ucx_perftest_lat_one_node(self, mofed_cluster: list[OciInstance]):
"""
Run ucx_perftest latency on a single node.
Args:
mofed_cluster (list[OciInstance]): RDMA-enabled cluster instances
"""
server_instance = mofed_cluster[0]
# ucx_perftest only works within a single instance on all MOFED stacks right now, so this
# being 0 is intentional. (Will adjust if Oracle provides config info to resolve this)
client_instance = mofed_cluster[0]

def start_server():
# start the rping server on the first instance
server_instance.execute(f"ucx_perftest -c 0 &")
"""Start the ucx_perftest server on the "server_instance"."""
server_instance.execute("ucx_perftest -c 0 &")

server_thread = threading.Thread(target=start_server)
server_thread.start()

# Wait for server to start
time.sleep(5)
# start the client on the second instance (only send 10 packets so it doesn't hang)
r = client_instance.execute(f"ucx_perftest {server_instance.secondary_vnic_private_ip} -t tag_lat -c 1")
# start the ucx_perftest client
r = client_instance.execute(
f"ucx_perftest {server_instance.secondary_vnic_private_ip} -t tag_lat -c 1"
)
logger.info("ucx_perftest output: %s", r.stdout)
assert r.ok, "Failed to run ucx_perftest"

def test_ucx_perftest_bw_one_node(self, mofed_cluster: list[OciInstance]):
"""
Run ucx_perftest bandwidth on a single node.
def test_ucx_perftest_bw_one_node(
self,
mofed_cluster: list[OciInstance],
):
Args:
mofed_cluster (list[OciInstance]): RDMA-enabled cluster instances
"""
server_instance = mofed_cluster[0]
# ucx_perftest only works within a single instance on all MOFED stacks right now, so this
# being 0 is intentional. (Will adjust if Oracle provides config info to resolve this)
client_instance = mofed_cluster[0]

def start_server():
# start the rping server on the first instance
server_instance.execute(f"ucx_perftest -c 0 &")
"""Start the ucx_perftest server on the "server_instance"."""
server_instance.execute("ucx_perftest -c 0 &")

server_thread = threading.Thread(target=start_server)
server_thread.start()

# Wait for server to start
time.sleep(5)
# start the client on the second instance (only send 10 packets so it doesn't hang)
r = client_instance.execute(f"ucx_perftest {server_instance.secondary_vnic_private_ip} -t tag_bw -c 1")
# start the ucx_perftest client
r = client_instance.execute(
f"ucx_perftest {server_instance.secondary_vnic_private_ip} -t tag_bw -c 1"
)
logger.info("ucx_perftest output: %s", r.stdout)
assert r.ok, "Failed to run ucx_perftest"
2 changes: 1 addition & 1 deletion pycloudlib/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __exit__(self, _type, _value, _traceback):
def id(self) -> str:
"""Return instance id."""
raise NotImplementedError

@property
def private_ip(self) -> str:
"""Return instance private ip."""
Expand Down
Loading

0 comments on commit 47048c3

Please sign in to comment.