Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

purge resource actions #886

Merged
merged 6 commits into from
Feb 13, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/inmanta/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,7 @@ def convert_agent_map(value):
AUTOSTART_AGENT_INTERVAL = "autostart_agent_interval"
AGENT_AUTH = "agent_auth"
SERVER_COMPILE = "server_compile"
RESOURCE_ACTION_LOGS_RETENTION = "resource_action_logs_retention"


class Setting(object):
Expand Down Expand Up @@ -573,6 +574,8 @@ class Environment(BaseDocument):
doc="Agent interval for autostarted agents in seconds", agent_restart=True),
SERVER_COMPILE: Setting(name=SERVER_COMPILE, default=True, typ="bool",
validator=convert_boolean, doc="Allow the server to compile the configuration model."),
RESOURCE_ACTION_LOGS_RETENTION: Setting(name=RESOURCE_ACTION_LOGS_RETENTION, default=7, typ="int",
validator=convert_int, doc="Amount of days to retain resource-action logs"),
wouterdb marked this conversation as resolved.
Show resolved Hide resolved
}

__indexes__ = [
Expand Down Expand Up @@ -977,10 +980,21 @@ class FormRecord(BaseDocument):


class LogLine(DataDocument):

@property
def msg(self):
return self._data["msg"]

@property
def args(self):
return self._data["args"]

def get_log_level_as_int(self):
return self._data["level"].value

def write_to_logger(self, logger):
logger.log(self.get_log_level_as_int(), self.msg, *self.args)

@classmethod
def log(cls, level, msg, timestamp=None, **kwargs):
if timestamp is None:
Expand Down Expand Up @@ -1089,6 +1103,15 @@ def save(self):
yield ResourceAction._coll.update_one(query, self._updates)
self._updates = {}

@classmethod
@gen.coroutine
def purge_logs(cls):
environments = yield Environment.get_list()
for env in environments:
time_to_retain_logs = yield env.get(RESOURCE_ACTION_LOGS_RETENTION)
keep_logs_until = datetime.datetime.now() - datetime.timedelta(days=time_to_retain_logs)
yield cls._coll.delete_many({"started": {"$lt": keep_logs_until}})


class Resource(BaseDocument):
"""
Expand Down
9 changes: 8 additions & 1 deletion src/inmanta/server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Contact: code@inmanta.com
"""

from inmanta.config import Option, is_int, is_bool, is_time, is_list, is_str_opt
from inmanta.config import Option, is_int, is_bool, is_time, is_str_opt
from inmanta.config import state_dir, log_dir
import logging

Expand Down Expand Up @@ -109,6 +109,13 @@ def validate_fact_renew(value):
"server will also delete the file, so on subsequent compiles the missing file will be "
"recreated.", is_bool)

server_purge_resource_action_logs_interval = Option("server", "purge-resource-action-logs-interval", 3600,
"The number of seconds between resource-action log purging", is_time)

server_resource_action_log = Option("server", "resource_action_log", "resource-actions.log",
"File in log-dir, containing the resource-action logs", is_str_opt)


#############################
# Dashboard
#############################
Expand Down
50 changes: 44 additions & 6 deletions src/inmanta/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,17 @@ def __init__(self, database_host=None, database_port=None, agent_no_log=False):
self._database_host = database_host
self._database_port = database_port

self._resource_action_logger = None
self._resource_action_file_handler = None

@gen.coroutine
def prestart(self, server):
self.agentmanager = server.get_endpoint("agentmanager")

@gen.coroutine
def start(self):
yield self._create_resource_action_logger()

if self._database_host is None:
self._database_host = opt.db_host.get()

Expand All @@ -93,6 +98,7 @@ def start(self):

self.schedule(self.renew_expired_facts, self._fact_renew)
self.schedule(self._purge_versions, opt.server_purge_version_interval.get())
self.schedule(data.ResourceAction.purge_logs, opt.server_purge_resource_action_logs_interval.get())

ioloop.IOLoop.current().add_callback(self._purge_versions)

Expand All @@ -101,6 +107,32 @@ def start(self):
@gen.coroutine
def stop(self):
yield super().stop()
yield self._close_resource_action_logger()

@gen.coroutine
def _create_resource_action_logger(self):
resource_action_log = os.path.join(opt.log_dir.get(), opt.server_resource_action_log.get())
self._file_handler = logging.handlers.WatchedFileHandler(filename=resource_action_log, mode='a+')
formatter = logging.Formatter(fmt="%(asctime)s %(levelname)-8s %(message)s")
self._file_handler.setFormatter(formatter)
self._file_handler.setLevel(logging.DEBUG)
self._resource_action_logger = logging.getLogger("RESOURCE_ACTION_LOG_FILE")
wouterdb marked this conversation as resolved.
Show resolved Hide resolved
self._resource_action_logger.setLevel(logging.DEBUG)
self._resource_action_logger.addHandler(self._file_handler)

@gen.coroutine
def _close_resource_action_logger(self):
if self._resource_action_logger:
logger_copy = self._resource_action_logger
self._resource_action_logger = None
logger_copy.removeHandler(self._file_handler)
self._file_handler.flush()
self._file_handler.close()
self._file_handler = None

def _write_to_resource_action_log(self, log_line):
if self._resource_action_logger:
log_line.write_to_logger(self._resource_action_logger)

def get_agent_client(self, tid: UUID, endpoint):
return self.agentmanager.get_agent_client(tid, endpoint)
Expand Down Expand Up @@ -651,6 +683,7 @@ def get_resource(self, env, resource_id, logs, status, log_action, log_limit):
action_name = None
if log_action is not None:
action_name = log_action.name

actions = yield data.ResourceAction.get_log(environment=env.id, resource_version_id=resource_id,
action=action_name, limit=log_limit)

Expand Down Expand Up @@ -682,11 +715,11 @@ def get_resources_for_agent(self, env, agent, version):
resource_ids.append(rv.resource_version_id)

now = datetime.datetime.now()

log_line = data.LogLine.log(logging.INFO, "Resource version pulled by client for agent %(agent)s state", agent=agent)
self._write_to_resource_action_log(log_line)
ra = data.ResourceAction(environment=env.id, resource_version_ids=resource_ids, action=const.ResourceAction.pull,
action_id=uuid.uuid4(), started=started, finished=now,
messages=[data.LogLine.log(logging.INFO,
"Resource version pulled by client for agent %(agent)s state",
agent=agent)])
action_id=uuid.uuid4(), started=started, finished=now, messages=[log_line])
yield ra.insert()

return 200, {"environment": env.id, "agent": agent, "version": version, "resources": deploy_model}
Expand Down Expand Up @@ -887,10 +920,11 @@ def safe_get(input, key, default):
for agent in agents:
yield self.agentmanager.ensure_agent_registered(env, agent)

log_line = data.LogLine.log(logging.INFO, "Successfully stored version %(version)d", version=version)
self._write_to_resource_action_log(log_line)
ra = data.ResourceAction(environment=env.id, resource_version_ids=resource_version_ids, action_id=uuid.uuid4(),
action=const.ResourceAction.store, started=started, finished=datetime.datetime.now(),
messages=[data.LogLine.log(logging.INFO, "Successfully stored version %(version)d",
version=version)])
messages=[log_line])
yield ra.insert()
LOGGER.debug("Successfully stored version %d", version)

Expand Down Expand Up @@ -1157,6 +1191,10 @@ def resource_action_update(self, env, resource_ids, action_id, action, started,

if len(messages) > 0:
resource_action.add_logs(messages)
for msg in messages:
loglevel = const.LogLevel[msg["level"]].value
log_line = data.LogLine.log(loglevel, msg["msg"], *msg["args"])
self._write_to_resource_action_log(log_line)
wouterdb marked this conversation as resolved.
Show resolved Hide resolved

if len(changes) > 0:
resource_action.add_changes(changes)
Expand Down
31 changes: 31 additions & 0 deletions tests/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1223,3 +1223,34 @@ async def test_data_document_recursion(data_module):
messages=[data.LogLine.log(logging.INFO, "Successfully stored version %(version)d",
version=2)])
await ra.insert()


@pytest.mark.asyncio
async def test_purgelog_test(data_module):
project = data.Project(name="test")
await project.insert()

env = data.Environment(name="dev", project=project.id, repo_url="", repo_branch="")
await env.insert()

# ResourceAction 1
timestamp_ra1 = datetime.datetime.now() - datetime.timedelta(days=8)
log_line_ra1 = data.LogLine.log(logging.INFO, "Successfully stored version %(version)d", version=1)
ra1 = data.ResourceAction(environment=env.id, resource_version_ids=["id"], action_id=uuid.uuid4(),
action=const.ResourceAction.store, started=timestamp_ra1, finished=datetime.datetime.now(),
messages=[log_line_ra1])
await ra1.insert()

# ResourceAction 2
timestamp_ra2 = datetime.datetime.now() - datetime.timedelta(days=6)
log_line_ra2 = data.LogLine.log(logging.INFO, "Successfully stored version %(version)d", version=2)
ra2 = data.ResourceAction(environment=env.id, resource_version_ids=["id"], action_id=uuid.uuid4(),
action=const.ResourceAction.store, started=timestamp_ra2, finished=datetime.datetime.now(),
messages=[log_line_ra2])
await ra2.insert()

assert len(await data.ResourceAction.get_list()) == 2
await data.ResourceAction.purge_logs()
assert len(await data.ResourceAction.get_list()) == 1
remaining_resource_action = (await data.ResourceAction.get_list())[0]
assert remaining_resource_action.id == ra2.id
25 changes: 25 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import time
import logging
import uuid
import os

from utils import retry_limited
import pytest
Expand All @@ -31,6 +32,7 @@
from inmanta.export import upload_code, unknown_parameters
import asyncio


LOGGER = logging.getLogger(__name__)


Expand Down Expand Up @@ -1028,3 +1030,26 @@ async def test_legacy_code(motor, server_multi, client_multi, environment):
res = await agent.get_code(tid=environment, id=version, resource="std::File")
assert res.code == 200
assert res.result["sources"] == sources


@pytest.mark.asyncio(timeout=30)
async def test_resource_action_log(motor, server_multi, client_multi, environment):
version = 1
resources = [{'group': 'root',
'hash': '89bf880a0dc5ffc1156c8d958b4960971370ee6a',
'id': 'std::File[vm1.dev.inmanta.com,path=/etc/sysconfig/network],v=%d' % version,
'owner': 'root',
'path': '/etc/sysconfig/network',
'permissions': 644,
'purged': False,
'reload': False,
'requires': [],
'version': version}]
res = await client_multi.put_version(tid=environment, version=version, resources=resources, unknowns=[], version_info={})
assert res.code == 200

await server_multi.stop() # Make sure that logs are flushed to disk

resource_action_log = os.path.join(opt.log_dir.get(), opt.server_resource_action_log.get())
assert os.path.isfile(resource_action_log)
assert os.stat(resource_action_log).st_size != 0
1 change: 0 additions & 1 deletion tests/test_server_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2768,7 +2768,6 @@ async def test_deploy_and_events_failed(client, server, environment, resource_co
@pytest.mark.parametrize("dep_state", dep_states_reload, ids=lambda x: x.name)
@pytest.mark.asyncio(timeout=5000)
async def test_reload(client, server, environment, resource_container, dep_state):

agentmanager = server.get_endpoint(SLICE_AGENT_MANAGER)

resource_container.Provider.reset()
Expand Down