Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques
{
// add_batch may concurrency with inc_open but not under _lock.
// so need to protect it with _tablet_writers_lock.
std::lock_guard<SpinLock> l(_tablet_writers_lock);
std::lock_guard<std::mutex> l(_tablet_writers_lock);
for (auto& [tablet_id, _] : tablet_to_rowidxs) {
auto tablet_writer_it = _tablet_writers.find(tablet_id);
if (tablet_writer_it == _tablet_writers.end()) {
Expand Down
12 changes: 5 additions & 7 deletions be/src/common/object_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
#include <mutex>
#include <vector>

#include "util/spinlock.h"

namespace doris {

// An ObjectPool maintains a list of C++ objects which are deallocated
Expand All @@ -36,20 +34,20 @@ class ObjectPool {
template <class T>
T* add(T* t) {
// TODO: Consider using a lock-free structure.
std::lock_guard<SpinLock> l(_lock);
std::lock_guard<std::mutex> l(_lock);
_objects.emplace_back(Element {t, [](void* obj) { delete reinterpret_cast<T*>(obj); }});
return t;
}

template <class T>
T* add_array(T* t) {
std::lock_guard<SpinLock> l(_lock);
std::lock_guard<std::mutex> l(_lock);
_objects.emplace_back(Element {t, [](void* obj) { delete[] reinterpret_cast<T*>(obj); }});
return t;
}

void clear() {
std::lock_guard<SpinLock> l(_lock);
std::lock_guard<std::mutex> l(_lock);
// reverse delete object to make sure the obj can
// safe access the member object construt early by
// object pool
Expand All @@ -65,7 +63,7 @@ class ObjectPool {
}

uint64_t size() {
std::lock_guard<SpinLock> l(_lock);
std::lock_guard<std::mutex> l(_lock);
return _objects.size();
}

Expand All @@ -83,7 +81,7 @@ class ObjectPool {
};

std::vector<Element> _objects;
SpinLock _lock;
std::mutex _lock;
};

} // namespace doris
1 change: 0 additions & 1 deletion be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "util/spinlock.h"
#include "util/uid_util.h"

namespace doris {
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "util/spinlock.h"
#include "util/uid_util.h"

namespace doris {
Expand Down
18 changes: 7 additions & 11 deletions be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@

#include <fmt/format.h>

#include <filesystem>
#include <ostream>
#include <string>
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
Expand All @@ -40,8 +38,6 @@
#include "olap/storage_engine.h"
#include "olap/tablet_schema.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
#include "util/mem_info.h"
#include "util/stopwatch.hpp"
#include "vec/core/block.h"
Expand Down Expand Up @@ -152,12 +148,12 @@ Status MemTableWriter::_flush_memtable_async() {
DCHECK(_flush_token != nullptr);
std::shared_ptr<MemTable> memtable;
{
std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
memtable = _mem_table;
_mem_table = nullptr;
}
{
std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
memtable->update_mem_type(MemType::WRITE_FINISHED);
_freezed_mem_tables.push_back(memtable);
}
Expand Down Expand Up @@ -211,7 +207,7 @@ Status MemTableWriter::wait_flush() {

void MemTableWriter::_reset_mem_table() {
{
std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
_mem_table.reset(new MemTable(_req.tablet_id, _tablet_schema, _req.slots, _req.tuple_desc,
_unique_key_mow, _partial_update_info.get()));
}
Expand All @@ -237,7 +233,7 @@ Status MemTableWriter::close() {

auto s = _flush_memtable_async();
{
std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
_mem_table.reset();
}
_is_closed = true;
Expand Down Expand Up @@ -336,7 +332,7 @@ Status MemTableWriter::cancel_with_status(const Status& st) {
return Status::OK();
}
{
std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
_mem_table.reset();
}
if (_flush_token != nullptr) {
Expand Down Expand Up @@ -364,7 +360,7 @@ int64_t MemTableWriter::mem_consumption(MemType mem) {
}
int64_t mem_usage = 0;
{
std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
for (const auto& mem_table : _freezed_mem_tables) {
auto mem_table_sptr = mem_table.lock();
if (mem_table_sptr != nullptr && mem_table_sptr->get_mem_type() == mem) {
Expand All @@ -376,7 +372,7 @@ int64_t MemTableWriter::mem_consumption(MemType mem) {
}

int64_t MemTableWriter::active_memtable_mem_consumption() {
std::lock_guard<SpinLock> l(_mem_table_ptr_lock);
std::lock_guard<std::mutex> l(_mem_table_ptr_lock);
return _mem_table != nullptr ? _mem_table->memory_usage() : 0;
}

Expand Down
6 changes: 1 addition & 5 deletions be/src/olap/memtable_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
#include <cstdint>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_set>
#include <vector>

#include "common/status.h"
Expand All @@ -38,8 +36,6 @@
#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "util/spinlock.h"
#include "util/uid_util.h"

namespace doris {

Expand Down Expand Up @@ -130,7 +126,7 @@ class MemTableWriter {
// Save the not active memtable that is in flush queue or under flushing.
std::vector<std::weak_ptr<MemTable>> _freezed_mem_tables;
// The lock to protect _memtable and _freezed_mem_tables structure to avoid concurrency modification or read
SpinLock _mem_table_ptr_lock;
std::mutex _mem_table_ptr_lock;
QueryThreadContext _query_thread_context;

std::mutex _lock;
Expand Down
5 changes: 2 additions & 3 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
#include "olap/rowset/segment_creator.h"
#include "segment_v2/inverted_index_file_writer.h"
#include "segment_v2/segment.h"
#include "util/spinlock.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -80,7 +79,7 @@ class SegmentFileCollection {
}

private:
mutable SpinLock _lock;
mutable std::mutex _lock;
std::unordered_map<int /* seg_id */, io::FileWriterPtr> _file_writers;
bool _closed {false};
};
Expand Down Expand Up @@ -109,7 +108,7 @@ class InvertedIndexFileCollection {
int64_t get_total_index_size() const { return _total_size; }

private:
mutable SpinLock _lock;
mutable std::mutex _lock;
std::unordered_map<int /* seg_id */, InvertedIndexFileWriterPtr> _inverted_index_file_writers;
int64_t _total_size = 0;
};
Expand Down
19 changes: 3 additions & 16 deletions be/src/olap/rowset/beta_rowset_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,20 @@

#include <fmt/format.h>
#include <gen_cpp/olap_file.pb.h>
#include <stddef.h>
#include <stdint.h>

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <map>
#include <cstdint>
#include <memory>
#include <mutex>
#include <optional>
#include <roaring/roaring.hh>
#include <string>
#include <unordered_set>
#include <vector>

#include "brpc/controller.h"
#include "brpc/stream.h"
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/olap_common.h"
#include "olap/rowset/beta_rowset_writer.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_creator.h"
#include "segment_v2/segment.h"
#include "util/spinlock.h"

namespace doris {
namespace vectorized {
Expand Down Expand Up @@ -120,7 +107,7 @@ class BetaRowsetWriterV2 : public RowsetWriter {
RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }

Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) const override {
std::lock_guard<SpinLock> l(_lock);
std::lock_guard<std::mutex> l(_lock);
*segment_num_rows = _segment_num_rows;
return Status::OK();
}
Expand All @@ -145,7 +132,7 @@ class BetaRowsetWriterV2 : public RowsetWriter {
}

private:
mutable SpinLock _lock; // protect following vectors.
mutable std::mutex _lock; // protect following vectors.
// record rows number of every segment already written, using for rowid
// conversion when compaction in unique key with MoW model
std::vector<uint32_t> _segment_num_rows;
Expand Down
6 changes: 0 additions & 6 deletions be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,12 @@
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/olap_file.pb.h>

#include <string>
#include <typeinfo>
#include <unordered_map>
#include <vector>

#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/rowset/segment_v2/inverted_index_file_writer.h"
#include "olap/tablet_fwd.h"
#include "util/spinlock.h"
#include "vec/core/block.h"

namespace doris {
Expand Down
9 changes: 0 additions & 9 deletions be/src/olap/rowset/vertical_beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,15 @@
#include <fmt/format.h>
#include <gen_cpp/olap_file.pb.h>

#include <algorithm>
#include <atomic>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>

#include "cloud/cloud_rowset_writer.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
#include "io/fs/file_system.h"
#include "io/fs/file_writer.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer_context.h"
#include "util/slice.h"
#include "util/spinlock.h"
#include "vec/core/block.h"

namespace doris {
Expand Down
6 changes: 2 additions & 4 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
#include "olap/memtable_flush_executor.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/olap_meta.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_meta_manager.h"
Expand All @@ -72,7 +71,6 @@
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
#include "util/spinlock.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
#include "util/threadpool.h"
Expand Down Expand Up @@ -300,7 +298,7 @@ Status StorageEngine::_open() {

Status StorageEngine::_init_store_map() {
std::vector<std::thread> threads;
SpinLock error_msg_lock;
std::mutex error_msg_lock;
std::string error_msg;
for (auto& path : _options.store_paths) {
auto store = std::make_unique<DataDir>(*this, path.path, path.capacity_bytes,
Expand All @@ -309,7 +307,7 @@ Status StorageEngine::_init_store_map() {
auto st = store->init();
if (!st.ok()) {
{
std::lock_guard<SpinLock> l(error_msg_lock);
std::lock_guard<std::mutex> l(error_msg_lock);
error_msg.append(st.to_string() + ";");
}
LOG(WARNING) << "Store load failed, status=" << st.to_string()
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/data_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <vector>

#include "common/status.h"
#include "util/spinlock.h"
#include "vec/core/block.h"

namespace doris::pipeline {
Expand Down Expand Up @@ -105,7 +104,7 @@ class DataQueue {
// data queue is multi sink one source
std::shared_ptr<Dependency> _source_dependency = nullptr;
std::vector<Dependency*> _sink_dependencies;
SpinLock _source_lock;
std::mutex _source_lock;
};

} // namespace doris::pipeline
3 changes: 1 addition & 2 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/tablets_channel.h"
#include "runtime/thread_context.h"
#include "runtime/workload_group/workload_group_manager.h"
Expand Down Expand Up @@ -271,7 +270,7 @@ void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) {
ThriftSerializer ser(false, 4096);
uint8_t* buf = nullptr;
uint32_t len = 0;
std::lock_guard<SpinLock> l(_profile_serialize_lock);
std::lock_guard<std::mutex> l(_profile_serialize_lock);
_profile->to_thrift(&tprofile);
auto st = ser.serialize(&tprofile, &len, &buf);
if (st.ok()) {
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include "common/status.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
#include "util/uid_util.h"

namespace doris {
Expand Down Expand Up @@ -87,7 +86,7 @@ class LoadChannel {
UniqueId _load_id;
int64_t _txn_id = 0;

SpinLock _profile_serialize_lock;
std::mutex _profile_serialize_lock;
std::unique_ptr<RuntimeProfile> _profile;
RuntimeProfile* _self_profile = nullptr;
RuntimeProfile::Counter* _add_batch_number_counter = nullptr;
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/load_stream_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/tablet_fwd.h"
#include "util/spinlock.h"
#include "util/uid_util.h"

namespace doris {
Expand Down
Loading
Loading