Skip to content

Commit

Permalink
fix format
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
  • Loading branch information
Little-Wallace committed Mar 19, 2020
1 parent 7c0ffad commit fdf85fa
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 85 deletions.
4 changes: 2 additions & 2 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl/db_impl.h"

#include <cinttypes>
#include <set>
#include <unordered_set>

#include "db/db_impl/db_impl.h"
#include "db/event_helpers.h"
#include "db/memtable_list.h"
#include "file/file_util.h"
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.avoid_flush_during_recovery = false;
}

// multi thread write do not support two-write-que or write in pipeline
// multi thread write do not support two-write-que or write in 2PC
if (result.two_write_queues || result.allow_2pc) {
result.enable_multi_thread_write = false;
}
Expand Down
10 changes: 7 additions & 3 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,19 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options,
WriteBatchInternal::AsyncInsertInto(
&writer, writer.sequence, version_set, &flush_scheduler_,
ignore_missing_faimly, this, &write_thread_.write_queue_);
// Because `LaunchParallelMemTableWriters` has add `write_group->size` to `running`,
// the value of `running` is always larger than one if the leader thread does not
// call `CompleteParallelMemTableWriter`.
while (writer.write_group->running.load(std::memory_order_acquire) > 1) {
// Write thread could exit and block itself if it is not a leader thread.
if (!write_thread_.write_queue_.RunFunc() && !is_leader_thread) {
break;
}
}
// We only allow leader_thread to finish this WriteGroup because there may be another task which is done by thread
// which is not in this WriteGroup, and it would not notify threads in WriteGroup. So we must make someone in this
// WriteGroup to complete it and leader thread is easy to be decided.
// We only allow leader_thread to finish this WriteGroup because there may
// be another task which is done by the thread that is not in this WriteGroup,
// and it would not notify the threads in this WriteGroup. So we must make someone in
// this WriteGroup to complete it and leader thread is easy to be decided.
if (is_leader_thread) {
MemTableInsertStatusCheck(writer.status);
versions_->SetLastSequence(writer.write_group->last_sequence);
Expand Down
14 changes: 8 additions & 6 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1986,11 +1986,12 @@ Status WriteBatchInternal::InsertInto(
return s;
}

void WriteBatchInternal::AsyncInsertInto(
WriteThread::Writer* writer, SequenceNumber sequence,
ColumnFamilySet* version_set, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, DB* db,
SafeFuncQueue* pool) {
void WriteBatchInternal::AsyncInsertInto(WriteThread::Writer* writer,
SequenceNumber sequence,
ColumnFamilySet* version_set,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
DB* db, SafeFuncQueue* pool) {
auto write_group = writer->write_group;
auto batch_size = writer->batches.size();
write_group->running.fetch_add(batch_size);
Expand All @@ -2013,7 +2014,8 @@ void WriteBatchInternal::AsyncInsertInto(
};
if (i + 1 == batch_size) {
// If there is only one WriteBatch written by this thread, It shall do it
// by self, because this batch may be large.
// by self, because this batch may be large. And every thread does the latest
// one by self will reduce the cost of calling `SafeFuncQueue::Push`.
f();
} else {
pool->Push(std::move(f));
Expand Down
3 changes: 1 addition & 2 deletions include/rocksdb/c.h
Original file line number Diff line number Diff line change
Expand Up @@ -966,8 +966,7 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_hash_skip_list_rep(
rocksdb_options_t*, size_t, int32_t, int32_t);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_hash_link_list_rep(
rocksdb_options_t*, size_t);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_doubly_skip_list_rep(
rocksdb_options_t* opt);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_doubly_skip_list_rep(rocksdb_options_t *opt);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_plain_table_factory(
rocksdb_options_t*, uint32_t, int, double, size_t);

Expand Down
78 changes: 43 additions & 35 deletions memtable/doubly_skiplist.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
// this source code is governed by a BSD-style license that can be found
// in the LICENSE file. See the AUTHORS file for names of contributors.
//
// DoublySkipList is derived from InlineSkipList (inlineskiplist.h), but it
// optimizes Prev(), which is used in reverse scan.
// DoublySkipList is derived from InlineSkipList (inlineskiplist.h), but it optimizes
// Prev(), which is used in reverse scan.

//
// Thread safety -------------
Expand Down Expand Up @@ -37,11 +37,9 @@
#pragma once
#include <assert.h>
#include <stdlib.h>

#include <algorithm>
#include <atomic>
#include <type_traits>

#include "memory/allocator.h"
#include "port/likely.h"
#include "port/port.h"
Expand All @@ -58,8 +56,8 @@ class DoublySkipList {
struct Splice;

public:
using DecodedKey =
typename std::remove_reference<Comparator>::type::DecodedType;
using DecodedKey = \
typename std::remove_reference<Comparator>::type::DecodedType;

static const uint16_t kMaxPossibleHeight = 32;

Expand Down Expand Up @@ -188,7 +186,7 @@ class DoublySkipList {
// Immutable after construction
Comparator const compare_;
Node* const head_;
Node* const tail_; // Only use for prev pointer
Node* const tail_; // Only use for prev pointer

// Modified only by Insert(). Read racily by readers, but stale
// values are ok.
Expand Down Expand Up @@ -247,9 +245,9 @@ class DoublySkipList {
// point to a node that is before the key, and after should point to
// a node that is after the key. after should be nullptr if a good after
// node isn't conveniently available.
template <bool prefetch_before>
void FindSpliceForLevel(const DecodedKey& key, Node* before, Node* after,
int level, Node** out_prev, Node** out_next);
template<bool prefetch_before>
void FindSpliceForLevel(const DecodedKey& key, Node* before, Node* after, int level,
Node** out_prev, Node** out_next);

// Recomputes Splice levels from highest_level (inclusive) down to
// lowest_level (inclusive).
Expand Down Expand Up @@ -342,16 +340,24 @@ struct DoublySkipList<Comparator>::Node {
}

bool CASPrev(Node** expected, Node* x) {
return prev_.compare_exchange_strong(*expected, x);
return prev_.compare_exchange_strong(*expected, x);
}

Node* NoBarrier_Prev() { return prev_.load(std::memory_order_relaxed); }
Node* NoBarrier_Prev() {
return prev_.load(std::memory_order_relaxed);
}

void SetPrev(Node* x) { prev_.store(x, std::memory_order_release); }
void SetPrev(Node* x) {
prev_.store(x, std::memory_order_release);
}

Node* Prev() { return prev_.load(std::memory_order_acquire); }
Node* Prev() {
return prev_.load(std::memory_order_acquire);
}

void NoBarrier_SetPrev(Node* x) { prev_.store(x, std::memory_order_relaxed); }
void NoBarrier_SetPrev(Node* x) {
prev_.store(x, std::memory_order_relaxed);
}

private:
// next_[0] is the lowest level link (level 0). Higher levels are
Expand Down Expand Up @@ -398,16 +404,14 @@ inline void DoublySkipList<Comparator>::Iterator::Prev() {
auto current = node_;
do {
node_ = node_->Prev();
// Because we insert node into prev linked list, there may be some node
// being inserted into next linked list. Just ignore them, because their log
// SequenceNumber must be less then LastSequence().
// Because we insert node into prev linked list, there may be some node being inserted into next linked list.
// Just ignore them, because their log SequenceNumber must be less then LastSequence().
} while (node_ != list_->head_ && node_->Next(0) == nullptr);
if (node_ == list_->head_) {
node_ = nullptr;
} else if (node_->Prev()->Next(0) != node_) {
// The Prev() operation maybe happens before prev.CASNext(node_), so node_
// has not been inserted into skiplist by other thread. Find prev pos by
// next-linked-list again.
// The Prev() operation maybe happens before prev.CASNext(node_), so node_ has not been inserted into skiplist by other thread.
// Find prev pos by next-linked-list again.
node_ = list_->FindLessThan(current->Key());
if (node_ == list_->head_) {
node_ = nullptr;
Expand Down Expand Up @@ -717,8 +721,8 @@ void DoublySkipList<Comparator>::FindSpliceForLevel(const DecodedKey& key,
PREFETCH(next->Next(level), 0, 1);
}
if (prefetch_before == true) {
if (next != nullptr && level > 0) {
PREFETCH(next->Next(level - 1), 0, 1);
if (next != nullptr && level>0) {
PREFETCH(next->Next(level-1), 0, 1);
}
}
assert(before == head_ || next == nullptr ||
Expand All @@ -735,15 +739,16 @@ void DoublySkipList<Comparator>::FindSpliceForLevel(const DecodedKey& key,
}

template <class Comparator>
bool DoublySkipList<Comparator>::InsertPrevListCAS(Node* x, Splice* splice,
const DecodedKey& key) {
bool DoublySkipList<Comparator>::InsertPrevListCAS(Node* x, Splice* splice, const DecodedKey& key){
Node* prev = splice->prev_[0];
Node* next = splice->next_[0];
if (next != nullptr && compare_(x->Key(), next->Key()) >= 0) {
if (next != nullptr &&
compare_(x->Key(), next->Key()) >= 0) {
// duplicate key
return false;
}
if (prev != head_ && compare_(prev->Key(), x->Key()) >= 0) {
if (prev != head_ &&
compare_(prev->Key(), x->Key()) >= 0) {
// duplicate key
return false;
}
Expand All @@ -753,9 +758,8 @@ bool DoublySkipList<Comparator>::InsertPrevListCAS(Node* x, Splice* splice,
}

while (!next->CASPrev(&prev, x)) {
// If there is one node inserted between prev and x, we only need to try
// setting next.prev to x again. If the node is inserted between x and next,
// we must adjust insert position for x.
// If there is one node inserted between prev and x, we only need to try setting next.prev to x again.
// If the node is inserted between x and next, we must adjust insert position for x.
if (prev != head_ && !KeyIsAfterNode(key, prev)) {
do {
next = prev;
Expand All @@ -775,15 +779,18 @@ bool DoublySkipList<Comparator>::InsertPrevListCAS(Node* x, Splice* splice,
return true;
}


template <class Comparator>
bool DoublySkipList<Comparator>::InsertPrevList(Node* x, Splice* splice) {
bool DoublySkipList<Comparator>::InsertPrevList(Node* x, Splice* splice){
Node* prev = splice->prev_[0];
Node* next = splice->next_[0];
if (next != nullptr && compare_(x->Key(), next->Key()) >= 0) {
if (next != nullptr &&
compare_(x->Key(), next->Key()) >= 0) {
// duplicate key
return false;
}
if (prev != head_ && compare_(prev->Key(), x->Key()) >= 0) {
if (prev != head_ &&
compare_(prev->Key(), x->Key()) >= 0) {
// duplicate key
return false;
}
Expand All @@ -803,7 +810,7 @@ void DoublySkipList<Comparator>::RecomputeSpliceLevels(const DecodedKey& key,
assert(recompute_level <= splice->height_);
for (int i = recompute_level - 1; i >= 0; --i) {
FindSpliceForLevel<true>(key, splice->prev_[i + 1], splice->next_[i + 1], i,
&splice->prev_[i], &splice->next_[i]);
&splice->prev_[i], &splice->next_[i]);
}
}

Expand Down Expand Up @@ -893,7 +900,8 @@ bool DoublySkipList<Comparator>::Insert(const char* key, Splice* splice,
// we're pessimistic, recompute everything
recompute_height = max_height;
}
} else if (KeyIsAfterNode(key_decoded, splice->next_[recompute_height])) {
} else if (KeyIsAfterNode(key_decoded,
splice->next_[recompute_height])) {
// key is from after splice
if (allow_partial_splice_fix) {
Node* bad = splice->next_[recompute_height];
Expand Down
62 changes: 35 additions & 27 deletions memtable/inlineskiplist_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include "memtable/inlineskiplist.h"

#include "memtable/doubly_skiplist.h"
#include <set>
#include <unordered_set>

#include "memory/concurrent_arena.h"
#include "memtable/doubly_skiplist.h"
#include "rocksdb/env.h"
#include "test_util/testharness.h"
#include "util/hash.h"
Expand Down Expand Up @@ -337,7 +335,7 @@ TEST_F(InlineSkipTest, InsertWithHint_CompatibleWithInsertWithoutHint) {
// calls to Next() and Seek(). For every key we encounter, we
// check that it is either expected given the initial snapshot or has
// been concurrently added since the iterator started.
template <template <typename U> class SkipList>
template<template<typename U> class SkipList>
class ConcurrentTest {
public:
static const uint32_t K = 8;
Expand Down Expand Up @@ -485,12 +483,13 @@ class ConcurrentTest {
}
};

template <template <typename U> class SkipList>
template<template <typename U> class SkipList>
const uint32_t ConcurrentTest<SkipList>::K;

// Simple test that does single-threaded testing of the ConcurrentTest
// scaffolding.
TEST_F(InlineSkipTest, ConcurrentReadWithoutThreads) {

{
ConcurrentTest<rocksdb::InlineSkipList> test;
Random rnd(test::RandomSeed());
Expand Down Expand Up @@ -528,23 +527,25 @@ TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) {
}

class TestState {
public:
TestState(int s) : seed_(s), quit_flag_(false) {}

enum ReaderState { STARTING, RUNNING, DONE };
virtual ~TestState() {}
virtual void Wait(ReaderState s) = 0;
virtual void Change(ReaderState s) = 0;
virtual void AdjustPendingWriters(int delta) = 0;
virtual void WaitForPendingWriters() = 0;
// REQUIRES: No concurrent calls for the same k
virtual void ConcurrentWriteStep(uint32_t k) = 0;
virtual void ReadStep(Random* rnd) = 0;

public:
int seed_;
std::atomic<bool> quit_flag_;
std::atomic<uint32_t> next_writer_;
public:
TestState(int s)
: seed_(s),
quit_flag_(false) {}

enum ReaderState { STARTING, RUNNING, DONE };
virtual ~TestState() {}
virtual void Wait(ReaderState s) = 0;
virtual void Change(ReaderState s) = 0;
virtual void AdjustPendingWriters(int delta) = 0;
virtual void WaitForPendingWriters() = 0;
// REQUIRES: No concurrent calls for the same k
virtual void ConcurrentWriteStep(uint32_t k) = 0;
virtual void ReadStep(Random* rnd) = 0;

public:
int seed_;
std::atomic<bool> quit_flag_;
std::atomic<uint32_t> next_writer_;
};

template <template <typename U> class SkipList>
Expand All @@ -553,7 +554,10 @@ class TestStateImpl : public TestState {
ConcurrentTest<SkipList> t_;

explicit TestStateImpl(int s)
: TestState(s), state_(STARTING), pending_writers_(0), state_cv_(&mu_) {}
: TestState(s),
state_(STARTING),
pending_writers_(0),
state_cv_(&mu_) {}

void Wait(ReaderState s) override {
mu_.Lock();
Expand Down Expand Up @@ -587,8 +591,13 @@ class TestStateImpl : public TestState {
mu_.Unlock();
}

void ConcurrentWriteStep(uint32_t k) override { t_.ConcurrentWriteStep(k); }
void ReadStep(Random* rnd) override { t_.ReadStep(rnd); }
void ConcurrentWriteStep(uint32_t k) override {
t_.ConcurrentWriteStep(k);
}
void ReadStep(Random* rnd) override {
t_.ReadStep(rnd);
}


private:
port::Mutex mu_;
Expand All @@ -611,8 +620,7 @@ static void ConcurrentReader(void* arg) {

static void ConcurrentWriter(void* arg) {
TestState* state = reinterpret_cast<TestState*>(arg);
uint32_t k =
state->next_writer_++ % ConcurrentTest<rocksdb::InlineSkipList>::K;
uint32_t k = state->next_writer_++ % ConcurrentTest<rocksdb::InlineSkipList>::K;
state->ConcurrentWriteStep(k);
state->AdjustPendingWriters(-1);
}
Expand Down
Loading

0 comments on commit fdf85fa

Please sign in to comment.