Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
CORTX-31068: Refactoring needed for the Hare-ha communication module
Browse files Browse the repository at this point in the history
Solution:
Added new base classes for resources and events. These calsses will be
used as parent classes for all the resources and events.

Signed-off-by: Swapnil Gaonkar <swapnil.gaonkar@seagate.com>
  • Loading branch information
SwapnilGaonkar7 committed Jun 10, 2022
1 parent c239790 commit 929122f
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 46 deletions.
41 changes: 39 additions & 2 deletions hax/hax/ha/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,48 @@
# opensource@seagate.com or cortx-questions@seagate.com.

import logging
from hax.ha.const import HEALTH_EVENT_SOURCES, NOT_DEFINED
from hax.ha.utils import HaUtils
from hax.util import ConsulUtil
from cortx.utils.event_framework.health import HealthAttr, HealthEvent


class HaEvent():
def __init__(self):
def __init__(self,
source: str,
cluster_id: str,
site_id: str,
rack_id: str,
storageset_id: str,
node_id: str,
resource_type: str,
resource_id: str,
resource_status: str,
specific_info):
logging.debug('Inside HaEvent')
payload = {
HealthAttr.SOURCE.value: HEALTH_EVENT_SOURCES.HARE.value,
HealthAttr.CLUSTER_ID.value: NOT_DEFINED,
HealthAttr.SITE_ID.value: NOT_DEFINED,
HealthAttr.RACK_ID.value: NOT_DEFINED,
HealthAttr.STORAGESET_ID.value: NOT_DEFINED,
HealthAttr.NODE_ID.value: node_id,
HealthAttr.RESOURCE_TYPE.value: resource_type,
HealthAttr.RESOURCE_ID.value: resource_id,
HealthAttr.RESOURCE_STATUS.value: resource_status,
HealthAttr.SPECIFIC_INFO.value: specific_info
}

def create_event(self, node_id, node_name, health_status):
self.event = HealthEvent(**payload)
# 'specific_info' is resource type specific information.
# For e.g. incase of Node 'generation_id' will be pod name
self.event.set_specific_info(payload['specific_info'])

def send(self, util):
raise NotImplementedError()

def get_subscribers(self, consul_util: ConsulUtil,
resourse_type: str):
ha_util = HaUtils(consul_util)
subscriber_list = ha_util.get_subscribers(consul_util, resourse_type)
return subscriber_list
31 changes: 7 additions & 24 deletions hax/hax/ha/event/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,16 @@
# opensource@seagate.com or cortx-questions@seagate.com.

import logging
from hax.ha.const import HEALTH_EVENT_SOURCES, NOT_DEFINED
from cortx.utils.event_framework.health import HealthAttr, HealthEvent
from hax.ha.event.event import HaEvent
from hax.ha.types import InterfaceMapping


class NodeEvent(HaEvent):
def __init__(self):
logging.debug('Inside NodeEvent')
def send(self, util):
logging.debug('Inside NodeEvent:send')

def create_event(self, node_id, node_name, health_status):
logging.debug('Inside create_event of NodeEvent')
self.payload = {
HealthAttr.SOURCE.value: HEALTH_EVENT_SOURCES.HARE.value,
HealthAttr.CLUSTER_ID.value: NOT_DEFINED,
HealthAttr.SITE_ID.value: NOT_DEFINED,
HealthAttr.RACK_ID.value: NOT_DEFINED,
HealthAttr.STORAGESET_ID.value: NOT_DEFINED,
HealthAttr.NODE_ID.value: node_id,
HealthAttr.RESOURCE_TYPE.value: 'node',
HealthAttr.RESOURCE_ID.value: node_id,
HealthAttr.RESOURCE_STATUS.value: health_status,
HealthAttr.SPECIFIC_INFO.value: {}
}
subscribers = self.get_subscribers(util, 'node')

self.event = HealthEvent(**self.payload)

# 'specific_info' is resource type specific information.
# For e.g. incase of Node 'generation_id' will be pod name
self.event.set_specific_info({"generation_id": node_name})
return self.event.json
for interface in subscribers:
sender = InterfaceMapping[interface]()
sender.send(self.event.json, util)
36 changes: 16 additions & 20 deletions hax/hax/ha/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,52 +17,48 @@

import logging
from typing import List, NamedTuple, Optional
from hax.ha.event.node import NodeEvent
from hax.ha.message_type.message_type import HealthMessage
from hax.types import HAState, ObjHealth, ObjT, Fid
from hax.ha.resource.node import Node
from hax.ha.resource.resource import ResourceType
from hax.ha.event.event import HaEvent
from hax.util import ConsulUtil
from cortx.utils.conf_store import Conf

LOG = logging.getLogger('hax')


Resource = NamedTuple('Resource', [('type', ObjT), ('id', str),
Resource = NamedTuple('Resource', [('type', ResourceType), ('id', str),
('name', str), ('status', str)])


class Ha():
resources = {ObjT.NODE: NodeEvent}
# currently only online is supported for node status
message_types = {ObjT.NODE: [HealthMessage]}

def __init__(self, util: ConsulUtil):
self.util = util

def send_event(self, res: Resource):
resource = self.resources[res.type]()
event = resource.create_event(res.id,
res.name,
res.status)
event: HaEvent = res.type.create_event(res.id,
res.name,
res.status)

interfaces = self.message_types[res.type]
for interface in interfaces:
sender = interface()
sender.send(event, self.util)
event.send(self.util)

def check_and_send(self,
parent_resource_type: ObjT,
fid: Fid,
resource_status: str):
resources = {ObjT.NODE: Node}

# TODO Need to have generic function to get resource status
logging.debug('Inside check_and_send')
if (self.util.get_local_node_status() == resource_status and
self.util.is_proc_local(fid)):
resource = self.util.get_process_node(fid)
resource_name = self.util.get_process_node(fid)
resource_id = str(Conf.machine_id)
LOG.debug('Sending %s event for resource %s',
resource_status, resource)
self.send_event(Resource(type=parent_resource_type,
resource_status, resource_name)
self.send_event(Resource(type=resources[parent_resource_type](),
id=resource_id,
name=resource,
name=resource_name,
status=resource_status))

def generate_event_for_process(self,
Expand All @@ -81,7 +77,7 @@ def generate_event_for_process(self,
# when node is offline, rc node sends the offline event.
resource_id = cns.get_machineid_by_nodename(node_name)
if resource_id:
resource = Resource(type=ObjT.NODE,
resource = Resource(type=Node(),
id=resource_id,
name=node_name,
status=state)
Expand Down
19 changes: 19 additions & 0 deletions hax/hax/ha/resource/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright (c) 2022 Seagate Technology LLC and/or its Affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# For any questions about this software or licensing,
# please email opensource@seagate.com or cortx-questions@seagate.com.
#

# empty
41 changes: 41 additions & 0 deletions hax/hax/ha/resource/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) 2022 Seagate Technology LLC and/or its Affiliates
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
# for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# For any questions about this software or licensing, please email
# opensource@seagate.com or cortx-questions@seagate.com.

import logging
from hax.ha.const import HEALTH_EVENT_SOURCES, NOT_DEFINED
from hax.ha.event.event import HaEvent
from hax.ha.event.node import NodeEvent
from hax.ha.resource.resource import ResourceType


class Node(ResourceType):
def create_event(self, resource_id,
resource_name, resource_status) -> HaEvent:
logging.debug('Inside Node:create_event')
event: NodeEvent = NodeEvent(source=HEALTH_EVENT_SOURCES.HARE.value,
cluster_id=NOT_DEFINED,
site_id=NOT_DEFINED,
rack_id=NOT_DEFINED,
storageset_id=NOT_DEFINED,
node_id=resource_id,
resource_type='node',
resource_id=resource_id,
resource_status=resource_status,
specific_info={"generation_id":
resource_name})

return event
26 changes: 26 additions & 0 deletions hax/hax/ha/resource/resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (c) 2022 Seagate Technology LLC and/or its Affiliates
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
# for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# For any questions about this software or licensing, please email
# opensource@seagate.com or cortx-questions@seagate.com.

import logging


class ResourceType():
def __init__(self):
logging.debug('Inside ResourceType')

def create_event(self, resource_id, resource_name, resource_status):
raise NotImplementedError()
23 changes: 23 additions & 0 deletions hax/hax/ha/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright (c) 2022 Seagate Technology LLC and/or its Affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# For any questions about this software or licensing,
# please email opensource@seagate.com or cortx-questions@seagate.com.
#

from hax.ha.message_type.message_type import HealthMessage

InterfaceMapping = {
'health_message': HealthMessage
}
94 changes: 94 additions & 0 deletions hax/hax/ha/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright (c) 2022 Seagate Technology LLC and/or its Affiliates
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
# for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# For any questions about this software or licensing, please email
# opensource@seagate.com or cortx-questions@seagate.com.

import json
from typing import Dict, List, Optional
from hax.util import ConsulUtil

cached_subscriber_list: List = []
is_subscriber_list_cached: bool = False


class HaUtils:
resource_types = ['node']
interface_types = ['health_message']

def __init__(self, consul_util: ConsulUtil):
self.util = consul_util
self.get_subscribers(self.util, None)

# If resourse_type is None then all resource_types will be fetched
def get_subscribers(self, consul_util: ConsulUtil,
resourse_type: Optional[str]):
global cached_subscriber_list, is_subscriber_list_cached
subscriber_list: List = []
if is_subscriber_list_cached:
return cached_subscriber_list

if resourse_type:
subscribers = consul_util.kv.kv_get(
f'events/subscription/{resourse_type}',
allow_null=True)
subscriber_list = json.loads(subscribers['Value'])
else:
for resourse_type in self.resource_types:
subscribers = consul_util.kv.kv_get(
f'events/subscription/{resourse_type}',
allow_null=True)
if subscribers:
subscriber_list.append(json.loads(subscribers['Value']))
cached_subscriber_list = subscriber_list
is_subscriber_list_cached = True
return subscriber_list

def event_subscribe(self, data: Dict[str, str]):
global cached_subscriber_list
for item in data.keys():
if item not in self.resource_types:
raise Exception(f'Invalid resource type({item})')
if data[item] not in self.interface_types:
raise Exception(f'Invalid interface type({data[item]})')

subscriber = self.util.kv.kv_get(f'events/subscription/{item}',
allow_null=True)
subscriber_list = []
if subscriber:
subscriber_list = json.loads(subscriber['Value'])
if data[item] not in subscriber_list:
subscriber_list.append(data[item])
self.util.kv.kv_put(f'events/subscription/{item}',
json.dumps(subscriber_list))
cached_subscriber_list = subscriber_list

def event_unsubscribe(self, data: Dict[str, str]):
global cached_subscriber_list
for item in data.keys():
if item not in self.resource_types:
raise Exception(f'Invalid reource type({item})')
if data[item] not in self.interface_types:
raise Exception(f'Invalid interface type({data[item]})')

subscriber = self.util.kv.kv_get(f'events/subscription/{item}',
allow_null=True)
subscriber_list = []
if subscriber:
subscriber_list = json.loads(subscriber['Value'])
if data[item] in subscriber_list:
subscriber_list.remove(data[item])
self.util.kv.kv_put(f'events/subscription/{item}',
json.dumps(subscriber_list))
cached_subscriber_list = subscriber_list
4 changes: 4 additions & 0 deletions hax/hax/hax.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from hax.exception import HAConsistencyException
from hax.filestats import FsStatsUpdater
from hax.ha import create_ha_thread
from hax.ha.utils import HaUtils
from hax.handler import ConsumerThread
from hax.log import setup_logging
from hax.motr import Motr
Expand Down Expand Up @@ -190,6 +191,9 @@ def handle_signal(sig, frame):
cfg: HL_Fids = _get_motr_fids(util)
hax_http_port = util.get_hax_http_port()
util.init_motr_processes_status()
# By default health_message will be subscribed to 'node' events
ha_util = HaUtils(util)
ha_util.event_subscribe({'node': 'health_message'})

LOG.info('Welcome to HaX')
LOG.info(f'Setting up ha_link interface with the options as follows: '
Expand Down
Loading

0 comments on commit 929122f

Please sign in to comment.