Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix edit behavior on lan jobs - properly host the edited job #171

Merged
merged 1 commit into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 55 additions & 8 deletions continuousprint/integration_test.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import unittest
import datetime
import time
import tempfile
from unittest.mock import MagicMock, ANY
from .driver import Driver, Action as DA, Printer as DP
from pathlib import Path
import logging
import traceback
from .storage.database_test import DBTest
from .storage.database import DEFAULT_QUEUE, MODELS, populate_queues
from .storage import queries
from .storage.lan import LANJobView
from .queues.multi import MultiQueue
from .queues.local import LocalQueue
from .queues.lan import LANQueue
Expand Down Expand Up @@ -341,18 +344,22 @@ def onupdate():

self.locks = {}
self.peers = []
lqpeers = {}
lqjobs = TestReplDict(lambda a, b: None)
for i, db in enumerate(self.dbs):
with db.bind_ctx(MODELS):
populate_queues()
fsm = MagicMock(host="fsaddr", port=0)
profile = dict(name="profile")
lq = LANQueue(
"LAN",
f"peer{i}:{12345+i}",
logging.getLogger(f"peer{i}:LAN"),
Strategy.IN_ORDER,
onupdate,
MagicMock(),
dict(name="profile"),
lambda path: path,
fsm,
profile,
lambda path, sd: path,
)
mq = MultiQueue(queries, Strategy.IN_ORDER, onupdate)
mq.add(lq.ns, lq)
Expand All @@ -368,11 +375,9 @@ def onupdate():
lq.ns, lq.addr, MagicMock(), logging.getLogger("lantestbase")
)
lq.lan.q.locks = LocalLockManager(self.locks, f"peer{i}")
lq.lan.q.jobs = TestReplDict(lambda a, b: None)
lq.lan.q.peers = self.peers
if i > 0:
lq.lan.q.peers = self.peers[0][2].lan.q.peers
lq.lan.q.jobs = self.peers[0][2].lan.q.jobs
lq.lan.q.jobs = lqjobs
lq.lan.q.peers = lqpeers
lq.update_peer_state(lq.addr, "status", "run", profile)
self.peers.append((d, mq, lq, db))

def test_ordered_acquisition(self):
Expand Down Expand Up @@ -428,6 +433,48 @@ def test_ordered_acquisition(self):
d2.action(DA.SUCCESS, DP.IDLE) # -> idle
self.assertEqual(d2.state.__name__, d2._state_idle.__name__)

def test_non_local_edit(self):
(d1, _, lq1, db1) = self.peers[0]
(d2, _, lq2, db2) = self.peers[1]
with tempfile.TemporaryDirectory() as tdir:
(Path(tdir) / "test.gcode").touch()
j = LANJobView(
dict(
id="jobhash",
name="job",
created=0,
sets=[
dict(
path="test.gcode",
count=1,
remaining=1,
profiles=["profile"],
),
],
count=1,
remaining=1,
peer_=None,
),
lq1,
)
lq1._path_on_disk = lambda p, sd: str(Path(tdir) / p)
lq1.import_job_from_view(j, j.id)
lq2._fileshare.post.assert_not_called()

# LQ2 edits the job
lq2._fileshare.fetch.return_value = str(Path(tdir) / "unpack/")
(Path(tdir) / "unpack").mkdir()
lq2dest = Path(tdir) / "unpack/test.gcode"
lq2dest.touch()
lq2.edit_job("jobhash", dict(draft=True))

lq2._fileshare.post.assert_called_once()
# Job posts with lan 2 address, from pov of lq1
self.assertEqual(list(lq1.lan.q.jobs.values())[0][0], lq2.addr)
# Uses resolved file path
c = lq2._fileshare.post.call_args[0]
self.assertEqual(c[1], {str(lq2dest): str(lq2dest)})


if __name__ == "__main__":
unittest.main()
8 changes: 8 additions & 0 deletions continuousprint/queues/lan.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def _gen_uuid(self) -> str:
def edit_job(self, job_id, data) -> bool:
# For lan queues, "editing" a job is basically resubmission of the whole thing.
# This is because the backing .gjob format is a single file containing the full manifest.

j = self.get_job_view(job_id)
for (k, v) in data.items():
if k in ("id", "peer_", "queue"):
Expand All @@ -320,6 +321,13 @@ def edit_job(self, job_id, data) -> bool:
else:
setattr(j, k, v)

# We must resolve the set paths so we have them locally, as editing can
# also occur on servers other than the one that submitted the job.
j.remap_set_paths()

# We are also now the source of this job
j.peer = self.addr

# Exchange the old job for the new job (reuse job ID)
jid = self.import_job_from_view(j, j.id)
return self._get_job(jid)
7 changes: 6 additions & 1 deletion continuousprint/queues/lan_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ class LANQueueTest(unittest.TestCase, PeerPrintLANTest):
def setUp(self):
PeerPrintLANTest.setUp(self) # Generate peerprint LANQueue as self.q
self.q.q.syncPeer(
dict(profile=dict(name="profile")), addr=self.q.q.addr
dict(
profile=dict(name="profile"),
fs_addr="mock_fs_addr",
),
addr=self.q.q.addr,
) # Helps pass validation
ppq = self.q # Rename to make way for CPQ LANQueue

self.ucb = MagicMock()
self.fs = MagicMock()
self.fs.fetch.return_value = "asdf.gcode"
self.q = LANQueue(
"ns",
"localhost:1234",
Expand Down
5 changes: 5 additions & 0 deletions continuousprint/storage/lan.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ def __init__(self, manifest, lq):
def get_base_dir(self):
return self.queue.lq.get_gjob_dirpath(self.peer, self.hash)

def remap_set_paths(self):
# Replace all relative/local set paths with fully resolved paths
for s in self.sets:
s.path = s.resolve()

def updateSets(self, sets_list):
self.sets = [LANSetView(s, self, i) for i, s in enumerate(sets_list)]

Expand Down
5 changes: 5 additions & 0 deletions continuousprint/storage/lan_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ def test_resolve_file(self):
self.lq.get_gjob_dirpath.return_value = "/path/to/"
self.assertEqual(self.s.resolve(), "/path/to/a.gcode")

def test_remap_set_paths(self):
self.lq.get_gjob_dirpath.return_value = "/path/to/"
self.j.remap_set_paths()
self.assertEqual(self.s.path, "/path/to/a.gcode")

def test_resolve_http_error(self):
self.lq.get_gjob_dirpath.side_effect = HTTPError
with self.assertRaises(ResolveError):
Expand Down