diff --git a/src/inmanta/agent/agent.py b/src/inmanta/agent/agent.py index 6f21adf07b..a502eb8426 100644 --- a/src/inmanta/agent/agent.py +++ b/src/inmanta/agent/agent.py @@ -185,11 +185,13 @@ def execute(self, dummy, generation, cache): start = datetime.datetime.now() ctx = handler.HandlerContext(self.resource, logger=self.logger) - ctx.debug("Start run for resource %(resource)s because %(reason)s", - resource=str(self.resource.id), - deploy_id=self.gid, - agent=self.scheduler.agent.name, - reason=self.reason) + ctx.debug( + "Start run for resource %(resource)s because %(reason)s", + resource=str(self.resource.id), + deploy_id=self.gid, + agent=self.scheduler.agent.name, + reason=self.reason + ) self.running = True if self.is_done(): @@ -228,19 +230,20 @@ def execute(self, dummy, generation, cache): send_event = False elif not result.success: ctx.set_status(const.ResourceState.skipped) - ctx.info("Resource %(resource)s skipped due to failed dependency %(failed)s", - resource=str(self.resource.id), - failed=self.skipped_because(results)) + ctx.info( + "Resource %(resource)s skipped due to failed dependency %(failed)s", + resource=str(self.resource.id), + failed=self.skipped_because(results) + ) success = False send_event = False yield self._execute(ctx=ctx, events=received_events, cache=cache, event_only=True, start=start) else: success, send_event = yield self._execute(ctx=ctx, events=received_events, cache=cache, start=start) - LOGGER.info("end run %s", self.resource) - ctx.debug("end run for resource %(resource)s with id %(deploy_id)s", - resource=str(self.resource.id), - deploy_id=self.gid) + ctx.debug( + "End run for resource %(resource)s in deploy %(deploy_id)s", resource=str(self.resource.id), deploy_id=self.gid + ) end = datetime.datetime.now() changes = {str(self.resource.id): ctx.changes} @@ -461,7 +464,7 @@ def mark_deployment_as_finished(self, resource_actions, reason, gid): def notify_ready(self, resourceid, send_events, state, change, changes): if resourceid not in self.cad: - self.logger.warning("received CAD notification that was not required, %s", resourceid) + # received CAD notification for which no resource are waiting, so return return self.cad[resourceid].notify(send_events, state, change, changes) diff --git a/src/inmanta/loader.py b/src/inmanta/loader.py index 5fb788bb39..fc980f3577 100644 --- a/src/inmanta/loader.py +++ b/src/inmanta/loader.py @@ -107,13 +107,13 @@ def deploy_version(self, key, mod, persist=False): :param version The version of the deployed modules :modules modules A list of module names and the hashes of the code files """ - LOGGER.info("Deploying code (key=%s)" % key) # deploy the new code name = mod[1] source_code = mod[2] # if the module is new, or update if name not in self.__modules or key != self.__modules[name][0]: + LOGGER.info("Deploying code (key=%s, module=%s)", key, mod[1]) # write the new source source_file = os.path.join(self.__code_dir, MODULE_DIR, name + ".py") diff --git a/src/inmanta/server/config.py b/src/inmanta/server/config.py index 6e5bb7274a..66a05354f7 100644 --- a/src/inmanta/server/config.py +++ b/src/inmanta/server/config.py @@ -112,8 +112,13 @@ def validate_fact_renew(value): server_purge_resource_action_logs_interval = Option("server", "purge-resource-action-logs-interval", 3600, "The number of seconds between resource-action log purging", is_time) -server_resource_action_log = Option("server", "resource_action_log", "resource-actions.log", - "File in log-dir, containing the resource-action logs", is_str_opt) +server_resource_action_log_prefix = Option( + "server", + "resource_action_log_prefix", + "resource-actions-", + "File prefix in log-dir, containing the resource-action logs. The after the prefix the environment uuid and .log is added", + is_str_opt +) ############################# diff --git a/src/inmanta/server/server.py b/src/inmanta/server/server.py index cd968933ff..ef38bbd3e6 100644 --- a/src/inmanta/server/server.py +++ b/src/inmanta/server/server.py @@ -29,6 +29,7 @@ from uuid import UUID import uuid import shutil +import json import dateutil import pymongo @@ -42,17 +43,40 @@ from inmanta.ast import type from inmanta.resources import Id from inmanta.server import config as opt -import json from inmanta.util import hash_file from inmanta.const import UNDEPLOYABLE_STATES from inmanta.protocol import encode_token, methods +from typing import List + LOGGER = logging.getLogger(__name__) agent_lock = locks.Lock() DBLIMIT = 100000 +class ResourceActionLogLine(logging.LogRecord): + """ A special log record that is used to report log lines that come from the agent + """ + def __init__(self, logger_name: str, level: str, msg: str, created: datetime.datetime) -> None: + super().__init__( + name=logger_name, + level=level, + pathname="(unknown file)", + lineno=0, + msg=msg, + args=[], + exc_info=None, + func=None, + sinfo=None + ) + + self.created = created.timestamp() + self.created = self.created + self.msecs = (self.created - int(self.created)) * 1000 + self.relativeCreated = (self.created - logging._startTime) * 1000 + + class Server(protocol.ServerSlice): """ The central Inmanta server that communicates with clients and agents and persists configuration @@ -75,8 +99,8 @@ def __init__(self, database_host=None, database_port=None, agent_no_log=False): self._database_host = database_host self._database_port = database_port - self._resource_action_logger = None - self._resource_action_file_handler = None + self._resource_action_loggers: Dict[uuid.UUID, logging.Logger] = {} + self._resource_action_handlers: Dict[uuid.UUID, logging.Handler] = {} @gen.coroutine def prestart(self, server): @@ -84,8 +108,6 @@ def prestart(self, server): @gen.coroutine def start(self): - yield self._create_resource_action_logger() - if self._database_host is None: self._database_host = opt.db_host.get() @@ -108,32 +130,81 @@ def start(self): @gen.coroutine def stop(self): yield super().stop() - yield self._close_resource_action_logger() + self._close_resource_action_loggers() - @gen.coroutine - def _create_resource_action_logger(self): - resource_action_log = os.path.join(opt.log_dir.get(), opt.server_resource_action_log.get()) - self._file_handler = logging.handlers.WatchedFileHandler(filename=resource_action_log, mode='a+') - formatter = logging.Formatter(fmt="%(asctime)s %(levelname)-8s %(message)s") - self._file_handler.setFormatter(formatter) - self._file_handler.setLevel(logging.DEBUG) - self._resource_action_logger = logging.getLogger(const.NAME_RESOURCE_ACTION_LOGGER) - self._resource_action_logger.setLevel(logging.DEBUG) - self._resource_action_logger.addHandler(self._file_handler) + @staticmethod + def get_resource_action_log_file(environment: uuid.UUID) -> str: + """Get the correct filename for the given environment + :param environment: The environment id to get the file for + :return: The path to the logfile + """ + return os.path.join( + opt.log_dir.get(), + opt.server_resource_action_log_prefix.get() + str(environment) + ".log" + ) + + def get_resource_action_logger(self, environment: uuid.UUID) -> logging.Logger: + """Get the resource action logger for the given environment. If the logger was not created, create it. + :param environment: The environment to get a logger for + :return: The logger for the given environment. + """ + if environment in self._resource_action_loggers: + return self._resource_action_loggers[environment] - @gen.coroutine - def _close_resource_action_logger(self): - if self._resource_action_logger: - logger_copy = self._resource_action_logger - self._resource_action_logger = None - logger_copy.removeHandler(self._file_handler) - self._file_handler.flush() - self._file_handler.close() - self._file_handler = None + resource_action_log = self.get_resource_action_log_file(environment) + + file_handler = logging.handlers.WatchedFileHandler(filename=resource_action_log, mode='a+') + # Most logs will come from agents. We need to use their level and timestamp and their formatted message + file_handler.setFormatter(logging.Formatter(fmt="%(message)s")) + file_handler.setLevel(logging.DEBUG) + + resource_action_logger = logging.getLogger(const.NAME_RESOURCE_ACTION_LOGGER).getChild(str(environment)) + resource_action_logger.setLevel(logging.DEBUG) + resource_action_logger.addHandler(file_handler) + + self._resource_action_loggers[environment] = resource_action_logger + self._resource_action_handlers[environment] = file_handler - def _write_to_resource_action_log(self, log_line): - if self._resource_action_logger: - log_line.write_to_logger(self._resource_action_logger) + return resource_action_logger + + def _close_resource_action_loggers(self) -> None: + """Close all resource action loggers and their associated handlers""" + try: + while True: + env, logger = self._resource_action_loggers.popitem() + self._close_resource_action_logger(env, logger) + except KeyError: + pass + + def _close_resource_action_logger(self, env: uuid.UUID, logger: logging.Logger = None) -> None: + """Close the given logger for the given env. + :param env: The environment to close the logger for + :param logger: The logger to close, if the logger is none it is retrieved + """ + if logger is None: + if env in self._resource_action_loggers: + logger = self._resource_action_loggers.pop(env) + else: + return + + handler = self._resource_action_handlers.pop(env) + logger.removeHandler(handler) + handler.flush() + handler.close() + + def log_resource_action( + self, env: uuid.UUID, resource_ids: List[str], log_level: int, ts: datetime.datetime, message: str + ) -> None: + """Write the given log to the correct resource action logger""" + logger = self.get_resource_action_logger(env) + if len(resource_ids) == 0: + message = "no resources: " + message + elif len(resource_ids) > 1: + message = "multiple resources: " + message + else: + message = resource_ids[0] + ": " + message + log_record = ResourceActionLogLine(logger.name, log_level, message, ts) + logger.handle(log_record) def get_agent_client(self, tid: UUID, endpoint): return self.agentmanager.get_agent_client(tid, endpoint) @@ -709,7 +780,7 @@ def get_all_resources_for_agent(self, env: Environment, agent: str, version: str now = datetime.datetime.now() log_line = data.LogLine.log(logging.INFO, "Resource version pulled by client for agent %(agent)s state", agent=agent) - self._write_to_resource_action_log(log_line) + self.log_resource_action(env.id, resource_ids, logging.INFO, now, log_line.msg) ra = data.ResourceAction(environment=env.id, resource_version_ids=resource_ids, action=const.ResourceAction.pull, action_id=uuid.uuid4(), started=started, finished=now, messages=[log_line]) yield ra.insert() @@ -942,11 +1013,18 @@ def safe_get(input, key, default): for agent in agents: yield self.agentmanager.ensure_agent_registered(env, agent) + now = datetime.datetime.now() log_line = data.LogLine.log(logging.INFO, "Successfully stored version %(version)d", version=version) - self._write_to_resource_action_log(log_line) - ra = data.ResourceAction(environment=env.id, resource_version_ids=resource_version_ids, action_id=uuid.uuid4(), - action=const.ResourceAction.store, started=started, finished=datetime.datetime.now(), - messages=[log_line]) + self.log_resource_action(env.id, resource_version_ids, logging.INFO, now, log_line.msg) + ra = data.ResourceAction( + environment=env.id, + resource_version_ids=resource_version_ids, + action_id=uuid.uuid4(), + action=const.ResourceAction.store, + started=started, + finished=now, + messages=[log_line] + ) yield ra.insert() LOGGER.debug("Successfully stored version %d", version) @@ -995,7 +1073,12 @@ def release_version(self, env, version_id, push, agent_trigger_method=None): # all resources already deployed deployed = yield model.get_increment(negative=True) - logline = {"level": "INFO", "msg": "Setting deployed due to known good status", "timestamp": now, "args": []} + logline = { + "level": "INFO", + "msg": "Setting deployed due to known good status", + "timestamp": now.isoformat(), + "args": [] + } yield self.resource_action_update(env, deployed, action_id=uuid.uuid4(), started=now, finished=now, status=const.ResourceState.deployed, # does this require a different ResourceAction? @@ -1233,11 +1316,15 @@ def resource_action_update(self, env, resource_ids, action_id, action, started, if len(messages) > 0: resource_action.add_logs(messages) - if self._resource_action_logger: - for msg in messages: - # All other data is stored in the database. The msg was already formatted at the client side. - # The only way to disable formatting is, by not passing any args - self._resource_action_logger.log(level=const.LogLevel[msg["level"]].value, msg=msg["msg"]) + for msg in messages: + # All other data is stored in the database. The msg was already formatted at the client side. + self.log_resource_action( + env.id, + resource_ids, + const.LogLevel[msg["level"]].value, + datetime.datetime.strptime(msg["timestamp"], const.TIME_ISOFMT), + msg["msg"] + ) if len(changes) > 0: resource_action.add_changes(changes) @@ -1310,7 +1397,13 @@ def delete_project(self, project_id): if project is None: return 404, {"message": "The project with given id does not exist."} - yield project.delete_cascade() + environments = yield Environment.get_list(project=project.id) + for env in environments: + yield env.delete_cascade() + self._close_resource_action_logger(env) + + yield project.delete() + return 200, {} @protocol.handle(methods.modify_project, project_id="id") @@ -1438,6 +1531,8 @@ def delete_environment(self, environment_id): yield env.delete_cascade() + self._close_resource_action_logger(environment_id) + return 200 @protocol.handle(methods.list_settings, env="tid") diff --git a/tests/test_server.py b/tests/test_server.py index e3965d5103..d2daa85c64 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -25,7 +25,7 @@ import pytest from inmanta.agent.agent import Agent from inmanta import data, const, config -from inmanta.server import config as opt, SLICE_AGENT_MANAGER, SLICE_SESSION_MANAGER +from inmanta.server import config as opt, SLICE_AGENT_MANAGER, SLICE_SESSION_MANAGER, server from datetime import datetime from uuid import UUID from inmanta.util import hash_file @@ -1054,7 +1054,7 @@ async def test_resource_action_log(motor, server_multi, client_multi, environmen res = await client_multi.put_version(tid=environment, version=version, resources=resources, unknowns=[], version_info={}) assert res.code == 200 - resource_action_log = os.path.join(opt.log_dir.get(), opt.server_resource_action_log.get()) + resource_action_log = server.Server.get_resource_action_log_file(environment) assert os.path.isfile(resource_action_log) assert os.stat(resource_action_log).st_size != 0