Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Mongo Data Migration #804

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ Write the date in place of the "Unreleased" in the case a new version is release

## Unreleased

### Added

- Automatic reshaping of tiff data by the adapter to account for
extra/missing singleton dimensions
- Refactor CSVAdapter to allow pd.read_csv kwargs
- Add adapters for reading back assets with the image/jpeg and
multipart/related;type=image/jpeg mimetypes.

Expand Down
73 changes: 73 additions & 0 deletions scripts/mongo_migration/load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import json
import importlib
from datetime import datetime
import yaml
import event_model
import os
import re
import sys
import tqdm
from bson import json_util
from databroker.mongo_normalized import MongoAdapter

def import_from_yaml(spec):
module_name, func_name = spec.split(':')
return getattr(importlib.import_module(module_name), func_name)


def unbatch_documents(docs):
for item in docs:
if item['name'] == 'event_page':
for _doc in event_model.unpack_event_page(item['doc']):
yield {"name": "event", "doc": _doc}
elif item['name'] == 'datum_page':
for _doc in event_model.unpack_datum_page(item['doc']):
yield {"name":"datum", "doc": _doc}
else:
yield item


beamline = 'iss'
mongo_user = f"{beamline.lower()}_read"
mongo_pass = os.environ.get("MONGO_PASSWORD", MONGO_PASSWORD)

# Load the beamline-specific patches and handlers
tiled_config_dir = f"{os.getenv('HOME')}/.config/tiled"
sys.path.append(tiled_config_dir)
with open(tiled_config_dir + "/profiles/profiles.yml", 'r') as f:
profiles = yaml.safe_load(f)
args = profiles[beamline]['direct']['trees'][0]['args']
transforms = args.get('transforms', {})
handlers = args.get('handler_registry', {})
uri = args['uri']
uri = re.sub(r'\$\{(?:MONGO_USER_)\w{3,}\}', mongo_user, uri)
uri = re.sub(r'\$\{(?:MONGO_PASSWORD_)\w{3,}\}', mongo_pass, uri)

ma = MongoAdapter.from_uri(uri, transforms={key:import_from_yaml(val) for key, val in transforms.items()},
handler_registry={key:import_from_yaml(val) for key, val in handlers.items()})
coll = ma._run_start_collection
start_doc_cursor = coll.find()


docs = []
for i in range(100):
bs_run = ma._get_run(start_doc_cursor.next())
g = bs_run.documents(fill=False, size=25)
g = map(lambda item : {"name":item[0], "doc": (item[1], item[1].pop("_id", None))[0]}, g)
g = unbatch_documents(g)
docs += list(g)


# cur = coll.find(filter={'time':{'$gt':dateutil.parser.parse('2024-10-22 00:00').timestamp(),
# '$lt':dateutil.parser.parse('2024-10-23 00:00').timestamp()}}
# , projection={'_id':False})
# docs = []
# for doc0 in cur:
# bs_run = ma._get_run(doc0)
# g = bs_run.documents(fill=False, size=25)
# g = map(lambda item : {"name":item[0], "doc": (item[1], item[1].pop("_id", None))[0]}, g)
# g = unbatch_documents(g)
# docs += list(g)

with open(f'data/docs_{beamline.lower()}.json', 'w') as f:
f.write(json_util.dumps(docs))
30 changes: 30 additions & 0 deletions scripts/mongo_migration/migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# tiled serve catalog --temp --api-key=secret -w /tmp -r /Users/eugene/code/demo_stream_documents
# tiled serve catalog --temp --api-key=secret -w /tmp -r /nsls2/data

import json
from pathlib import Path
import shutil
from tqdm import tqdm
import traceback

from tiled.client import from_uri
from deepdiff import DeepDiff

from bluesky.callbacks.tiled_writer import TiledWriter
from tiled.client import from_uri

documents = json.loads(Path("data/docs_smi.json").read_text())

client = from_uri("http://localhost:8000", api_key="secret", include_data_sources=True)
tw = TiledWriter(client)

start_doc_uid = None
for item in tqdm(documents[:30]):
name = item["name"]
doc = item["doc"]
if name == "start":
start_doc_uid = doc['uid']
try:
tw(name, doc)
except Exception as e:
print(f"Error while processing Bluesky run with {start_doc_uid=}\n{traceback.format_exception(e)}")
110 changes: 110 additions & 0 deletions scripts/mongo_migration/validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# tiled serve catalog --temp --api-key=secret -w /tmp -r /Users/eugene/code/demo_stream_documents

import json
from pathlib import Path
import shutil
from tqdm import tqdm
import numpy as np

from tiled.client import from_uri
from deepdiff import DeepDiff

from bluesky.callbacks.tiled_writer import TiledWriter
from tiled.client import from_uri

class ValidationException(Exception):

def __init__(self, message, uid=None):
super().__init__(message)
self.uid = uid

class RunValidationException(ValidationException):
pass

class MetadataValidationException(ValidationException):
pass

class TableValidationException(ValidationException):
pass

class ContainerValidationException(ValidationException):
pass

class DataValidationException(ValidationException):
pass


def validate(c0, c1, uid=None):
run0 = c0[uid]
run1 = c1[uid]

# Check the Run metadata. Ignore formatting of summary.datetime
meta0 = dict(run0.metadata)
meta1 = dict(run1.metadata)
diff = DeepDiff(meta0, meta1, exclude_paths="root['summary']['datetime']")
if diff:
raise MetadataValidationException(diff, uid)

# Check the data stream names
stream_names = set(run0.keys())
if stream_names != set(run1.keys()):
raise ValidationException("Inconsistent stream names", uid)

# Check (descriptor) metadata for each stream
for name in stream_names:
md0 = dict(run0[name].metadata)
md1 = dict(run1[name].metadata)
conf_dict, time_dict = {}, {}
for desc in md0['descriptors']:
diff = DeepDiff(desc, md1, exclude_paths=["root['configuration']", "root['time']", "root['uid']"])
if diff:
raise MetadataValidationException(diff, uid)
conf_dict[desc['uid']] = desc.get('configuration')
time_dict[desc['uid']] = desc.get('time')
diff = DeepDiff(conf_dict, md1['configuration'])
if diff:
raise MetadataValidationException(diff, uid)
diff = DeepDiff(time_dict, md1['time'])
if diff:
raise MetadataValidationException(diff, uid)

# Check structure for each stream
for name in stream_names:
# Check internal data
external_data_cols = set()
if ("data" in run0[name].keys()) and ("internal" not in run1[name].keys()):
raise TableValidationException(f"Missing internal data table in stream {name}", uid)
else:
ds1 = run1[name]['internal']
for old_key, prefix in {"data" : '', "timestams": "ts_"}.items():
if old_key in run0[name].keys():
ds0 = run0[name][old_key]
external_data_cols = external_data_cols.union([prefix+k for k in ds0.keys()]).difference(ds1.columns+['time'])

# Check external data
if external_data_cols and ("external" not in run1[name].keys()):
raise ContainerValidationException(f"Missing external data in stream {name}", uid)
if external_data_cols != set(run1[name]['external'].keys()):
raise ContainerValidationException(f"Inconsistent external data in stream {name}", uid)
for key in external_data_cols:
dat0 = run0[name]['data'][key].read()
dat1 = run1[name]['external'][key].read()
if not np.array_equal(dat0, dat1):
raise DataValidationException(f"External data mismatch for {key=} in stream {name}", uid)

# Check config for each stream
for name in stream_names:
if ("config" in run0[name].keys()) and ("config" not in run1[name].keys()):
raise TableValidationException(f"Missing config container in stream {name}", uid)
else:
config_keys = set(run0[name]['config'].keys())
if set(config_keys) != set(run1[name]['config'].keys()):
raise ContainerValidationException(f"Inconsistent config keys in stream {name}", uid)

for ckey in config_keys:
ds1 = run1[name]['config'][ckey]
ds0 = run0[name]['config'][ckey]
missing_conf_cols = set(ds0.keys()).difference(ds1.columns)
if missing_conf_cols:
raise TableValidationException(f"Missing config dataset columns {missing_conf_cols} in {name}/config/{ckey}", uid)

36 changes: 33 additions & 3 deletions tiled/_tests/test_tiff.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
from ..client import Context, from_context
from ..client.register import IMG_SEQUENCE_EMPTY_NAME_ROOT, register
from ..server.app import build_app
from ..structures.array import ArrayStructure, BuiltinDtype
from ..utils import ensure_uri

COLOR_SHAPE = (11, 17, 3)
rng = numpy.random.default_rng(12345)


@pytest.fixture(scope="module")
Expand All @@ -21,20 +23,27 @@ def client(tmpdir_module):
sequence_directory.mkdir()
filepaths = []
for i in range(3):
data = numpy.random.random((5, 7, 4))
data = rng.integers(0, 255, size=(5, 7, 4), dtype="uint8")
filepath = sequence_directory / f"temp{i:05}.tif"
tf.imwrite(filepath, data)
filepaths.append(filepath)
color_data = numpy.random.randint(0, 255, COLOR_SHAPE, dtype="uint8")
color_data = rng.integers(0, 255, size=COLOR_SHAPE, dtype="uint8")
path = Path(tmpdir_module, "color.tif")
tf.imwrite(path, color_data)

tree = MapAdapter(
{
"color": TiffAdapter(ensure_uri(path)),
"sequence": TiffSequenceAdapter.from_uris(
[ensure_uri(filepath) for filepath in filepaths]
),
"5d_sequence": TiffSequenceAdapter.from_uris(
[ensure_uri(filepath) for filepath in filepaths],
structure=ArrayStructure(
shape=(3, 1, 5, 7, 4),
chunks=((1, 1, 1), (1,), (5,), (7,), (4,)),
data_type=BuiltinDtype.from_numpy_dtype(numpy.dtype("uint8")),
),
),
}
)
app = build_app(tree)
Expand Down Expand Up @@ -62,6 +71,27 @@ def test_tiff_sequence(client, slice_input, correct_shape):
assert arr.shape == correct_shape


@pytest.mark.parametrize(
"slice_input, correct_shape",
[
(None, (3, 1, 5, 7, 4)),
(..., (3, 1, 5, 7, 4)),
((), (3, 1, 5, 7, 4)),
(0, (1, 5, 7, 4)),
(slice(0, 3, 2), (2, 1, 5, 7, 4)),
((1, slice(0, 10), slice(0, 3), slice(0, 3)), (1, 3, 3, 4)),
((slice(0, 3), 0, slice(0, 3), slice(0, 3)), (3, 3, 3, 4)),
((..., 0, 0, 0, 0), (3,)),
((0, slice(0, 1), slice(0, 1), slice(0, 2), ...), (1, 1, 2, 4)),
((0, ..., slice(0, 2)), (1, 5, 7, 2)),
((..., slice(0, 1)), (3, 1, 5, 7, 1)),
],
)
def test_forced_reshaping(client, slice_input, correct_shape):
arr = client["5d_sequence"].read(slice=slice_input)
assert arr.shape == correct_shape


@pytest.mark.parametrize("block_input, correct_shape", [((0, 0, 0, 0), (1, 5, 7, 4))])
def test_tiff_sequence_block(client, block_input, correct_shape):
arr = client["sequence"].read_block(block_input)
Expand Down
6 changes: 5 additions & 1 deletion tiled/_tests/test_writing.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,11 @@ def test_write_with_specified_mimetype(tree):
df = pandas.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})
structure = TableStructure.from_pandas(df)

for mimetype in [PARQUET_MIMETYPE, "text/csv", APACHE_ARROW_FILE_MIME_TYPE]:
for mimetype in [
PARQUET_MIMETYPE,
"text/csv",
APACHE_ARROW_FILE_MIME_TYPE,
]:
x = client.new(
"table",
[
Expand Down
Loading
Loading