Skip to content

Commit

Permalink
Only write ref key once when writing with prune_previous_versions (#1560
Browse files Browse the repository at this point in the history
)

#### What does this implement or fix?

We should only write the version ref key once when we write with
`prune_previous_versions=True`. Currently we are writing it twice - once
after we write the tombstone all and once when we write the new version.
This means that there is a period of time where the symbol is
unreadable.

This was fixed a while ago with PR #1104 but regressed with PR #1355.
  • Loading branch information
poodlewars committed May 15, 2024
1 parent 259d1bd commit d2e49ff
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 31 deletions.
20 changes: 10 additions & 10 deletions build_tooling/parallel_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ echo Saving results to ${TEST_OUTPUT_DIR:="$(realpath "$tooling_dir/../cpp/out")

catch=`{ which catchsegv 2>/dev/null || echo ; } | tail -n 1`

set -o xtrace -o pipefail
set -o xtrace -o pipefail

# Build a directory that's just the test assets, so can't access other Python source not in the wheel
mkdir -p $PARALLEL_TEST_ROOT
MSYS=winsymlinks:nativestrict ln -s "$(realpath "$tooling_dir/../python/tests")" $PARALLEL_TEST_ROOT/
cd $PARALLEL_TEST_ROOT
# Build a directory that's just the test assets, so can't access other Python source not in the wheel
mkdir -p $PARALLEL_TEST_ROOT
MSYS=winsymlinks:nativestrict ln -s "$(realpath "$tooling_dir/../python/tests")" $PARALLEL_TEST_ROOT/
cd $PARALLEL_TEST_ROOT

export ARCTICDB_RAND_SEED=$RANDOM
export ARCTICDB_RAND_SEED=$RANDOM

$catch python -m pytest --timeout=3600 $PYTEST_XDIST_MODE -v --log-file="$TEST_OUTPUT_DIR/pytest-logger.$group.log" \
--junitxml="$TEST_OUTPUT_DIR/pytest.$group.xml" \
--basetemp="$PARALLEL_TEST_ROOT/temp-pytest-output" \
"$@" 2>&1 | sed -ur "s#^(tests/.*/([^/]+\.py))?#\2#"
$catch python -m pytest --timeout=3600 $PYTEST_XDIST_MODE -v --log-file="$TEST_OUTPUT_DIR/pytest-logger.$group.log" \
--junitxml="$TEST_OUTPUT_DIR/pytest.$group.xml" \
--basetemp="$PARALLEL_TEST_ROOT/temp-pytest-output" \
"$@" 2>&1 | sed -ur "s#^(tests/.*/([^/]+\.py))?#\2#"
31 changes: 10 additions & 21 deletions cpp/arcticdb/version/version_map.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,6 @@ class VersionMapImpl {
log_write(store, key.id(), key.version_id());
}

AtomKey write_tombstone_all_key(
const std::shared_ptr<Store>& store,
const AtomKey& previous_key,
const std::shared_ptr<VersionMapEntry>& entry) {
auto tombstone_key = write_tombstone_all_key_internal(store, previous_key, entry);
write_symbol_ref(store, tombstone_key, std::nullopt, entry->head_.value());
return tombstone_key;
}

/**
* Tombstone all non-deleted versions of the given stream and do the related housekeeping.
* @param first_key_to_tombstone The first key in the version chain that should be tombstoned. When empty
Expand Down Expand Up @@ -298,8 +289,8 @@ class VersionMapImpl {
__FUNCTION__);
auto [_, result] = tombstone_from_key_or_all_internal(store, key.id(), previous_key, entry);

do_write(store, key, entry);
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value());
auto previous_index = do_write(store, key, entry);
write_symbol_ref(store, *entry->keys_.cbegin(), previous_index, entry->head_.value());

if (log_changes_)
log_write(store, key.id(), key.version_id());
Expand Down Expand Up @@ -369,7 +360,6 @@ class VersionMapImpl {
}
}
new_entry->head_ = write_entry_to_storage(store, stream_id, new_version_id, new_entry);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
remove_entry_version_keys(store, entry, stream_id);
if (validate_)
new_entry->validate();
Expand Down Expand Up @@ -477,7 +467,6 @@ class VersionMapImpl {
entry->keys_.assign(std::begin(index_keys), std::end(index_keys));
auto new_version_id = index_keys[0].version_id();
entry->head_ = write_entry_to_storage(store, stream_id, new_version_id, entry);
write_symbol_ref(store, *entry->keys_.cbegin(), std::nullopt, entry->head_.value());
if (validate_)
entry->validate();
}
Expand All @@ -502,7 +491,10 @@ class VersionMapImpl {
return storage_reload(store, stream_id, load_param, load_param.iterate_on_failure_);
}

void do_write(
/**
* Returns the second undeleted index (after the write).
*/
std::optional<AtomKey> do_write(
std::shared_ptr<Store> store,
const AtomKey &key,
const std::shared_ptr<VersionMapEntry> &entry) {
Expand All @@ -512,7 +504,7 @@ class VersionMapImpl {
auto journal_key = to_atom(std::move(journal_single_key(store, key, entry->head_)));
write_to_entry(entry, key, journal_key);
auto previous_index = entry->get_second_undeleted_index();
write_symbol_ref(store, key, previous_index, journal_key);
return previous_index;
}

AtomKey write_tombstone(
Expand Down Expand Up @@ -902,8 +894,7 @@ class VersionMapImpl {
entry->clear();
load_via_iteration(store, stream_id, entry, false);
remove_duplicate_index_keys(entry);
auto new_entry = rewrite_entry(store, stream_id, entry);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
rewrite_entry(store, stream_id, entry);
}

void remove_and_rewrite_version_keys(std::shared_ptr<Store> store, const StreamId& stream_id) {
Expand All @@ -913,9 +904,8 @@ class VersionMapImpl {
entry->clear();
load_via_iteration(store, stream_id, entry, true);
remove_duplicate_index_keys(entry);
auto new_entry = rewrite_entry(store, stream_id, entry);
rewrite_entry(store, stream_id, entry);
remove_entry_version_keys(store, old_entry, stream_id);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
}

void fix_ref_key(std::shared_ptr<Store> store, const StreamId& stream_id) {
Expand Down Expand Up @@ -964,8 +954,7 @@ class VersionMapImpl {

entry->keys_.insert(std::begin(entry->keys_), std::begin(missing_versions), std::end(missing_versions));
entry->sort();
auto new_entry = rewrite_entry(store, stream_id, entry);
write_symbol_ref(store, *new_entry->keys_.cbegin(), std::nullopt, new_entry->head_.value());
rewrite_entry(store, stream_id, entry);
}

std::shared_ptr<Lock> get_lock_object(const StreamId& stream_id) const {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import time
import pytest
from multiprocessing import Process, Queue


@pytest.fixture
def writer_store(lmdb_version_store_delayed_deletes_v2):
return lmdb_version_store_delayed_deletes_v2


@pytest.fixture
def reader_store(lmdb_version_store_delayed_deletes_v2):
return lmdb_version_store_delayed_deletes_v2


def read_repeatedly(version_store, queue: Queue):
while True:
try:
version_store.read("sym")
except Exception as e:
queue.put(e)
raise # don't get stuck in the while loop when we already know there's an issue
time.sleep(0.1)


def write_repeatedly(version_store):
while True:
version_store.write("sym", [1, 2, 3], prune_previous_version=True)
time.sleep(0.1)


def test_concurrent_read_write(writer_store, reader_store):
"""When using delayed deletes, a reader should always be able to read a symbol even if it is being modified
and pruned by another process."""
writer_store.write("sym", [1, 2, 3], prune_previous_version=True)
exceptions_in_reader = Queue()
reader = Process(target=read_repeatedly, args=(reader_store, exceptions_in_reader))
writer = Process(target=write_repeatedly, args=(writer_store,))

try:
reader.start()
writer.start()
reader.join(5)
writer.join(0.001)
finally:
writer.terminate()
reader.terminate()

assert exceptions_in_reader.empty()

0 comments on commit d2e49ff

Please sign in to comment.