Skip to content

Commit

Permalink
Issue ooici#198 - Storage container defragmentation implemented.
Browse files Browse the repository at this point in the history
Defragmentation still needs to be kicked off somewhere as a scheduled task.
  • Loading branch information
lldaj committed Jul 30, 2014
1 parent 0f4c015 commit ad880ff
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 5 deletions.
2 changes: 2 additions & 0 deletions coverage_model/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class CoverageConfig(object):
_default_span_id_db_key = 'span_address'
_default_span_coverage_id_db_key = 'coverage_id'
_default_storage_location = None
_default_ideal_span_size = 100

def __init__(self):
self.ordered_time_key_preferences = self._default_ordered_time_key_preferences
Expand All @@ -44,6 +45,7 @@ def __init__(self):
self.config_time = 0
self.read_and_set_config()
self.ingest_time_key = self._default_ingest_time_key
self.ideal_span_size = self._default_ideal_span_size

def read_and_set_config(self):
one_from_config = False
Expand Down
73 changes: 71 additions & 2 deletions coverage_model/data_span.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
class Span(object):
ingest_time_str = CoverageConfig().ingest_time_key

def __init__(self, span_uuid, coverage_id, param_dict, ingest_time=None, compressors=None, mutable=False):
def __init__(self, span_uuid, coverage_id, param_dict, ingest_time=None, compressors=None, mutable=False,
fill_mask=None, sort_parameter=None):
self.param_dict = param_dict
self.ingest_time = ingest_time
self.ingest_times = []
Expand All @@ -32,6 +33,7 @@ def __init__(self, span_uuid, coverage_id, param_dict, ingest_time=None, compres
self.coverage_id = coverage_id
self.compressors = compressors
self.mutable = mutable
self.sort_parameter = sort_parameter

def _set_ingest_times(self, ingest_times=None):
if self.ingest_time_str in self.param_dict:
Expand All @@ -58,6 +60,15 @@ def get_param_data_length(self):
raise IndexError("Parameter arrays are different sizes in the 0th dimmension.")
return size

def get_numpy_bytes(self):
size = 0
for k, param_data in self.param_dict.iteritems():
if not isinstance(param_data, NumpyParameterData):
continue
if size == 0:
size += param_data.get_data().nbytes
return size

@property
def ingest_time_array(self, dtype=np.float64):
arr = np.empty(self.get_param_data_length(), dtype=dtype)
Expand All @@ -73,6 +84,8 @@ def get_span_stats(self, params=None):
param_stat_dict = {}
if params is None:
params = self.param_dict.keys()
elif isinstance(params, basestring):
params = [params]
for param in params:
if param in self.param_dict:
if isinstance(self.param_dict[param], ConstantOverTime):
Expand Down Expand Up @@ -155,6 +168,7 @@ def from_json(cls, js_str, decompressors=None):
span = Span(str(json_dict['id']), str(json_dict['coverage_id']), uncompressed_params, ingest_time=json_dict['ingest_time'], mutable=json_dict['mutable'])
span.ingest_times = json_dict['ingest_time_dict']
span.param_dict[cls.ingest_time_str] = NumpyParameterData(cls.ingest_time_str, span.ingest_time_array)
span.compressors = decompressors
return span

def get_hash(self):
Expand Down Expand Up @@ -198,11 +212,66 @@ def __eq__(self, other):
return False

def __gt__(self, other):
return self.ingest_time > other.ingest_time
if self.sort_parameter != other.sort_parameter:
raise RuntimeError('Sort parameters for objects do not match [%s vs. %s]' % (self.sort_parameter, other.sort_parameter))
if self.sort_parameter is None:
return self.ingest_time > other.ingest_time
else:
my_stats = self.get_span_stats(self.sort_parameter)
other_stats = other.get_span_stats(self.sort_parameter)
return my_stats.params[self.sort_parameter][0] > other_stats.params[self.sort_parameter][0]

def __lt__(self, other):
return not self.__gt__(other)

@classmethod
def merge_spans(cls, spans, sort_param=None, fill_value_dict=None):
num_observations = 0
cov_id = None
param_key_set = set()
compressors = {}
for span in spans:
param_key_set.update(span.param_dict.keys())
num_observations += span.get_param_data_length()
if cov_id is None:
cov_id = span.coverage_id
elif cov_id != span.coverage_id:
raise RuntimeError('coverages ids do not match across spans')
if span.compressors is not None:
for k, v in span.compressors.iteritems():
compressors[k] = v

new_pdict = {}
current_pos = 0
for span in spans:
span_size = span.get_param_data_length()
for key in param_key_set:
if key not in span.param_dict.keys():
continue
data = span.param_dict[key].get_data()
if key not in new_pdict and key in span.param_dict.keys():
new_pdict[key] = np.empty(num_observations, dtype=data.dtype)
if fill_value_dict is not None and key in fill_value_dict:
new_pdict[key][:] = fill_value_dict[key]
else:
new_pdict[key][:] = 0
new_pdict[key][current_pos:current_pos+span_size] = data
current_pos += span_size

from coverage_model.util.numpy_utils import NumpyUtils
if sort_param is not None:
new_pdict = NumpyUtils.sort_flat_arrays(new_pdict, sort_param)

replace_dict = new_pdict
for k, v in new_pdict.iteritems():
replace_dict[k] = NumpyParameterData(k, v)

from coverage_model.utils import create_guid
new_span_id = create_guid()
merged_span = Span(new_span_id, coverage_id=cov_id, param_dict=replace_dict, compressors=compressors)
return merged_span



class SpanStats(object):

Expand Down
1 change: 0 additions & 1 deletion coverage_model/storage/parameter_persisted_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ def _create_param_dict_from_spans_dict(self, params, span_dict):
def _sort_flat_arrays(cls, np_dict, sort_parameter=None):
sorted_array_dict = {}
if sort_parameter is None or sort_parameter not in np_dict.keys():
# sort_parameter = self.alignment_parameter
sort_parameter = 'time'
sort_array = np_dict[sort_parameter]
sorted_indexes = np.argsort(sort_array)
Expand Down
37 changes: 37 additions & 0 deletions coverage_model/storage/postgres_span_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ def write_span(self, span):
with self.span_store.pool.cursor(**self.span_store.cursor_args) as cur:
cur.execute(sql_str, [SpanJsonDumper(span) for i in range(data_times)])

def replace_spans(self, new_span, old_spans):
stats_sql, bin_sql = self.get_span_stats_and_bin_insert_sql(new_span)
span_sql, data_times = self.get_span_insert_sql(new_span)
delete_sql = self.get_span_delete_sql(old_spans)
sql_str = "BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; %s %s %s %s COMMIT;" % (span_sql, stats_sql, bin_sql, delete_sql)
with self.span_store.pool.cursor(**self.span_store.cursor_args) as cur:
cur.execute(sql_str, [SpanJsonDumper(new_span) for i in range(data_times)])

def get_span_insert_sql(self, span):
data_times = 1
if True:
Expand Down Expand Up @@ -122,6 +130,22 @@ def get_span_stats_and_bin_insert_sql(self, span):
bin_sql = ""
return stats_sql, bin_sql

def get_span_delete_sql(self, spans):
cov_id = None
span_ids = set()
for span in spans:
if cov_id is None:
cov_id = span.coverage_id
elif cov_id != span.coverage_id:
raise RuntimeError('Cannot delete spans from multiple coverages in one call')
span_ids.add("'" + span.id + "'")
span_ids = ", ".join(span_ids)
statement = """DELETE FROM %s where coverage_id = '%s' AND id IN (%s); \
DELETE FROM %s WHERE coverage_id = '%s' AND span_address IN (%s);""" \
% (self.span_table_name, cov_id, span_ids, self.span_stats_table_name, cov_id, span_ids)
return statement


def get_spans(self, span_ids=None, coverage_ids=None, params=None, start_time=None, stop_time=None, decompressors=None):
statement = """SELECT data::text from %s where coverage_id = '%s'""" % (self.span_table_name, coverage_ids)
if span_ids is not None:
Expand All @@ -140,6 +164,19 @@ def get_spans(self, span_ids=None, coverage_ids=None, params=None, start_time=No

return spans

def get_stored_coverage_ids(self):
statement = """SELECT coverage_id from %s""" % (self.span_stats_table_name)
with self.span_store.pool.cursor(**self.span_store.cursor_args) as cur:
cur.execute(statement)
results = cur.fetchall()

cov_ids = set()
for row in results:
cov_id, = row
cov_ids.add(cov_id)

return cov_ids

def has_data(self, coverage_id):
statement = """SELECT coverage_id FROM %s WHERE coverage_id = '%s'""" % (self.span_stats_table_name, coverage_id)
results = []
Expand Down
8 changes: 7 additions & 1 deletion coverage_model/storage/span_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,10 @@ def search(self, search_criteria, limit=None):
raise NotImplementedError('Not implemented in base class')

def has_data(self, coverage_id):
raise NotImplementedError('Not implemented in base class')
raise NotImplementedError('Not implemented in base class')

def get_stored_coverage_ids(self):
raise NotImplementedError('Not implemented in base class')

def replace_spans(self, new_spans, old_spans):
raise NotImplementedError('Not implemented in base class')
97 changes: 97 additions & 0 deletions coverage_model/storage/storage_respan_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
__author__ = 'casey'

import sys
import threading
from ooi.logging import log
from threading import Thread
from coverage_model.config import CoverageConfig
from coverage_model.coverage import AbstractCoverage, SimplexCoverage
from coverage_model.data_span import Span
from coverage_model.storage.span_storage_factory import SpanStorageFactory
from coverage_model.storage.span_storage import SpanStorage
from coverage_model.metadata_factory import MetadataManagerFactory


class StorageRespanTask(object):
"""
Repackage data writes into contiguous memory packages for more optimal data access.
Each individual data write is stored in it's own location to support asynchronous writes.
Repackaging accounts for the possibility of out of observation chronological order writes as well as
data write sizes that could hurt read performance (e.g. lots of small-ish writes).
Repackaging attempts to merge many small 'files' into one optimally-sized 'file'.
The current implementation does not split larger 'files' into multiple smaller files, though that
functionality should be considered in the future.
"""

def __init__(self, storage_name=None, coverage_ids=None, time_segment=None,
sort_parameter=None):
store = SpanStorageFactory.get_span_storage_obj(storage_name)
if not isinstance(store, SpanStorage):
raise TypeError("Retrieved storage object must implement %s type. Found %s." % (SpanStorage.__name__, self.store.__class__.__name__))
else:
self.store = store

if coverage_ids is None:
self.coverage_ids = set()
coverage_ids = self.store.get_stored_coverage_ids()
for cov_id in coverage_ids:
if MetadataManagerFactory.is_persisted(cov_id):
self.coverage_ids.add(cov_id)

elif isinstance(coverage_ids, (list,set)):
self.coverage_ids = set(coverage_ids)
elif isinstance(coverage_ids, basestring):
self.coverage_ids = [coverage_ids]
else:
raise TypeError("Unhandled coverage_ids type - %s", type(coverage_ids))

if time_segment is not None and not isinstance(time_segment, tuple) and len(time_segment) != 2:
raise TypeError()
self.time_segment = time_segment
self.sort_parameter_name = sort_parameter

def do_respan(self, asynchronous=False):
if asynchronous:
thread = Thread(target=self.do_respan, args=(False,))
thread.start()
return thread

for id in self.coverage_ids:
self.respan_coverage(id)

def respan_coverage(self, cov_id):
cov = AbstractCoverage.resurrect(cov_id, 'r')
if not isinstance(cov, SimplexCoverage):
return
log.info('Respanning coverage %s' % cov_id)
decompressors = cov._persistence_layer.value_list
fill_value_dict = {}
for k in decompressors:
fill_value_dict[k] = cov.get_parameter_context(k).fill_value
if fill_value_dict[k] is None:
fill_value_dict[k] = -9999.0
spans = self.store.get_spans(coverage_ids=cov_id, decompressors=decompressors)
for span in spans:
span.sort_parameter = cov.temporal_parameter_name
starting_num_spans = len(spans)

ideal_span_size = CoverageConfig().ideal_span_size
span_sets = [[]]
current_size = 0
spans = sorted(spans)
for span in spans:
span_size = span.get_numpy_bytes()
if (current_size > ideal_span_size and len(span_sets[-1]) > 0) or abs(ideal_span_size - current_size) < abs(ideal_span_size - span_size):
span_sets.append([])
current_size = 0
span_sets[-1].append(span)
current_size += span.get_numpy_bytes()

new_span_ids = []
for span_set in span_sets:
new_span = Span.merge_spans(span_set, sort_param=self.sort_parameter_name, fill_value_dict=fill_value_dict)
self.store.replace_spans(new_span, span_set)
new_span_ids.append(new_span.id)

log.info('Respaned coverage %s from %s spans to %s spans' % (cov_id, starting_num_spans, len(new_span_ids)))
return new_span_ids
63 changes: 62 additions & 1 deletion coverage_model/test/test_postgres_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from coverage_model.parameter_data import *
from coverage_test_base import get_parameter_dict
from coverage_model.parameter_types import *
from coverage_model.storage.storage_respan_task import StorageRespanTask


@attr('UNIT',group='cov')
Expand Down Expand Up @@ -266,7 +267,6 @@ def test_add_constant(self):
expected_array[1:4] = 4096
scov.set_parameter_values({'sparseness': ConstantOverTime('sparseness', 4096, time_start=10000, time_end=10002)})
returned_array = scov.get_parameter_values([scov.temporal_parameter_name, 'sparseness'], as_record_array=True).get_data()
print returned_array
np.testing.assert_array_equal(expected_array, returned_array['sparseness'])

expected_array[-3:] = 17
Expand Down Expand Up @@ -1045,6 +1045,67 @@ def test_order_by_ingest(self):
print return_values['ingestion_timestamp'][x+1], sorted_values[x+1]
np.testing.assert_array_equal(sorted_values, return_values['ingestion_timestamp'])

def test_respan(self):
data_ctx = ParameterContext('data', param_type=RecordType(), fill_value=-999.0)
cov = _make_cov(self.working_dir, ['conductivity',data_ctx], nt=10)

cov.set_parameter_values({'time': np.arange(50,55), 'data': np.arange(150,155)})
cov.set_parameter_values({'time': np.arange(250,253), 'data': np.arange(200,203)})
cov.set_parameter_values({'time': np.arange(1500,1610), 'data': np.arange(13000,13110), 'conductivity': np.arange(100,210)})
cov.set_parameter_values({'time': np.arange(100,110), 'data': np.arange(2000,2010), 'conductivity': np.arange(0,10)})
cov.refresh()

orig_cov_data = cov.get_parameter_values().get_data()
from coverage_model.storage.storage_respan_task import StorageRespanTask
respan_task = StorageRespanTask(coverage_ids=cov.persistence_guid, sort_parameter='time')
respan_task.do_respan()

respan_cov = AbstractCoverage.resurrect(cov.persistence_guid, 'r')
respan_cov_data = respan_cov.get_parameter_values().get_data()
self.assertEqual(respan_cov_data.keys(), orig_cov_data.keys())
for key in orig_cov_data.keys():
np.testing.assert_array_equal(orig_cov_data[key], respan_cov_data[key])

@unittest.skip("Test has the potential to take considerable time only run if you're testing respan of all coverages")
def test_respan_all_coverages(self):
data_ctx = ParameterContext('data', param_type=RecordType(), fill_value=-999.0)
cov = _make_cov(self.working_dir, ['conductivity',data_ctx], nt=10)

cov.set_parameter_values({'time': np.arange(50,55), 'data': np.arange(150,155)})
import time
time.sleep(1)
cov.set_parameter_values({'time': np.arange(250,253), 'data': np.arange(200,203)})
time.sleep(1)
cov.set_parameter_values({'time': np.arange(100,110), 'data': np.arange(2000,2010), 'conductivity': np.arange(0,10)})
cov.refresh()

cov2 = _make_cov(self.working_dir, ['conductivity',data_ctx], nt=10)

cov2.set_parameter_values({'time': np.arange(1050,1055), 'data': np.arange(1150,1155)})
cov2.set_parameter_values({'time': np.arange(1250,1253), 'data': np.arange(1200,1203)})
cov2.set_parameter_values({'time': np.arange(1500,1610), 'data': np.arange(13000,13110), 'conductivity': np.arange(100,210)})
cov2.set_parameter_values({'time': np.arange(1100,1110), 'data': np.arange(12000,12010), 'conductivity': np.arange(0,10)})
cov2.refresh()

orig_cov_data = cov.get_parameter_values().get_data()
orig_cov2_data = cov2.get_parameter_values().get_data()

from coverage_model.storage.storage_respan_task import StorageRespanTask
respan_task = StorageRespanTask(sort_parameter='time')
respan_task.do_respan()

respan_cov = AbstractCoverage.resurrect(cov.persistence_guid, 'r')
respan_cov_data = respan_cov.get_parameter_values().get_data()
self.assertEqual(respan_cov_data.keys(), orig_cov_data.keys())
for key in orig_cov_data.keys():
np.testing.assert_array_equal(orig_cov_data[key], respan_cov_data[key])

respan_cov2 = AbstractCoverage.resurrect(cov2.persistence_guid, 'r')
respan_cov2_data = respan_cov2.get_parameter_values().get_data()
self.assertEqual(respan_cov2_data.keys(), orig_cov2_data.keys())
for key in orig_cov2_data.keys():
np.testing.assert_array_equal(orig_cov2_data[key], respan_cov2_data[key])


def identity(x):
return np.copy(x)*3
Expand Down

0 comments on commit ad880ff

Please sign in to comment.