Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: SPEC filewriter scan numbering and newfile() when file exists #138

Merged
merged 20 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions apstools/filewriters.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ def descriptor(self, doc):
doc_hints_names = []
for k, d in doc["hints"].items():
doc_hints_names.append(k)
doc_hints_names += doc["hints"][k]["fields"]
doc_hints_names += d["fields"]

# independent variable(s) first
# assumes start["motors"] was defined
Expand Down Expand Up @@ -528,22 +528,29 @@ def newfile(self, filename=None, scan_id=None, RE=None):
self.clear()
filename = filename or self.make_default_filename()
if os.path.exists(filename):
ValueError(f"file {filename} exists")
from spec2nexus.spec import SpecDataFile
sdf = SpecDataFile(filename)
scan_list = sdf.getScanNumbers()
l = len(scan_list)
m = max(map(float, scan_list))
highest = int(max(l, m) + 0.9999) # solves issue #128
scan_id = max(scan_id or 0, highest)
self.spec_filename = filename
self.spec_epoch = int(time.time()) # ! no roundup here!!!
self.spec_host = socket.gethostname() or 'localhost'
self.spec_user = getpass.getuser() or 'BlueSkyUser'
self.write_file_header = True # don't write the file yet

# backwards-compatibility
if scan_id == True:
scan_id = SCAN_ID_RESET_VALUE
elif scan_id == False:
scan_id = None
if isinstance(scan_id, bool):
# True means reset the scan ID to default
# False means do not modify it
scan_id = {True: SCAN_ID_RESET_VALUE, False: None}[scan_id]
if scan_id is not None and RE is not None:
# assume isinstance(RE, bluesky.run_engine.RunEngine)
# RE is an instance of bluesky.run_engine.RunEngine
# (or duck type for testing)
RE.md["scan_id"] = scan_id
print(f"scan ID set to {scan_id}")
self.scan_id = scan_id
return self.spec_filename

def usefile(self, filename):
Expand Down
1 change: 1 addition & 0 deletions conda-recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ requirements:
- pyRestTable
- pandas
- xlrd
- spec2nexus

test:
imports:
Expand Down
16 changes: 16 additions & 0 deletions packaging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Packaging Hints

## PyPI upload

Preceed the wildcard with tag text (`apstools-1.1.1*`)::

python setup.py sdist bdist_wheel
twine upload dist/*

## Conda upload

In the upload command below, use the text reported
at (near) the end of a successful conda build.

conda build ./conda-recipe/
anaconda upload /home/mintadmin/Apps/anaconda/conda-bld/noarch/apstools-1.1.1-py_0.tar.bz2
26 changes: 0 additions & 26 deletions pypi.txt

This file was deleted.

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ ophyd
pandas
pyRestTable
xlrd
spec2nexus
2 changes: 2 additions & 0 deletions tests/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
def suite(*args, **kw):

import test_simple
import test_filewriter
# import test_excel
test_list = [
test_simple,
test_filewriter,
# test_excel
]

Expand Down
282 changes: 282 additions & 0 deletions tests/test_filewriter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@

"""
unit tests for the SPEC filewriter
"""

import json
import os
import shutil
import sys
import tempfile
import unittest
import zipfile

_test_path = os.path.dirname(__file__)
_path = os.path.join(_test_path, '..')
if _path not in sys.path:
sys.path.insert(0, _path)

from apstools.filewriters import SpecWriterCallback


ZIP_FILE = os.path.join(_test_path, "usaxs_docs.json.zip")
JSON_FILE = "usaxs_docs.json.txt"


def write_stream(specwriter, stream):
"""write the doc stream to the file"""
for document in stream:
tag, doc = document
specwriter.receiver(tag, doc)


def get_test_data():
"""get document streams as dict from zip file"""
with zipfile.ZipFile(ZIP_FILE, "r") as fp:
buf = fp.read(JSON_FILE).decode("utf-8")
return json.loads(buf)


class Test_Data_is_Readable(unittest.TestCase):

def test_00_testdata_exist(self):
self.assertTrue(
os.path.exists(ZIP_FILE),
"zip file with test data")
with zipfile.ZipFile(ZIP_FILE, "r") as fp:
self.assertIn(JSON_FILE, fp.namelist(), "JSON test data")

def test_testfile_content(self):
# get our test document stream
datasets = get_test_data()

census = {}
for document in datasets["tune_mr"]:
tag, _doc = document
if tag not in census:
census[tag] = 0
census[tag] += 1

# test that tune_mr content arrived intact
keys = dict(start=1, descriptor=2, event=33, stop=1)
self.assertEqual(
len(census.keys()),
len(keys),
"four document types")
for k, v in keys.items():
self.assertIn(k, census, f"{k} document exists")
self.assertEqual(
census[k],
v,
f"expected {v} '{k}' document(s)")


class Test_SpecWriterCallback(unittest.TestCase):

def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.db = get_test_data()

def tearDown(self):
if os.path.exists(self.tempdir):
shutil.rmtree(self.tempdir, ignore_errors=True)

def test_writer_default_name(self):
specwriter = SpecWriterCallback()
path = os.path.abspath(
os.path.dirname(
specwriter.spec_filename))
self.assertNotEqual(
path,
self.tempdir,
"default file not in tempdir")
self.assertEqual(
path,
os.path.abspath(os.getcwd()),
"default file to go in pwd")

# change the directory
specwriter.spec_filename = os.path.join(
self.tempdir,
specwriter.spec_filename)

self.assertFalse(
os.path.exists(specwriter.spec_filename),
"data file not created yet")
write_stream(specwriter, self.db["tune_mr"])
self.assertTrue(
os.path.exists(specwriter.spec_filename),
"data file created")

def test_writer_filename(self):
self.assertTrue(len(self.db) > 0, "test data ready")

testfile = os.path.join(self.tempdir, "tune_mr.dat")
if os.path.exists(testfile):
os.remove(testfile)
specwriter = SpecWriterCallback(filename=testfile)

self.assertIsInstance(
specwriter, SpecWriterCallback,
"specwriter object")
self.assertEqual(
specwriter.spec_filename,
testfile,
"output data file")

self.assertFalse(
os.path.exists(testfile),
"data file not created yet")
write_stream(specwriter, self.db["tune_mr"])
self.assertTrue(os.path.exists(testfile), "data file created")

def test_newfile_exists(self):
testfile = os.path.join(self.tempdir, "tune_mr.dat")
if os.path.exists(testfile):
os.remove(testfile)
specwriter = SpecWriterCallback(filename=testfile)

from apstools.filewriters import SCAN_ID_RESET_VALUE
self.assertEqual(SCAN_ID_RESET_VALUE, 0, "default reset scan id")

write_stream(specwriter, self.db["tune_mr"])
self.assertTrue(os.path.exists(testfile), "data file created")

try:
specwriter.newfile(filename=testfile)
raised = False
except ValueError:
raised = True
finally:
self.assertFalse(raised, "file exists")
self.assertEqual(specwriter.reset_scan_id, 0, "check scan id")

class my_RunEngine:
# dick type for testing _here_
md = dict(scan_id=SCAN_ID_RESET_VALUE)
RE = my_RunEngine()

specwriter.scan_id = -5 # an unusual value for testing only
RE.md["scan_id"] = -10 # an unusual value for testing only
specwriter.newfile(filename=testfile, scan_id=None, RE=RE)
self.assertEqual(specwriter.scan_id, 108, "scan_id unchanged")
self.assertEqual(RE.md["scan_id"], 108, "RE.md['scan_id'] unchanged")

specwriter.scan_id = -5 # an unusual value for testing only
RE.md["scan_id"] = -10 # an unusual value for testing only
specwriter.newfile(filename=testfile, scan_id=False, RE=RE)
self.assertEqual(specwriter.scan_id, 108, "scan_id unchanged")
self.assertEqual(RE.md["scan_id"], 108, "RE.md['scan_id'] unchanged")

specwriter.scan_id = -5 # an unusual value for testing only
RE.md["scan_id"] = -10 # an unusual value for testing only
specwriter.newfile(filename=testfile, scan_id=True, RE=RE)
self.assertEqual(specwriter.scan_id, 108, "scan_id reset")
self.assertEqual(RE.md["scan_id"], 108, "RE.md['scan_id'] reset")

for n, s in {'0': 108, '108': 108, '110': 110}.items():
specwriter.scan_id = -5 # an unusual value for testing only
RE.md["scan_id"] = -10 # an unusual value for testing only
specwriter.newfile(filename=testfile, scan_id=int(n), RE=RE)
self.assertEqual(specwriter.scan_id, s, f"scan_id set to {n}, actually {s}")
self.assertEqual(RE.md["scan_id"], s, f"RE.md['scan_id'] set to {n}, actually {s}")

def test__rebuild_scan_command(self):
from apstools.filewriters import _rebuild_scan_command

self.assertTrue(len(self.db) > 0, "test data ready")

start_docs = []
for header in self.db["tune_mr"]:
tag, doc = header
if tag == "start":
start_docs.append(doc)
self.assertEqual(len(start_docs), 1, "unique start doc found")

doc = start_docs[0]
expected = "108 tune_mr()"
result = _rebuild_scan_command(doc)
self.assertEqual(result, expected, "rebuilt #S line")

def test_spec_comment(self):
from apstools.filewriters import spec_comment

# spec_comment(comment, doc=None, writer=None)
testfile = os.path.join(self.tempdir, "spec_comment.dat")
if os.path.exists(testfile):
os.remove(testfile)
specwriter = SpecWriterCallback(filename=testfile)

for category in "buffered_comments comments".split():
for k in "start stop descriptor event".split():
o = getattr(specwriter, category)
self.assertEqual(len(o[k]), 0, f"no '{k}' {category}")

# insert comments with every document
spec_comment(
"TESTING: Should appear within start doc",
doc=None,
writer=specwriter)

for idx, document in enumerate(self.db["tune_mr"]):
tag, doc = document
msg = f"TESTING: document {idx+1}: '{tag}' %s specwriter.receiver"
spec_comment(
msg % "before",
doc=tag,
writer=specwriter)
specwriter.receiver(tag, doc)
if tag == "stop":
# since stop doc was received, this appears in the next scan
spec_comment(
str(msg % "before") + " (appears at END of next scan)",
doc=tag,
writer=specwriter)
else:
spec_comment(
msg % "after",
doc=tag,
writer=specwriter)

self.assertEqual(
len(specwriter.buffered_comments['stop']),
1,
"last 'stop' comment buffered")

# since stop doc was received, this appears in the next scan
spec_comment(
"TESTING: Appears at END of next scan",
doc="stop",
writer=specwriter)

self.assertEqual(
len(specwriter.buffered_comments['stop']),
2,
"last end of scan comment buffered")
write_stream(specwriter, self.db["tune_ar"])

for k in "start descriptor event".split():
o = specwriter.buffered_comments
self.assertEqual(len(o[k]), 0, f"no '{k}' {category}")
expected = dict(start=2, stop=5, event=0, descriptor=0)
for k, v in expected.items():
self.assertEqual(
len(specwriter.comments[k]),
v,
f"'{k}' comments")


def suite(*args, **kw):
test_list = [
Test_Data_is_Readable,
Test_SpecWriterCallback,
]
test_suite = unittest.TestSuite()
for test_case in test_list:
test_suite.addTest(unittest.makeSuite(test_case))
return test_suite


if __name__ == "__main__":
runner=unittest.TextTestRunner()
runner.run(suite())
Binary file added tests/usaxs_docs.json.zip
Binary file not shown.