diff --git a/hax/hax/ha/event/event.py b/hax/hax/ha/event/event.py index e0bb74352..44750aa54 100644 --- a/hax/hax/ha/event/event.py +++ b/hax/hax/ha/event/event.py @@ -16,11 +16,48 @@ # opensource@seagate.com or cortx-questions@seagate.com. import logging +from hax.ha.const import HEALTH_EVENT_SOURCES, NOT_DEFINED +from hax.ha.utils import HaUtils +from hax.util import ConsulUtil +from cortx.utils.event_framework.health import HealthAttr, HealthEvent class HaEvent(): - def __init__(self): + def __init__(self, + source: str, + cluster_id: str, + site_id: str, + rack_id: str, + storageset_id: str, + node_id: str, + resource_type: str, + resource_id: str, + resource_status: str, + specific_info): logging.debug('Inside HaEvent') + payload = { + HealthAttr.SOURCE.value: HEALTH_EVENT_SOURCES.HARE.value, + HealthAttr.CLUSTER_ID.value: NOT_DEFINED, + HealthAttr.SITE_ID.value: NOT_DEFINED, + HealthAttr.RACK_ID.value: NOT_DEFINED, + HealthAttr.STORAGESET_ID.value: NOT_DEFINED, + HealthAttr.NODE_ID.value: node_id, + HealthAttr.RESOURCE_TYPE.value: resource_type, + HealthAttr.RESOURCE_ID.value: resource_id, + HealthAttr.RESOURCE_STATUS.value: resource_status, + HealthAttr.SPECIFIC_INFO.value: specific_info + } - def create_event(self, node_id, node_name, health_status): + self.event = HealthEvent(**payload) + # 'specific_info' is resource type specific information. + # For e.g. incase of Node 'generation_id' will be pod name + self.event.set_specific_info(payload['specific_info']) + + def send(self, util): raise NotImplementedError() + + def get_subscribers(self, consul_util: ConsulUtil, + resourse_type: str): + ha_util = HaUtils(consul_util) + subscriber_list = ha_util.get_subscribers(consul_util, resourse_type) + return subscriber_list diff --git a/hax/hax/ha/event/node.py b/hax/hax/ha/event/node.py index 4b448d0a1..00dc63ca0 100644 --- a/hax/hax/ha/event/node.py +++ b/hax/hax/ha/event/node.py @@ -16,33 +16,16 @@ # opensource@seagate.com or cortx-questions@seagate.com. import logging -from hax.ha.const import HEALTH_EVENT_SOURCES, NOT_DEFINED -from cortx.utils.event_framework.health import HealthAttr, HealthEvent from hax.ha.event.event import HaEvent +from hax.ha.types import InterfaceMapping class NodeEvent(HaEvent): - def __init__(self): - logging.debug('Inside NodeEvent') + def send(self, util): + logging.debug('Inside NodeEvent:send') - def create_event(self, node_id, node_name, health_status): - logging.debug('Inside create_event of NodeEvent') - self.payload = { - HealthAttr.SOURCE.value: HEALTH_EVENT_SOURCES.HARE.value, - HealthAttr.CLUSTER_ID.value: NOT_DEFINED, - HealthAttr.SITE_ID.value: NOT_DEFINED, - HealthAttr.RACK_ID.value: NOT_DEFINED, - HealthAttr.STORAGESET_ID.value: NOT_DEFINED, - HealthAttr.NODE_ID.value: node_id, - HealthAttr.RESOURCE_TYPE.value: 'node', - HealthAttr.RESOURCE_ID.value: node_id, - HealthAttr.RESOURCE_STATUS.value: health_status, - HealthAttr.SPECIFIC_INFO.value: {} - } + subscribers = self.get_subscribers(util, 'node') - self.event = HealthEvent(**self.payload) - - # 'specific_info' is resource type specific information. - # For e.g. incase of Node 'generation_id' will be pod name - self.event.set_specific_info({"generation_id": node_name}) - return self.event.json + for interface in subscribers: + sender = InterfaceMapping[interface]() + sender.send(self.event.json, util) diff --git a/hax/hax/ha/ha.py b/hax/hax/ha/ha.py index 24f3316a2..68c2a8f88 100644 --- a/hax/hax/ha/ha.py +++ b/hax/hax/ha/ha.py @@ -17,52 +17,48 @@ import logging from typing import List, NamedTuple, Optional -from hax.ha.event.node import NodeEvent -from hax.ha.message_type.message_type import HealthMessage from hax.types import HAState, ObjHealth, ObjT, Fid +from hax.ha.resource.node import Node +from hax.ha.resource.resource import ResourceType +from hax.ha.event.event import HaEvent from hax.util import ConsulUtil from cortx.utils.conf_store import Conf LOG = logging.getLogger('hax') -Resource = NamedTuple('Resource', [('type', ObjT), ('id', str), +Resource = NamedTuple('Resource', [('type', ResourceType), ('id', str), ('name', str), ('status', str)]) class Ha(): - resources = {ObjT.NODE: NodeEvent} - # currently only online is supported for node status - message_types = {ObjT.NODE: [HealthMessage]} - def __init__(self, util: ConsulUtil): self.util = util def send_event(self, res: Resource): - resource = self.resources[res.type]() - event = resource.create_event(res.id, - res.name, - res.status) + event: HaEvent = res.type.create_event(res.id, + res.name, + res.status) - interfaces = self.message_types[res.type] - for interface in interfaces: - sender = interface() - sender.send(event, self.util) + event.send(self.util) def check_and_send(self, parent_resource_type: ObjT, fid: Fid, resource_status: str): + resources = {ObjT.NODE: Node} + # TODO Need to have generic function to get resource status + logging.debug('Inside check_and_send') if (self.util.get_local_node_status() == resource_status and self.util.is_proc_local(fid)): - resource = self.util.get_process_node(fid) + resource_name = self.util.get_process_node(fid) resource_id = str(Conf.machine_id) LOG.debug('Sending %s event for resource %s', - resource_status, resource) - self.send_event(Resource(type=parent_resource_type, + resource_status, resource_name) + self.send_event(Resource(type=resources[parent_resource_type](), id=resource_id, - name=resource, + name=resource_name, status=resource_status)) def generate_event_for_process(self, @@ -81,7 +77,7 @@ def generate_event_for_process(self, # when node is offline, rc node sends the offline event. resource_id = cns.get_machineid_by_nodename(node_name) if resource_id: - resource = Resource(type=ObjT.NODE, + resource = Resource(type=Node(), id=resource_id, name=node_name, status=state) diff --git a/hax/hax/ha/resource/__init__.py b/hax/hax/ha/resource/__init__.py new file mode 100644 index 000000000..a0f2b6595 --- /dev/null +++ b/hax/hax/ha/resource/__init__.py @@ -0,0 +1,19 @@ +# Copyright (c) 2022 Seagate Technology LLC and/or its Affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# For any questions about this software or licensing, +# please email opensource@seagate.com or cortx-questions@seagate.com. +# + +# empty diff --git a/hax/hax/ha/resource/node.py b/hax/hax/ha/resource/node.py new file mode 100644 index 000000000..fcab0b207 --- /dev/null +++ b/hax/hax/ha/resource/node.py @@ -0,0 +1,41 @@ +# Copyright (c) 2022 Seagate Technology LLC and/or its Affiliates +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, +# or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License +# for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# For any questions about this software or licensing, please email +# opensource@seagate.com or cortx-questions@seagate.com. + +import logging +from hax.ha.const import HEALTH_EVENT_SOURCES, NOT_DEFINED +from hax.ha.event.event import HaEvent +from hax.ha.event.node import NodeEvent +from hax.ha.resource.resource import ResourceType + + +class Node(ResourceType): + def create_event(self, resource_id, + resource_name, resource_status) -> HaEvent: + logging.debug('Inside Node:create_event') + event: NodeEvent = NodeEvent(source=HEALTH_EVENT_SOURCES.HARE.value, + cluster_id=NOT_DEFINED, + site_id=NOT_DEFINED, + rack_id=NOT_DEFINED, + storageset_id=NOT_DEFINED, + node_id=resource_id, + resource_type='node', + resource_id=resource_id, + resource_status=resource_status, + specific_info={"generation_id": + resource_name}) + + return event diff --git a/hax/hax/ha/resource/resource.py b/hax/hax/ha/resource/resource.py new file mode 100644 index 000000000..4c98ae0b0 --- /dev/null +++ b/hax/hax/ha/resource/resource.py @@ -0,0 +1,26 @@ +# Copyright (c) 2022 Seagate Technology LLC and/or its Affiliates +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, +# or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License +# for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# For any questions about this software or licensing, please email +# opensource@seagate.com or cortx-questions@seagate.com. + +import logging + + +class ResourceType(): + def __init__(self): + logging.debug('Inside ResourceType') + + def create_event(self, resource_id, resource_name, resource_status): + raise NotImplementedError() diff --git a/hax/hax/ha/types.py b/hax/hax/ha/types.py new file mode 100644 index 000000000..26eb2b704 --- /dev/null +++ b/hax/hax/ha/types.py @@ -0,0 +1,23 @@ +# Copyright (c) 2022 Seagate Technology LLC and/or its Affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# For any questions about this software or licensing, +# please email opensource@seagate.com or cortx-questions@seagate.com. +# + +from hax.ha.message_type.message_type import HealthMessage + +InterfaceMapping = { + 'health_message': HealthMessage +} diff --git a/hax/hax/ha/utils.py b/hax/hax/ha/utils.py new file mode 100644 index 000000000..c17340552 --- /dev/null +++ b/hax/hax/ha/utils.py @@ -0,0 +1,94 @@ +# Copyright (c) 2022 Seagate Technology LLC and/or its Affiliates +# +# This program is free software: you can redistribute it and/or modify it +# under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, +# or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License +# for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +# For any questions about this software or licensing, please email +# opensource@seagate.com or cortx-questions@seagate.com. + +import json +from typing import Dict, List, Optional +from hax.util import ConsulUtil + +cached_subscriber_list: List = [] +is_subscriber_list_cached: bool = False + + +class HaUtils: + resource_types = ['node'] + interface_types = ['health_message'] + + def __init__(self, consul_util: ConsulUtil): + self.util = consul_util + self.get_subscribers(self.util, None) + + # If resourse_type is None then all resource_types will be fetched + def get_subscribers(self, consul_util: ConsulUtil, + resourse_type: Optional[str]): + global cached_subscriber_list, is_subscriber_list_cached + subscriber_list: List = [] + if is_subscriber_list_cached: + return cached_subscriber_list + + if resourse_type: + subscribers = consul_util.kv.kv_get( + f'events/subscription/{resourse_type}', + allow_null=True) + subscriber_list = json.loads(subscribers['Value']) + else: + for resourse_type in self.resource_types: + subscribers = consul_util.kv.kv_get( + f'events/subscription/{resourse_type}', + allow_null=True) + if subscribers: + subscriber_list.append(json.loads(subscribers['Value'])) + cached_subscriber_list = subscriber_list + is_subscriber_list_cached = True + return subscriber_list + + def event_subscribe(self, data: Dict[str, str]): + global cached_subscriber_list + for item in data.keys(): + if item not in self.resource_types: + raise Exception(f'Invalid resource type({item})') + if data[item] not in self.interface_types: + raise Exception(f'Invalid interface type({data[item]})') + + subscriber = self.util.kv.kv_get(f'events/subscription/{item}', + allow_null=True) + subscriber_list = [] + if subscriber: + subscriber_list = json.loads(subscriber['Value']) + if data[item] not in subscriber_list: + subscriber_list.append(data[item]) + self.util.kv.kv_put(f'events/subscription/{item}', + json.dumps(subscriber_list)) + cached_subscriber_list = subscriber_list + + def event_unsubscribe(self, data: Dict[str, str]): + global cached_subscriber_list + for item in data.keys(): + if item not in self.resource_types: + raise Exception(f'Invalid reource type({item})') + if data[item] not in self.interface_types: + raise Exception(f'Invalid interface type({data[item]})') + + subscriber = self.util.kv.kv_get(f'events/subscription/{item}', + allow_null=True) + subscriber_list = [] + if subscriber: + subscriber_list = json.loads(subscriber['Value']) + if data[item] in subscriber_list: + subscriber_list.remove(data[item]) + self.util.kv.kv_put(f'events/subscription/{item}', + json.dumps(subscriber_list)) + cached_subscriber_list = subscriber_list diff --git a/hax/hax/hax.py b/hax/hax/hax.py index d676709e2..8465634b3 100755 --- a/hax/hax/hax.py +++ b/hax/hax/hax.py @@ -33,6 +33,7 @@ from hax.exception import HAConsistencyException from hax.filestats import FsStatsUpdater from hax.ha import create_ha_thread +from hax.ha.utils import HaUtils from hax.handler import ConsumerThread from hax.log import setup_logging from hax.motr import Motr @@ -190,6 +191,9 @@ def handle_signal(sig, frame): cfg: HL_Fids = _get_motr_fids(util) hax_http_port = util.get_hax_http_port() util.init_motr_processes_status() + # By default health_message will be subscribed to 'node' events + ha_util = HaUtils(util) + ha_util.event_subscribe({'node': 'health_message'}) LOG.info('Welcome to HaX') LOG.info(f'Setting up ha_link interface with the options as follows: ' diff --git a/hax/hax/server.py b/hax/hax/server.py index 0d15c6568..90b4e5002 100644 --- a/hax/hax/server.py +++ b/hax/hax/server.py @@ -47,6 +47,7 @@ from hax.util import ConsulUtil, create_process_fid, dump_json from helper.exec import Executor, Program from hax.util import repeat_if_fails +from hax.ha.utils import HaUtils LOG = logging.getLogger('hax') @@ -283,6 +284,40 @@ def fn(): return _process +def event_subscription_handle(consul_util: ConsulUtil): + async def _process(request): + data = await request.json() + + loop = asyncio.get_event_loop() + + try: + ha_util = HaUtils(consul_util) + await loop.run_in_executor(None, ha_util.event_subscribe, data) + except Exception as e: + LOG.exception(f'Event subscribe error: {e}') + return web.Response(text=f'Event subscribe error: {e}') + return web.Response() + + return _process + + +def event_unsubscription_handle(consul_util: ConsulUtil): + async def _process(request): + data = await request.json() + + loop = asyncio.get_event_loop() + + try: + ha_util = HaUtils(consul_util) + await loop.run_in_executor(None, ha_util.event_unsubscribe, data) + except Exception as e: + LOG.exception(f'Event unsubscribe error: {e}') + return web.Response(text=f'Event unsubscribe error: {e}') + return web.Response() + + return _process + + @web.middleware async def encode_exception(request, handler): def error_response(e: Exception, code=500, reason=""): @@ -369,6 +404,10 @@ def _configure(self) -> None: get_sns_status(planner, SnsRepairStatus)), web.get('/api/v1/sns/rebalance-status', get_sns_status(planner, SnsRebalanceStatus)), + web.post('/v1/events/subscribe', + event_subscription_handle(consul_util)), + web.post('/v1/events/unsubscribe', + event_unsubscription_handle(consul_util)), ]) self.app = app except Exception as e: