Skip to content
This repository has been archived by the owner on Mar 15, 2021. It is now read-only.

Commit

Permalink
fix: Avoid collecting unnecessary snapshots
Browse files Browse the repository at this point in the history
When the logs are collected in relaxed mode it does not need to compute
the current snapshot for every single entry.

This fix speeds up large registers by ~2x.

Signed-off-by: Arnau Siches <arnau.siches@digital.cabinet-office.gov.uk>
  • Loading branch information
Arnau Siches committed Apr 18, 2019
1 parent 3ffa630 commit f59ec22
Showing 1 changed file with 45 additions and 35 deletions.
80 changes: 45 additions & 35 deletions registers/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,43 +149,10 @@ def collect(commands: List[Command], # pylint: disable=too-many-branches
data = log or Log()
metadata = metalog or Log()
blobs = {**data.blobs, **metadata.blobs}
errors = []
errors: List[ValidationError] = []

for command in commands:
if command.action == Action.AssertRootHash:
digest = cast(Hash, command.value)

if digest != data.digest():
raise InconsistentLog(digest, data.digest(), data.size)

elif command.action == Action.AddItem:
blobs[command.value.digest()] = cast(Blob, command.value)

elif command.action == Action.AppendEntry:
entry = cast(Entry, command.value)
blob = blobs.get(entry.blob_hash)

if blob is None:
raise OrphanEntry(entry)

if entry.scope == Scope.System:
metadata.insert(blob)
record = metadata.snapshot().get(entry.key)
(_, err) = _collect_entry(entry, record)

if err and not relaxed:
errors.append(err)
else:
metadata.insert(entry)
else:
data.insert(blob)
record = data.snapshot().get(entry.key)
(_, err) = _collect_entry(entry, record)

if err and not relaxed:
errors.append(err)
else:
data.insert(entry)
_collect_command(command, data, metadata, blobs, errors, relaxed)

return {"data": data, "metadata": metadata, "errors": errors}

Expand All @@ -207,6 +174,49 @@ def slice(log: Log, start_position: int) -> List[Command]:
Result = Tuple[Optional[Entry], Optional[ValidationError]]


def _collect_command(command: Command,
data: Log,
metadata: Log,
blobs: Dict[Hash, Blob],
errors: List[ValidationError],
relaxed: bool):
if command.action == Action.AssertRootHash:
digest = cast(Hash, command.value)

if digest != data.digest():
raise InconsistentLog(digest, data.digest(), data.size)

elif command.action == Action.AddItem:
blobs[command.value.digest()] = cast(Blob, command.value)

elif command.action == Action.AppendEntry:
entry = cast(Entry, command.value)
blob = blobs.get(entry.blob_hash)

if blob is None:
raise OrphanEntry(entry)

if entry.scope == Scope.System:
_collect_pair(entry, blob, metadata, errors, relaxed)

else:
_collect_pair(entry, blob, data, errors, relaxed)


def _collect_pair(entry: Entry, blob: Blob, log: Log,
errors: List[ValidationError], relaxed: bool):
log.insert(blob)

if not relaxed:
record = log.snapshot().get(entry.key)
(_, err) = _collect_entry(entry, record)

if err:
errors.append(err)

log.insert(entry)


def _collect_entry(entry: Entry, record: Optional[Record]) -> Result:
if record and record.blob.digest() == entry.blob_hash:
return (None, DuplicatedEntry(entry.key, record.blob))
Expand Down

0 comments on commit f59ec22

Please sign in to comment.