Skip to content

Commit

Permalink
Improved testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
jcejohnson committed May 28, 2023
1 parent 0ce536d commit 91f7198
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 97 deletions.
8 changes: 6 additions & 2 deletions src/json_expand_o_matic/expander.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ def __init__(self, *, logger, path, data, leaf_nodes, **options):

# options will not include pool or zip options when called recursively.
self.pool_options = {
key: self.options.pop(key) for key in {key for key in self.options.keys() if key.startswith("pool_")}
# See ExpansionPool
key: self.options.pop(key)
for key in {key for key in self.options.keys() if key.startswith("pool_")}
}
self.zip_options = {
key: self.options.pop(key) for key in {key for key in self.options.keys() if key.startswith("zip_")}
# See ExpansionZipper
key: self.options.pop(key)
for key in {key for key in self.options.keys() if key.startswith("zip_")}
}

assert (
Expand Down
53 changes: 28 additions & 25 deletions src/json_expand_o_matic/expansion_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
logger = logging.getLogger(__name__)


class Modes(Enum):
class InitArgsType(Enum):
ArrayOfTuples = "ArrayOfTuples"
SharedMemoryArray = "SharedMemoryArray"


__mode__ = Modes.SharedMemoryArray
__unpack__ = None
__initargsmode__ = InitArgsType.SharedMemoryArray
__unpackfunc__ = None
__work__ = None


Expand All @@ -41,26 +41,26 @@ def __iter__(self):


def _initialize(mode, data):
global __mode__
global __initargsmode__
global __work__
global __unpack__
global __unpackfunc__

__mode__ = mode
__initargsmode__ = mode
__work__ = data

if __mode__ == Modes.SharedMemoryArray:
__unpack__ = lambda request: [ # noqa: E731
if __initargsmode__ == InitArgsType.SharedMemoryArray:
__unpackfunc__ = lambda request: [ # noqa: E731
string_at(element).decode("utf-8") for element in __work__[request]
]
elif __mode__ == Modes.ArrayOfTuples:
__unpack__ = lambda request: __work__[request] # noqa: E731
elif __initargsmode__ == InitArgsType.ArrayOfTuples:
__unpackfunc__ = lambda request: __work__[request] # noqa: E731


def _write_file(request):
global __unpack__
global __unpackfunc__

begin = time.time()
directory, filename, data, checksum_filename, checksum = __unpack__(request)
directory, filename, data, checksum_filename, checksum = __unpackfunc__(request)

def do():
with open(f"{directory}/{filename}", "w") as f:
Expand Down Expand Up @@ -89,18 +89,18 @@ def __init__(
pool_ratio: Optional[float] = None,
pool_size: Optional[int] = None,
pool_disable: Optional[bool] = False,
pool_mode: Union[str, Modes] = Modes.SharedMemoryArray,
pool_mode: Union[str, InitArgsType] = InitArgsType.SharedMemoryArray,
):
assert logger, "logger is required"
self.logger = logger
self.work: list = list()

self.mode = Modes(pool_mode)
self.init_style = InitArgsType(pool_mode)

self._set_pool_size(pool_ratio, pool_size, pool_disable)

if self.pool_size > 1:
logger.info(f"PoolSize: [{self.pool_size}]. Mode [{self.mode.value}].")
logger.info(f"PoolSize: [{self.pool_size}]. Mode [{self.init_style.value}].")
else:
logger.info(f"PoolSize: [{self.pool_size}].")

Expand All @@ -111,7 +111,7 @@ def finalize(self):
begin = time.time()

if self.pool_size == 1:
_initialize(Modes.ArrayOfTuples, self.work)
_initialize(InitArgsType.ArrayOfTuples, self.work)
results = [_write_file(i) for i in range(0, len(self.work))]

else:
Expand All @@ -122,23 +122,26 @@ def finalize(self):
self.work_time = sum(results)
self.overhead = self.elapsed - self.work_time

def _pooled_processing(self, chunksize, data):
if self.mode == Modes.SharedMemoryArray:
def _pooled_processing(self):
if self.init_style == InitArgsType.SharedMemoryArray:
data = self._prepare_shared_memory_array()
elif self.mode == Modes.ArrayOfTuples:
elif self.init_style == InitArgsType.ArrayOfTuples:
data = self.work

chunksize = 1 + int(len(self.work) / self.pool_size)

with mp.Pool(processes=self.pool_size, initializer=_initialize, initargs=(self.mode, data)) as pool:
with mp.Pool(processes=self.pool_size, initializer=_initialize, initargs=(self.init_style, data)) as pool:
futures = pool.map(_write_file, range(0, len(self.work)), chunksize=chunksize)
results = [f for f in futures]
return results

def _prepare_shared_memory_array(self):
value_list = [
WorkTuple(
*[cast(create_string_buffer(component.encode("utf-8")), POINTER(c_ubyte)) for component in work_unit]
*[
cast(create_string_buffer((component or "").encode("utf-8")), POINTER(c_ubyte))
for component in work_unit
]
)
for work_unit in self.work
]
Expand All @@ -153,9 +156,9 @@ def _set_pool_size(self, pool_ratio, pool_size, pool_disable):
elif pool_size == 0 and not pool_ratio:
self.pool_size = os.cpu_count()
elif pool_ratio:
assert pool_size is None, "Programmer error."
self.pool_size = abs(int(os.cpu_count() * self.pool_ratio))
assert (
pool_size is None
), f"Programmer error: pool_ratio [{pool_ratio}] cannot be used with pool_size [{pool_size}]."
self.pool_size = abs(int(os.cpu_count() * pool_ratio))
else:
assert pool_size is None, "Programmer error."
assert pool_ratio is None, "Programmer error."
self.pool_size = 1
31 changes: 25 additions & 6 deletions src/json_expand_o_matic/expansion_zipper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,45 @@
import logging
import os
import zipfile
from typing import Optional, Tuple
from enum import Enum
from typing import Optional, Tuple, Union


class OutputChoice(Enum):
KeepZip = "KeepZip"
UnZipped = "UnZipped"
Zipped = "Zipped"


class ExpansionZipper:
def __init__(
self,
*,
logger: logging.Logger,
output_path: Optional[str] = None,
zip_root: Optional[str] = None,
zip_file: Optional[str] = None,
output_path: Optional[str] = None, # . Where the output will be written.
zip_root: Optional[str] = None, # .... Where all the files are within the zip.
zip_file: Optional[str] = None, # .... Name of the zip file to create in `output_path`.
zip_output: Union[str, OutputChoice] = OutputChoice.UnZipped, # Keep zipped, unzip or both.
):
assert logger, "logger is required"
self.logger = logger
self.work: list = list()

self.output_mode = OutputChoice(zip_output)

if output_path:
if not zip_file:
if zip_file and zip_root:
...
elif not zip_file and not zip_root:
zip_file = os.path.basename(output_path)
output_path = os.path.dirname(output_path) or "."
zip_root = os.path.basename(output_path)
output_path = os.path.dirname(output_path)
elif zip_file:
zip_root = os.path.basename(output_path)
output_path = os.path.dirname(output_path)
elif zip_root:
zip_file = os.path.basename(output_path)

else:
output_path = "."

Expand Down
39 changes: 16 additions & 23 deletions tests/test_leaves.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,36 @@ def test_data(self, raw_data):
def original_data(self, raw_data):
return json.loads(json.dumps(raw_data))

def test_actors1(self, tmpdir, test_data, original_data, threaded):
def test_actors1(self, tmpdir, test_data, original_data):
"""Verify that we can create a json file for each actor and not recurse any further."""

self._actors_test(tmpdir, test_data, original_data, "/root/actors/.*", threaded)
self._actors_test(tmpdir, test_data, original_data, "/root/actors/.*")

def test_actors2(self, tmpdir, test_data, original_data, threaded):
def test_actors2(self, tmpdir, test_data, original_data):
"""Same as test_actors1 but with a more precise regex."""

self._actors_test(tmpdir, test_data, original_data, "/root/actors/[^/]+", threaded)
self._actors_test(tmpdir, test_data, original_data, "/root/actors/[^/]+")

def test_charlie1(self, tmpdir, test_data, original_data, threaded):
def test_charlie1(self, tmpdir, test_data, original_data):
"""Verify that we can single out an actor."""
self._charlie_test(tmpdir, test_data, original_data, "/root/actors/charlie_chaplin", threaded)
self._charlie_test(tmpdir, test_data, original_data, "/root/actors/charlie_chaplin")

def test_charlie2(self, tmpdir, test_data, original_data, threaded):
def test_charlie2(self, tmpdir, test_data, original_data):
"""Like test_charlie1 but with a loose wildcard."""
self._charlie_test(tmpdir, test_data, original_data, "/root/actors/[abcxyz].*", threaded)
self._charlie_test(tmpdir, test_data, original_data, "/root/actors/[abcxyz].*")

def test_charlie3(self, tmpdir, test_data, original_data, threaded):
def test_charlie3(self, tmpdir, test_data, original_data):
"""Like test_charlie1 but with tighter regex."""
self._charlie_test(tmpdir, test_data, original_data, "/root/actors/[abcxyz][^/]+", threaded)
self._charlie_test(tmpdir, test_data, original_data, "/root/actors/[abcxyz][^/]+")

def test_threaded_nested1(self, tmpdir, test_data, original_data, threaded):
def test_threaded_nested1(self, tmpdir, test_data, original_data):
"""Test a simple leaf_nodes scenario."""

expanded = JsonExpandOMatic(path=tmpdir).expand(
test_data,
root_element="root",
preserve=False,
leaf_nodes=[{"/root/actors/.*": ["/[^/]+/movies/.*", "/[^/]+/filmography"]}],
threaded=threaded,
)
assert expanded == {"root": {"$ref": f"{tmpdir.basename}/root.json"}}

Expand Down Expand Up @@ -90,7 +89,7 @@ def test_threaded_nested1(self, tmpdir, test_data, original_data, threaded):
assert os.path.exists(f"{tmpdir}/root/actors/dwayne_johnson/hobbies.json")
assert not os.path.exists(f"{tmpdir}/root/actors/dwayne_johnson/hobbies")

def test_nested1_equivalency(self, tmpdir, test_data, original_data, threaded):
def test_nested1_equivalency(self, tmpdir, test_data, original_data):
"""
In a nested leaf-node expression the dict key is treated as it
would be in the non-nested case.
Expand All @@ -110,7 +109,6 @@ def test_nested1_equivalency(self, tmpdir, test_data, original_data, threaded):
root_element="root",
preserve=False,
leaf_nodes=[{"/root/actors/.*": ["/[^/]+/movies/.*", "/[^/]+/filmography"]}],
threaded=threaded,
)
nested_files = [x.replace(f"{tmpdir}/n", "") for x in glob.glob(f"{tmpdir}/n", recursive=True)]

Expand All @@ -119,13 +117,12 @@ def test_nested1_equivalency(self, tmpdir, test_data, original_data, threaded):
root_element="root",
preserve=False,
leaf_nodes=["/root/actors/.*/movies/.*", "/root/actors/.*/filmography"],
threaded=threaded,
)
flattened_files = [x.replace(f"{tmpdir}/f", "") for x in glob.glob(f"{tmpdir}/f", recursive=True)]

assert nested_files == flattened_files

def test_nested2(self, tmpdir, test_data, original_data, threaded):
def test_nested2(self, tmpdir, test_data, original_data):
"""Test a targeted leaf_node exmple.
The expressions listed in the dict value are relative to the
Expand All @@ -141,7 +138,6 @@ def test_nested2(self, tmpdir, test_data, original_data, threaded):
root_element="root",
preserve=False,
leaf_nodes=[{"/root/actors/.*": ["/dwayne_johnson/movies", "/charlie_chaplin/spouses"]}],
threaded=threaded,
)
assert expanded == {"root": {"$ref": f"{tmpdir.basename}/root.json"}}

Expand All @@ -161,7 +157,7 @@ def test_nested2(self, tmpdir, test_data, original_data, threaded):
assert os.path.exists(f"{tmpdir}/root/actors/dwayne_johnson/movies.json")
assert not os.path.exists(f"{tmpdir}/root/actors/dwayne_johnson/movies")

def xtest_enhanced_nested1(self, tmpdir, test_data, original_data, threaded):
def xtest_enhanced_nested1(self, tmpdir, test_data, original_data):
"""Enhanced nested #1...
But what if we want a single json file per actor to include
Expand Down Expand Up @@ -210,7 +206,6 @@ def xtest_enhanced_nested1(self, tmpdir, test_data, original_data, threaded):
root_element="root",
preserve=False,
leaf_nodes=[{"/root/actors/.*": ["/[^/]+/movies/.*", "<A:/.*"]}],
threaded=threaded,
)

# This is the same thing you would expect in the non-nested case.
Expand Down Expand Up @@ -242,13 +237,12 @@ def xtest_enhanced_nested1(self, tmpdir, test_data, original_data, threaded):
assert data.get["spouses"].get("lita_grey", None)
assert data.get["spouses"]["lita_grey"].get("children", None)

def _actors_test(self, tmpdir, test_data, original_data, regex, threaded):
def _actors_test(self, tmpdir, test_data, original_data, regex):
expanded = JsonExpandOMatic(path=tmpdir).expand(
test_data,
root_element="root",
preserve=False,
leaf_nodes=[regex],
threaded=threaded,
)

# preserve=True allows mangling of test_data by expand()
Expand All @@ -268,13 +262,12 @@ def _not(x):
self._assert_actor_dirs(tmpdir, f=_not)
self._assert_movies(tmpdir, f=_not)

def _charlie_test(self, tmpdir, test_data, original_data, regex, threaded):
def _charlie_test(self, tmpdir, test_data, original_data, regex):
expanded = JsonExpandOMatic(path=tmpdir).expand(
test_data,
root_element="root",
preserve=False,
leaf_nodes=[regex],
threaded=threaded,
)
assert expanded == {"root": {"$ref": f"{tmpdir.basename}/root.json"}}

Expand Down
Loading

0 comments on commit 91f7198

Please sign in to comment.