Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move settings scripts into sqlite DB #148

Merged
merged 7 commits into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions continuousprint/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def get_template_vars(self):
return dict(
printer_profiles=list(PRINTER_PROFILES.values()),
gcode_scripts=list(GCODE_SCRIPTS.values()),
custom_events=[e.as_dict() for e in CustomEvents],
local_ip=self._plugin.get_local_ip(),
)

Expand Down
46 changes: 41 additions & 5 deletions continuousprint/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@


class Permission(Enum):
GETSTATE = (
"Get state",
"Allows for fetching queue and management state of Continuous Print",
False,
)
STARTSTOP = (
"Start and Stop Queue",
"Allows for starting and stopping the queue",
Expand All @@ -31,17 +36,31 @@ class Permission(Enum):
"Allows for fetching history of print runs by Continuous Print",
False,
)
CLEARHISTORY = (
"Clear history",
RESETHISTORY = (
"Reset history",
"Allows for deleting all continuous print history data",
True,
)
GETQUEUES = ("Get queue", "Allows for fetching metadata on all print queues", False)
GETQUEUES = (
"Get queues",
"Allows for fetching metadata on all print queues",
False,
)
EDITQUEUES = (
"Edit queues",
"Allows for adding/removing queues and rearranging them",
True,
)
GETAUTOMATION = (
"Get automation scripts and events",
"Allows for fetching metadata on all scripts and the events they're configured for",
False,
)
EDITAUTOMATION = (
"Edit automation scripts and events",
"Allows for adding/removing gcode scripts and registering them to execute when events happen",
True,
)

def __init__(self, longname, desc, dangerous):
self.longname = longname
Expand Down Expand Up @@ -133,6 +152,7 @@ def _sync_history(self):
# (e.g. 1.4.1 -> 2.0.0)
@octoprint.plugin.BlueprintPlugin.route("/state/get", methods=["GET"])
@restricted_access
@cpq_permission(Permission.GETSTATE)
def get_state(self):
return self._state_json()

Expand Down Expand Up @@ -248,7 +268,7 @@ def export_job(self):
# PRIVATE API METHOD - may change without warning.
@octoprint.plugin.BlueprintPlugin.route("/job/rm", methods=["POST"])
@restricted_access
@cpq_permission(Permission.EDITJOB)
@cpq_permission(Permission.RMJOB)
def rm_job(self):
return json.dumps(
self._get_queue(flask.request.form["queue"]).remove_jobs(
Expand Down Expand Up @@ -277,7 +297,7 @@ def get_history(self):
# PRIVATE API METHOD - may change without warning.
@octoprint.plugin.BlueprintPlugin.route("/history/reset", methods=["POST"])
@restricted_access
@cpq_permission(Permission.CLEARHISTORY)
@cpq_permission(Permission.RESETHISTORY)
def reset_history(self):
queries.resetHistory()
return json.dumps("OK")
Expand All @@ -298,3 +318,19 @@ def edit_queues(self):
(absent_names, added) = queries.assignQueues(queues)
self._commit_queues(added, absent_names)
return json.dumps("OK")

# PRIVATE API METHOD - may change without warning.
@octoprint.plugin.BlueprintPlugin.route("/automation/edit", methods=["POST"])
@restricted_access
@cpq_permission(Permission.EDITAUTOMATION)
def edit_automation(self):
data = json.loads(flask.request.form.get("json"))
queries.assignScriptsAndEvents(data["scripts"], data["events"])
return json.dumps("OK")

# PRIVATE API METHOD - may change without warning.
@octoprint.plugin.BlueprintPlugin.route("/automation/get", methods=["GET"])
@restricted_access
@cpq_permission(Permission.GETAUTOMATION)
def get_automation(self):
return json.dumps(queries.getScriptsAndEvents())
191 changes: 174 additions & 17 deletions continuousprint/api_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import unittest
import json
import logging
from .driver import Action as DA
from unittest.mock import patch, MagicMock
from unittest.mock import patch, MagicMock, call
import imp
from flask import Flask
from .api import Permission, cpq_permission
Expand Down Expand Up @@ -60,19 +62,65 @@ def kill_patches():
self.api = continuousprint.api.ContinuousPrintAPI()
self.api._basefolder = "notexisty"
self.api._identifier = "continuousprint"
self.api._get_queue = MagicMock()
self.api._logger = logging.getLogger()
self.app.register_blueprint(self.api.get_blueprint())
self.app.config.update({"TESTING": True})
self.client = self.app.test_client()
self.api._state_json = lambda: "foo"

def test_role_access_denied(self):
testcases = [
("GETSTATE", "/state/get"),
("STARTSTOP", "/set_active"),
("ADDSET", "/set/add"),
("ADDJOB", "/job/add"),
("EDITJOB", "/job/mv"),
("EDITJOB", "/job/edit"),
("ADDJOB", "/job/import"),
("EXPORTJOB", "/job/export"),
("RMJOB", "/job/rm"),
("EDITJOB", "/job/reset"),
("GETHISTORY", "/history/get"),
("RESETHISTORY", "/history/reset"),
("GETQUEUES", "/queues/get"),
("EDITQUEUES", "/queues/edit"),
("GETAUTOMATION", "/automation/get"),
("EDITAUTOMATION", "/automation/edit"),
]
self.api._get_queue = None # MagicMock interferes with checking

num_handlers_tested = len(set([tc[1] for tc in testcases]))
handlers = [
f
for f in dir(self.api)
if hasattr(getattr(self.api, f), "_blueprint_rules")
]
self.assertEqual(num_handlers_tested, len(handlers))

num_perms_tested = len(set([tc[0] for tc in testcases]))
num_perms = len([p for p in Permission])
self.assertEqual(num_perms_tested, num_perms)

for (role, endpoint) in testcases:
p = getattr(self.perm, f"PLUGIN_CONTINUOUSPRINT_{role}")
p.can.return_value = False
if role.startswith("GET"):
rep = self.client.get(endpoint)
else:
rep = self.client.post(endpoint)
self.assertEqual(rep.status_code, 403)

def test_get_state(self):
self.perm.PLUGIN_CONTINUOUSPRINT_GETSTATE.can.return_value = True
self.api._state_json = lambda: "foo"
rep = self.client.get("/state/get")
self.assertEqual(rep.status_code, 200)
self.assertEqual(rep.data, b"foo")

def test_set_active(self):
self.perm.PLUGIN_CONTINUOUSPRINT_STARTSTOP.can.return_value = True
self.api._update = MagicMock()
self.api._state_json = lambda: "foo"
rep = self.client.post("/set_active", data=dict(active="true"))
self.assertEqual(rep.status_code, 200)
self.api._update.assert_called_with(DA.ACTIVATE)
Expand All @@ -93,37 +141,146 @@ def test_set_active(self):
self.api._update.assert_called_with(DA.DEACTIVATE)

def test_add_set(self):
self.skipTest("TODO")
self.perm.PLUGIN_CONTINUOUSPRINT_ADDSET.can.return_value = True
data = dict(foo="bar", job="jid")
self.api._get_queue().add_set.return_value = "ret"
self.api._preprocess_set = lambda s: s

rep = self.client.post("/set/add", data=dict(json=json.dumps(data)))

self.assertEqual(rep.status_code, 200)
self.assertEqual(rep.get_data(as_text=True), '"ret"')
self.api._get_queue().add_set.assert_called_with("jid", data)

def test_add_job(self):
self.skipTest("TODO")
self.perm.PLUGIN_CONTINUOUSPRINT_ADDJOB.can.return_value = True
data = dict(name="jobname")
self.api._get_queue().add_job().as_dict.return_value = "ret"

rep = self.client.post("/job/add", data=dict(json=json.dumps(data)))

self.assertEqual(rep.status_code, 200)
self.assertEqual(rep.get_data(as_text=True), '"ret"')
self.api._get_queue().add_job.assert_called_with("jobname")

def test_mv_job(self):
self.skipTest("TODO")
self.perm.PLUGIN_CONTINUOUSPRINT_EDITJOB.can.return_value = True
data = dict(id="foo", after_id="bar", src_queue="q1", dest_queue="q2")

rep = self.client.post("/job/mv", data=data)

self.assertEqual(rep.status_code, 200)
self.api._get_queue().mv_job.assert_called_with(data["id"], data["after_id"])

def test_edit_job(self):
self.skipTest("TODO")
self.perm.PLUGIN_CONTINUOUSPRINT_EDITJOB.can.return_value = True
data = dict(id="foo", queue="queue")
self.api._get_queue().edit_job.return_value = "ret"
rep = self.client.post("/job/edit", data=dict(json=json.dumps(data)))

self.assertEqual(rep.status_code, 200)
self.assertEqual(rep.get_data(as_text=True), '"ret"')
self.api._get_queue().edit_job.assert_called_with(data["id"], data)

def test_import_job(self):
self.skipTest("TODO")
self.perm.PLUGIN_CONTINUOUSPRINT_ADDJOB.can.return_value = True
data = dict(path="path", queue="queue")
self.api._get_queue().import_job().as_dict.return_value = "ret"
rep = self.client.post("/job/import", data=data)

self.assertEqual(rep.status_code, 200)
self.assertEqual(rep.get_data(as_text=True), '"ret"')
self.api._get_queue().import_job.assert_called_with(data["path"])

def test_export_job(self):
self.skipTest("TODO")
self.perm.PLUGIN_CONTINUOUSPRINT_EXPORTJOB.can.return_value = True
data = {"job_ids[]": ["1", "2", "3"]}
self.api._get_queue().export_job.return_value = "ret"
self.api._path_in_storage = lambda p: p
self.api._path_on_disk = lambda p, sd: p
rep = self.client.post("/job/export", data=data)

self.assertEqual(rep.status_code, 200)
self.assertEqual(
json.loads(rep.get_data(as_text=True)),
dict(errors=[], paths=["ret", "ret", "ret"]),
)
self.api._get_queue().export_job.assert_has_calls(
[call(int(i), "/") for i in data["job_ids[]"]]
)

def test_rm_job(self):
self.skipTest("TODO")
self.perm.PLUGIN_CONTINUOUSPRINT_RMJOB.can.return_value = True
data = {"queue": "q", "job_ids[]": ["1", "2", "3"]}
self.api._get_queue().remove_jobs.return_value = "ret"

rep = self.client.post("/job/rm", data=data)

self.assertEqual(rep.status_code, 200)
self.assertEqual(rep.get_data(as_text=True), '"ret"')
self.api._get_queue().remove_jobs.assert_called_with(data["job_ids[]"])

def test_reset_multi(self):
self.skipTest("TODO")
self.perm.PLUGIN_CONTINUOUSPRINT_EDITJOB.can.return_value = True
data = {"queue": "q", "job_ids[]": ["1", "2", "3"]}
self.api._get_queue().reset_jobs.return_value = "ret"

rep = self.client.post("/job/reset", data=data)

self.assertEqual(rep.status_code, 200)
self.assertEqual(rep.get_data(as_text=True), '"ret"')
self.api._get_queue().reset_jobs.assert_called_with(data["job_ids[]"])

def test_get_history(self):
self.skipTest("TODO")
self.perm.PLUGIN_CONTINUOUSPRINT_GETHISTORY.can.return_value = True
self.api._history_json = lambda: "foo"
rep = self.client.get("/history/get")
self.assertEqual(rep.status_code, 200)
self.assertEqual(rep.data, b"foo")

def test_reset_history(self):
self.skipTest("TODO")
@patch("continuousprint.api.queries")
def test_reset_history(self, q):
self.perm.PLUGIN_CONTINUOUSPRINT_RESETHISTORY.can.return_value = True
rep = self.client.post("/history/reset")
q.resetHistory.assert_called_once()
self.assertEqual(rep.status_code, 200)
self.assertEqual(rep.data, b'"OK"')

def test_get_queues(self):
self.skipTest("TODO")
@patch("continuousprint.api.queries")
def test_get_queues(self, q):
self.perm.PLUGIN_CONTINUOUSPRINT_GETQUEUES.can.return_value = True
mq = MagicMock()
mq.as_dict.return_value = dict(foo="bar")
q.getQueues.return_value = [mq]
rep = self.client.get("/queues/get")
self.assertEqual(rep.status_code, 200)
self.assertEqual(json.loads(rep.get_data(as_text=True)), [dict(foo="bar")])

def edit_queues(self):
self.skipTest("TODO")
@patch("continuousprint.api.queries")
def test_edit_queues(self, q):
self.perm.PLUGIN_CONTINUOUSPRINT_EDITQUEUES.can.return_value = True
q.assignQueues.return_value = ("absent", "added")
self.api._commit_queues = MagicMock()
rep = self.client.post("/queues/edit", data=dict(json='"foo"'))
self.assertEqual(rep.status_code, 200)
self.assertEqual(rep.data, b'"OK"')
self.api._commit_queues.assert_called_with("added", "absent")

@patch("continuousprint.api.queries")
def test_edit_automation(self, q):
self.perm.PLUGIN_CONTINUOUSPRINT_EDITAUTOMATION.can.return_value = True
rep = self.client.post(
"/automation/edit",
data=dict(json=json.dumps(dict(scripts="scripts", events="events"))),
)
self.assertEqual(rep.status_code, 200)
self.assertEqual(rep.data, b'"OK"')
q.assignScriptsAndEvents.assert_called_with("scripts", "events")

@patch("continuousprint.api.queries")
def test_get_automation(self, q):
self.perm.PLUGIN_CONTINUOUSPRINT_GETHISTORY.can.return_value = True
q.getScriptsAndEvents.return_value = "foo"
rep = self.client.get("/automation/get")
self.assertEqual(rep.status_code, 200)
self.assertEqual(rep.data, b'"foo"')
Loading