Skip to content

Commit

Permalink
feat: cache incomplete line in memory to avoid repeat read from disk (#…
Browse files Browse the repository at this point in the history
…1142)

* read file only once

* feat: cache incomplete log in memory to avoid repeated read from disk

* avoid negative read size theoretically

* fix new pipeline crash when using exacly once

* fix reader's host log path
  • Loading branch information
yyuuttaaoo authored Sep 22, 2023
1 parent 567ec12 commit 42df83e
Show file tree
Hide file tree
Showing 23 changed files with 524 additions and 226 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ your changes, such as:
- [public] [both] [updated] Flusher Kafka V2: support send the message with headers to kafka
- [public] [both] [updated] update gcc version to 9.3.1
- [public] [both] [updated] add make flag WITHOUTGDB
- [public] [both] [updated] cache incomplete line in memory to avoid repeated read system call
- [public] [both] [fixed] Add APSARA\_LOG\_TRACE to solve the problem of not being able to find LOG\_TRACE.
- [public] [both] [fixed] fix multiline is splitted if not flushed to disk together
- [public] [both] [fixed] fix line is truncated if \0 is in the middle of line
Expand Down
2 changes: 1 addition & 1 deletion core/aggregator/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ bool Aggregator::Add(const std::string& projectName,
auto& logPosition = context.mExactlyOnceCheckpoint->positions[logIdx];
auto& cpt = value->mLogGroupContext.mExactlyOnceCheckpoint->data;

// First log, upodate read_offset.
// First log, update read_offset.
if (1 == value->mLogGroup.logs_size()) {
cpt.set_read_offset(logPosition.first);
}
Expand Down
14 changes: 11 additions & 3 deletions core/checkpoint/CheckPointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
string realFilePath;
int32_t fileOpenFlag = 0; // default, we close file ptr
int32_t containerStopped = 0;
int32_t lastForceRead = 0;
if (meta.isMember("real_file_name")) {
realFilePath = meta["real_file_name"].asString();
}
Expand Down Expand Up @@ -232,6 +233,9 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
if (meta.isMember("container_stopped")) {
containerStopped = meta["container_stopped"].asInt();
}
if (meta.isMember("last_force_read")) {
lastForceRead = meta["last_force_read"].asInt();
}
// can not get file's dev inode
if (!devInode.IsValid()) {
LOG_WARNING(sLogger, ("can not find check point dev inode, discard it", filePath));
Expand All @@ -250,7 +254,8 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
configName,
realFilePath,
fileOpenFlag != 0,
containerStopped != 0);
containerStopped != 0,
lastForceRead != 0);
ptr->mLastUpdateTime = update_time;
AddCheckPoint(ptr);
} else {
Expand Down Expand Up @@ -281,7 +286,8 @@ void CheckPointManager::LoadFileCheckPoint(const Json::Value& root) {
allConfig[i]->mConfigName,
realFilePath,
fileOpenFlag != 0,
containerStopped != 0);
containerStopped != 0,
lastForceRead != 0);
ptr->mLastUpdateTime = update_time;
AddCheckPoint(ptr);
}
Expand Down Expand Up @@ -325,6 +331,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
leaf["dev"] = Json::Value(Json::UInt64(checkPointPtr->mDevInode.dev));
leaf["file_open"] = Json::Value(checkPointPtr->mFileOpenFlag ? 1 : 0);
leaf["container_stopped"] = Json::Value(checkPointPtr->mContainerStopped ? 1 : 0);
leaf["last_force_read"] = Json::Value(checkPointPtr->mLastForceRead ? 1 : 0);
leaf["config_name"] = Json::Value(checkPointPtr->mConfigName);
// forward compatible
leaf["sig"] = Json::Value(string(""));
Expand Down Expand Up @@ -353,6 +360,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
leaf["dev"] = Json::Value(Json::UInt64(checkPointPtr->mDevInode.dev));
leaf["file_open"] = Json::Value(checkPointPtr->mFileOpenFlag ? 1 : 0);
leaf["container_stopped"] = Json::Value(checkPointPtr->mContainerStopped ? 1 : 0);
leaf["last_force_read"] = Json::Value(checkPointPtr->mLastForceRead ? 1 : 0);
leaf["config_name"] = Json::Value(checkPointPtr->mConfigName);
// forward compatible
leaf["sig"] = Json::Value(string(""));
Expand Down Expand Up @@ -537,7 +545,7 @@ void CheckPointManager::PrintStatus() {
for (DevInodeCheckPointHashMap::iterator it = mDevInodeCheckPointPtrMap.begin();
it != mDevInodeCheckPointPtrMap.end();
++it) {
printf("File %s\n", it->second.get()->mFileName.c_str());
printf("Inode %lu, File %s\n", it->first.mDevInode.inode, it->second.get()->mFileName.c_str());
}
printf("\n");
for (std::unordered_map<std::string, DirCheckPointPtr>::iterator it = mDirNameMap.begin(); it != mDirNameMap.end();
Expand Down
30 changes: 17 additions & 13 deletions core/checkpoint/CheckPointManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ namespace logtail {
#define NO_CHECKPOINT_VERSION 0
class CheckPoint {
public:
DevInode mDevInode;
int64_t mOffset = 0;
uint64_t mSignatureHash = 0;
uint32_t mSignatureSize = 0;
int32_t mLastUpdateTime = 0;
bool mFileOpenFlag = false;
bool mContainerStopped = false;
bool mLastForceRead = false;
std::string mCache;
std::string mConfigName;
std::string mFileName;
std::string mRealFileName;
DevInode mDevInode;
int64_t mOffset;
uint64_t mSignatureHash;
uint32_t mSignatureSize;
int32_t mLastUpdateTime;
bool mFileOpenFlag;
bool mContainerStopped;

CheckPoint() {}

Expand All @@ -56,17 +58,19 @@ class CheckPoint {
const std::string& configName,
const std::string& realFileName,
bool fileOpenFlag,
bool containerStopped)
: mConfigName(configName),
mFileName(filename),
mRealFileName(realFileName),
mDevInode(devInode),
bool containerStopped,
bool lastForceRead)
: mDevInode(devInode),
mOffset(offset),
mSignatureHash(signatureHash),
mSignatureSize(signatureSize),
mLastUpdateTime(0),
mFileOpenFlag(fileOpenFlag),
mContainerStopped(containerStopped) {}
mContainerStopped(containerStopped),
mLastForceRead(lastForceRead),
mConfigName(configName),
mFileName(filename),
mRealFileName(realFileName) {}
};

class DirCheckPoint {
Expand Down
2 changes: 1 addition & 1 deletion core/common/EncodingConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ EncodingConverter::~EncodingConverter() {

// TODO: Refactor it, do not use the output params to do calculations, set them before return.
size_t EncodingConverter::ConvertGbk2Utf8(
const char* src, size_t* srcLength, char* desOut, size_t desLength, const std::vector<size_t>& linePosVec) const {
const char* src, size_t* srcLength, char* desOut, size_t desLength, const std::vector<long>& linePosVec) const {
#if defined(__linux__)
if (src == NULL || *srcLength == 0 || mGbk2Utf8Cd == (iconv_t)(-1)) {
LOG_ERROR(sLogger, ("invalid iconv descriptor fail or invalid buffer pointer, cd", mGbk2Utf8Cd));
Expand Down
2 changes: 1 addition & 1 deletion core/common/EncodingConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class EncodingConverter {
// - For Windows, ConvertGbk2Utf8 converts whole @src, if any errors happened,
// 0 will be returned (ignore @linePosVec).
size_t ConvertGbk2Utf8(
const char* src, size_t* srcLength, char* des, size_t desLength, const std::vector<size_t>& linePosVec) const;
const char* src, size_t* srcLength, char* des, size_t desLength, const std::vector<long>& linePosVec) const;

#if defined(_MSC_VER)
// FromUTF8ToACP converts @s encoded in UTF8 to ACP.
Expand Down
8 changes: 8 additions & 0 deletions core/common/FileSystemUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ std::string AbsolutePath(const std::string& path, const std::string& basepath) {
return boost::filesystem::absolute(path, basepath).string();
}

std::string NormalizePath(const std::string& path) {
boost::filesystem::path abs(path);
if (abs.filename_is_dot() || abs.filename_is_dot_dot()) {
abs.remove_filename();
}
return abs.string();
}

int FSeek(FILE* stream, int64_t offset, int origin) {
#if defined(_MSC_VER)
return _fseeki64(stream, offset, origin);
Expand Down
5 changes: 5 additions & 0 deletions core/common/FileSystemUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ inline bool IsEEXIST(int e) {
}

bool IsRelativePath(const std::string& path);
// . -> /usr/local/ilogtail/.
// ./a.txt -> /usr/local/ilogtail/./a.txt
std::string AbsolutePath(const std::string& path, const std::string& basepath);
// /usr/local/ilogtail/. -> /usr/local/ilogtail
// /usr/local/ilogtail/./a.txt -> /usr/local/ilogtail/a.txt
std::string NormalizePath(const std::string& path);

// FSeek, FTell that can handle file large than 2GB on 32bits OS.
int FSeek(FILE* stream, int64_t offset, int origin);
Expand Down
2 changes: 1 addition & 1 deletion core/common/StringTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ std::vector<std::string> StringSpliter(const std::string& str, const std::string
tokens.push_back(token);
prev = pos + delim.length();
} while (pos < str.length() && prev < str.length());
return std::move(tokens);
return tokens;
}

void ReplaceString(std::string& raw, const std::string& src, const std::string& dst) {
Expand Down
4 changes: 4 additions & 0 deletions core/config_manager/ConfigManagerBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,10 @@ void ConfigManagerBase::LoadSingleUserConfig(const std::string& logName, const J
if (!tmpLogPath.empty())
logPath = tmpLogPath;
}
if (IsRelativePath(logPath)) {
logPath = NormalizePath(AbsolutePath(logPath, AppConfig::GetInstance()->GetProcessExecutionDir()));
}

// one may still make mistakes, teminate logPath by '/'
size_t size = logPath.size();
if (size > 0 && PATH_SEPARATOR[0] == logPath[size - 1])
Expand Down
1 change: 1 addition & 0 deletions core/config_manager/ConfigYamlToJson.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ ConfigYamlToJson::ConfigYamlToJson() {
mFileAdvancedConfigMap["PreciseTimestampUnit"] = "precise_timestamp_unit";
mFileAdvancedConfigMap["ForceMultiConfig"] = "force_multiconfig";
mFileAdvancedConfigMap["TailSizeKB"] = "tail_size_kb";
mFileAdvancedConfigMap["ExactlyOnceConcurrency"] = "exactly_once_concurrency";
mFileAdvancedConfigMap["EnableLogPositionMeta"] = "enable_log_position_meta";
mFileAdvancedConfigMap["RawLogTag"] = "raw_log_tag";

Expand Down
1 change: 1 addition & 0 deletions core/controller/EventDispatcherBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ void EventDispatcherBase::AddExistedCheckPointFileEvents() {
cpt.config_name(),
cpt.real_path(),
1,
0,
0);
const auto result = validateCheckpoint(v1Cpt, cachePathDevInodeMap, eventVec);
switch (result) {
Expand Down
4 changes: 3 additions & 1 deletion core/logtail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ void do_worker_process() {
LogtailAlarm::GetInstance()->SendAlarm(LOGTAIL_CRASH_STACK_ALARM, backTraceStr);
}

InitCrashBackTrace();
if (BOOL_FLAG(ilogtail_disable_core)) {
InitCrashBackTrace();
}

LogtailMonitor::Instance()->InitMonitor();
LogFilter::Instance()->InitFilter(STRING_FLAG(user_log_config));
Expand Down
4 changes: 3 additions & 1 deletion core/logtail_windows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ void do_worker_process() {
APSARA_LOG_ERROR(sLogger, ("last logtail crash stack", backTraceStr)("stack size", backTraceStr.length()));
LogtailAlarm::GetInstance()->SendAlarm(LOGTAIL_CRASH_STACK_ALARM, backTraceStr);
}
InitCrashBackTrace();
if (BOOL_FLAG(ilogtail_disable_core)) {
InitCrashBackTrace();
}

auto& startupHints = STRING_FLAG(ilogtail_daemon_startup_hints);
if (!startupHints.empty()) {
Expand Down
31 changes: 19 additions & 12 deletions core/processor/LogProcess.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,19 @@ int LogProcess::ProcessBuffer(std::shared_ptr<LogBuffer>& logBuffer,
return 1;
}
FillLogGroupAllNative(eventGroup, logFileReader, resultGroup);
// record log positions for exactly once.
if (logBuffer->exactlyOnceCheckpoint) {
// I think one just record buffer offset and length is enough
// There is no need to record offset and length for each event
std::pair<size_t, size_t> pos(logBuffer->beginOffset, logBuffer->rawBuffer.size());
logBuffer->exactlyOnceCheckpoint->positions.assign(eventGroup.GetEvents().size(), pos);
}
return 0;
}

void LogProcess::FillLogGroupLogs(const PipelineEventGroup& eventGroup, sls_logs::LogGroup& resultGroup, bool enableTimestampNanosecond) {
void LogProcess::FillLogGroupLogs(const PipelineEventGroup& eventGroup,
sls_logs::LogGroup& resultGroup,
bool enableTimestampNanosecond) {
for (auto& event : eventGroup.GetEvents()) {
if (!event.Is<LogEvent>()) {
continue;
Expand Down Expand Up @@ -725,18 +734,16 @@ int LogProcess::ProcessBufferLegacy(std::shared_ptr<LogBuffer>& logBuffer,
}
// record log positions for exactly once.
if (logBuffer->exactlyOnceCheckpoint && logPtr != nullptr) {
if (logBuffer->exactlyOnceCheckpoint) {
int32_t length = 0;
if (1 == lines) {
length = rawBuffer.size() + 1;
} else if (i != lines - 1) {
length = logIndex[i].size() + 1;
} else {
length = rawBuffer.size() - (logIndex[i].data() - rawBuffer.data());
}
logBuffer->exactlyOnceCheckpoint->positions.emplace_back(
std::make_pair(offset, static_cast<size_t>(length)));
int32_t length = 0;
if (1 == lines) {
length = rawBuffer.size();
} else if (i != lines - 1) {
length = logIndex[i + 1].data() - logIndex[i].data();
} else {
length = rawBuffer.size() - (logIndex[i].data() - rawBuffer.data());
}
logBuffer->exactlyOnceCheckpoint->positions.emplace_back(
std::make_pair(offset, static_cast<size_t>(length)));
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions core/processor/ProcessorParseDelimiterNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ bool ProcessorParseDelimiterNative::ProcessEvent(const StringView& logPath, Pipe
for (size_t i = mColumnKeys.size(); i < columnValues.size(); ++i) {
extraFields[0] = mSeparatorChar;
extraFields++;
strcpy(extraFields, columnValues[i].data());
memcpy(extraFields, columnValues[i].data(), columnValues[i].size());
extraFields += columnValues[i].size();
}
// remove extra fields
Expand Down Expand Up @@ -206,9 +206,8 @@ bool ProcessorParseDelimiterNative::ProcessEvent(const StringView& logPath, Pipe
if (mExtractPartialFields) {
continue;
}
StringBuffer sb = sourceEvent.GetSourceBuffer()->AllocateStringBuffer(10 + idx / 10);
std::string key = "__column" + ToString(idx) + "__";
strcpy(sb.data, key.c_str());
StringBuffer sb = sourceEvent.GetSourceBuffer()->CopyString(key);
AddLog(StringView(sb.data, sb.size),
useQuote ? columnValues[idx] : StringView(buffer.data() + colBegIdxs[idx], colLens[idx]),
sourceEvent);
Expand Down
1 change: 1 addition & 0 deletions core/reader/CommonRegLogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class CommonRegLogFileReader : public LogFileReader {

#ifdef APSARA_UNIT_TEST_MAIN
friend class LogFileReaderUnittest;
friend class LogFileReaderCheckpointUnittest;
friend class CommonRegParseLogLineUnittest;
#endif
};
Expand Down
Loading

0 comments on commit 42df83e

Please sign in to comment.