Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove snapshot restore functionality #823

Merged
merged 6 commits into from
Dec 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
v 2018.4 (2018-12-xx)
Changes in this release:
- Various bugfixes and performance enhancements
- Dependency updates
- Removal of snapshot and restore functionality from the server (#789)

v 2018.3 (2018-12-07)
Changes in this release:
- Various bugfixes and performance enhancements
Expand Down
162 changes: 0 additions & 162 deletions src/inmanta/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
Contact: code@inmanta.com
"""

import base64
from concurrent.futures.thread import ThreadPoolExecutor
import datetime
import hashlib
import logging
import os
import random
Expand Down Expand Up @@ -598,144 +596,6 @@ def do_run_dryrun(self, version, dry_run_id):

self._cache.close_version(version)

@gen.coroutine
def do_restore(self, restore_id, snapshot_id, resources):
with (yield self.ratelimiter.acquire()):

LOGGER.info("Start a restore %s", restore_id)

yield self.process._ensure_code(self._env_id, resources[0][1]["model"],
[res[1]["resource_type"] for res in resources])

version = resources[0][1]["model"]
self._cache.open_version(version)

for restore, resource in resources:
start = datetime.datetime.now()
provider = None
try:
data = resource["attributes"]
data["id"] = resource["id"]
resource_obj = Resource.deserialize(data)
provider = yield self.get_provider(resource_obj)
if not hasattr(resource_obj, "allow_restore") or not resource_obj.allow_restore:
yield self.get_client().update_restore(tid=self._env_id,
id=restore_id,
resource_id=str(resource_obj.id),
start=start,
stop=datetime.datetime.now(),
success=False,
error=False,
msg="Resource %s does not allow restore" % resource["id"])
continue

try:
yield self.thread_pool.submit(provider.restore, resource_obj, restore["content_hash"])
yield self.get_client().update_restore(tid=self._env_id, id=restore_id,
resource_id=str(resource_obj.id),
success=True, error=False,
start=start, stop=datetime.datetime.now(), msg="")
except NotImplementedError:
yield self.get_client().update_restore(tid=self._env_id, id=restore_id,
resource_id=str(resource_obj.id),
success=False, error=False,
start=start, stop=datetime.datetime.now(),
msg="The handler for resource "
"%s does not support restores" % resource["id"])

except Exception:
LOGGER.exception("Unable to find a handler for %s", resource["id"])
yield self.get_client().update_restore(tid=self._env_id, id=restore_id,
resource_id=resource_obj.id.resource_str(),
success=False, error=False,
start=start, stop=datetime.datetime.now(),
msg="Unable to find a handler to restore a snapshot of resource %s" %
resource["id"])
finally:
if provider is not None:
provider.close()
self._cache.close_version(version)

return 200

@gen.coroutine
def do_snapshot(self, snapshot_id, resources):
with (yield self.ratelimiter.acquire()):
LOGGER.info("Start snapshot %s", snapshot_id)

yield self.process._ensure_code(self._env_id, resources[0]["model"],
[res["resource_type"] for res in resources])

version = resources[0]["model"]
self._cache.open_version(version)

for resource in resources:
start = datetime.datetime.now()
provider = None
try:
data = resource["attributes"]
data["id"] = resource["id"]
resource_obj = Resource.deserialize(data)
provider = yield self.get_provider(resource_obj)

if not hasattr(resource_obj, "allow_snapshot") or not resource_obj.allow_snapshot:
yield self.get_client().update_snapshot(tid=self._env_id, id=snapshot_id,
resource_id=resource_obj.id.resource_str(), snapshot_data="",
start=start, stop=datetime.datetime.now(), size=0,
success=False, error=False,
msg="Resource %s does not allow snapshots" % resource["id"])
continue

try:
result = yield self.thread_pool.submit(provider.snapshot, resource_obj)
if result is not None:
sha1sum = hashlib.sha1()
sha1sum.update(result)
content_id = sha1sum.hexdigest()
yield self.get_client().upload_file(id=content_id, content=base64.b64encode(result).decode("ascii"))

yield self.get_client().update_snapshot(tid=self._env_id, id=snapshot_id,
resource_id=resource_obj.id.resource_str(),
snapshot_data=content_id,
start=start, stop=datetime.datetime.now(),
size=len(result), success=True, error=False,
msg="")
else:
raise Exception("Snapshot returned no data")

except NotImplementedError:
yield self.get_client().update_snapshot(tid=self._env_id, id=snapshot_id, error=False,
resource_id=resource_obj.id.resource_str(),
snapshot_data="",
start=start, stop=datetime.datetime.now(),
size=0, success=False,
msg="The handler for resource "
"%s does not support snapshots" % resource["id"])
except Exception:
LOGGER.exception("An exception occurred while creating the snapshot of %s", resource["id"])
yield self.get_client().update_snapshot(tid=self._env_id, id=snapshot_id, snapshot_data="",
resource_id=resource_obj.id.resource_str(), error=True,
start=start,
stop=datetime.datetime.now(),
size=0, success=False,
msg="The handler for resource "
"%s does not support snapshots" % resource["id"])

except Exception:
LOGGER.exception("Unable to find a handler for %s", resource["id"])
yield self.get_client().update_snapshot(tid=self._env_id,
id=snapshot_id, snapshot_data="",
resource_id=resource_obj.id.resource_str(), error=False,
start=start, stop=datetime.datetime.now(),
size=0, success=False,
msg="Unable to find a handler for %s" % resource["id"])
finally:
if provider is not None:
provider.close()

self._cache.close_version(version)
return 200

@gen.coroutine
def get_facts(self, resource):
with (yield self.ratelimiter.acquire()):
Expand Down Expand Up @@ -977,28 +837,6 @@ def check_storage(self):

return dir_map

@protocol.handle(methods.AgentRestore.do_restore, env="tid")
@gen.coroutine
def do_restore(self, env, agent, restore_id, snapshot_id, resources):
"""
Restore a snapshot
"""
if agent not in self._instances:
return 200

return (yield self._instances[agent].do_restore(restore_id, snapshot_id, resources))

@protocol.handle(methods.AgentSnapshot.do_snapshot, env="tid")
@gen.coroutine
def do_snapshot(self, env, agent, snapshot_id, resources):
"""
Create a snapshot of stateful resources managed by this agent
"""
if agent not in self._instances:
return 200

return (yield self._instances[agent].do_snapshot(snapshot_id, resources))

@protocol.handle(methods.AgentParameterMethod.get_parameter, env="tid")
@gen.coroutine
def get_facts(self, env, agent, resource):
Expand Down
20 changes: 0 additions & 20 deletions src/inmanta/agent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,26 +584,6 @@ def available(self, resource: resources.Resource) -> bool:
"""
return True

def snapshot(self, resource: resources.Resource) -> bytes:
"""
Create a new snapshot and upload it to the server

:param resource: The state of the resource for which a snapshot is created
:return: The data that needs to be uploaded to the server. This data is passed back to the restore method on
snapshot restore.
"""
raise NotImplementedError()

def restore(self, resource: resources.Resource, snapshot_id: str) -> None:
"""
Restore a resource from a snapshot.

:param resource: The resource for which a snapshot needs to be restored.
:param snapshot_id: The id of the "file" on the server that contains the snapshot data. This data can be retrieved
with the :func:`~inmanta.agent.handler.ResourceHandler.get_file` method
"""
raise NotImplementedError()

def get_file(self, hash_id) -> bytes:
"""
Retrieve a file from the fileserver identified with the given id. The convention is to use the sha1sum of the
Expand Down
2 changes: 0 additions & 2 deletions src/inmanta/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ class ResourceAction(Enum):
pull = 3
deploy = 4
dryrun = 5
snapshot = 6
restore = 7
other = 8


Expand Down
122 changes: 1 addition & 121 deletions src/inmanta/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1260,20 +1260,6 @@ def get(cls, environment, resource_version_id):
if value is not None:
return cls(from_mongo=True, **value)

@classmethod
@gen.coroutine
def get_with_state(cls, environment, version):
"""
Get all resources from the given version that have "state_id" defined
"""
cursor = cls._coll.find({"environment": environment, "model": version, "attributes.state_id": {"$exists": True}})

resources = []
while (yield cursor.fetch_next):
resources.append(cls(from_mongo=True, **cursor.next_object()))

return resources

@classmethod
def new(cls, environment, resource_version_id, **kwargs):
vid = Id.parse_id(resource_version_id)
Expand Down Expand Up @@ -1477,9 +1463,6 @@ def delete_cascade(self):
resources = yield Resource.get_list(environment=self.environment, model=self.version)
for res in resources:
yield res.delete_cascade()
snaps = yield Snapshot.get_list(environment=self.environment, model=self.version)
for snap in snaps:
yield snap.delete_cascade()
yield UnknownParameter.delete_all(environment=self.environment, version=self.version)
yield Code.delete_all(environment=self.environment, version=self.version)
yield DryRun.delete_all(environment=self.environment, model=self.version)
Expand Down Expand Up @@ -1624,111 +1607,8 @@ def to_dict(self):
return dict_result


class ResourceSnapshot(BaseDocument):
"""
Snapshot of a resource

:param error Indicates if an error made the snapshot fail
"""
environment = Field(field_type=uuid.UUID, required=True)
snapshot = Field(field_type=uuid.UUID, required=True)
resource_id = Field(field_type=str, required=True)
state_id = Field(field_type=str, required=True)
started = Field(field_type=datetime.datetime, default=None)
finished = Field(field_type=datetime.datetime, default=None)
content_hash = Field(field_type=str)
success = Field(field_type=bool)
error = Field(field_type=bool)
msg = Field(field_type=str)
size = Field(field_type=int)


class ResourceRestore(BaseDocument):
"""
A restore of a resource from a snapshot
"""
environment = Field(field_type=uuid.UUID, required=True)
restore = Field(field_type=uuid.UUID, required=True)
state_id = Field(field_type=str)
resource_id = Field(field_type=str)
started = Field(field_type=datetime.datetime, default=None)
finished = Field(field_type=datetime.datetime, default=None)
success = Field(field_type=bool)
error = Field(field_type=bool)
msg = Field(field_type=str)


class SnapshotRestore(BaseDocument):
"""
Information about a snapshot restore
"""
environment = Field(field_type=uuid.UUID, required=True)
snapshot = Field(field_type=uuid.UUID, required=True)
started = Field(field_type=datetime.datetime, default=None)
finished = Field(field_type=datetime.datetime, default=None)
resources_todo = Field(field_type=int, default=0)

@gen.coroutine
def delete_cascade(self):
yield ResourceRestore.delete_all(restore=self.id)
yield self.delete()

@gen.coroutine
def resource_updated(self):
yield SnapshotRestore._coll.update_one({"_id": self.id}, {"$inc": {"resources_todo": int(-1)}})
self.resources_todo -= 1

now = datetime.datetime.now()
result = yield SnapshotRestore._coll.update_one({"_id": self.id, "resources_todo": 0}, {"$set": {"finished": now}})
if result.matched_count == 1 and (result.modified_count == 1 or result.modified_count is None):
# modified_count is None for mongodb < 2.6
self.finished = now


class Snapshot(BaseDocument):
"""
A snapshot of an environment

:param id The id of the snapshot
:param environment A reference to the environment
:param started When was this snapshot started
:param finished When was this snapshot finished
:param total_size The total size of this snapshot
"""
environment = Field(field_type=uuid.UUID, required=True)
model = Field(field_type=int, required=True)
name = Field(field_type=str)
started = Field(field_type=datetime.datetime, default=None)
finished = Field(field_type=datetime.datetime, default=None)
total_size = Field(field_type=int, default=0)
resources_todo = Field(field_type=int, default=0)

@gen.coroutine
def delete_cascade(self):
yield ResourceSnapshot.delete_all(snapshot=self.id)
restores = yield SnapshotRestore.get_list(snapshot=self.id)
for restore in restores:
yield restore.delete_cascade()

yield self.delete()

@gen.coroutine
def resource_updated(self, size):
yield Snapshot._coll.update_one({"_id": self.id},
{"$inc": {"resources_todo": int(-1), "total_size": size}})
self.total_size += size
self.resources_todo -= 1

now = datetime.datetime.now()
result = yield Snapshot._coll.update_one({"_id": self.id, "resources_todo": 0}, {"$set": {"finished": now}})
if result.matched_count == 1 and (result.modified_count == 1 or result.modified_count is None):
# modified_count is None for mongodb < 2.6
self.finished = now


_classes = [Project, Environment, Parameter, UnknownParameter, AgentProcess, AgentInstance, Agent, Report, Compile, Form,
FormRecord, Resource, ResourceAction, ConfigurationModel, Code, DryRun, ResourceSnapshot, ResourceRestore,
SnapshotRestore, Snapshot]
FormRecord, Resource, ResourceAction, ConfigurationModel, Code, DryRun]


def use_motor(motor):
Expand Down
Loading