Skip to content

Commit

Permalink
Merge pull request #19 from materialsproject/useability
Browse files Browse the repository at this point in the history
Useability updates
  • Loading branch information
shyamd committed Mar 31, 2018
2 parents 6834960 + 0456b00 commit 2ee043f
Show file tree
Hide file tree
Showing 8 changed files with 644 additions and 189 deletions.
12 changes: 7 additions & 5 deletions examples/runner_sample.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Usage:
# with multiprocessing:
# python runner_sample.py
# with mpi(need mpi4py pacakge):
# mpiexec -n 5 python runner_sample.py
"""
Usage:
with multiprocessing:
python runner_sample.py
with mpi(need mpi4py pacakge):
mpiexec -n 5 python runner_sample.py
"""

from maggma.stores import MemoryStore
from maggma.builder import Builder
Expand Down
297 changes: 254 additions & 43 deletions maggma/advanced_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@
"""
Advanced Stores for behavior outside normal access patterns
"""
from maggma.stores import Store, MongoStore
from pydash.objects import set_, get, has
from pydash.utilities import to_path
import pydash.objects
import os
import hvac
import json
import os
import boto3
import botocore
import zlib
from datetime import datetime
from maggma.stores import Store, MongoStore
from maggma.utils import lazy_substitute, substitute
from monty.json import jsanitize


class VaultStore(MongoStore):
"""
Extends MongoStore to read credentials out of Vault server
and uses these values to initialize MongoStore instance
"""

def __init__(self, collection_name, vault_secret_path):
"""
collection (string): name of mongo collection
Expand Down Expand Up @@ -58,12 +62,7 @@ def __init__(self, collection_name, vault_secret_path):
username = db_creds.get("username", "")
password = db_creds.get("password", "")

super(VaultStore, self).__init__(database,
collection_name,
host,
port,
username,
password)
super(VaultStore, self).__init__(database, collection_name, host, port, username, password)


class AliasingStore(Store):
Expand All @@ -77,7 +76,9 @@ def __init__(self, store, aliases, **kwargs):
aliases (dict): dict of aliases of the form external key: internal key
"""
self.store = store
# Given an external key tells what the internal key is
self.aliases = aliases
# Given the internal key tells us what the external key is
self.reverse_aliases = {v: k for k, v in aliases.items()}
self.kwargs = kwargs

Expand Down Expand Up @@ -109,11 +110,36 @@ def query_one(self, properties=None, criteria=None, **kwargs):
return d

def distinct(self, key, criteria=None, **kwargs):
if key in self.aliases:
key = self.aliases[key]
if isinstance(key, list):
criteria = criteria if criteria else {}
# Update to ensure keys are there
if all_exist:
criteria.update({k: {"$exists": True} for k in key if k not in criteria})

results = []
for d in self.groupby(key, properties=key, criteria=criteria):
results.append(d["_id"])
return results

else:
criteria = criteria if criteria else {}
lazy_substitute(criteria, self.reverse_aliases)
key = self.aliases[key] if key in self.aliases else key
return self.collection.distinct(key, filter=criteria, **kwargs)

def groupby(self, keys, properties=None, criteria=None, **kwargs):
# Convert to a list
keys = keys if isinstance(keys, list) else [keys]

# Make the aliasing transformations on keys
keys = [self.aliases[k] if k in self.aliases else k for k in keys]

# Update criteria and properties based on aliases
criteria = criteria if criteria else {}
lazy_substitute(criteria, self.aliases)
return self.store.distinct(key, criteria, **kwargs)
substitute(properties, self.reverse_aliases)
lazy_substitute(criteria, self.reverse_aliases)

return self.store.groupby(keys=keys, properties=properties, criteria=criteria, **kwargs)

def update(self, docs, update_lu=True, key=None):
key = key if key else self.key
Expand All @@ -126,10 +152,10 @@ def update(self, docs, update_lu=True, key=None):

self.store.update(docs, update_lu=update_lu, key=key)

def ensure_index(self, key, unique=False):
def ensure_index(self, key, unique=False, **kwargs):
if key in self.aliases:
key = self.aliases
return self.store.ensure_index(key, unique)
return self.store.ensure_index(key, unique, **kwargs)

def close(self):
self.store.close()
Expand All @@ -138,37 +164,222 @@ def close(self):
def collection(self):
return self.store.collection

def connect(self):
self.store.connect()
def connect(self, force_reset=False):
self.store.connect(force_reset=force_reset)


def lazy_substitute(d, aliases):
class AmazonS3Store(Store):
"""
Simple top level substitute that doesn't dive into mongo like strings
GridFS like storage using Amazon S3 and a regular store for indexing
Assumes Amazon AWS key and secret key are set in environment or default config file
"""
for alias, key in aliases.items():
if key in d:
d[alias] = d[key]
del d[key]

def __init__(self, index, bucket, **kwargs):
"""
Initializes an S3 Store
Args:
index (Store): a store to use to index the S3 Bucket
bucket (str) : name of the bucket
"""
self.index = index
self.bucket = bucket
self.s3 = None
self.s3_bucket = None
# Force the key to be the same as the index
kwargs["key"] = index.key
super(AmazonS3Store, self).__init__(**kwargs)

def connect(self, force_reset=False):
self.index.connect(force_reset=force_reset)
if not self.s3:
self.s3 = boto3.resource("s3")
# TODO: Provide configuration variable to create bucket if not present
if self.bucket not in self.s3.list_buckets():
raise Exception("Bucket not present on AWS: {}".format(self.bucket))
self.s3_bucket = self.s3.Bucket(self.bucket)

def substitute(d, aliases):
"""
Substitutes keys in dictionary
Accepts multilevel mongo like keys
"""
for alias, key in aliases.items():
if has(d, key):
set_(d, alias, get(d, key))
unset(d, key)
def close(self):
self.index.close()

@property
def collection(self):
# For now returns the index collection since that is what we would "search" on
return self.index

def unset(d, key):
"""
Unsets a key
"""
pydash.objects.unset(d, key)
path = to_path(key)
for i in reversed(range(1, len(path))):
if len(get(d, path[:i])) == 0:
unset(d, path[:i])
def query(self, properties=None, criteria=None, **kwargs):
"""
Function that gets data from Amazon S3. This store ignores all
property projections as its designed for whole document access
Args:
properties (list or dict): This will be ignored by the S3
Store
criteria (dict): filter for query, matches documents
against key-value pairs
**kwargs (kwargs): further kwargs to Collection.find
"""
for f in self.index.query(criteria=criteria, **kwargs):
try:
data = self.s3_bucket.Object(f[self.key]).get()
except botocore.exceptions.ClientError as e:
# If a client error is thrown, then check that it was a 404 error.
# If it was a 404 error, then the object does not exist.
error_code = int(e.response['Error']['Code'])
if error_code == 404:
self.logger.error("Could not find S3 object {}".format(f[self.key]))
break

if f.get("compression", "") is "zlib":
data = zlib.decompress(data)

yield json.loads(data)

def query_one(self, properties=None, criteria=None, **kwargs):
"""
Function that gets a single document from Amazon S3. This store
ignores all property projections as its designed for whole
document access
Args:
properties (list or dict): This will be ignored by the S3
Store
criteria (dict): filter for query, matches documents
against key-value pairs
**kwargs (kwargs): further kwargs to Collection.find
"""
f = self.index.query_one(criteria=criteria, **kwargs)
if f:
try:
data = self.s3_bucket.Object(f[self.key]).get()
except botocore.exceptions.ClientError as e:
# If a client error is thrown, then check that it was a 404 error.
# If it was a 404 error, then the object does not exist.
error_code = int(e.response['Error']['Code'])
if error_code == 404:
self.logger.error("Could not find S3 object {}".format(f[self.key]))
return None

if f.get("compression", "") is "zlib":
data = zlib.decompress(data)

return json.loads(data)
else:
return None

def distinct(self, key, criteria=None, all_exist=False, **kwargs):
"""
Function get to get all distinct values of a certain key in the
AmazonS3 Store. This searches the index collection for this data
Args:
key (mongolike key or list of mongolike keys): key or keys
for which to find distinct values or sets of values.
criteria (filter criteria): criteria for filter
all_exist (bool): whether to ensure all keys in list exist
in each document, defaults to False
**kwargs (kwargs): kwargs corresponding to collection.distinct
"""
# Index is a store so it should have its own distinct function
return self.index.distinct(key, filter=criteria, **kwargs)

def groupby(self, keys, properties=None, criteria=None, **kwargs):
"""
Simple grouping function that will group documents
by keys. Only searches the index collection
Args:
keys (list or string): fields to group documents
properties (list): properties to return in grouped documents
criteria (dict): filter for documents to group
allow_disk_use (bool): whether to allow disk use in aggregation
Returns:
command cursor corresponding to grouped documents
elements of the command cursor have the structure:
{'_id': {"KEY_1": value_1, "KEY_2": value_2 ...,
'docs': [list_of_documents corresponding to key values]}
"""
self.index.groupby(keys, properties, criteria, **kwargs)

def ensure_index(self, key, unique=False):
"""
Wrapper for pymongo.Collection.ensure_index for the files collection
"""
return self.index.ensure_index(key, unique=unique, background=True)

def update(self, docs, update_lu=True, key=None, compress=False):
"""
Function to update associated MongoStore collection.
Args:
docs ([dict]): list of documents
key ([str] or str): keys to use to build search doc
compress (bool): compress the document or not
"""
now = datetime.now()
search_docs = []
for d in docs:
if isinstance(key, list):
search_doc = {k: d[k] for k in key}
elif key:
search_doc = {key: d[key]}
else:
search_doc = {}

# Always include our main key
search_doc[self.key] = d[self.key]

# Remove MongoDB _id from search
if "_id" in search_doc:
del search_doc["_id"]

# Add a timestamp
if update_lu:
search_doc[self.lu_field] = now
d[self.lu_field] = now

data = json.dumps(jsanitize(d)).encode()

# Compress with zlib if chosen
if compress:
search_doc["compression"] = "zlib"
data = zlib.compress(data)

self.s3_bucket.put_object(Key=d[self.key], Body=data, Metadata=search_doc)
search_docs.append(search_doc)

# Use store's update to remove key clashes
self.index.update(search_docs)

@property
def last_updated(self):
return self.index.last_updated

def lu_filter(self, targets):
"""Creates a MongoDB filter for new documents.
By "new", we mean documents in this Store that were last updated later
than any document in targets.
Args:
targets (list): A list of Stores
"""
self.index.lu_filter(targets)

def __hash__(self):
return hash((self.index.__hash__, self.bucket))

def rebuild_index_from_s3_data(self):
"""
Rebuilds the index Store from the data in S3
Relies on the index document being stores as the metadata for the file
"""
index_docs = []
for file in self.s3_bucket.objects.all():
# TODO: Transform the data back from strings and remove AWS S3 specific keys
index_docs.append(file.metadata)

self.index.update(index_docs)
4 changes: 3 additions & 1 deletion maggma/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import abc
import logging
import time
from collections import defaultdict, deque
from threading import Thread, Condition, BoundedSemaphore
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
Expand Down Expand Up @@ -225,6 +224,9 @@ def update_targets(self):


class MultiprocProcessor(BaseProcessor):
"""
Processor to run builders using python multiprocessing
"""
def __init__(self, builders, num_workers=None):
# multiprocessing only if mpi is not used, no mixing
self.num_workers = num_workers
Expand Down
Loading

0 comments on commit 2ee043f

Please sign in to comment.