Skip to content

Commit

Permalink
update: check pipeline queue before scrape, try again after 1 second …
Browse files Browse the repository at this point in the history
…if is not valid to push (alibaba#1757)
  • Loading branch information
catdogpandas authored Oct 8, 2024
1 parent 6526c34 commit 6d9d5f2
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 37 deletions.
5 changes: 4 additions & 1 deletion core/common/timer/Timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,15 @@ void Timer::Run() {
mCV.wait_for(threadLock, timeout);
break;
} else {
auto e = std::move(const_cast<unique_ptr<TimerEvent>&>(mQueue.top()));
mQueue.pop();
queueLock.unlock();
if (!e->IsValid()) {
LOG_INFO(sLogger, ("invalid timer event", "task is cancelled"));
} else {
e->Execute();
}
mQueue.pop();
queueLock.lock();
}
}
}
Expand Down
1 change: 1 addition & 0 deletions core/common/timer/Timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Timer {

#ifdef APSARA_UNIT_TEST_MAIN
friend class TimerUnittest;
friend class ScrapeSchedulerUnittest;
#endif
};

Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputPrometheus.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class InputPrometheus : public Input {
bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override;
bool Start() override;
bool Stop(bool isPipelineRemoving) override;
bool SupportAck() const override { return false; }
bool SupportAck() const override { return true; }

private:
bool CreateInnerProcessors(const Json::Value& inputConfig);
Expand Down
22 changes: 16 additions & 6 deletions core/prometheus/async/PromFuture.cpp
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
#include "prometheus/async/PromFuture.h"

#include "common/Lock.h"
#include "common/http/HttpResponse.h"

namespace logtail {

void PromFuture::Process(const HttpResponse& response, uint64_t timestampMilliSec) {
template <typename... Args>
bool PromFuture<Args...>::Process(Args... args) {
WriteLock lock(mStateRWLock);
if (mState == PromFutureState::New) {
for (auto& callback : mDoneCallbacks) {
callback(response, timestampMilliSec);
if (!callback(std::forward<Args>(args)...)) {
mState = PromFutureState::Done;
return false;
}
}
mState = PromFutureState::Done;
} else {
return;
}

return true;
}

void PromFuture::AddDoneCallback(std::function<void(const HttpResponse&, uint64_t timestampMilliSec)>&& callback) {
template <typename... Args>
void PromFuture<Args...>::AddDoneCallback(CallbackSignature&& callback) {
mDoneCallbacks.emplace_back(std::move(callback));
}

void PromFuture::Cancel() {
template <typename... Args>
void PromFuture<Args...>::Cancel() {
WriteLock lock(mStateRWLock);
mState = PromFutureState::Done;
}

template class PromFuture<const HttpResponse&, uint64_t>;
template class PromFuture<>;

} // namespace logtail
11 changes: 7 additions & 4 deletions core/prometheus/async/PromFuture.h
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
#pragma once

#include <functional>

#include "common/Lock.h"
#include "common/http/HttpResponse.h"

namespace logtail {

enum class PromFutureState { New, Processing, Done };

template <typename... Args>
class PromFuture {
public:
using CallbackSignature = std::function<bool(Args...)>;
// Process should support oneshot and streaming mode.
void Process(const HttpResponse&, uint64_t timestampMilliSec);
bool Process(Args...);

void AddDoneCallback(std::function<void(const HttpResponse&, uint64_t timestampMilliSec)>&& callback);
void AddDoneCallback(CallbackSignature&&);

void Cancel();

protected:
PromFutureState mState = {PromFutureState::New};
ReadWriteLock mStateRWLock;

std::vector<std::function<void(const HttpResponse&, uint64_t timestampMilliSec)>> mDoneCallbacks;
std::vector<CallbackSignature> mDoneCallbacks;

#ifdef APSARA_UNIT_TEST_MAIN
friend class ScrapeSchedulerUnittest;
Expand Down
15 changes: 11 additions & 4 deletions core/prometheus/async/PromHttpRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,24 @@ PromHttpRequest::PromHttpRequest(const std::string& method,
const std::string& body,
uint32_t timeout,
uint32_t maxTryCnt,
std::shared_ptr<PromFuture> future)
std::shared_ptr<PromFuture<const HttpResponse&, uint64_t>> future,
std::shared_ptr<PromFuture<>> isContextValidFuture)
: AsynHttpRequest(method, httpsFlag, host, port, url, query, header, body, timeout, maxTryCnt),
mFuture(std::move(future)) {
mFuture(std::move(future)),
mIsContextValidFuture(std::move(isContextValidFuture)) {
}

void PromHttpRequest::OnSendDone(const HttpResponse& response) {
mFuture->Process(response,
std::chrono::duration_cast<std::chrono::milliseconds>(mLastSendTime.time_since_epoch()).count());
if (mFuture != nullptr) {
mFuture->Process(
response, std::chrono::duration_cast<std::chrono::milliseconds>(mLastSendTime.time_since_epoch()).count());
}
}

[[nodiscard]] bool PromHttpRequest::IsContextValid() const {
if (mIsContextValidFuture != nullptr) {
return mIsContextValidFuture->Process();
}
return true;
}

Expand Down
6 changes: 4 additions & 2 deletions core/prometheus/async/PromHttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class PromHttpRequest : public AsynHttpRequest {
const std::string& body,
uint32_t timeout,
uint32_t maxTryCnt,
std::shared_ptr<PromFuture> future);
std::shared_ptr<PromFuture<const HttpResponse&, uint64_t>> future,
std::shared_ptr<PromFuture<>> isContextValidFuture = nullptr);
PromHttpRequest(const PromHttpRequest&) = default;
~PromHttpRequest() override = default;

Expand All @@ -30,7 +31,8 @@ class PromHttpRequest : public AsynHttpRequest {
private:
void SetNextExecTime(std::chrono::steady_clock::time_point execTime);

std::shared_ptr<PromFuture> mFuture;
std::shared_ptr<PromFuture<const HttpResponse&, uint64_t>> mFuture;
std::shared_ptr<PromFuture<>> mIsContextValidFuture;
};

} // namespace logtail
8 changes: 7 additions & 1 deletion core/prometheus/schedulers/BaseScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@
namespace logtail {
void BaseScheduler::ExecDone() {
mExecCount++;
mLatestExecTime = mFirstExecTime + std::chrono::seconds(mExecCount * mInterval);
}

std::chrono::steady_clock::time_point BaseScheduler::GetNextExecTime() {
return mFirstExecTime + std::chrono::seconds(mExecCount * mInterval);
return mLatestExecTime;
}

void BaseScheduler::SetFirstExecTime(std::chrono::steady_clock::time_point firstExecTime) {
mFirstExecTime = firstExecTime;
mLatestExecTime = mFirstExecTime;
}

void BaseScheduler::DelayExecTime(uint64_t delaySeconds) {
mLatestExecTime = mLatestExecTime + std::chrono::seconds(delaySeconds);
}

void BaseScheduler::Cancel() {
Expand Down
6 changes: 5 additions & 1 deletion core/prometheus/schedulers/BaseScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <memory>

#include "common/http/HttpResponse.h"
#include "prometheus/async/PromFuture.h"

namespace logtail {
Expand All @@ -18,18 +19,21 @@ class BaseScheduler {
std::chrono::steady_clock::time_point GetNextExecTime();

void SetFirstExecTime(std::chrono::steady_clock::time_point firstExecTime);
void DelayExecTime(uint64_t delaySeconds);

virtual void Cancel();

protected:
bool IsCancelled();

std::chrono::steady_clock::time_point mFirstExecTime;
std::chrono::steady_clock::time_point mLatestExecTime;
int64_t mExecCount = 0;
int64_t mInterval = 0;

ReadWriteLock mLock;
bool mValidState = true;
std::shared_ptr<PromFuture> mFuture;
std::shared_ptr<PromFuture<const HttpResponse&, uint64_t>> mFuture;
std::shared_ptr<PromFuture<>> mIsContextValidFuture;
};
} // namespace logtail
35 changes: 30 additions & 5 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKey.h"
#include "prometheus/Constants.h"
#include "prometheus/async/PromFuture.h"
#include "prometheus/async/PromHttpRequest.h"

using namespace std;
Expand Down Expand Up @@ -99,40 +100,60 @@ void ScrapeScheduler::PushEventGroup(PipelineEventGroup&& eGroup) {
auto item = make_unique<ProcessQueueItem>(std::move(eGroup), mInputIndex);
#ifdef APSARA_UNIT_TEST_MAIN
mItem.push_back(std::move(item));
return;
#endif
ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item));
while (true) {
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item)) == 0) {
break;
}
usleep(10 * 1000);
}
}

string ScrapeScheduler::GetId() const {
return mHash;
}

void ScrapeScheduler::ScheduleNext() {
auto future = std::make_shared<PromFuture>();
auto future = std::make_shared<PromFuture<const HttpResponse&, uint64_t>>();
auto isContextValidFuture = std::make_shared<PromFuture<>>();
future->AddDoneCallback([this](const HttpResponse& response, uint64_t timestampMilliSec) {
this->OnMetricResult(response, timestampMilliSec);
this->ExecDone();
this->ScheduleNext();
return true;
});
isContextValidFuture->AddDoneCallback([this]() -> bool {
if (ProcessQueueManager::GetInstance()->IsValidToPush(mQueueKey)) {
return true;
} else {
this->DelayExecTime(1);
this->ScheduleNext();
return false;
}
});

if (IsCancelled()) {
mFuture->Cancel();
mIsContextValidFuture->Cancel();
return;
}

{
WriteLock lock(mLock);
mFuture = future;
mIsContextValidFuture = isContextValidFuture;
}

auto event = BuildScrapeTimerEvent(GetNextExecTime());
mTimer->PushEvent(std::move(event));
}

void ScrapeScheduler::ScrapeOnce(std::chrono::steady_clock::time_point execTime) {
auto future = std::make_shared<PromFuture>();
auto future = std::make_shared<PromFuture<const HttpResponse&, uint64_t>>();
future->AddDoneCallback([this](const HttpResponse& response, uint64_t timestampMilliSec) {
this->OnMetricResult(response, timestampMilliSec);
return true;
});
mFuture = future;
auto event = BuildScrapeTimerEvent(execTime);
Expand All @@ -153,15 +174,19 @@ std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::
mScrapeConfigPtr->mScrapeTimeoutSeconds,
mScrapeConfigPtr->mScrapeIntervalSeconds
/ mScrapeConfigPtr->mScrapeTimeoutSeconds,
this->mFuture);
this->mFuture,
this->mIsContextValidFuture);
auto timerEvent = std::make_unique<HttpRequestTimerEvent>(execTime, std::move(request));
return timerEvent;
}

void ScrapeScheduler::Cancel() {
if (mFuture) {
if (mFuture != nullptr) {
mFuture->Cancel();
}
if (mIsContextValidFuture != nullptr) {
mIsContextValidFuture->Cancel();
}
{
WriteLock lock(mLock);
mValidState = false;
Expand Down
6 changes: 4 additions & 2 deletions core/prometheus/schedulers/TargetSubscriberScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,12 @@ string TargetSubscriberScheduler::GetId() const {
}

void TargetSubscriberScheduler::ScheduleNext() {
auto future = std::make_shared<PromFuture>();
auto future = std::make_shared<PromFuture<const HttpResponse&, uint64_t>>();
future->AddDoneCallback([this](const HttpResponse& response, uint64_t timestampMilliSec) {
this->OnSubscription(response, timestampMilliSec);
this->ExecDone();
this->ScheduleNext();
return true;
});
if (IsCancelled()) {
mFuture->Cancel();
Expand All @@ -267,9 +268,10 @@ void TargetSubscriberScheduler::Cancel() {
}

void TargetSubscriberScheduler::SubscribeOnce(std::chrono::steady_clock::time_point execTime) {
auto future = std::make_shared<PromFuture>();
auto future = std::make_shared<PromFuture<const HttpResponse&, uint64_t>>();
future->AddDoneCallback([this](const HttpResponse& response, uint64_t timestampNanoSec) {
this->OnSubscription(response, timestampNanoSec);
return true;
});
mFuture = future;
auto event = BuildSubscriberTimerEvent(execTime);
Expand Down
3 changes: 2 additions & 1 deletion core/unittest/prometheus/PromAsynUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ class PromAsynUnittest : public testing::Test {
};

void PromAsynUnittest::TestExecTime() {
auto future = std::make_shared<PromFuture>();
auto future = std::make_shared<PromFuture<const HttpResponse&, uint64_t>>();
auto now = std::chrono::system_clock::now();
bool exec = false;
future->AddDoneCallback([&exec, now](const HttpResponse&, uint64_t timestampMilliSec) {
APSARA_TEST_EQUAL(timestampMilliSec,
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count());

APSARA_TEST_TRUE(exec);
return true;
});
auto request = std::make_shared<PromHttpRequest>(
"http", false, "127.0.0.1", 8080, "/", "", map<string, string>(), "", 10, 3, future);
Expand Down
Loading

0 comments on commit 6d9d5f2

Please sign in to comment.