Skip to content

Commit

Permalink
Update tests (#2185)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Call InfinityThriftService::ClearSessionMap() in
InfinityContext::StopThriftServers()
Update python/parallel_test/test_index_parallel.py

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Refactoring
  • Loading branch information
yangzq50 authored Nov 6, 2024
1 parent a63ced2 commit 768b29d
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 14 deletions.
4 changes: 2 additions & 2 deletions python/infinity_sdk/infinity/remote_thrift/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,8 @@ def disconnect(self):
return CommonResponse(ErrorCode.OK, "Already disconnected")
try:
res = self.client.Disconnect(CommonRequest(session_id=self.session_id))
except Exception:
pass
except Exception as e:
res = CommonResponse(ErrorCode.CLIENT_CLOSE, str(e))
self.transport.close()
self._is_connected = False
return res
Expand Down
11 changes: 3 additions & 8 deletions python/parallel_test/test_index_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,9 @@ def create_table_worker(connection_pool: ConnectionPool, table_name_prefix, end_
while time.time() < end_time:
table_name = f"{table_name_prefix}_{thread_id}_{table_counter}"
table_counter += 1
res = db_obj.create_table(table_name, {
"id": {"type": "int"},
"value": {"type": "varchar"}}, ConflictType.Ignore)
if res.error_code == ErrorCode.OK:
print(f"thread {thread_id}: table {table_name} created")
else:
print(f"thread {thread_id}: create table {table_name} failed: {res.error_msg}")
db_obj.create_table(table_name, {"id": {"type": "int"}, "value": {"type": "varchar"}},
ConflictType.Ignore)
print(f"thread {thread_id}: table {table_name} created")
time.sleep(0.5)
res = db_obj.drop_table(table_name, ConflictType.Ignore)
if res.error_code == ErrorCode.OK:
Expand All @@ -402,7 +398,6 @@ def create_table_worker(connection_pool: ConnectionPool, table_name_prefix, end_

connection_pool = get_infinity_connection_pool
infinity_obj = connection_pool.get_conn()
db_obj = infinity_obj.get_database("default_db")
table_name_prefix = "test_table_creation_deletion"

threads = []
Expand Down
4 changes: 4 additions & 0 deletions src/main/infinity_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import status;
import infinity_exception;
import wal_manager;
import global_resource_usage;
import infinity_thrift_service;

namespace infinity {

Expand Down Expand Up @@ -471,6 +472,9 @@ void InfinityContext::StopThriftServers() {
// start_servers_func_ = nullptr;
// stop_servers_func_ = nullptr;
}
// close all thrift sessions
const auto removed_session_count = InfinityThriftService::ClearSessionMap();
LOG_INFO(fmt::format("Removed {} thrift sessions", removed_session_count));
}

} // namespace infinity
7 changes: 7 additions & 0 deletions src/network/infinity_thrift_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ std::mutex InfinityThriftService::infinity_session_map_mutex_;
HashMap<u64, SharedPtr<Infinity>> InfinityThriftService::infinity_session_map_;
ClientVersions InfinityThriftService::client_version_;

u32 InfinityThriftService::ClearSessionMap() {
std::lock_guard lock(infinity_session_map_mutex_);
const auto session_count = infinity_session_map_.size();
infinity_session_map_.clear();
return session_count;
}

void InfinityThriftService::Connect(infinity_thrift_rpc::CommonResponse &response, const infinity_thrift_rpc::ConnectRequest &request) {
i64 request_client_version = request.client_version;
if (request_client_version != current_version_index_) {
Expand Down
2 changes: 2 additions & 0 deletions src/network/infinity_thrift_service.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ private:
static ClientVersions client_version_;

public:
static u32 ClearSessionMap();

InfinityThriftService() {
#ifdef INFINITY_DEBUG
GlobalResourceUsage::IncrObjectCount("InfinityThriftService");
Expand Down
12 changes: 8 additions & 4 deletions src/storage/column_vector/column_vector.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public:
private:
ColumnVectorType vector_type_{ColumnVectorType::kInvalid};

SharedPtr<DataType> data_type_;
SharedPtr<DataType> data_type_{};

// Only a pointer to the real data in vector buffer
ptr_t data_ptr_{nullptr};
Expand All @@ -99,7 +99,11 @@ private:
SizeT tail_index_{0};

public:
ColumnVector() : vector_type_(ColumnVectorType::kInvalid) {}
ColumnVector() : vector_type_(ColumnVectorType::kInvalid) {
#ifdef INFINITY_DEBUG
GlobalResourceUsage::IncrObjectCount("ColumnVector");
#endif
}

// Construct a column vector without initialization;
explicit ColumnVector(SharedPtr<DataType> data_type) : vector_type_(ColumnVectorType::kInvalid), data_type_(std::move(data_type)) {
Expand All @@ -119,7 +123,7 @@ public:
}

// used in BlockColumnIter, keep ObjectCount correct
ColumnVector(ColumnVector &&right)
ColumnVector(ColumnVector &&right) noexcept
: data_type_size_(right.data_type_size_), buffer_(std::move(right.buffer_)), nulls_ptr_(std::move(right.nulls_ptr_)),
initialized(right.initialized), vector_type_(right.vector_type_), data_type_(std::move(right.data_type_)), data_ptr_(right.data_ptr_),
capacity_(right.capacity_), tail_index_(right.tail_index_) {
Expand All @@ -128,7 +132,7 @@ public:
#endif
}

ColumnVector &operator=(ColumnVector &&right) {
ColumnVector &operator=(ColumnVector &&right) noexcept {
if (this != &right) {
data_type_size_ = right.data_type_size_;
std::swap(buffer_, right.buffer_);
Expand Down
3 changes: 3 additions & 0 deletions test/sql/ddl/schema/use_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ SELECT * FROM sqllogic_db1.t1;
----
7 8 9

statement ok
DROP TABLE default_db.t1;

statement ok
DROP TABLE sqllogic_db1.t1;

Expand Down

0 comments on commit 768b29d

Please sign in to comment.