Skip to content

Commit

Permalink
Move the PIM tests in their own module
Browse files Browse the repository at this point in the history
  • Loading branch information
plule-ansys committed May 18, 2022
1 parent f975aa8 commit 6433bcf
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 65 deletions.
65 changes: 0 additions & 65 deletions tests/test_launcher.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
from unittest.mock import create_autospec
import grpc
import pytest
import sys
import ansys.platform.instancemanagement as pypim

from ansys.dpf import core
from ansys.dpf.core import server
from ansys.dpf.core.misc import is_ubuntu
from ansys.dpf.core.server import DpfServer, __ansys_version__

ansys_path = core.misc.find_ansys()

Expand Down Expand Up @@ -35,70 +29,11 @@ def test_start_local():
# ensure global channel didn't change
assert starting_server == id(core.SERVER)

def test_start_remote(monkeypatch):
# Test for the Product Instance Management API integration

# Start a local DPF server and create a mock PyPIM pretending it is starting it
local_server = core.start_local_server(as_global=False, ansys_path=core.SERVER.ansys_path)
server_address = local_server._address
mock_instance = pypim.Instance(
definition_name="definitions/fake-dpf",
name="instances/fake-dpf",
ready=True,
status_message=None,
services={"grpc": pypim.Service(uri=server_address, headers={})},
)
# Mock the wait_for_ready method so that it immediately returns
mock_instance.wait_for_ready = create_autospec(mock_instance.wait_for_ready)
# Mock the deletion method
mock_instance.delete = create_autospec(mock_instance.delete)

# Mock the PyPIM client, so that on the "create_instance" call it returns the mock instance
# Note: the host and port here will not be used.
mock_client = pypim.Client(channel=grpc.insecure_channel("localhost:12345"))
mock_client.create_instance = create_autospec(
mock_client.create_instance, return_value=mock_instance
)

# Mock the general pypim connection and configuration check method to expose the mock client.
mock_connect = create_autospec(pypim.connect, return_value=mock_client)
monkeypatch.setattr(pypim, "connect", mock_connect)
monkeypatch.setenv("ANSYS_PLATFORM_INSTANCEMANAGEMENT_CONFIG", "/fake/config.json")

# Call the generic startup sequence with no indication on how to launch it
server = DpfServer(as_global=False)

# It detected the environment and connected to pypim
assert mock_connect.called

# It created a remote instance through PyPIM
mock_client.create_instance.assert_called_with(
product_name="dpf", product_version=__ansys_version__
)

# It waited for this instance to be ready
assert mock_instance.wait_for_ready.called

# It connected using the address provided by PyPIM
assert server._address == server_address

# Stop the server
server.shutdown()

# The delete instance is called
assert mock_instance.delete.called

def test_start_local_failed():
with pytest.raises(NotADirectoryError):
core.start_local_server(ansys_path="", use_docker_by_default=False)

def test_start_remote_failed(monkeypatch):
# Verifies that the error includes the package name to install when using
# launch_remote_dpf() without the requirements installed.
monkeypatch.setitem(sys.modules, "ansys.platform.instancemanagement", None)
with pytest.raises(ImportError) as exc:
server.launch_remote_dpf()
assert "ansys-platform-instancemanagement" in str(exc)

def test_server_ip():
assert core.SERVER.ip != None
Expand Down
72 changes: 72 additions & 0 deletions tests/test_launcher_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import sys
from unittest.mock import create_autospec

import ansys.platform.instancemanagement as pypim
import grpc
import pytest
from ansys.dpf import core
from ansys.dpf.core import server
from ansys.dpf.core.server import DpfServer, __ansys_version__


def test_start_remote(monkeypatch):
# Test for the Product Instance Management API integration

# Start a local DPF server and create a mock PyPIM pretending it is starting it
local_server = core.start_local_server(as_global=False)
server_address = local_server._address
mock_instance = pypim.Instance(
definition_name="definitions/fake-dpf",
name="instances/fake-dpf",
ready=True,
status_message=None,
services={"grpc": pypim.Service(uri=server_address, headers={})},
)
# Mock the wait_for_ready method so that it immediately returns
mock_instance.wait_for_ready = create_autospec(mock_instance.wait_for_ready)
# Mock the deletion method
mock_instance.delete = create_autospec(mock_instance.delete)

# Mock the PyPIM client, so that on the "create_instance" call it returns the mock instance
# Note: the host and port here will not be used.
mock_client = pypim.Client(channel=grpc.insecure_channel("localhost:12345"))
mock_client.create_instance = create_autospec(
mock_client.create_instance, return_value=mock_instance
)

# Mock the general pypim connection and configuration check method to expose the mock client.
mock_connect = create_autospec(pypim.connect, return_value=mock_client)
monkeypatch.setattr(pypim, "connect", mock_connect)
monkeypatch.setenv("ANSYS_PLATFORM_INSTANCEMANAGEMENT_CONFIG", "/fake/config.json")

# Call the generic startup sequence with no indication on how to launch it
server = DpfServer(as_global=False)

# It detected the environment and connected to pypim
assert mock_connect.called

# It created a remote instance through PyPIM
mock_client.create_instance.assert_called_with(
product_name="dpf", product_version=__ansys_version__
)

# It waited for this instance to be ready
assert mock_instance.wait_for_ready.called

# It connected using the address provided by PyPIM
assert server._address == server_address

# Stop the server
server.shutdown()

# The delete instance is called
assert mock_instance.delete.called


def test_start_remote_failed(monkeypatch):
# Verifies that the error includes the package name to install when using
# launch_remote_dpf() without the requirements installed.
monkeypatch.setitem(sys.modules, "ansys.platform.instancemanagement", None)
with pytest.raises(ImportError) as exc:
server.launch_remote_dpf()
assert "ansys-platform-instancemanagement" in str(exc)

0 comments on commit 6433bcf

Please sign in to comment.