Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSI-5277/ separate addon code from controller server #656

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions controllers/scripts/csi_general/csi_pb2.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cd ./proto/${PB2_DIR}
curl -O https://raw.githubusercontent.com/container-storage-interface/spec/${CSI_VERSION}/csi.proto
curl -O https://raw.githubusercontent.com/IBM/csi-volume-group/${VG_VERSION}/volumegroup/volumegroup.proto
curl -O https://raw.githubusercontent.com/csi-addons/spec/v0.2.0/replication/replication.proto
curl -O https://raw.githubusercontent.com/csi-addons/spec/main/identity/identity.proto
sed -i 's|github.com/container-storage-interface/spec/lib/go/csi/csi.proto|csi_general/csi.proto|g' replication.proto
cd -

Expand Down
57 changes: 0 additions & 57 deletions controllers/servers/csi/controller_server_manager.py

This file was deleted.

Empty file.
57 changes: 54 additions & 3 deletions controllers/servers/csi/main.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,70 @@
import os
from argparse import ArgumentParser
from threading import Thread
from concurrent import futures
import grpc
from concurrent import futures

from csi_general import csi_pb2_grpc, volumegroup_pb2_grpc, identity_pb2_grpc, replication_pb2_grpc

from controllers.common.csi_logger import set_log_level
from controllers.servers.csi.controller_server_manager import ControllerServerManager
from controllers.common.settings import CSI_CONTROLLER_SERVER_WORKERS
from controllers.servers.csi.server_manager import ServerManager
from controllers.servers.csi.csi_controller_server import CSIControllerServicer
from controllers.servers.csi.volume_group_server import VolumeGroupControllerServicer
from controllers.servers.csi.csi_addons_server.replication_controller_servicer import ReplicationControllerServicer


def main():
parser = ArgumentParser()
parser.add_argument("-e", "--csi-endpoint", dest="endpoint", help="grpc endpoint")
parser.add_argument("-a", "--csi-addons-endpoint", dest="addonsendpoint", help="CSI-Addons grpc endpoint")
parser.add_argument("-l", "--loglevel", dest="loglevel", help="log level")
arguments = parser.parse_args()

set_log_level(arguments.loglevel)
controller_server = _create_grpc_server()
csi_addons_server = _create_grpc_server()

csi_controller_server_manager = ServerManager(arguments.endpoint, "Controller",
_add_csi_controller_servicers(controller_server))
csi_addons_server_manager = ServerManager(arguments.addonsendpoint, "CSI Addons",
_add_csi_addons_servicers(csi_addons_server))
_start_servers(csi_controller_server_manager, csi_addons_server_manager)


def _create_grpc_server():
max_workers = _get_max_workers_count()
return grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))


def _get_max_workers_count():
cpu_count = (os.cpu_count() or 1) + 4
return CSI_CONTROLLER_SERVER_WORKERS if cpu_count < CSI_CONTROLLER_SERVER_WORKERS else None


def _add_csi_controller_servicers(controller_server):
csi_servicer = CSIControllerServicer()
volume_group_servicer = VolumeGroupControllerServicer()
csi_pb2_grpc.add_ControllerServicer_to_server(csi_servicer, controller_server)
csi_pb2_grpc.add_IdentityServicer_to_server(csi_servicer, controller_server)
volumegroup_pb2_grpc.add_ControllerServicer_to_server(volume_group_servicer, controller_server)
return controller_server


def _add_csi_addons_servicers(csi_addons_server):
replication_servicer = ReplicationControllerServicer()
replication_pb2_grpc.add_ControllerServicer_to_server(replication_servicer, csi_addons_server)
return csi_addons_server


server_manager = ControllerServerManager(arguments.endpoint)
server_manager.start_server()
def _start_servers(csi_controller_server_manager, csi_addons_server_manager):
servers = (
csi_controller_server_manager.start_server,
csi_addons_server_manager.start_server)
for server_function in servers:
thread = Thread(target=server_function,)
thread.start()


if __name__ == '__main__':
Expand Down
34 changes: 34 additions & 0 deletions controllers/servers/csi/server_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import time

from controllers.common.config import config
from controllers.common.csi_logger import get_stdout_logger

logger = get_stdout_logger()


class ServerManager:
def __init__(self, array_endpoint, server_type, grpc_server):
self.endpoint = array_endpoint
self.server_type = server_type
self.grpc_server = grpc_server

def start_server(self):
# bind the server to the port defined above
# grpc_server.add_insecure_port('[::]:{}'.format(self.server_port))
# grpc_server.add_insecure_port('unix://{}'.format(self.server_port))
self.grpc_server.add_insecure_port(self.endpoint)

logger.info("{} version: {}".format(self.server_type, config.identity.version))

# start the server
logger.debug("Listening for connections on endpoint address: {}".format(self.endpoint))

self.grpc_server.start()
logger.debug('{} Server running ...'.format(self.server_type))

try:
while True:
time.sleep(60 * 60 * 60)
except KeyboardInterrupt:
self.grpc_server.stop(0)
logger.debug('Controller Server Stopped ...')
4 changes: 2 additions & 2 deletions controllers/tests/controller_server/addons_server_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from controllers.servers.settings import PARAMETERS_SYSTEM_ID, PARAMETERS_COPY_TYPE, PARAMETERS_REPLICATION_POLICY
from controllers.array_action.settings import REPLICATION_TYPE_MIRROR, REPLICATION_TYPE_EAR, REPLICATION_COPY_TYPE_SYNC
from controllers.array_action.array_action_types import ReplicationRequest
from controllers.servers.csi.addons_server import ReplicationControllerServicer
from controllers.servers.csi.csi_addons_server.replication_controller_servicer import ReplicationControllerServicer
from controllers.tests import utils
from controllers.tests.common.test_settings import VOLUME_NAME, VOLUME_UID, OBJECT_INTERNAL_ID, \
OTHER_OBJECT_INTERNAL_ID, REPLICATION_NAME, SYSTEM_ID, COPY_TYPE, SECRET_USERNAME_VALUE, SECRET_PASSWORD_VALUE, \
Expand All @@ -16,7 +16,7 @@
from controllers.tests.controller_server.csi_controller_server_test import (CommonControllerTest)
from controllers.tests.utils import ProtoBufMock

ADDON_SERVER_PATH = "controllers.servers.csi.addons_server"
ADDON_SERVER_PATH = "controllers.servers.csi.csi_addons_server.replication_controller_servicer"


class BaseReplicationSetUp(unittest.TestCase):
Expand Down