Skip to content

Commit

Permalink
MINIFICPP-1247 - Enhance logging for CWEL
Browse files Browse the repository at this point in the history
Signed-off-by: Arpad Boda <aboda@apache.org>

This closes civetweb#812
  • Loading branch information
nghiaxlee authored and arpadboda committed Jun 18, 2020
1 parent b107c86 commit 8b73e55
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 8 deletions.
4 changes: 3 additions & 1 deletion extensions/windows-event-log/Bookmark.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// TODO: Where is our License O_O

#include "Bookmark.h"

#include <direct.h>
Expand Down Expand Up @@ -121,8 +123,8 @@ bool Bookmark::getNewBookmarkXml(EVT_HANDLE hEvent, std::wstring& bookmarkXml) {
LOG_LAST_ERROR(EvtUpdateBookmark);
return false;
}

// Render the bookmark as an XML string that can be persisted.
logger_->log_trace("Rendering new bookmark");
DWORD bufferSize{};
DWORD bufferUsed{};
DWORD propertyCount{};
Expand Down
53 changes: 47 additions & 6 deletions extensions/windows-event-log/ConsumeWindowsEventLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,20 @@ ConsumeWindowsEventLog::ConsumeWindowsEventLog(const std::string& name, utils::I
writePlainText_ = false;
}

void ConsumeWindowsEventLog::notifyStop() {
std::lock_guard<std::mutex> lock(onTriggerMutex_);
logger_->log_trace("start notifyStop");
pBookmark_.reset();
if (hMsobjsDll_) {
if (FreeLibrary(hMsobjsDll_)) {
hMsobjsDll_ = nullptr;
} else {
LOG_LAST_ERROR(LoadLibrary);
}
}
logger_->log_trace("finish notifyStop");
}

ConsumeWindowsEventLog::~ConsumeWindowsEventLog() {
if (hMsobjsDll_) {
FreeLibrary(hMsobjsDll_);
Expand Down Expand Up @@ -267,24 +281,26 @@ void ConsumeWindowsEventLog::onSchedule(const std::shared_ptr<core::ProcessConte
context->getProperty(BookmarkRootDirectory.getName(), bookmarkDir);
if (bookmarkDir.empty()) {
logger_->log_error("State Directory is empty");
return;
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "State Directory is empty");
}
pBookmark_ = std::make_unique<Bookmark>(wstrChannel_, wstrQuery_, bookmarkDir, getUUIDStr(), processOldEvents, state_manager_, logger_);
if (!*pBookmark_) {
pBookmark_.reset();
return;
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Bookmark is empty");
}
}

context->getProperty(MaxBufferSize.getName(), maxBufferSize_);
logger_->log_debug("ConsumeWindowsEventLog: maxBufferSize_ %" PRIu64, maxBufferSize_);

provenanceUri_ = "winlog://" + computerName_ + "/" + channel_ + "?" + query;
logger_->log_trace("Successfully configured CWEL");
}


void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
if (!pBookmark_) {
logger_->log_debug("pBookmark_ is null");
context->yield();
return;
}
Expand All @@ -295,6 +311,8 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
return;
}

logger_->log_trace("CWEL onTrigger");

struct TimeDiff {
auto operator()() const {
return int64_t{ std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - time_).count() };
Expand Down Expand Up @@ -332,23 +350,28 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
const auto hEventResults = EvtQuery(0, wstrChannel_.c_str(), wstrQuery_.c_str(), EvtQueryChannelPath);
if (!hEventResults) {
LOG_LAST_ERROR(EvtQuery);
context->yield();
return;
}
const utils::ScopeGuard guard_hEventResults([hEventResults]() { EvtClose(hEventResults); });

logger_->log_trace("Retrieved results in Channel: %ls with Query: %ls", wstrChannel_.c_str(), wstrQuery_.c_str());

auto hBookmark = pBookmark_->getBookmarkHandleFromXML();
if (!hBookmark) {
// Unrecoverable error.
logger_->log_error("hBookmark is null, unrecoverable error!");
pBookmark_.reset();
context->yield();
return;
}

if (!EvtSeek(hEventResults, 1, hBookmark, 0, EvtSeekRelativeToBookmark)) {
LOG_LAST_ERROR(EvtSeek);
context->yield();
return;
}

// Enumerate the events in the result set after the bookmarked event.
logger_->log_trace("Enumerating the events in the result set after the bookmarked event.");
while (true) {
EVT_HANDLE hEvent{};
DWORD dwReturned{};
Expand All @@ -363,7 +386,7 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
break;
}
const utils::ScopeGuard guard_hEvent([hEvent]() { EvtClose(hEvent); });

logger_->log_trace("Succesfully get the next hEvent, performing event rendering");
EventRender eventRender;
std::wstring newBookmarkXml;
if (createEventRender(hEvent, eventRender) && pBookmark_->getNewBookmarkXml(hEvent, newBookmarkXml)) {
Expand All @@ -373,6 +396,7 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex

if (batch_commit_size_ != 0U && (eventCount % batch_commit_size_ == 0)) {
if (!commitAndSaveBookmark(bookmarkXml)) {
context->yield();
return;
}

Expand All @@ -381,23 +405,27 @@ void ConsumeWindowsEventLog::onTrigger(const std::shared_ptr<core::ProcessContex
}
}

logger_->log_trace("Finish enumerating events.");

if (eventCount > commitAndSaveBookmarkCount) {
commitAndSaveBookmark(bookmarkXml);
}
}

wel::WindowsEventLogHandler ConsumeWindowsEventLog::getEventLogHandler(const std::string & name) {
std::lock_guard<std::mutex> lock(cache_mutex_);
logger_->log_trace("Getting Event Log Handler corresponding to %s", name.c_str());
auto provider = providers_.find(name);
if (provider != std::end(providers_)) {
logger_->log_trace("Found the handler");
return provider->second;
}

std::wstring temp_wstring = std::wstring(name.begin(), name.end());
LPCWSTR widechar = temp_wstring.c_str();

providers_[name] = wel::WindowsEventLogHandler(EvtOpenPublisherMetadata(NULL, widechar, NULL, 0, 0));

logger_->log_trace("Not found the handler -> created handler for %s", name.c_str());
return providers_[name];
}

Expand Down Expand Up @@ -491,6 +519,7 @@ void ConsumeWindowsEventLog::substituteXMLPercentageItems(pugi::xml_document& do
}

bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& eventRender) {
logger_->log_trace("Rendering an event");
DWORD size = 0;
DWORD used = 0;
DWORD propertyCount = 0;
Expand All @@ -512,6 +541,8 @@ bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& e
return false;
}

logger_->log_debug("Event rendered with size %" PRIu32 ". Performing doc traversing...", size);

std::string xml = wel::to_string(&buf[0]);

pugi::xml_document doc;
Expand All @@ -529,7 +560,11 @@ bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& e
// resolve the event metadata
doc.traverse(walker);

logger_->log_debug("Finish doc traversing, performing writing...");

if (writePlainText_) {
logger_->log_trace("Writing event in plain text");

auto handler = getEventLogHandler(providerName);
auto message = handler.getEventMessage(hEvent);

Expand All @@ -547,10 +582,13 @@ bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& e
eventRender.rendered_text_ += "Message" + header_delimiter_ + " ";
eventRender.rendered_text_ += message;
}
logger_->log_trace("Finish writing in plain text");
}

if (writeXML_) {
logger_->log_trace("Writing event in XML");
substituteXMLPercentageItems(doc);
logger_->log_trace("Finish substituting \%\% in XML");

if (resolve_as_attributes_) {
eventRender.matched_fields_ = walker.getFieldValues();
Expand All @@ -561,6 +599,7 @@ bool ConsumeWindowsEventLog::createEventRender(EVT_HANDLE hEvent, EventRender& e
xml = writer.xml_;

eventRender.text_ = std::move(xml);
logger_->log_trace("Finish writing in XML");
}

return true;
Expand All @@ -582,6 +621,7 @@ void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender&

if (writeXML_) {
auto flowFile = session.create();
logger_->log_trace("Writing rendered XML to a flow file");

session.write(flowFile, &WriteCallback(eventRender.text_));
for (const auto &fieldMapping : eventRender.matched_fields_) {
Expand All @@ -596,6 +636,7 @@ void ConsumeWindowsEventLog::putEventRenderFlowFileToSession(const EventRender&

if (writePlainText_) {
auto flowFile = session.create();
logger_->log_trace("Writing rendered plain text to a flow file");

session.write(flowFile, &WriteCallback(eventRender.rendered_text_));
session.putAttribute(flowFile, FlowAttributeKey(MIME_TYPE), "text/plain");
Expand Down
1 change: 1 addition & 0 deletions extensions/windows-event-log/ConsumeWindowsEventLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class ConsumeWindowsEventLog : public core::Processor
virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
//! Initialize, overwrite by NiFi ConsumeWindowsEventLog
virtual void initialize(void) override;
void notifyStop() override;


protected:
Expand Down
2 changes: 1 addition & 1 deletion extensions/windows-event-log/wel/XMLString.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

/**
* @file ConsumeWindowsEventLog.h
* @file XMLString.h
* ConsumeWindowsEventLog class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
Expand Down

0 comments on commit 8b73e55

Please sign in to comment.