-
Notifications
You must be signed in to change notification settings - Fork 8
/
pluginbase.py
130 lines (105 loc) · 3.55 KB
/
pluginbase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import logging
import threading
import typing
from abc import ABC
from abc import abstractmethod
from typing import Optional
from ktoolbox import common
from tftbase import PluginOutput
from tftbase import PluginResult
from tftbase import TestMetadata
logger = logging.getLogger("tft." + __name__)
class Plugin(ABC):
PLUGIN_NAME = ""
@property
def log_name(self) -> str:
return f"plugin[{self.PLUGIN_NAME}"
def enable(
self,
*,
ts: "TestSettings",
node_server_name: str,
node_client_name: str,
perf_server: "ServerTask",
perf_client: "ClientTask",
tenant: bool,
) -> list["PluginTask"]:
tasks = self._enable(
ts=ts,
node_server_name=node_server_name,
node_client_name=node_client_name,
perf_server=perf_server,
perf_client=perf_client,
tenant=tenant,
)
logger.debug(f"{self.log_name}: enable ({len(tasks)} tasks, {tasks})")
return tasks
@abstractmethod
def _enable(
self,
*,
ts: "TestSettings",
node_server_name: str,
node_client_name: str,
perf_server: "ServerTask",
perf_client: "ClientTask",
tenant: bool,
) -> list["PluginTask"]:
pass
def eval_plugin_output(
self,
md: TestMetadata,
plugin_output: PluginOutput,
) -> Optional[PluginResult]:
if not plugin_output.success:
logger.error(
f"{self.PLUGIN_NAME} plugin failed for {common.dataclass_to_json(md)}: {plugin_output.err_msg}"
)
else:
logger.debug(
f"{self.PLUGIN_NAME} plugin succeded for {common.dataclass_to_json(md)}"
)
return PluginResult(
tft_metadata=md,
plugin_name=self.PLUGIN_NAME,
success=plugin_output.success,
msg=plugin_output.err_msg,
plugin_output=plugin_output,
)
_plugin_registry_lock = threading.Lock()
_plugin_registry: dict[str, Plugin] = {}
def _get_plugin_registry() -> dict[str, Plugin]:
# Plugins self-register via pluginbase.register_plugin()
# when being imported. Ensure they are imported.
import pluginMeasureCpu # noqa: F401
import pluginMeasurePower # noqa: F401
import pluginValidateOffload # noqa: F401
return _plugin_registry
def register_plugin(plugin: Plugin) -> Plugin:
name = plugin.PLUGIN_NAME
with _plugin_registry_lock:
p2 = _plugin_registry.setdefault(name, plugin)
if p2 is not plugin:
raise ValueError(f"plugin {repr(name)} is already registered")
return plugin
def get_all() -> list[Plugin]:
registry = _get_plugin_registry()
with _plugin_registry_lock:
return [registry[k] for k in sorted(registry)]
def get_by_name(plugin_name: str) -> Plugin:
registry = _get_plugin_registry()
with _plugin_registry_lock:
plugin = registry.get(plugin_name)
if plugin is None:
raise ValueError(f'Plugin "{plugin_name}" does not exist')
return plugin
if typing.TYPE_CHECKING:
# "pluginbase" cannot import modules like perf, task or testSettings, because
# those modules import "pluginbase" in turn. However, to forward declare
# type annotations, we do need those module here. Import them with
# TYPE_CHECKING, but otherwise avoid the cyclic dependency between
# modules.
from task import ClientTask
from task import ServerTask
from task import PluginTask
from testSettings import TestSettings