Skip to content

Commit

Permalink
MNT #479 #449 pytest using new test data files
Browse files Browse the repository at this point in the history
  • Loading branch information
prjemian committed Mar 22, 2021
1 parent f93919c commit decdc6c
Showing 1 changed file with 129 additions and 154 deletions.
283 changes: 129 additions & 154 deletions tests/test_filewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import tempfile

CATALOG = "usaxs_test"
TUNE_MR = "2ffe4d8"
TUNE_AR = 103 # <-- scan_id, uid: "3554003"
TUNE_MR = 108 # <-- scan_id, uid: "2ffe4d8"


@pytest.fixture(scope="function")
Expand Down Expand Up @@ -285,156 +286,130 @@ def test_SpecWriterCallback_writer_filename(cat, tempdir):
assert os.path.exists(testfile)


# ---------------------------------------


# class Test_SpecWriterCallback(MyTestBase):

# def test_newfile_exists(self):
# testfile = os.path.join(self.tempdir, "tune_mr.dat")
# if os.path.exists(testfile):
# os.remove(testfile)
# specwriter = apstools.filewriters.SpecWriterCallback(
# filename=testfile
# )

# from apstools.filewriters import SCAN_ID_RESET_VALUE

# self.assertEqual(SCAN_ID_RESET_VALUE, 0, "default reset scan id")

# write_stream(specwriter, self.db["tune_mr"])
# self.assertTrue(os.path.exists(testfile), "data file created")

# raised = False
# try:
# specwriter.newfile(filename=testfile)
# except ValueError:
# raised = True
# finally:
# self.assertFalse(raised, "file exists")
# self.assertEqual(specwriter.reset_scan_id, 0, "check scan id")

# class my_RunEngine:
# # dick type for testing _here_
# md = dict(scan_id=SCAN_ID_RESET_VALUE)

# RE = my_RunEngine()

# specwriter.scan_id = -5 # an unusual value for testing only
# RE.md["scan_id"] = -10 # an unusual value for testing only
# specwriter.newfile(filename=testfile, scan_id=None, RE=RE)
# self.assertEqual(specwriter.scan_id, 108, "scan_id unchanged")
# self.assertEqual(
# RE.md["scan_id"], 108, "RE.md['scan_id'] unchanged"
# )

# specwriter.scan_id = -5 # an unusual value for testing only
# RE.md["scan_id"] = -10 # an unusual value for testing only
# specwriter.newfile(filename=testfile, scan_id=False, RE=RE)
# self.assertEqual(specwriter.scan_id, 108, "scan_id unchanged")
# self.assertEqual(
# RE.md["scan_id"], 108, "RE.md['scan_id'] unchanged"
# )

# specwriter.scan_id = -5 # an unusual value for testing only
# RE.md["scan_id"] = -10 # an unusual value for testing only
# specwriter.newfile(filename=testfile, scan_id=True, RE=RE)
# self.assertEqual(specwriter.scan_id, 108, "scan_id reset")
# self.assertEqual(RE.md["scan_id"], 108, "RE.md['scan_id'] reset")

# for n, s in {"0": 108, "108": 108, "110": 110}.items():
# specwriter.scan_id = -5 # an unusual value for testing only
# RE.md["scan_id"] = -10 # an unusual value for testing only
# specwriter.newfile(filename=testfile, scan_id=int(n), RE=RE)
# self.assertEqual(
# specwriter.scan_id, s, f"scan_id set to {n}, actually {s}"
# )
# self.assertEqual(
# RE.md["scan_id"],
# s,
# f"RE.md['scan_id'] set to {n}, actually {s}",
# )

# def test__rebuild_scan_command(self):
# from apstools.filewriters import _rebuild_scan_command

# self.assertTrue(len(self.db) > 0, "test data ready")

# start_docs = []
# for header in self.db["tune_mr"]:
# tag, doc = header
# if tag == "start":
# start_docs.append(doc)
# self.assertEqual(len(start_docs), 1, "unique start doc found")

# doc = start_docs[0]
# expected = "108 tune_mr()"
# result = _rebuild_scan_command(doc)
# self.assertEqual(result, expected, "rebuilt #S line")

# def test_spec_comment(self):
# from apstools.filewriters import spec_comment

# # spec_comment(comment, doc=None, writer=None)
# testfile = os.path.join(self.tempdir, "spec_comment.dat")
# if os.path.exists(testfile):
# os.remove(testfile)
# specwriter = apstools.filewriters.SpecWriterCallback(
# filename=testfile
# )

# for category in "buffered_comments comments".split():
# for k in "start stop descriptor event".split():
# o = getattr(specwriter, category)
# self.assertEqual(len(o[k]), 0, f"no '{k}' {category}")

# # insert comments with every document
# spec_comment(
# "TESTING: Should appear within start doc",
# doc=None,
# writer=specwriter,
# )

# for idx, document in enumerate(self.db["tune_mr"]):
# tag, doc = document
# msg = f"TESTING: document {idx+1}: '{tag}' %s specwriter.receiver"
# spec_comment(msg % "before", doc=tag, writer=specwriter)
# specwriter.receiver(tag, doc)
# if tag == "stop":
# # since stop doc was received, this appears in the next scan
# spec_comment(
# str(msg % "before") + " (appears at END of next scan)",
# doc=tag,
# writer=specwriter,
# )
# else:
# spec_comment(msg % "after", doc=tag, writer=specwriter)

# self.assertEqual(
# len(specwriter.buffered_comments["stop"]),
# 1,
# "last 'stop' comment buffered",
# )

# # since stop doc was received, this appears in the next scan
# spec_comment(
# "TESTING: Appears at END of next scan",
# doc="stop",
# writer=specwriter,
# )

# self.assertEqual(
# len(specwriter.buffered_comments["stop"]),
# 2,
# "last end of scan comment buffered",
# )
# write_stream(specwriter, self.db["tune_ar"])

# for k in "start descriptor event".split():
# o = specwriter.buffered_comments
# self.assertEqual(len(o[k]), 0, f"no '{k}' {category}")
# expected = dict(start=2, stop=5, event=0, descriptor=0)
# for k, v in expected.items():
# self.assertEqual(
# len(specwriter.comments[k]), v, f"'{k}' comments"
# )
def test_SpecWriterCallback_newfile_exists(cat, tempdir):
testfile = os.path.join(tempdir, "tune_mr.dat")
if os.path.exists(testfile):
os.remove(testfile)
specwriter = apstools.filewriters.SpecWriterCallback(
filename=testfile
)

from apstools.filewriters import SCAN_ID_RESET_VALUE

assert SCAN_ID_RESET_VALUE == 0 # "default reset scan id"

write_stream(specwriter, cat.v1[TUNE_MR].documents())
assert os.path.exists(testfile) # "data file created"

raised = False
try:
specwriter.newfile(filename=testfile)
except ValueError:
raised = True
finally:
assert not raised # "file exists"
assert specwriter.reset_scan_id == 0 # "check scan id"

class my_RunEngine:
# duck type model *strictly* for testing *here*
md = dict(scan_id=SCAN_ID_RESET_VALUE)

RE = my_RunEngine()

specwriter.scan_id = -5 # an unusual value for testing only
RE.md["scan_id"] = -10 # an unusual value for testing only
specwriter.newfile(filename=testfile, scan_id=None, RE=RE)
assert specwriter.scan_id == 108 # "scan_id unchanged"
assert RE.md["scan_id"] == 108 # "RE.md['scan_id'] unchanged"

specwriter.scan_id = -5 # an unusual value for testing only
RE.md["scan_id"] = -10 # an unusual value for testing only
specwriter.newfile(filename=testfile, scan_id=False, RE=RE)
assert specwriter.scan_id == 108 # "scan_id unchanged"
assert RE.md["scan_id"] == 108 # "RE.md['scan_id'] unchanged"

specwriter.scan_id = -5 # an unusual value for testing only
RE.md["scan_id"] = -10 # an unusual value for testing only
specwriter.newfile(filename=testfile, scan_id=True, RE=RE)
assert specwriter.scan_id == 108 # "scan_id reset"
assert RE.md["scan_id"] == 108 # "RE.md['scan_id'] reset"

for n, s in {"0": 108, "108": 108, "110": 110}.items():
specwriter.scan_id = -5 # an unusual value for testing only
RE.md["scan_id"] = -10 # an unusual value for testing only
specwriter.newfile(filename=testfile, scan_id=int(n), RE=RE)
assert specwriter.scan_id == s
assert RE.md["scan_id"] == s


def test_SpecWriterCallback__rebuild_scan_command(cat, tempdir):
from apstools.filewriters import _rebuild_scan_command

start_docs = []
for tag, doc in cat.v1[TUNE_MR].documents():
if tag == "start":
start_docs.append(doc)
assert len(start_docs) == 1 # "unique start doc found"

doc = start_docs[0]
expected = "108 tune_mr()"
result = _rebuild_scan_command(doc)
assert result == expected # "rebuilt #S line"


def test_SpecWriterCallback_spec_comment(cat, tempdir):
from apstools.filewriters import spec_comment

# spec_comment(comment, doc=None, writer=None)
testfile = os.path.join(tempdir, "spec_comment.dat")
if os.path.exists(testfile):
os.remove(testfile)
specwriter = apstools.filewriters.SpecWriterCallback(
filename=testfile
)

for category in "buffered_comments comments".split():
for k in "start stop descriptor event".split():
o = getattr(specwriter, category)
assert len(o[k]) == 0

# insert comments with every document
spec_comment(
"TESTING: Should appear within start doc",
doc=None,
writer=specwriter,
)

for idx, document in enumerate(cat.v1[TUNE_MR].documents()):
tag, doc = document
msg = f"TESTING: document {idx+1}: '{tag}' %s specwriter.receiver"
spec_comment(msg % "before", doc=tag, writer=specwriter)
specwriter.receiver(tag, doc)
if tag == "stop":
# since stop doc was received, this appears in the next scan
spec_comment(
str(msg % "before") + " (appears at END of next scan)",
doc=tag,
writer=specwriter,
)
else:
spec_comment(msg % "after", doc=tag, writer=specwriter)

assert len(specwriter.buffered_comments["stop"]) == 1

# since stop doc was received, this appears in the next scan
spec_comment(
"TESTING: Appears at END of next scan",
doc="stop",
writer=specwriter,
)

assert len(specwriter.buffered_comments["stop"]) == 2
write_stream(specwriter, cat.v1[TUNE_AR].documents())

for k in "start descriptor event".split():
o = specwriter.buffered_comments
assert len(o[k]) == 0
expected = dict(start=2, stop=5, event=0, descriptor=0)
for k, v in expected.items():
assert len(specwriter.comments[k]) == v

0 comments on commit decdc6c

Please sign in to comment.