From f464f03be343aedc4a16ef0cb6c6db88db400a60 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Wed, 14 Aug 2024 12:17:54 +0800 Subject: [PATCH 1/7] refactor the grpc client to fix #116 Signed-off-by: wbpcode --- BUILD | 1 + cpp2sky/internal/BUILD | 7 - cpp2sky/internal/async_client.h | 118 +++------------- cpp2sky/internal/stream_builder.h | 56 -------- cpp2sky/tracer.h | 2 +- source/BUILD | 8 +- source/grpc_async_client_impl.cc | 215 ++++++++++++++++-------------- source/grpc_async_client_impl.h | 161 +++++++++++----------- source/tracer_impl.cc | 71 +++------- source/tracer_impl.h | 22 +-- source/utils/BUILD | 2 +- source/utils/buffer.h | 76 +++++++++++ source/utils/circular_buffer.h | 122 ----------------- test/BUILD | 1 - test/buffer_test.cc | 149 +++++---------------- test/grpc_async_client_test.cc | 131 +++++++++++++++--- test/mocks.h | 42 +----- test/tracer_test.cc | 12 +- test/tracing_context_test.cc | 2 +- 19 files changed, 483 insertions(+), 715 deletions(-) delete mode 100644 cpp2sky/internal/stream_builder.h create mode 100644 source/utils/buffer.h delete mode 100644 source/utils/circular_buffer.h diff --git a/BUILD b/BUILD index 7f60adb..3012d2e 100644 --- a/BUILD +++ b/BUILD @@ -9,5 +9,6 @@ refresh_compile_commands( "//cpp2sky/...": "", "//source/...": "", "//test/...": "", + "//example/...": "", }, ) diff --git a/cpp2sky/internal/BUILD b/cpp2sky/internal/BUILD index 397c85c..2769991 100644 --- a/cpp2sky/internal/BUILD +++ b/cpp2sky/internal/BUILD @@ -10,7 +10,6 @@ cc_library( ], visibility = ["//visibility:public"], deps = [ - "@com_github_grpc_grpc//:grpc++", "@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_grpc", ], ) @@ -23,12 +22,6 @@ cc_library( visibility = ["//visibility:public"], ) -cc_library( - name = "stream_builder_interface", - hdrs = ["stream_builder.h"], - visibility = ["//visibility:public"], -) - cc_library( name = "matcher_interface", hdrs = ["matcher.h"], diff --git a/cpp2sky/internal/async_client.h b/cpp2sky/internal/async_client.h index d9c8b59..a06cfa9 100644 --- a/cpp2sky/internal/async_client.h +++ b/cpp2sky/internal/async_client.h @@ -15,130 +15,54 @@ #pragma once #include -#include -#include +#include #include -#include "source/utils/circular_buffer.h" - -using google::protobuf::Message; +#include "language-agent/Tracing.pb.h" namespace cpp2sky { +using TracerRequestType = skywalking::v3::SegmentObject; +using TracerResponseType = skywalking::v3::Commands; + template -class AsyncClient { +class AsyncClientBase { public: - virtual ~AsyncClient() = default; + virtual ~AsyncClientBase() = default; /** * Send the specified protobuf message. */ virtual void sendMessage(RequestType message) = 0; - /** - * Pending message queue reference. - */ - virtual CircularBuffer& pendingMessages() = 0; - - /** - * Start stream if there is no living stream. - */ - virtual void startStream() = 0; - - /** - * Completion queue. - */ - virtual grpc::CompletionQueue& completionQueue() = 0; - - /** - * gRPC Stub - */ - virtual grpc::TemplatedGenericStub& stub() = 0; + virtual void resetClient() = 0; }; -template -using AsyncClientPtr = std::unique_ptr>; +using AsyncClient = AsyncClientBase; +using AsyncClientPtr = std::unique_ptr; template -class AsyncStream { +class AsyncStreamBase { public: - virtual ~AsyncStream() = default; + virtual ~AsyncStreamBase() = default; /** - * Send message. It will move the state from Init to Write. + * Send the specified protobuf message. */ virtual void sendMessage(RequestType message) = 0; }; -enum class StreamState : uint8_t { - Initialized = 0, - Ready = 1, - Idle = 2, - WriteDone = 3, - ReadDone = 4, -}; - -class AsyncStreamCallback { - public: - /** - * Callback when stream ready event occured. - */ - virtual void onReady() = 0; - - /** - * Callback when idle event occured. - */ - virtual void onIdle() = 0; - - /** - * Callback when write done event occured. - */ - virtual void onWriteDone() = 0; - - /** - * Callback when read done event occured. - */ - virtual void onReadDone() = 0; +template +using AsyncStreamBasePtr = + std::unique_ptr>; - /** - * Callback when stream had finished with arbitrary error. - */ - virtual void onStreamFinish() = 0; -}; +using AsyncStream = AsyncStreamBase; +using AsyncStreamSharedPtr = std::shared_ptr; -struct StreamCallbackTag { - public: - void callback(bool stream_finished) { - if (stream_finished) { - callback_->onStreamFinish(); - return; - } - - switch (state_) { - case StreamState::Ready: - callback_->onReady(); - break; - case StreamState::WriteDone: - callback_->onWriteDone(); - break; - case StreamState::Idle: - callback_->onIdle(); - break; - case StreamState::ReadDone: - callback_->onReadDone(); - break; - default: - break; - } - } - - StreamState state_; - AsyncStreamCallback* callback_; +struct AsyncEventTag { + std::function callback; }; - -template -using AsyncStreamSharedPtr = - std::shared_ptr>; +using AsyncEventTagPtr = std::unique_ptr; } // namespace cpp2sky diff --git a/cpp2sky/internal/stream_builder.h b/cpp2sky/internal/stream_builder.h deleted file mode 100644 index da7e26e..0000000 --- a/cpp2sky/internal/stream_builder.h +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2021 SkyAPM - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at - -// http://www.apache.org/licenses/LICENSE-2.0 - -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - -#include - -namespace cpp2sky { - -template -class ClientStreamingStreamBuilder { - public: - virtual ~ClientStreamingStreamBuilder() = default; - - /** - * Create async stream entity - */ - virtual AsyncStreamSharedPtr create( - AsyncClient& client, - std::condition_variable& cv) = 0; -}; - -template -using ClientStreamingStreamBuilderPtr = - std::unique_ptr>; - -template -class UnaryStreamBuilder { - public: - virtual ~UnaryStreamBuilder() = default; - - /** - * Create async stream entity - */ - virtual AsyncStreamSharedPtr create( - AsyncClient& client, RequestType request) = 0; -}; - -template -using UnaryStreamBuilderPtr = - std::unique_ptr>; - -} // namespace cpp2sky diff --git a/cpp2sky/tracer.h b/cpp2sky/tracer.h index 3eacd78..4cb43ff 100644 --- a/cpp2sky/tracer.h +++ b/cpp2sky/tracer.h @@ -40,6 +40,6 @@ class Tracer { using TracerPtr = std::unique_ptr; -TracerPtr createInsecureGrpcTracer(TracerConfig& cfg); +TracerPtr createInsecureGrpcTracer(const TracerConfig& cfg); } // namespace cpp2sky diff --git a/source/BUILD b/source/BUILD index c45126e..05b8ef3 100644 --- a/source/BUILD +++ b/source/BUILD @@ -4,22 +4,20 @@ cc_library( name = "cpp2sky_lib", srcs = [ "grpc_async_client_impl.cc", - "propagation_impl.cc", "tracer_impl.cc", - "tracing_context_impl.cc", ], hdrs = [ "grpc_async_client_impl.h", - "propagation_impl.h", "tracer_impl.h", - "tracing_context_impl.h", ], visibility = ["//visibility:public"], deps = [ + ":cpp2sky_data_lib", + "//cpp2sky:config_cc_proto", + "//cpp2sky:cpp2sky_data_interface", "//cpp2sky:cpp2sky_interface", "//cpp2sky/internal:async_client_interface", "//cpp2sky/internal:matcher_interface", - "//cpp2sky/internal:stream_builder_interface", "//source/matchers:suffix_matcher_lib", "//source/utils:util_lib", "@com_github_gabime_spdlog//:spdlog", diff --git a/source/grpc_async_client_impl.cc b/source/grpc_async_client_impl.cc index 42e6e0c..985cab8 100644 --- a/source/grpc_async_client_impl.cc +++ b/source/grpc_async_client_impl.cc @@ -14,146 +14,161 @@ #include "grpc_async_client_impl.h" +#include + #include +#include #include #include "absl/strings/string_view.h" #include "cpp2sky/exception.h" +#include "cpp2sky/internal/async_client.h" +#include "grpcpp/alarm.h" #include "spdlog/spdlog.h" namespace cpp2sky { namespace { -static constexpr absl::string_view authenticationKey = "authentication"; -} -using namespace spdlog; +static constexpr uint32_t MaxPendingMessagesSize = 1024; -GrpcAsyncSegmentReporterClient::GrpcAsyncSegmentReporterClient( - const std::string& address, grpc::CompletionQueue& cq, - ClientStreamingStreamBuilderPtr - factory, - std::shared_ptr cred) - : factory_(std::move(factory)), - cq_(cq), - stub_(grpc::CreateChannel(address, cred)) { - startStream(); -} +static std::string AuthenticationKey = "authentication"; -GrpcAsyncSegmentReporterClient::~GrpcAsyncSegmentReporterClient() { - // It will wait until there is no drained messages with 5 second timeout. - if (stream_) { - std::unique_lock lck(mux_); - while (!pending_messages_.empty()) { - cv_.wait_for(lck, std::chrono::seconds(5)); - pending_messages_.clear(); - } - } +static std::string TraceCollectMethod = "/TraceSegmentReportService/collect"; - resetStream(); -} - -void GrpcAsyncSegmentReporterClient::sendMessage(TracerRequestType message) { - pending_messages_.push(message); - - if (!stream_) { - info( - "[Reporter] No active stream, inserted message into pending message " - "queue. " - "pending message size: {}", - pending_messages_.size()); - return; - } +} // namespace - stream_->sendMessage(message); -} +using namespace spdlog; -void GrpcAsyncSegmentReporterClient::startStream() { - resetStream(); +void EventLoopThread::gogo() { + while (true) { + void* got_tag{nullptr}; + bool ok{false}; + + // true if got an event from the queue or false + // if the queue is fully drained and is shutdown. + const bool status = cq_.Next(&got_tag, &ok); + if (!status) { + assert(got_tag == nullptr); + assert(!ok); + info("[Reporter] Completion queue is drained and is shutdown."); + break; + } - stream_ = factory_->create(*this, cv_); - info("[Reporter] Stream {} had created.", fmt::ptr(stream_.get())); -} + assert(got_tag != nullptr); -void GrpcAsyncSegmentReporterClient::resetStream() { - if (stream_) { - info("[Reporter] Stream {} has destroyed.", fmt::ptr(stream_.get())); - stream_.reset(); + // The lifetime of the tag is managed by the caller. + auto* tag = static_cast(got_tag); + tag->callback(ok); } } -GrpcAsyncSegmentReporterStream::GrpcAsyncSegmentReporterStream( - AsyncClient& client, - std::condition_variable& cv, const std::string& token) - : client_(client), cv_(cv) { +GrpcAsyncSegmentReporterClient::GrpcAsyncSegmentReporterClient( + const std::string& address, const std::string& token, + CredentialsSharedPtr cred) + : stub_(grpc::CreateChannel(address, cred)) { if (!token.empty()) { - ctx_.AddMetadata(authenticationKey.data(), token); + client_ctx_.AddMetadata(AuthenticationKey, token); } - // Ensure pending RPC will complete if connection to the server is not - // established first because of like server is not ready. This will queue - // pending RPCs and when connection has established, Connected tag will be - // sent to CompletionQueue. - ctx_.set_wait_for_ready(true); + basic_event_tag_.reset(new AsyncEventTag{[this](bool ok) { + if (client_reset_) { + return; + } - request_writer_ = client_.stub().PrepareCall( - &ctx_, "/TraceSegmentReportService/collect", &client_.completionQueue()); - request_writer_->StartCall(reinterpret_cast(&ready_)); -} + if (ok) { + trace("[Reporter] Stream event success.", fmt::ptr(this)); -void GrpcAsyncSegmentReporterStream::sendMessage(TracerRequestType message) { - clearPendingMessage(); -} + // Mark event loop as idle because the previous Write() or + // other operations are successful. + markEventLoopIdle(); -bool GrpcAsyncSegmentReporterStream::clearPendingMessage() { - if (state_ != StreamState::Idle || client_.pendingMessages().empty()) { - return false; - } - auto message = client_.pendingMessages().front(); - if (!message.has_value()) { - return false; - } + sendMessageOnce(); + return; + } else { + trace("[Reporter] Stream event failure.", fmt::ptr(this)); + + // Do not mark event loop as idle because the previous Write() + // or other operations are failed. The event loop should keep + // running to process the re-creation of the stream. + assert(event_loop_idle_.load() == false); + // Reset stream and try to create a new one. + startStream(); + } + }}); + + write_event_tag_.reset(new AsyncEventTag{[this](bool ok) { + if (ok) { + trace("[Reporter] Stream {} message sending success.", fmt::ptr(this)); + messages_sent_++; + } else { + trace("[Reporter] Stream {} message sending failure.", fmt::ptr(this)); + messages_dropped_++; + } + // Delegate the event to basic_event_tag_ to trigger the next task or + // reset the stream. + basic_event_tag_->callback(ok); + }}); - request_writer_->Write(message.value(), - reinterpret_cast(&write_done_)); - return true; + startStream(); } -void GrpcAsyncSegmentReporterStream::onReady() { - info("[Reporter] Stream ready"); +void GrpcAsyncSegmentReporterClient::sendMessageOnce() { + bool expect_idle = true; + if (event_loop_idle_.compare_exchange_strong(expect_idle, false)) { + assert(active_stream_ != nullptr); + + auto opt_message = message_buffer_.pop_front(); + if (!opt_message.has_value()) { + // No message to send, mark event loop as idle. + markEventLoopIdle(); + return; + } - state_ = StreamState::Idle; - onIdle(); + active_stream_->sendMessage(std::move(opt_message).value()); + } } -void GrpcAsyncSegmentReporterStream::onIdle() { - info("[Reporter] Stream idleing"); +void GrpcAsyncSegmentReporterClient::startStream() { + resetStream(); // Reset stream before creating a new one. - // Release pending messages which are inserted when stream is not ready - // to write. - if (!clearPendingMessage()) { - cv_.notify_all(); - } + active_stream_.reset(new SegmentReporterStream( + stub_.PrepareCall(&client_ctx_, TraceCollectMethod, &event_loop_.cq_), + basic_event_tag_.get(), write_event_tag_.get())); + + info("[Reporter] Stream {} has created.", fmt::ptr(active_stream_.get())); +} + +void GrpcAsyncSegmentReporterClient::resetStream() { + info("[Reporter] Stream {} has destroyed.", fmt::ptr(active_stream_.get())); + active_stream_.reset(); } -void GrpcAsyncSegmentReporterStream::onWriteDone() { - info("[Reporter] Write finished"); +void GrpcAsyncSegmentReporterClient::sendMessage(TracerRequestType message) { + messages_total_++; - // Dequeue message after sending message finished. - // With this, messages which failed to sent never lost even if connection - // was closed. because pending messages with messages which failed to send - // will drained and resend another stream. - client_.pendingMessages().pop(); - state_ = StreamState::Idle; + const size_t pending = message_buffer_.size(); + if (pending > MaxPendingMessagesSize) { + info("[Reporter] pending message overflow and drop message"); + messages_dropped_++; + return; + } + message_buffer_.push_back(std::move(message)); + + sendMessageOnce(); +} - onIdle(); +SegmentReporterStream::SegmentReporterStream( + TraceReaderWriterPtr request_writer, AsyncEventTag* basic_event_tag, + AsyncEventTag* write_event_tag) + : request_writer_(std::move(request_writer)), + basic_event_tag_(basic_event_tag), + write_event_tag_(write_event_tag) { + request_writer_->StartCall(reinterpret_cast(basic_event_tag_)); } -AsyncStreamSharedPtr -GrpcAsyncSegmentReporterStreamBuilder::create( - AsyncClient& client, - std::condition_variable& cv) { - return std::make_shared(client, cv, token_); +void SegmentReporterStream::sendMessage(TracerRequestType message) { + request_writer_->Write(message, reinterpret_cast(write_event_tag_)); } } // namespace cpp2sky diff --git a/source/grpc_async_client_impl.h b/source/grpc_async_client_impl.h index 07a6b73..23537c8 100644 --- a/source/grpc_async_client_impl.h +++ b/source/grpc_async_client_impl.h @@ -18,116 +18,117 @@ #include #include +#include #include #include #include +#include #include "cpp2sky/config.pb.h" #include "cpp2sky/internal/async_client.h" -#include "cpp2sky/internal/stream_builder.h" #include "language-agent/Tracing.grpc.pb.h" -#include "language-agent/Tracing.pb.h" +#include "source/utils/buffer.h" namespace cpp2sky { -namespace { -static constexpr size_t pending_message_buffer_size = 1024; -} - -using TracerRequestType = skywalking::v3::SegmentObject; -using TracerResponseType = skywalking::v3::Commands; - -class GrpcAsyncSegmentReporterStream; - -class GrpcAsyncSegmentReporterClient final - : public AsyncClient { +class EventLoopThread { public: - GrpcAsyncSegmentReporterClient( - const std::string& address, grpc::CompletionQueue& cq, - ClientStreamingStreamBuilderPtr - factory, - std::shared_ptr cred); - ~GrpcAsyncSegmentReporterClient(); + EventLoopThread() : thread_([this] { this->gogo(); }) {} + ~EventLoopThread() { exit(); } - // AsyncClient - void sendMessage(TracerRequestType message) override; - CircularBuffer& pendingMessages() override { - return pending_messages_; - } - void startStream() override; - grpc::TemplatedGenericStub& stub() - override { - return stub_; - } - grpc::CompletionQueue& completionQueue() override { return cq_; } + grpc::CompletionQueue cq_; - size_t numOfMessages() { return pending_messages_.size(); } + void exit() { + if (!exited_) { + exited_ = true; + cq_.Shutdown(); + thread_.join(); + } + } private: - void resetStream(); + bool exited_{false}; + std::thread thread_; - std::string address_; - ClientStreamingStreamBuilderPtr - factory_; - grpc::CompletionQueue& cq_; - grpc::TemplatedGenericStub stub_; - AsyncStreamSharedPtr stream_; - CircularBuffer pending_messages_{ - pending_message_buffer_size}; - - std::mutex mux_; - std::condition_variable cv_; + void gogo(); }; -class GrpcAsyncSegmentReporterStream final - : public AsyncStream, - public AsyncStreamCallback { +using CredentialsSharedPtr = std::shared_ptr; +using TracerReaderWriter = + grpc::ClientAsyncReaderWriter; +using TraceReaderWriterPtr = std::unique_ptr; + +class SegmentReporterStream : public AsyncStream { public: - GrpcAsyncSegmentReporterStream( - AsyncClient& client, - std::condition_variable& cv, const std::string& token); + SegmentReporterStream(TraceReaderWriterPtr request_writer, + AsyncEventTag* basic_event_tag, + AsyncEventTag* write_event_tag); // AsyncStream void sendMessage(TracerRequestType message) override; - // AsyncStreamCallback - void onReady() override; - void onIdle() override; - void onWriteDone() override; - void onReadDone() override {} - void onStreamFinish() override { client_.startStream(); } - private: - bool clearPendingMessage(); - - AsyncClient& client_; - TracerResponseType commands_; - grpc::ClientContext ctx_; - std::unique_ptr< - grpc::ClientAsyncReaderWriter> - request_writer_; - StreamState state_{StreamState::Initialized}; - - StreamCallbackTag ready_{StreamState::Ready, this}; - StreamCallbackTag write_done_{StreamState::WriteDone, this}; + TraceReaderWriterPtr request_writer_; - std::condition_variable& cv_; + AsyncEventTag* basic_event_tag_; + AsyncEventTag* write_event_tag_; }; -class GrpcAsyncSegmentReporterStreamBuilder final - : public ClientStreamingStreamBuilder { +class GrpcAsyncSegmentReporterClient : public AsyncClient { public: - explicit GrpcAsyncSegmentReporterStreamBuilder(const std::string& token) - : token_(token) {} + GrpcAsyncSegmentReporterClient(const std::string& address, + const std::string& token, + CredentialsSharedPtr cred); + ~GrpcAsyncSegmentReporterClient() override { + if (!client_reset_) { + resetClient(); + } + } - // ClientStreamingStreamBuilder - AsyncStreamSharedPtr create( - AsyncClient& client, - std::condition_variable& cv) override; + // AsyncClient + void sendMessage(TracerRequestType message) override; + void resetClient() override { + // After this is called, no more events will be processed. + client_reset_ = true; + message_buffer_.clear(); + event_loop_.exit(); + resetStream(); + } - private: - std::string token_; + protected: + // Start or re-create the stream that used to send messages. + virtual void startStream(); + void resetStream(); + void markEventLoopIdle() { event_loop_idle_.store(true); } + void sendMessageOnce(); + + // This may be operated by multiple threads. + std::atomic messages_total_{0}; + std::atomic messages_dropped_{0}; + std::atomic messages_sent_{0}; + + EventLoopThread event_loop_; + grpc::ClientContext client_ctx_; + std::atomic client_reset_{false}; + + ValueBuffer message_buffer_; + + AsyncEventTagPtr basic_event_tag_; + AsyncEventTagPtr write_event_tag_; + + // The Write() of the stream could only be called once at a time + // until the previous Write() is finished (callback is called). + // Considering the complexity and the thread safety, we make sure + // that all operations on the stream are done one by one. + // This flag is used to indicate whether the event loop is idle + // before we perform the next operation on the stream. + // + // Initially the value is false because the event loop will be + // occupied by the first operation (startStream). + std::atomic event_loop_idle_{false}; + + grpc::TemplatedGenericStub stub_; + AsyncStreamSharedPtr active_stream_; }; } // namespace cpp2sky diff --git a/source/tracer_impl.cc b/source/tracer_impl.cc index 6957f30..3a2eec6 100644 --- a/source/tracer_impl.cc +++ b/source/tracer_impl.cc @@ -20,32 +20,26 @@ #include "cpp2sky/exception.h" #include "language-agent/ConfigurationDiscoveryService.pb.h" #include "matchers/suffix_matcher.h" +#include "source/grpc_async_client_impl.h" #include "spdlog/spdlog.h" namespace cpp2sky { -TracerImpl::TracerImpl(TracerConfig& config, - std::shared_ptr cred) - : config_(config), - evloop_thread_([this] { this->run(); }), - segment_factory_(config) { +using namespace spdlog; + +TracerImpl::TracerImpl(const TracerConfig& config, CredentialsSharedPtr cred) + : segment_factory_(config) { init(config, cred); } -TracerImpl::TracerImpl( - TracerConfig& config, - AsyncClientPtr reporter_client) - : config_(config), - reporter_client_(std::move(reporter_client)), - evloop_thread_([this] { this->run(); }), - segment_factory_(config) { +TracerImpl::TracerImpl(const TracerConfig& config, AsyncClientPtr async_client) + : async_client_(std::move(async_client)), segment_factory_(config) { init(config, nullptr); } TracerImpl::~TracerImpl() { - reporter_client_.reset(); - cq_.Shutdown(); - evloop_thread_.join(); + // Stop the reporter client. + async_client_->resetClient(); } TracingContextSharedPtr TracerImpl::newContext() { @@ -56,63 +50,40 @@ TracingContextSharedPtr TracerImpl::newContext(SpanContextSharedPtr span) { return segment_factory_.create(span); } -bool TracerImpl::report(TracingContextSharedPtr obj) { - if (!obj || !obj->readyToSend()) { +bool TracerImpl::report(TracingContextSharedPtr ctx) { + if (!ctx || !ctx->readyToSend()) { return false; } - for (const auto& op_name_matcher : op_name_matchers_) { - if (!obj->spans().empty() && - op_name_matcher->match(obj->spans().front()->operationName())) { + if (!ctx->spans().empty()) { + if (ignore_matcher_->match(ctx->spans().front()->operationName())) { return false; } } - reporter_client_->sendMessage(obj->createSegmentObject()); + async_client_->sendMessage(ctx->createSegmentObject()); return true; } -void TracerImpl::run() { - void* got_tag; - bool ok = false; - while (true) { - grpc::CompletionQueue::NextStatus status = cq_.AsyncNext( - &got_tag, &ok, gpr_time_from_nanos(0, GPR_CLOCK_REALTIME)); - switch (status) { - case grpc::CompletionQueue::TIMEOUT: - continue; - case grpc::CompletionQueue::SHUTDOWN: - return; - case grpc::CompletionQueue::GOT_EVENT: - break; - } - static_cast(got_tag)->callback(!ok); - } -} - -void TracerImpl::init(TracerConfig& config, - std::shared_ptr cred) { +void TracerImpl::init(const TracerConfig& config, CredentialsSharedPtr cred) { spdlog::set_level(spdlog::level::warn); - if (reporter_client_ == nullptr) { + if (async_client_ == nullptr) { if (config.protocol() == Protocol::GRPC) { - reporter_client_ = absl::make_unique( - config.address(), cq_, - absl::make_unique( - config.token()), - cred); + async_client_.reset(new GrpcAsyncSegmentReporterClient( + config.address(), config.token(), cred)); } else { throw TracerException("REST is not supported."); } } - op_name_matchers_.emplace_back(absl::make_unique( + ignore_matcher_.reset(new SuffixMatcher( std::vector(config.ignore_operation_name_suffix().begin(), config.ignore_operation_name_suffix().end()))); } -TracerPtr createInsecureGrpcTracer(TracerConfig& cfg) { - return absl::make_unique(cfg, grpc::InsecureChannelCredentials()); +TracerPtr createInsecureGrpcTracer(const TracerConfig& cfg) { + return TracerPtr{new TracerImpl(cfg, grpc::InsecureChannelCredentials())}; } } // namespace cpp2sky diff --git a/source/tracer_impl.h b/source/tracer_impl.h index 4f63f1f..2be2c7d 100644 --- a/source/tracer_impl.h +++ b/source/tracer_impl.h @@ -34,29 +34,21 @@ using CdsResponse = skywalking::v3::Commands; class TracerImpl : public Tracer { public: - TracerImpl(TracerConfig& config, - std::shared_ptr cred); - TracerImpl( - TracerConfig& config, - AsyncClientPtr reporter_client); + TracerImpl(const TracerConfig& config, CredentialsSharedPtr cred); + TracerImpl(const TracerConfig& config, AsyncClientPtr async_client); ~TracerImpl(); TracingContextSharedPtr newContext() override; TracingContextSharedPtr newContext(SpanContextSharedPtr span) override; - bool report(TracingContextSharedPtr obj) override; + bool report(TracingContextSharedPtr ctx) override; private: - void init(TracerConfig& config, - std::shared_ptr cred); - void run(); - - TracerConfig config_; - AsyncClientPtr reporter_client_; - grpc::CompletionQueue cq_; - std::thread evloop_thread_; + void init(const TracerConfig& config, CredentialsSharedPtr cred); + + AsyncClientPtr async_client_; TracingContextFactory segment_factory_; - std::list op_name_matchers_; + MatcherPtr ignore_matcher_; }; } // namespace cpp2sky diff --git a/source/utils/BUILD b/source/utils/BUILD index 6305d44..8a07d56 100644 --- a/source/utils/BUILD +++ b/source/utils/BUILD @@ -7,7 +7,7 @@ cc_library( ], hdrs = [ "base64.h", - "circular_buffer.h", + "buffer.h", "random_generator.h", "timer.h", ], diff --git a/source/utils/buffer.h b/source/utils/buffer.h new file mode 100644 index 0000000..e0f76d9 --- /dev/null +++ b/source/utils/buffer.h @@ -0,0 +1,76 @@ +// Copyright 2020 SkyAPM + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include "absl/types/optional.h" + +namespace cpp2sky { + +template +class ValueBuffer { + public: + ValueBuffer() = default; + + absl::optional pop_front() { + std::unique_lock lock(mux_); + if (buf_.empty()) { + return absl::nullopt; + } + auto result = std::move(buf_.front()); + buf_.pop_front(); + return result; + } + + /** + * Insert new value. + */ + void push_back(Value value) { + std::unique_lock lock(mux_); + buf_.emplace_back(std::move(value)); + } + + /** + * Check whether buffer is empty or not. + */ + bool empty() const { + std::unique_lock lock(mux_); + return buf_.empty(); + } + + /** + * Get item count + */ + size_t size() const { + std::unique_lock lock(mux_); + return buf_.size(); + } + + /** + * Clear buffer + */ + void clear() { + std::unique_lock lock(mux_); + buf_.clear(); + } + + private: + std::deque buf_; + mutable std::mutex mux_; +}; + +} // namespace cpp2sky diff --git a/source/utils/circular_buffer.h b/source/utils/circular_buffer.h deleted file mode 100644 index 050cd13..0000000 --- a/source/utils/circular_buffer.h +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2020 SkyAPM - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at - -// http://www.apache.org/licenses/LICENSE-2.0 - -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include - -#include "absl/types/optional.h" - -namespace cpp2sky { - -template -class CircularBuffer { - public: - CircularBuffer(size_t max_capacity) - : back_(max_capacity - 1), max_capacity_(max_capacity) {} - - // disable copy - CircularBuffer(const CircularBuffer&) = delete; - CircularBuffer& operator=(const CircularBuffer&) = delete; - - struct Buffer { - T value; - bool is_destroyed_; - }; - - /** - * Get value which inserted older than any other values. - * It will return nullopt if buffer is empty. - */ - absl::optional front() { - if (empty()) { - return absl::nullopt; - } - return buf_[front_].value; - } - - /** - * Delete oldest value. It won't delete actual data we can treat as logical - * deletion. - */ - void pop() { - std::unique_lock lock(mux_); - popInternal(); - } - - /** - * Insert new value. If the buffer has more than max_capacity, it will delete - * the oldest value. - */ - void push(T value) { - std::unique_lock lock(mux_); - if (buf_.size() < max_capacity_) { - buf_.emplace_back(Buffer{value, false}); - back_ = (back_ + 1) % max_capacity_; - ++item_count_; - return; - } - - back_ = (back_ + 1) % max_capacity_; - if (!buf_[back_].is_destroyed_) { - popInternal(); - } - buf_[back_] = Buffer{value, false}; - ++item_count_; - } - - /** - * Check whether buffer is empty or not. - */ - bool empty() { return item_count_ == 0; } - - /** - * Get item count - */ - size_t size() const { return item_count_; } - - /** - * Clear buffer - */ - void clear() { - buf_.clear(); - item_count_ = 0; - } - - // Used for test - size_t frontIdx() { return front_; } - size_t backIdx() { return back_; } - - private: - void popInternal() { - if (empty() || buf_[front_].is_destroyed_) { - return; - } - // Not to destroy actual data. - buf_[front_].is_destroyed_ = true; - --item_count_; - front_ = (front_ + 1) % max_capacity_; - } - - size_t front_ = 0; - size_t back_ = 0; - size_t max_capacity_; - size_t item_count_ = 0; - - std::deque buf_; - std::mutex mux_; -}; - -} // namespace cpp2sky diff --git a/test/BUILD b/test/BUILD index 1a3c133..1cba365 100644 --- a/test/BUILD +++ b/test/BUILD @@ -8,7 +8,6 @@ cc_library( deps = [ "//cpp2sky/internal:async_client_interface", "//cpp2sky/internal:random_generator_interface", - "//cpp2sky/internal:stream_builder_interface", "@com_google_googletest//:gtest_main", "@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_proto", ], diff --git a/test/buffer_test.cc b/test/buffer_test.cc index dceab9c..b1770f7 100644 --- a/test/buffer_test.cc +++ b/test/buffer_test.cc @@ -12,131 +12,44 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "source/utils/buffer.h" + #include #include #include "absl/memory/memory.h" -#include "source/utils/circular_buffer.h" namespace cpp2sky { -class CircularBufferTest : public testing::Test { - protected: - void setup(size_t size) { - buf_ = absl::make_unique>(size); - } - - void evaluate(size_t expect_front, size_t expect_back, bool expect_empty) { - EXPECT_EQ(expect_front, buf_->frontIdx()); - EXPECT_EQ(expect_back, buf_->backIdx()); - EXPECT_EQ(expect_empty, buf_->empty()); - } - - void checkFront(int expect_value) { - auto a = buf_->front(); - ASSERT_TRUE(a.has_value()); - EXPECT_EQ(a.value(), expect_value); - } - - std::unique_ptr> buf_; -}; - -TEST_F(CircularBufferTest, Basic) { - setup(3); - for (auto i = 0; i < 1000; ++i) { - buf_->pop(); - } - - buf_->push(1); - buf_->push(2); - buf_->push(3); - evaluate(0, 2, false); - - buf_->push(4); - evaluate(1, 0, false); - - buf_->push(5); - buf_->push(6); - evaluate(0, 2, false); - - checkFront(4); - buf_->pop(); - evaluate(1, 2, false); - - checkFront(5); - buf_->pop(); - evaluate(2, 2, false); - - buf_->push(7); - evaluate(2, 0, false); - - checkFront(6); - buf_->pop(); - evaluate(0, 0, false); - - checkFront(7); - buf_->pop(); - // Return to Empty state - evaluate(1, 0, true); - - buf_->push(8); - evaluate(1, 1, false); - - buf_->push(9); - buf_->push(10); - buf_->push(11); - buf_->push(12); - - checkFront(10); - evaluate(0, 2, false); - - buf_->pop(); - buf_->pop(); - buf_->pop(); - - evaluate(0, 2, true); - - for (auto i = 0; i < 1000; ++i) { - buf_->pop(); - } - - evaluate(0, 2, true); -} - -TEST_F(CircularBufferTest, Basic2) { - setup(3); - - buf_->push(1); - buf_->pop(); - evaluate(1, 0, true); - - buf_->push(2); - evaluate(1, 1, false); - - buf_->push(3); - buf_->push(4); - - buf_->pop(); - - checkFront(3); - buf_->pop(); - checkFront(4); - buf_->pop(); - - buf_->pop(); -} - -TEST_F(CircularBufferTest, Basic3) { - setup(3); - - buf_->push(1); - buf_->push(2); - buf_->push(3); - buf_->pop(); - buf_->push(4); - buf_->push(5); - - evaluate(2, 1, false); +TEST(BufferTest, Basic) { + ValueBuffer buffer; + EXPECT_TRUE(buffer.empty()); + EXPECT_EQ(buffer.size(), 0); + + buffer.push_back(1); + EXPECT_FALSE(buffer.empty()); + EXPECT_EQ(buffer.size(), 1); + + buffer.push_back(2); + EXPECT_FALSE(buffer.empty()); + EXPECT_EQ(buffer.size(), 2); + + auto value = buffer.pop_front(); + EXPECT_TRUE(value.has_value()); + EXPECT_EQ(value.value(), 1); + EXPECT_FALSE(buffer.empty()); + EXPECT_EQ(buffer.size(), 1); + + value = buffer.pop_front(); + EXPECT_TRUE(value.has_value()); + EXPECT_EQ(value.value(), 2); + EXPECT_TRUE(buffer.empty()); + EXPECT_EQ(buffer.size(), 0); + + value = buffer.pop_front(); + EXPECT_FALSE(value.has_value()); + EXPECT_TRUE(buffer.empty()); + EXPECT_EQ(buffer.size(), 0); } } // namespace cpp2sky diff --git a/test/grpc_async_client_test.cc b/test/grpc_async_client_test.cc index 806f7c7..c3bae59 100644 --- a/test/grpc_async_client_test.cc +++ b/test/grpc_async_client_test.cc @@ -14,10 +14,13 @@ #include #include +#include +#include #include #include "absl/memory/memory.h" +#include "cpp2sky/internal/async_client.h" #include "language-agent/Tracing.pb.h" #include "source/grpc_async_client_impl.h" #include "test/mocks.h" @@ -26,36 +29,132 @@ namespace cpp2sky { using testing::_; +struct TestStats { + TestStats(uint64_t total, uint64_t dropped, uint64_t sent) + : total_(total), dropped_(dropped), sent_(sent) { + pending_ = total_ - dropped_ - sent_; + } + + uint64_t total_{}; + uint64_t dropped_{}; + uint64_t sent_{}; + uint64_t pending_{}; +}; + +class TestGrpcAsyncSegmentReporterClient + : public GrpcAsyncSegmentReporterClient { + public: + using GrpcAsyncSegmentReporterClient::GrpcAsyncSegmentReporterClient; + + TestStats getTestStats() const { + TestStats stats(messages_total_.load(), messages_dropped_.load(), + messages_sent_.load()); + return stats; + } + + void notifyWriteEvent(bool success) { write_event_tag_->callback(success); } + void notifyStartEvent(bool success) { basic_event_tag_->callback(success); } + + uint64_t bufferSize() const { return message_buffer_.size(); } + + void startStream() override { + resetStream(); + active_stream_ = mock_stream_; + } + + std::shared_ptr mock_stream_ = + std::make_shared(); +}; + class GrpcAsyncSegmentReporterClientTest : public testing::Test { public: GrpcAsyncSegmentReporterClientTest() { - stream_ = std::make_shared< - MockAsyncStream>(); - factory_ = absl::make_unique>(stream_); - EXPECT_CALL(*factory_, create(_, _)); - - client_ = absl::make_unique( - address_, cq_, std::move(factory_), grpc::InsecureChannelCredentials()); + client_.reset(new TestGrpcAsyncSegmentReporterClient( + address_, token_, grpc::InsecureChannelCredentials())); + } + + ~GrpcAsyncSegmentReporterClientTest() { + client_->resetClient(); + client_.reset(); } protected: - grpc::CompletionQueue cq_; std::string address_{"localhost:50051"}; std::string token_{"token"}; - std::shared_ptr> - stream_; - std::unique_ptr< - MockClientStreamingStreamBuilder> - factory_; - std::unique_ptr client_; + std::unique_ptr client_; }; TEST_F(GrpcAsyncSegmentReporterClientTest, SendMessageTest) { skywalking::v3::SegmentObject fake_message; - EXPECT_CALL(*stream_, sendMessage(_)); + EXPECT_CALL(*client_->mock_stream_, sendMessage(_)).Times(0); client_->sendMessage(fake_message); + + auto stats = client_->getTestStats(); + EXPECT_EQ(stats.total_, 1); + EXPECT_EQ(stats.dropped_, 0); + EXPECT_EQ(stats.sent_, 0); + EXPECT_EQ(stats.pending_, 1); + EXPECT_EQ(client_->bufferSize(), 1); + + client_->notifyStartEvent(false); + + sleep(1); // wait for the event loop to process the event. + + // The stream is not ready, the message still in the buffer. + stats = client_->getTestStats(); + EXPECT_EQ(stats.total_, 1); + EXPECT_EQ(stats.dropped_, 0); + EXPECT_EQ(stats.sent_, 0); + EXPECT_EQ(stats.pending_, 1); + EXPECT_EQ(client_->bufferSize(), 1); + + EXPECT_CALL(*client_->mock_stream_, sendMessage(_)); + client_->notifyStartEvent(true); + sleep(1); // wait for the event loop to process the event. + + // The stream is ready, the message is popped and sent. + // But before the collback is called, the stats is not updated. + + stats = client_->getTestStats(); + EXPECT_EQ(stats.total_, 1); + EXPECT_EQ(stats.dropped_, 0); + EXPECT_EQ(stats.sent_, 0); + EXPECT_EQ(stats.pending_, 1); + EXPECT_EQ(client_->bufferSize(), 0); + + client_->notifyWriteEvent(true); + sleep(1); // wait for the event loop to process the event. + + // The message is sent successfully. + stats = client_->getTestStats(); + EXPECT_EQ(stats.total_, 1); + EXPECT_EQ(stats.dropped_, 0); + EXPECT_EQ(stats.sent_, 1); + EXPECT_EQ(stats.pending_, 0); + EXPECT_EQ(client_->bufferSize(), 0); + + // Send another message. This time the stream is ready and + // previous message is sent successfully. So the new message + // should be sent immediately. + EXPECT_CALL(*client_->mock_stream_, sendMessage(_)); + client_->sendMessage(fake_message); + sleep(1); // wait for the event loop to process the event. + + stats = client_->getTestStats(); + EXPECT_EQ(stats.total_, 2); + EXPECT_EQ(stats.dropped_, 0); + EXPECT_EQ(stats.sent_, 1); + EXPECT_EQ(stats.pending_, 1); + + client_->notifyWriteEvent(true); + sleep(1); // wait for the event loop to process the event. + + stats = client_->getTestStats(); + EXPECT_EQ(stats.total_, 2); + EXPECT_EQ(stats.dropped_, 0); + EXPECT_EQ(stats.sent_, 2); + EXPECT_EQ(stats.pending_, 0); } } // namespace cpp2sky diff --git a/test/mocks.h b/test/mocks.h index 5c48e4a..bcaaf07 100644 --- a/test/mocks.h +++ b/test/mocks.h @@ -21,7 +21,6 @@ #include "cpp2sky/internal/async_client.h" #include "cpp2sky/internal/random_generator.h" -#include "cpp2sky/internal/stream_builder.h" using testing::_; using testing::Return; @@ -34,46 +33,15 @@ class MockRandomGenerator : public RandomGenerator { MOCK_METHOD(std::string, uuid, ()); }; -template -class MockAsyncStream : public AsyncStream { +class MockAsyncStream : public AsyncStream { public: - MOCK_METHOD(void, sendMessage, (RequestType)); - MOCK_METHOD(void, onIdle, ()); - MOCK_METHOD(void, onWriteDone, ()); - MOCK_METHOD(void, onReady, ()); + MOCK_METHOD(void, sendMessage, (TracerRequestType)); }; -template -class MockAsyncClient : public AsyncClient { +class MockAsyncClient : public AsyncClient { public: - using GenericStub = grpc::TemplatedGenericStub; - - MOCK_METHOD(void, sendMessage, (RequestType)); - MOCK_METHOD(GenericStub&, stub, ()); - MOCK_METHOD(CircularBuffer&, pendingMessages, ()); - MOCK_METHOD(void, startStream, ()); - MOCK_METHOD(grpc::CompletionQueue&, completionQueue, ()); -}; - -template -class MockClientStreamingStreamBuilder final - : public ClientStreamingStreamBuilder { - public: - using AsyncClientType = AsyncClient; - using AsyncStreamSharedPtrType = - AsyncStreamSharedPtr; - - MockClientStreamingStreamBuilder( - std::shared_ptr> stream) - : stream_(stream) { - ON_CALL(*this, create(_, _)).WillByDefault(Return(stream_)); - } - - MOCK_METHOD(AsyncStreamSharedPtrType, create, - (AsyncClientType&, std::condition_variable&)); - - private: - std::shared_ptr> stream_; + MOCK_METHOD(void, sendMessage, (TracerRequestType)); + MOCK_METHOD(void, resetClient, ()); }; } // namespace cpp2sky diff --git a/test/tracer_test.cc b/test/tracer_test.cc index bab1f91..907f80f 100644 --- a/test/tracer_test.cc +++ b/test/tracer_test.cc @@ -17,6 +17,7 @@ #include #include "cpp2sky/config.pb.h" +#include "cpp2sky/internal/async_client.h" #include "mocks.h" #include "source/tracer_impl.h" @@ -26,9 +27,7 @@ TEST(TracerTest, MatchedOpShouldIgnored) { TracerConfig config; *config.add_ignore_operation_name_suffix() = "/ignored"; - TracerImpl tracer( - config, absl::make_unique< - MockAsyncClient>()); + TracerImpl tracer(config, AsyncClientPtr{new MockAsyncClient()}); auto context = tracer.newContext(); auto span = context->createEntrySpan(); @@ -41,9 +40,7 @@ TEST(TracerTest, MatchedOpShouldIgnored) { TEST(TracerTest, NotClosedSpanExists) { TracerConfig config; - TracerImpl tracer( - config, absl::make_unique< - MockAsyncClient>()); + TracerImpl tracer(config, AsyncClientPtr{new MockAsyncClient()}); auto context = tracer.newContext(); auto span = context->createEntrySpan(); @@ -55,8 +52,7 @@ TEST(TracerTest, NotClosedSpanExists) { TEST(TracerTest, Success) { TracerConfig config; - auto mock_reporter = absl::make_unique< - MockAsyncClient>(); + auto mock_reporter = std::unique_ptr{new MockAsyncClient()}; EXPECT_CALL(*mock_reporter, sendMessage(_)); TracerImpl tracer(config, std::move(mock_reporter)); diff --git a/test/tracing_context_test.cc b/test/tracing_context_test.cc index 9114670..b802b23 100644 --- a/test/tracing_context_test.cc +++ b/test/tracing_context_test.cc @@ -44,7 +44,7 @@ class TracingContextTest : public testing::Test { span_ctx_ = std::make_shared(sample_ctx); span_ext_ctx_ = std::make_shared("1"); - factory_ = absl::make_unique(config_); + factory_.reset(new TracingContextFactory(config_)); } protected: From e8a646f3b0d08f996ecb03563a346172c52bc6c2 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Wed, 14 Aug 2024 14:55:23 +0800 Subject: [PATCH 2/7] add log to CI Signed-off-by: wbpcode --- .github/workflows/main.yml | 8 ++++---- test/e2e/requirements.txt | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index efef18d..454ba06 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -27,16 +27,16 @@ jobs: - uses: actions/checkout@v3 - name: Run bazel test with GCC c++11 run: | - bazel test --cxxopt=-std=c++0x //... + bazel test --test_output=all -c dbg --cxxopt=-std=c++0x //... - name: Run bazel test with GCC c++17 run: | - bazel test --cxxopt=-std=c++17 //... + bazel test --test_output=all -c opt --cxxopt=-std=c++17 //... - name: Run bazel test with CLANG c++11 run: | - bazel test --config=clang --cxxopt=-std=c++0x //... + bazel test --test_output=all -c dbg --config=clang --cxxopt=-std=c++0x //... - name: Run bazel test with CLANG c++17 run: | - bazel test --config=clang --cxxopt=-std=c++17 //... + bazel test --test_output=all -c opt --config=clang --cxxopt=-std=c++17 //... - name: Install cmake dependencies and run cmake compile run: | sudo apt update diff --git a/test/e2e/requirements.txt b/test/e2e/requirements.txt index b9612dd..3a31ceb 100644 --- a/test/e2e/requirements.txt +++ b/test/e2e/requirements.txt @@ -1,7 +1,6 @@ PyYAML==6.0 requests==2.26.0 apache-skywalking==0.6.0 -protobuf==3.18.3 tornado==6.1 sanic==21.12.2 Flask==2.0.2 From c156ab622acc7236e8d912669b3d1557934b8856 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Wed, 14 Aug 2024 18:34:21 +0800 Subject: [PATCH 3/7] minor update Signed-off-by: wbpcode --- source/grpc_async_client_impl.cc | 7 +++---- test/tracer_test.cc | 9 ++++++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/source/grpc_async_client_impl.cc b/source/grpc_async_client_impl.cc index 985cab8..78faca7 100644 --- a/source/grpc_async_client_impl.cc +++ b/source/grpc_async_client_impl.cc @@ -23,7 +23,6 @@ #include "absl/strings/string_view.h" #include "cpp2sky/exception.h" #include "cpp2sky/internal/async_client.h" -#include "grpcpp/alarm.h" #include "spdlog/spdlog.h" namespace cpp2sky { @@ -132,15 +131,15 @@ void GrpcAsyncSegmentReporterClient::sendMessageOnce() { void GrpcAsyncSegmentReporterClient::startStream() { resetStream(); // Reset stream before creating a new one. - active_stream_.reset(new SegmentReporterStream( + active_stream_ = std::make_shared( stub_.PrepareCall(&client_ctx_, TraceCollectMethod, &event_loop_.cq_), - basic_event_tag_.get(), write_event_tag_.get())); + basic_event_tag_.get(), write_event_tag_.get()); info("[Reporter] Stream {} has created.", fmt::ptr(active_stream_.get())); } void GrpcAsyncSegmentReporterClient::resetStream() { - info("[Reporter] Stream {} has destroyed.", fmt::ptr(active_stream_.get())); + info("[Reporter] Stream {} has deleted.", fmt::ptr(active_stream_.get())); active_stream_.reset(); } diff --git a/test/tracer_test.cc b/test/tracer_test.cc index 907f80f..9ba8ed7 100644 --- a/test/tracer_test.cc +++ b/test/tracer_test.cc @@ -27,7 +27,8 @@ TEST(TracerTest, MatchedOpShouldIgnored) { TracerConfig config; *config.add_ignore_operation_name_suffix() = "/ignored"; - TracerImpl tracer(config, AsyncClientPtr{new MockAsyncClient()}); + TracerImpl tracer(config, + AsyncClientPtr{new testing::NiceMock()}); auto context = tracer.newContext(); auto span = context->createEntrySpan(); @@ -40,7 +41,8 @@ TEST(TracerTest, MatchedOpShouldIgnored) { TEST(TracerTest, NotClosedSpanExists) { TracerConfig config; - TracerImpl tracer(config, AsyncClientPtr{new MockAsyncClient()}); + TracerImpl tracer(config, + AsyncClientPtr{new testing::NiceMock()}); auto context = tracer.newContext(); auto span = context->createEntrySpan(); @@ -52,7 +54,8 @@ TEST(TracerTest, NotClosedSpanExists) { TEST(TracerTest, Success) { TracerConfig config; - auto mock_reporter = std::unique_ptr{new MockAsyncClient()}; + auto mock_reporter = std::unique_ptr{ + new testing::NiceMock()}; EXPECT_CALL(*mock_reporter, sendMessage(_)); TracerImpl tracer(config, std::move(mock_reporter)); From 7b73036b90d6cbe7e0703e190d2cee9336445a44 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Wed, 14 Aug 2024 18:51:41 +0800 Subject: [PATCH 4/7] minor update Signed-off-by: wbpcode --- .github/workflows/main.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 454ba06..c50548a 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -27,10 +27,10 @@ jobs: - uses: actions/checkout@v3 - name: Run bazel test with GCC c++11 run: | - bazel test --test_output=all -c dbg --cxxopt=-std=c++0x //... + bazel test --test_output=all --cxxopt=-std=c++0x //... - name: Run bazel test with GCC c++17 run: | - bazel test --test_output=all -c opt --cxxopt=-std=c++17 //... + bazel test --test_output=all --cxxopt=-std=c++17 //... - name: Run bazel test with CLANG c++11 run: | bazel test --test_output=all -c dbg --config=clang --cxxopt=-std=c++0x //... From c94a4d92ced082759c74c0cf1b5b37053b848bfc Mon Sep 17 00:00:00 2001 From: wbpcode Date: Thu, 15 Aug 2024 11:47:41 +0800 Subject: [PATCH 5/7] fix unit test Signed-off-by: wbpcode --- cpp2sky/internal/BUILD | 1 + cpp2sky/internal/async_client.h | 70 ++++++++++++++++++--- source/grpc_async_client_impl.cc | 102 +++++++++++++++++++++---------- source/grpc_async_client_impl.h | 90 ++++++++++++++++++--------- source/tracer_impl.cc | 12 ++-- source/tracer_impl.h | 6 +- test/grpc_async_client_test.cc | 65 ++++++++++++++------ test/mocks.h | 10 ++- test/tracer_test.cc | 12 ++-- 9 files changed, 255 insertions(+), 113 deletions(-) diff --git a/cpp2sky/internal/BUILD b/cpp2sky/internal/BUILD index 2769991..b68a5f4 100644 --- a/cpp2sky/internal/BUILD +++ b/cpp2sky/internal/BUILD @@ -10,6 +10,7 @@ cc_library( ], visibility = ["//visibility:public"], deps = [ + "@com_github_grpc_grpc//:grpc++", "@skywalking_data_collect_protocol//language-agent:tracing_protocol_cc_grpc", ], ) diff --git a/cpp2sky/internal/async_client.h b/cpp2sky/internal/async_client.h index a06cfa9..a2f88a7 100644 --- a/cpp2sky/internal/async_client.h +++ b/cpp2sky/internal/async_client.h @@ -14,18 +14,19 @@ #pragma once -#include - #include #include +#include "google/protobuf/message.h" +#include "grpcpp/generic/generic_stub.h" +#include "grpcpp/grpcpp.h" #include "language-agent/Tracing.pb.h" namespace cpp2sky { -using TracerRequestType = skywalking::v3::SegmentObject; -using TracerResponseType = skywalking::v3::Commands; - +/** + * Template base class for gRPC async client. + */ template class AsyncClientBase { public: @@ -36,12 +37,21 @@ class AsyncClientBase { */ virtual void sendMessage(RequestType message) = 0; + /** + * Reset the client. This should be called when the client is no longer + * needed. + */ virtual void resetClient() = 0; }; -using AsyncClient = AsyncClientBase; -using AsyncClientPtr = std::unique_ptr; +template +using AsyncClientBasePtr = + std::unique_ptr>; +/** + * Template base class for gRPC async stream. The stream is used to represent + * a single gRPC stream/request. + */ template class AsyncStreamBase { public: @@ -57,12 +67,52 @@ template using AsyncStreamBasePtr = std::unique_ptr>; -using AsyncStream = AsyncStreamBase; -using AsyncStreamSharedPtr = std::shared_ptr; - +/** + * Tag for async operation. The callback should be called when the operation is + * done. + */ struct AsyncEventTag { std::function callback; }; using AsyncEventTagPtr = std::unique_ptr; +using GrpcClientContextPtr = std::unique_ptr; +using GrpcCompletionQueue = grpc::CompletionQueue; + +/** + * Factory for creating async stream. + */ +template +class AsyncStreamFactoryBase { + public: + virtual ~AsyncStreamFactoryBase() = default; + + using AsyncStreamPtr = AsyncStreamBasePtr; + using GrpcStub = grpc::TemplatedGenericStub; + + virtual AsyncStreamPtr createStream(GrpcClientContextPtr client_ctx, + GrpcStub& stub, GrpcCompletionQueue& cq, + AsyncEventTag& basic_event_tag, + AsyncEventTag& write_event_tag) = 0; +}; + +template +using AsyncStreamFactoryBasePtr = + std::unique_ptr>; + +using TraceRequestType = skywalking::v3::SegmentObject; +using TraceResponseType = skywalking::v3::Commands; + +using TraceAsyncStream = AsyncStreamBase; +using TraceAsyncStreamPtr = + AsyncStreamBasePtr; + +using TraceAsyncStreamFactory = + AsyncStreamFactoryBase; +using TraceAsyncStreamFactoryPtr = + AsyncStreamFactoryBasePtr; + +using TraceAsyncClient = AsyncClientBase; +using TraceAsyncClientPtr = std::unique_ptr; + } // namespace cpp2sky diff --git a/source/grpc_async_client_impl.cc b/source/grpc_async_client_impl.cc index 78faca7..f4f7d6b 100644 --- a/source/grpc_async_client_impl.cc +++ b/source/grpc_async_client_impl.cc @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "grpc_async_client_impl.h" +#include "source/grpc_async_client_impl.h" #include @@ -62,15 +62,50 @@ void EventLoopThread::gogo() { } } -GrpcAsyncSegmentReporterClient::GrpcAsyncSegmentReporterClient( - const std::string& address, const std::string& token, - CredentialsSharedPtr cred) - : stub_(grpc::CreateChannel(address, cred)) { - if (!token.empty()) { - client_ctx_.AddMetadata(AuthenticationKey, token); +TraceAsyncStreamImpl::TraceAsyncStreamImpl(GrpcClientContextPtr client_ctx, + TraceGrpcStub& stub, + GrpcCompletionQueue& cq, + AsyncEventTag& basic_event_tag, + AsyncEventTag& write_event_tag) + : client_ctx_(std::move(client_ctx)), + basic_event_tag_(basic_event_tag), + write_event_tag_(write_event_tag) { + if (client_ctx_ == nullptr) { + client_ctx_.reset(new grpc::ClientContext()); } - basic_event_tag_.reset(new AsyncEventTag{[this](bool ok) { + request_writer_ = + stub.PrepareCall(client_ctx_.get(), TraceCollectMethod, &cq); + request_writer_->StartCall(reinterpret_cast(&basic_event_tag_)); +} + +void TraceAsyncStreamImpl::sendMessage(TraceRequestType message) { + request_writer_->Write(message, reinterpret_cast(&write_event_tag_)); +} + +TraceAsyncStreamPtr TraceAsyncStreamFactoryImpl::createStream( + GrpcClientContextPtr client_ctx, TraceGrpcStub& stub, + GrpcCompletionQueue& cq, AsyncEventTag& basic_event_tag, + AsyncEventTag& write_event_tag) { + return TraceAsyncStreamPtr{new TraceAsyncStreamImpl( + std::move(client_ctx), stub, cq, basic_event_tag, write_event_tag)}; +} + +std::unique_ptr TraceAsyncClientImpl::createClient( + const std::string& address, const std::string& token, + TraceAsyncStreamFactoryPtr factory, CredentialsSharedPtr cred) { + return std::unique_ptr{new TraceAsyncClientImpl( + address, token, std::move(factory), std::move(cred))}; +} + +TraceAsyncClientImpl::TraceAsyncClientImpl(const std::string& address, + const std::string& token, + TraceAsyncStreamFactoryPtr factory, + CredentialsSharedPtr cred) + : token_(token), + stream_factory_(std::move(factory)), + stub_(grpc::CreateChannel(address, cred)) { + basic_event_tag_.callback = [this](bool ok) { if (client_reset_) { return; } @@ -94,9 +129,9 @@ GrpcAsyncSegmentReporterClient::GrpcAsyncSegmentReporterClient( // Reset stream and try to create a new one. startStream(); } - }}); + }; - write_event_tag_.reset(new AsyncEventTag{[this](bool ok) { + write_event_tag_.callback = [this](bool ok) { if (ok) { trace("[Reporter] Stream {} message sending success.", fmt::ptr(this)); messages_sent_++; @@ -106,13 +141,18 @@ GrpcAsyncSegmentReporterClient::GrpcAsyncSegmentReporterClient( } // Delegate the event to basic_event_tag_ to trigger the next task or // reset the stream. - basic_event_tag_->callback(ok); - }}); + basic_event_tag_.callback(ok); + }; + + // If the factory is not provided, use the default one. + if (stream_factory_ == nullptr) { + stream_factory_.reset(new TraceAsyncStreamFactoryImpl()); + } startStream(); } -void GrpcAsyncSegmentReporterClient::sendMessageOnce() { +void TraceAsyncClientImpl::sendMessageOnce() { bool expect_idle = true; if (event_loop_idle_.compare_exchange_strong(expect_idle, false)) { assert(active_stream_ != nullptr); @@ -128,22 +168,31 @@ void GrpcAsyncSegmentReporterClient::sendMessageOnce() { } } -void GrpcAsyncSegmentReporterClient::startStream() { - resetStream(); // Reset stream before creating a new one. +void TraceAsyncClientImpl::startStream() { + if (active_stream_ != nullptr) { + resetStream(); // Reset stream before creating a new one. + } + + // Create the unique client context for the new stream. + // Each stream should have its own context. + auto client_ctx = GrpcClientContextPtr{new grpc::ClientContext()}; + if (!token_.empty()) { + client_ctx->AddMetadata(AuthenticationKey, token_); + } - active_stream_ = std::make_shared( - stub_.PrepareCall(&client_ctx_, TraceCollectMethod, &event_loop_.cq_), - basic_event_tag_.get(), write_event_tag_.get()); + active_stream_ = stream_factory_->createStream( + std::move(client_ctx), stub_, event_loop_.cq_, basic_event_tag_, + write_event_tag_); info("[Reporter] Stream {} has created.", fmt::ptr(active_stream_.get())); } -void GrpcAsyncSegmentReporterClient::resetStream() { +void TraceAsyncClientImpl::resetStream() { info("[Reporter] Stream {} has deleted.", fmt::ptr(active_stream_.get())); active_stream_.reset(); } -void GrpcAsyncSegmentReporterClient::sendMessage(TracerRequestType message) { +void TraceAsyncClientImpl::sendMessage(TraceRequestType message) { messages_total_++; const size_t pending = message_buffer_.size(); @@ -157,17 +206,4 @@ void GrpcAsyncSegmentReporterClient::sendMessage(TracerRequestType message) { sendMessageOnce(); } -SegmentReporterStream::SegmentReporterStream( - TraceReaderWriterPtr request_writer, AsyncEventTag* basic_event_tag, - AsyncEventTag* write_event_tag) - : request_writer_(std::move(request_writer)), - basic_event_tag_(basic_event_tag), - write_event_tag_(write_event_tag) { - request_writer_->StartCall(reinterpret_cast(basic_event_tag_)); -} - -void SegmentReporterStream::sendMessage(TracerRequestType message) { - request_writer_->Write(message, reinterpret_cast(write_event_tag_)); -} - } // namespace cpp2sky diff --git a/source/grpc_async_client_impl.h b/source/grpc_async_client_impl.h index 23537c8..251727b 100644 --- a/source/grpc_async_client_impl.h +++ b/source/grpc_async_client_impl.h @@ -14,14 +14,11 @@ #pragma once -#include -#include - -#include #include #include #include #include +#include #include #include "cpp2sky/config.pb.h" @@ -31,6 +28,14 @@ namespace cpp2sky { +using CredentialsSharedPtr = std::shared_ptr; + +using TraceGrpcStub = + grpc::TemplatedGenericStub; +using TraceReaderWriter = + grpc::ClientAsyncReaderWriter; +using TraceReaderWriterPtr = std::unique_ptr; + class EventLoopThread { public: EventLoopThread() : thread_([this] { this->gogo(); }) {} @@ -53,40 +58,60 @@ class EventLoopThread { void gogo(); }; -using CredentialsSharedPtr = std::shared_ptr; -using TracerReaderWriter = - grpc::ClientAsyncReaderWriter; -using TraceReaderWriterPtr = std::unique_ptr; - -class SegmentReporterStream : public AsyncStream { +class TraceAsyncStreamImpl : public TraceAsyncStream { public: - SegmentReporterStream(TraceReaderWriterPtr request_writer, - AsyncEventTag* basic_event_tag, - AsyncEventTag* write_event_tag); + TraceAsyncStreamImpl(GrpcClientContextPtr client_ctx, TraceGrpcStub& stub, + GrpcCompletionQueue& cq, AsyncEventTag& basic_event_tag, + AsyncEventTag& write_event_tag); // AsyncStream - void sendMessage(TracerRequestType message) override; + void sendMessage(TraceRequestType message) override; private: + GrpcClientContextPtr client_ctx_; TraceReaderWriterPtr request_writer_; - AsyncEventTag* basic_event_tag_; - AsyncEventTag* write_event_tag_; + AsyncEventTag& basic_event_tag_; + AsyncEventTag& write_event_tag_; }; -class GrpcAsyncSegmentReporterClient : public AsyncClient { +class TraceAsyncStreamFactoryImpl : public TraceAsyncStreamFactory { public: - GrpcAsyncSegmentReporterClient(const std::string& address, - const std::string& token, - CredentialsSharedPtr cred); - ~GrpcAsyncSegmentReporterClient() override { + TraceAsyncStreamFactoryImpl() = default; + + TraceAsyncStreamPtr createStream(GrpcClientContextPtr client_ctx, + GrpcStub& stub, GrpcCompletionQueue& cq, + AsyncEventTag& basic_event_tag, + AsyncEventTag& write_event_tag) override; +}; + +class TraceAsyncClientImpl : public TraceAsyncClient { + public: + /** + * Create a new GrpcAsyncSegmentReporterClient. + * + * @param address The address of the server. + * @param token The optional token used to authenticate the client. + * If non-empty token is provided, the client will send the token + * to the server in the metadata. + * @param cred The credentials for creating the channel. + * @param factory The factory function to create the stream from the + * request writer and event tags. In most cases, the default factory + * should be used. + */ + static std::unique_ptr createClient( + const std::string& address, const std::string& token, + TraceAsyncStreamFactoryPtr factory = nullptr, + CredentialsSharedPtr cred = grpc::InsecureChannelCredentials()); + + ~TraceAsyncClientImpl() override { if (!client_reset_) { resetClient(); } } // AsyncClient - void sendMessage(TracerRequestType message) override; + void sendMessage(TraceRequestType message) override; void resetClient() override { // After this is called, no more events will be processed. client_reset_ = true; @@ -96,25 +121,33 @@ class GrpcAsyncSegmentReporterClient : public AsyncClient { } protected: + TraceAsyncClientImpl( + const std::string& address, const std::string& token, + TraceAsyncStreamFactoryPtr factory = nullptr, + CredentialsSharedPtr cred = grpc::InsecureChannelCredentials()); + // Start or re-create the stream that used to send messages. - virtual void startStream(); + void startStream(); void resetStream(); void markEventLoopIdle() { event_loop_idle_.store(true); } void sendMessageOnce(); + const std::string token_; + TraceAsyncStreamFactoryPtr stream_factory_; + TraceGrpcStub stub_; + // This may be operated by multiple threads. std::atomic messages_total_{0}; std::atomic messages_dropped_{0}; std::atomic messages_sent_{0}; EventLoopThread event_loop_; - grpc::ClientContext client_ctx_; std::atomic client_reset_{false}; - ValueBuffer message_buffer_; + ValueBuffer message_buffer_; - AsyncEventTagPtr basic_event_tag_; - AsyncEventTagPtr write_event_tag_; + AsyncEventTag basic_event_tag_; + AsyncEventTag write_event_tag_; // The Write() of the stream could only be called once at a time // until the previous Write() is finished (callback is called). @@ -127,8 +160,7 @@ class GrpcAsyncSegmentReporterClient : public AsyncClient { // occupied by the first operation (startStream). std::atomic event_loop_idle_{false}; - grpc::TemplatedGenericStub stub_; - AsyncStreamSharedPtr active_stream_; + TraceAsyncStreamPtr active_stream_; }; } // namespace cpp2sky diff --git a/source/tracer_impl.cc b/source/tracer_impl.cc index 3a2eec6..e7320c1 100644 --- a/source/tracer_impl.cc +++ b/source/tracer_impl.cc @@ -32,7 +32,8 @@ TracerImpl::TracerImpl(const TracerConfig& config, CredentialsSharedPtr cred) init(config, cred); } -TracerImpl::TracerImpl(const TracerConfig& config, AsyncClientPtr async_client) +TracerImpl::TracerImpl(const TracerConfig& config, + TraceAsyncClientPtr async_client) : async_client_(std::move(async_client)), segment_factory_(config) { init(config, nullptr); } @@ -69,12 +70,11 @@ void TracerImpl::init(const TracerConfig& config, CredentialsSharedPtr cred) { spdlog::set_level(spdlog::level::warn); if (async_client_ == nullptr) { - if (config.protocol() == Protocol::GRPC) { - async_client_.reset(new GrpcAsyncSegmentReporterClient( - config.address(), config.token(), cred)); - } else { - throw TracerException("REST is not supported."); + if (config.protocol() != Protocol::GRPC) { + throw TracerException("Only GRPC is supported."); } + async_client_ = TraceAsyncClientImpl::createClient( + config.address(), config.token(), nullptr, std::move(cred)); } ignore_matcher_.reset(new SuffixMatcher( diff --git a/source/tracer_impl.h b/source/tracer_impl.h index 2be2c7d..febd577 100644 --- a/source/tracer_impl.h +++ b/source/tracer_impl.h @@ -34,8 +34,8 @@ using CdsResponse = skywalking::v3::Commands; class TracerImpl : public Tracer { public: - TracerImpl(const TracerConfig& config, CredentialsSharedPtr cred); - TracerImpl(const TracerConfig& config, AsyncClientPtr async_client); + TracerImpl(const TracerConfig& config, CredentialsSharedPtr credentials); + TracerImpl(const TracerConfig& config, TraceAsyncClientPtr async_client); ~TracerImpl(); TracingContextSharedPtr newContext() override; @@ -46,7 +46,7 @@ class TracerImpl : public Tracer { private: void init(const TracerConfig& config, CredentialsSharedPtr cred); - AsyncClientPtr async_client_; + TraceAsyncClientPtr async_client_; TracingContextFactory segment_factory_; MatcherPtr ignore_matcher_; }; diff --git a/test/grpc_async_client_test.cc b/test/grpc_async_client_test.cc index c3bae59..4016d6d 100644 --- a/test/grpc_async_client_test.cc +++ b/test/grpc_async_client_test.cc @@ -41,10 +41,13 @@ struct TestStats { uint64_t pending_{}; }; -class TestGrpcAsyncSegmentReporterClient - : public GrpcAsyncSegmentReporterClient { +class TestTraceAsyncClient : public TraceAsyncClientImpl { public: - using GrpcAsyncSegmentReporterClient::GrpcAsyncSegmentReporterClient; + TestTraceAsyncClient(const std::string& address, const std::string& token, + TraceAsyncStreamFactoryPtr stream_factory, + CredentialsSharedPtr credentials) + : TraceAsyncClientImpl(address, token, std::move(stream_factory), + std::move(credentials)) {} TestStats getTestStats() const { TestStats stats(messages_total_.load(), messages_dropped_.load(), @@ -52,28 +55,47 @@ class TestGrpcAsyncSegmentReporterClient return stats; } - void notifyWriteEvent(bool success) { write_event_tag_->callback(success); } - void notifyStartEvent(bool success) { basic_event_tag_->callback(success); } + void notifyWriteEvent(bool success) { write_event_tag_.callback(success); } + void notifyStartEvent(bool success) { basic_event_tag_.callback(success); } uint64_t bufferSize() const { return message_buffer_.size(); } +}; - void startStream() override { - resetStream(); - active_stream_ = mock_stream_; +class TestTraceAsyncStreamFactory : public TraceAsyncStreamFactory { + public: + TestTraceAsyncStreamFactory(std::shared_ptr mock_stream) + : mock_stream_(mock_stream) {} + + class TestTraceAsyncStream : public TraceAsyncStream { + public: + TestTraceAsyncStream(std::shared_ptr mock_stream) + : mock_stream_(mock_stream) {} + void sendMessage(TraceRequestType message) override { + mock_stream_->sendMessage(std::move(message)); + } + std::shared_ptr mock_stream_; + }; + + TraceAsyncStreamPtr createStream(GrpcClientContextPtr, GrpcStub&, + GrpcCompletionQueue&, AsyncEventTag&, + AsyncEventTag&) override { + return TraceAsyncStreamPtr{new TestTraceAsyncStream(mock_stream_)}; } - std::shared_ptr mock_stream_ = - std::make_shared(); + std::shared_ptr mock_stream_; }; -class GrpcAsyncSegmentReporterClientTest : public testing::Test { +class TraceAsyncClientImplTest : public testing::Test { public: - GrpcAsyncSegmentReporterClientTest() { - client_.reset(new TestGrpcAsyncSegmentReporterClient( - address_, token_, grpc::InsecureChannelCredentials())); + TraceAsyncClientImplTest() { + client_.reset(new TestTraceAsyncClient( + address_, token_, + TraceAsyncStreamFactoryPtr{ + new TestTraceAsyncStreamFactory(mock_stream_)}, + grpc::InsecureChannelCredentials())); } - ~GrpcAsyncSegmentReporterClientTest() { + ~TraceAsyncClientImplTest() { client_->resetClient(); client_.reset(); } @@ -82,12 +104,15 @@ class GrpcAsyncSegmentReporterClientTest : public testing::Test { std::string address_{"localhost:50051"}; std::string token_{"token"}; - std::unique_ptr client_; + std::shared_ptr mock_stream_ = + std::make_shared(); + + std::unique_ptr client_; }; -TEST_F(GrpcAsyncSegmentReporterClientTest, SendMessageTest) { +TEST_F(TraceAsyncClientImplTest, SendMessageTest) { skywalking::v3::SegmentObject fake_message; - EXPECT_CALL(*client_->mock_stream_, sendMessage(_)).Times(0); + EXPECT_CALL(*mock_stream_, sendMessage(_)).Times(0); client_->sendMessage(fake_message); auto stats = client_->getTestStats(); @@ -109,7 +134,7 @@ TEST_F(GrpcAsyncSegmentReporterClientTest, SendMessageTest) { EXPECT_EQ(stats.pending_, 1); EXPECT_EQ(client_->bufferSize(), 1); - EXPECT_CALL(*client_->mock_stream_, sendMessage(_)); + EXPECT_CALL(*mock_stream_, sendMessage(_)); client_->notifyStartEvent(true); sleep(1); // wait for the event loop to process the event. @@ -137,7 +162,7 @@ TEST_F(GrpcAsyncSegmentReporterClientTest, SendMessageTest) { // Send another message. This time the stream is ready and // previous message is sent successfully. So the new message // should be sent immediately. - EXPECT_CALL(*client_->mock_stream_, sendMessage(_)); + EXPECT_CALL(*mock_stream_, sendMessage(_)); client_->sendMessage(fake_message); sleep(1); // wait for the event loop to process the event. diff --git a/test/mocks.h b/test/mocks.h index bcaaf07..d9f81c1 100644 --- a/test/mocks.h +++ b/test/mocks.h @@ -17,8 +17,6 @@ #include #include -#include - #include "cpp2sky/internal/async_client.h" #include "cpp2sky/internal/random_generator.h" @@ -33,14 +31,14 @@ class MockRandomGenerator : public RandomGenerator { MOCK_METHOD(std::string, uuid, ()); }; -class MockAsyncStream : public AsyncStream { +class MockTraceAsyncStream : public TraceAsyncStream { public: - MOCK_METHOD(void, sendMessage, (TracerRequestType)); + MOCK_METHOD(void, sendMessage, (TraceRequestType)); }; -class MockAsyncClient : public AsyncClient { +class MockTraceAsyncClient : public TraceAsyncClient { public: - MOCK_METHOD(void, sendMessage, (TracerRequestType)); + MOCK_METHOD(void, sendMessage, (TraceRequestType)); MOCK_METHOD(void, resetClient, ()); }; diff --git a/test/tracer_test.cc b/test/tracer_test.cc index 9ba8ed7..462eb07 100644 --- a/test/tracer_test.cc +++ b/test/tracer_test.cc @@ -27,8 +27,8 @@ TEST(TracerTest, MatchedOpShouldIgnored) { TracerConfig config; *config.add_ignore_operation_name_suffix() = "/ignored"; - TracerImpl tracer(config, - AsyncClientPtr{new testing::NiceMock()}); + TracerImpl tracer(config, TraceAsyncClientPtr{ + new testing::NiceMock()}); auto context = tracer.newContext(); auto span = context->createEntrySpan(); @@ -41,8 +41,8 @@ TEST(TracerTest, MatchedOpShouldIgnored) { TEST(TracerTest, NotClosedSpanExists) { TracerConfig config; - TracerImpl tracer(config, - AsyncClientPtr{new testing::NiceMock()}); + TracerImpl tracer(config, TraceAsyncClientPtr{ + new testing::NiceMock()}); auto context = tracer.newContext(); auto span = context->createEntrySpan(); @@ -54,8 +54,8 @@ TEST(TracerTest, NotClosedSpanExists) { TEST(TracerTest, Success) { TracerConfig config; - auto mock_reporter = std::unique_ptr{ - new testing::NiceMock()}; + auto mock_reporter = std::unique_ptr{ + new testing::NiceMock()}; EXPECT_CALL(*mock_reporter, sendMessage(_)); TracerImpl tracer(config, std::move(mock_reporter)); From fc2a928850d496c88c85b438033b84b00096fe37 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Thu, 15 Aug 2024 12:36:36 +0800 Subject: [PATCH 6/7] revert parts unnecessary change and minor update Signed-off-by: wbpcode --- cpp2sky/internal/async_client.h | 35 +++++++++++++++------------------ test/e2e/main.py | 5 ++++- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/cpp2sky/internal/async_client.h b/cpp2sky/internal/async_client.h index a2f88a7..a97f9e2 100644 --- a/cpp2sky/internal/async_client.h +++ b/cpp2sky/internal/async_client.h @@ -28,9 +28,9 @@ namespace cpp2sky { * Template base class for gRPC async client. */ template -class AsyncClientBase { +class AsyncClient { public: - virtual ~AsyncClientBase() = default; + virtual ~AsyncClient() = default; /** * Send the specified protobuf message. @@ -45,17 +45,16 @@ class AsyncClientBase { }; template -using AsyncClientBasePtr = - std::unique_ptr>; +using AsyncClientPtr = std::unique_ptr>; /** * Template base class for gRPC async stream. The stream is used to represent * a single gRPC stream/request. */ template -class AsyncStreamBase { +class AsyncStream { public: - virtual ~AsyncStreamBase() = default; + virtual ~AsyncStream() = default; /** * Send the specified protobuf message. @@ -64,8 +63,7 @@ class AsyncStreamBase { }; template -using AsyncStreamBasePtr = - std::unique_ptr>; +using AsyncStreamPtr = std::unique_ptr>; /** * Tag for async operation. The callback should be called when the operation is @@ -83,11 +81,11 @@ using GrpcCompletionQueue = grpc::CompletionQueue; * Factory for creating async stream. */ template -class AsyncStreamFactoryBase { +class AsyncStreamFactory { public: - virtual ~AsyncStreamFactoryBase() = default; + virtual ~AsyncStreamFactory() = default; - using AsyncStreamPtr = AsyncStreamBasePtr; + using AsyncStreamPtr = AsyncStreamPtr; using GrpcStub = grpc::TemplatedGenericStub; virtual AsyncStreamPtr createStream(GrpcClientContextPtr client_ctx, @@ -97,22 +95,21 @@ class AsyncStreamFactoryBase { }; template -using AsyncStreamFactoryBasePtr = - std::unique_ptr>; +using AsyncStreamFactoryPtr = + std::unique_ptr>; using TraceRequestType = skywalking::v3::SegmentObject; using TraceResponseType = skywalking::v3::Commands; -using TraceAsyncStream = AsyncStreamBase; -using TraceAsyncStreamPtr = - AsyncStreamBasePtr; +using TraceAsyncStream = AsyncStream; +using TraceAsyncStreamPtr = AsyncStreamPtr; using TraceAsyncStreamFactory = - AsyncStreamFactoryBase; + AsyncStreamFactory; using TraceAsyncStreamFactoryPtr = - AsyncStreamFactoryBasePtr; + AsyncStreamFactoryPtr; -using TraceAsyncClient = AsyncClientBase; +using TraceAsyncClient = AsyncClient; using TraceAsyncClientPtr = std::unique_ptr; } // namespace cpp2sky diff --git a/test/e2e/main.py b/test/e2e/main.py index 5463b2d..91210a1 100644 --- a/test/e2e/main.py +++ b/test/e2e/main.py @@ -26,6 +26,9 @@ def validate(expected_file_name): res = requests.get('http://0.0.0.0:12800/receiveData') actual_data = yaml.dump(yaml.load(res.content, Loader=Loader)) + print('actual data: ') + print(actual_data) + differ = Differ() diff_list = list(differ.compare( actual_data.splitlines(keepends=True), @@ -42,7 +45,7 @@ def validate(expected_file_name): parser.add_argument('--expected_file', help='File name which includes expected reported value') parser.add_argument('--max_retry_times', help='Max retry times', type=int) parser.add_argument('--target_path', help='Specify target path') - + args = parser.parse_args() retry_times = 0 From 6817f8723c4cb95b55c83eb0bcd8b6273524ccc4 Mon Sep 17 00:00:00 2001 From: wbpcode Date: Thu, 15 Aug 2024 13:23:02 +0800 Subject: [PATCH 7/7] fix naming] Signed-off-by: wbpcode --- cpp2sky/internal/async_client.h | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp2sky/internal/async_client.h b/cpp2sky/internal/async_client.h index a97f9e2..c71c1c2 100644 --- a/cpp2sky/internal/async_client.h +++ b/cpp2sky/internal/async_client.h @@ -85,13 +85,13 @@ class AsyncStreamFactory { public: virtual ~AsyncStreamFactory() = default; - using AsyncStreamPtr = AsyncStreamPtr; + using StreamPtr = AsyncStreamPtr; using GrpcStub = grpc::TemplatedGenericStub; - virtual AsyncStreamPtr createStream(GrpcClientContextPtr client_ctx, - GrpcStub& stub, GrpcCompletionQueue& cq, - AsyncEventTag& basic_event_tag, - AsyncEventTag& write_event_tag) = 0; + virtual StreamPtr createStream(GrpcClientContextPtr client_ctx, + GrpcStub& stub, GrpcCompletionQueue& cq, + AsyncEventTag& basic_event_tag, + AsyncEventTag& write_event_tag) = 0; }; template