Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More log cleanup #982

Merged
merged 4 commits into from
Mar 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions src/inmanta/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,13 @@ def execute(self, dummy, generation, cache):
start = datetime.datetime.now()
ctx = handler.HandlerContext(self.resource, logger=self.logger)

ctx.debug("Start run for resource %(resource)s because %(reason)s",
resource=str(self.resource.id),
deploy_id=self.gid,
agent=self.scheduler.agent.name,
reason=self.reason)
ctx.debug(
"Start run for resource %(resource)s because %(reason)s",
resource=str(self.resource.id),
deploy_id=self.gid,
agent=self.scheduler.agent.name,
reason=self.reason
)

self.running = True
if self.is_done():
Expand Down Expand Up @@ -228,19 +230,20 @@ def execute(self, dummy, generation, cache):
send_event = False
elif not result.success:
ctx.set_status(const.ResourceState.skipped)
ctx.info("Resource %(resource)s skipped due to failed dependency %(failed)s",
resource=str(self.resource.id),
failed=self.skipped_because(results))
ctx.info(
"Resource %(resource)s skipped due to failed dependency %(failed)s",
resource=str(self.resource.id),
failed=self.skipped_because(results)
)
success = False
send_event = False
yield self._execute(ctx=ctx, events=received_events, cache=cache, event_only=True, start=start)
else:
success, send_event = yield self._execute(ctx=ctx, events=received_events, cache=cache, start=start)

LOGGER.info("end run %s", self.resource)
ctx.debug("end run for resource %(resource)s with id %(deploy_id)s",
resource=str(self.resource.id),
deploy_id=self.gid)
ctx.debug(
"End run for resource %(resource)s in deploy %(deploy_id)s", resource=str(self.resource.id), deploy_id=self.gid
)

end = datetime.datetime.now()
changes = {str(self.resource.id): ctx.changes}
Expand Down Expand Up @@ -461,7 +464,7 @@ def mark_deployment_as_finished(self, resource_actions, reason, gid):

def notify_ready(self, resourceid, send_events, state, change, changes):
if resourceid not in self.cad:
self.logger.warning("received CAD notification that was not required, %s", resourceid)
# received CAD notification for which no resource are waiting, so return
return
self.cad[resourceid].notify(send_events, state, change, changes)

Expand Down
2 changes: 1 addition & 1 deletion src/inmanta/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ def deploy_version(self, key, mod, persist=False):
:param version The version of the deployed modules
:modules modules A list of module names and the hashes of the code files
"""
LOGGER.info("Deploying code (key=%s)" % key)
# deploy the new code
name = mod[1]
source_code = mod[2]

# if the module is new, or update
if name not in self.__modules or key != self.__modules[name][0]:
LOGGER.info("Deploying code (key=%s, module=%s)", key, mod[1])
# write the new source
source_file = os.path.join(self.__code_dir, MODULE_DIR, name + ".py")

Expand Down
9 changes: 7 additions & 2 deletions src/inmanta/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,13 @@ def validate_fact_renew(value):
server_purge_resource_action_logs_interval = Option("server", "purge-resource-action-logs-interval", 3600,
"The number of seconds between resource-action log purging", is_time)

server_resource_action_log = Option("server", "resource_action_log", "resource-actions.log",
"File in log-dir, containing the resource-action logs", is_str_opt)
server_resource_action_log_prefix = Option(
"server",
"resource_action_log_prefix",
"resource-actions-",
"File prefix in log-dir, containing the resource-action logs. The after the prefix the environment uuid and .log is added",
is_str_opt
)


#############################
Expand Down
175 changes: 135 additions & 40 deletions src/inmanta/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from uuid import UUID
import uuid
import shutil
import json

import dateutil
import pymongo
Expand All @@ -42,17 +43,40 @@
from inmanta.ast import type
from inmanta.resources import Id
from inmanta.server import config as opt
import json
from inmanta.util import hash_file
from inmanta.const import UNDEPLOYABLE_STATES
from inmanta.protocol import encode_token, methods

from typing import List

LOGGER = logging.getLogger(__name__)
agent_lock = locks.Lock()

DBLIMIT = 100000


class ResourceActionLogLine(logging.LogRecord):
""" A special log record that is used to report log lines that come from the agent
"""
def __init__(self, logger_name: str, level: str, msg: str, created: datetime.datetime) -> None:
super().__init__(
name=logger_name,
level=level,
pathname="(unknown file)",
lineno=0,
msg=msg,
args=[],
exc_info=None,
func=None,
sinfo=None
)

self.created = created.timestamp()
self.created = self.created
self.msecs = (self.created - int(self.created)) * 1000
self.relativeCreated = (self.created - logging._startTime) * 1000


class Server(protocol.ServerSlice):
"""
The central Inmanta server that communicates with clients and agents and persists configuration
Expand All @@ -75,17 +99,15 @@ def __init__(self, database_host=None, database_port=None, agent_no_log=False):
self._database_host = database_host
self._database_port = database_port

self._resource_action_logger = None
self._resource_action_file_handler = None
self._resource_action_loggers: Dict[uuid.UUID, logging.Logger] = {}
self._resource_action_handlers: Dict[uuid.UUID, logging.Handler] = {}

@gen.coroutine
def prestart(self, server):
self.agentmanager = server.get_slice("agentmanager")

@gen.coroutine
def start(self):
yield self._create_resource_action_logger()

if self._database_host is None:
self._database_host = opt.db_host.get()

Expand All @@ -108,32 +130,81 @@ def start(self):
@gen.coroutine
def stop(self):
yield super().stop()
yield self._close_resource_action_logger()
self._close_resource_action_loggers()

@gen.coroutine
def _create_resource_action_logger(self):
resource_action_log = os.path.join(opt.log_dir.get(), opt.server_resource_action_log.get())
self._file_handler = logging.handlers.WatchedFileHandler(filename=resource_action_log, mode='a+')
formatter = logging.Formatter(fmt="%(asctime)s %(levelname)-8s %(message)s")
self._file_handler.setFormatter(formatter)
self._file_handler.setLevel(logging.DEBUG)
self._resource_action_logger = logging.getLogger(const.NAME_RESOURCE_ACTION_LOGGER)
self._resource_action_logger.setLevel(logging.DEBUG)
self._resource_action_logger.addHandler(self._file_handler)
@staticmethod
def get_resource_action_log_file(environment: uuid.UUID) -> str:
"""Get the correct filename for the given environment
:param environment: The environment id to get the file for
:return: The path to the logfile
"""
return os.path.join(
opt.log_dir.get(),
opt.server_resource_action_log_prefix.get() + str(environment) + ".log"
)

def get_resource_action_logger(self, environment: uuid.UUID) -> logging.Logger:
"""Get the resource action logger for the given environment. If the logger was not created, create it.
:param environment: The environment to get a logger for
:return: The logger for the given environment.
"""
if environment in self._resource_action_loggers:
return self._resource_action_loggers[environment]

@gen.coroutine
def _close_resource_action_logger(self):
if self._resource_action_logger:
logger_copy = self._resource_action_logger
self._resource_action_logger = None
logger_copy.removeHandler(self._file_handler)
self._file_handler.flush()
self._file_handler.close()
self._file_handler = None
resource_action_log = self.get_resource_action_log_file(environment)

file_handler = logging.handlers.WatchedFileHandler(filename=resource_action_log, mode='a+')
# Most logs will come from agents. We need to use their level and timestamp and their formatted message
file_handler.setFormatter(logging.Formatter(fmt="%(message)s"))
file_handler.setLevel(logging.DEBUG)

resource_action_logger = logging.getLogger(const.NAME_RESOURCE_ACTION_LOGGER).getChild(str(environment))
resource_action_logger.setLevel(logging.DEBUG)
resource_action_logger.addHandler(file_handler)

self._resource_action_loggers[environment] = resource_action_logger
self._resource_action_handlers[environment] = file_handler

def _write_to_resource_action_log(self, log_line):
if self._resource_action_logger:
log_line.write_to_logger(self._resource_action_logger)
return resource_action_logger

def _close_resource_action_loggers(self) -> None:
"""Close all resource action loggers and their associated handlers"""
try:
while True:
env, logger = self._resource_action_loggers.popitem()
self._close_resource_action_logger(env, logger)
except KeyError:
pass

def _close_resource_action_logger(self, env: uuid.UUID, logger: logging.Logger = None) -> None:
"""Close the given logger for the given env.
:param env: The environment to close the logger for
:param logger: The logger to close, if the logger is none it is retrieved
"""
if logger is None:
if env in self._resource_action_loggers:
logger = self._resource_action_loggers.pop(env)
else:
return

handler = self._resource_action_handlers.pop(env)
logger.removeHandler(handler)
handler.flush()
handler.close()

def log_resource_action(
self, env: uuid.UUID, resource_ids: List[str], log_level: int, ts: datetime.datetime, message: str
) -> None:
"""Write the given log to the correct resource action logger"""
logger = self.get_resource_action_logger(env)
if len(resource_ids) == 0:
message = "no resources: " + message
elif len(resource_ids) > 1:
message = "multiple resources: " + message
else:
message = resource_ids[0] + ": " + message
log_record = ResourceActionLogLine(logger.name, log_level, message, ts)
logger.handle(log_record)

def get_agent_client(self, tid: UUID, endpoint):
return self.agentmanager.get_agent_client(tid, endpoint)
Expand Down Expand Up @@ -709,7 +780,7 @@ def get_all_resources_for_agent(self, env: Environment, agent: str, version: str
now = datetime.datetime.now()

log_line = data.LogLine.log(logging.INFO, "Resource version pulled by client for agent %(agent)s state", agent=agent)
self._write_to_resource_action_log(log_line)
self.log_resource_action(env.id, resource_ids, logging.INFO, now, log_line.msg)
ra = data.ResourceAction(environment=env.id, resource_version_ids=resource_ids, action=const.ResourceAction.pull,
action_id=uuid.uuid4(), started=started, finished=now, messages=[log_line])
yield ra.insert()
Expand Down Expand Up @@ -942,11 +1013,18 @@ def safe_get(input, key, default):
for agent in agents:
yield self.agentmanager.ensure_agent_registered(env, agent)

now = datetime.datetime.now()
log_line = data.LogLine.log(logging.INFO, "Successfully stored version %(version)d", version=version)
self._write_to_resource_action_log(log_line)
ra = data.ResourceAction(environment=env.id, resource_version_ids=resource_version_ids, action_id=uuid.uuid4(),
action=const.ResourceAction.store, started=started, finished=datetime.datetime.now(),
messages=[log_line])
self.log_resource_action(env.id, resource_version_ids, logging.INFO, now, log_line.msg)
ra = data.ResourceAction(
environment=env.id,
resource_version_ids=resource_version_ids,
action_id=uuid.uuid4(),
action=const.ResourceAction.store,
started=started,
finished=now,
messages=[log_line]
)
yield ra.insert()
LOGGER.debug("Successfully stored version %d", version)

Expand Down Expand Up @@ -995,7 +1073,12 @@ def release_version(self, env, version_id, push, agent_trigger_method=None):

# all resources already deployed
deployed = yield model.get_increment(negative=True)
logline = {"level": "INFO", "msg": "Setting deployed due to known good status", "timestamp": now, "args": []}
logline = {
"level": "INFO",
"msg": "Setting deployed due to known good status",
"timestamp": now.isoformat(),
"args": []
}
yield self.resource_action_update(env, deployed, action_id=uuid.uuid4(),
started=now, finished=now, status=const.ResourceState.deployed,
# does this require a different ResourceAction?
Expand Down Expand Up @@ -1233,11 +1316,15 @@ def resource_action_update(self, env, resource_ids, action_id, action, started,

if len(messages) > 0:
resource_action.add_logs(messages)
if self._resource_action_logger:
for msg in messages:
# All other data is stored in the database. The msg was already formatted at the client side.
# The only way to disable formatting is, by not passing any args
self._resource_action_logger.log(level=const.LogLevel[msg["level"]].value, msg=msg["msg"])
for msg in messages:
# All other data is stored in the database. The msg was already formatted at the client side.
self.log_resource_action(
env.id,
resource_ids,
const.LogLevel[msg["level"]].value,
datetime.datetime.strptime(msg["timestamp"], const.TIME_ISOFMT),
msg["msg"]
)

if len(changes) > 0:
resource_action.add_changes(changes)
Expand Down Expand Up @@ -1310,7 +1397,13 @@ def delete_project(self, project_id):
if project is None:
return 404, {"message": "The project with given id does not exist."}

yield project.delete_cascade()
environments = yield Environment.get_list(project=project.id)
for env in environments:
yield env.delete_cascade()
self._close_resource_action_logger(env)

yield project.delete()

return 200, {}

@protocol.handle(methods.modify_project, project_id="id")
Expand Down Expand Up @@ -1438,6 +1531,8 @@ def delete_environment(self, environment_id):

yield env.delete_cascade()

self._close_resource_action_logger(environment_id)

return 200

@protocol.handle(methods.list_settings, env="tid")
Expand Down
4 changes: 2 additions & 2 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import pytest
from inmanta.agent.agent import Agent
from inmanta import data, const, config
from inmanta.server import config as opt, SLICE_AGENT_MANAGER, SLICE_SESSION_MANAGER
from inmanta.server import config as opt, SLICE_AGENT_MANAGER, SLICE_SESSION_MANAGER, server
from datetime import datetime
from uuid import UUID
from inmanta.util import hash_file
Expand Down Expand Up @@ -1054,7 +1054,7 @@ async def test_resource_action_log(motor, server_multi, client_multi, environmen
res = await client_multi.put_version(tid=environment, version=version, resources=resources, unknowns=[], version_info={})
assert res.code == 200

resource_action_log = os.path.join(opt.log_dir.get(), opt.server_resource_action_log.get())
resource_action_log = server.Server.get_resource_action_log_file(environment)
assert os.path.isfile(resource_action_log)
assert os.stat(resource_action_log).st_size != 0

Expand Down