Skip to content

Commit

Permalink
Fix(#14262): Format the ES reindexing logs for better readability (#1…
Browse files Browse the repository at this point in the history
…4849)

* Fix(#14262): Format the ES reindexing logs for better readability

* Refactor formatJsonString function in StringsUtils.ts

* Fix Error Schema for Search Indexing Application

* Refactor AppLogsViewer.component.tsx

* Fix import order and add default value for formatJsonString

* Refactor formatJsonString function in StringsUtils.ts

* Fix Schema

---------

Co-authored-by: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com>
Co-authored-by: mohitdeuex <mohit.y@deuexsolutions.com>
  • Loading branch information
3 people authored Jan 29, 2024
1 parent dcc8739 commit 8d1780c
Show file tree
Hide file tree
Showing 25 changed files with 362 additions and 437 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.openmetadata.service.events.scheduled.template.DataInsightDescriptionAndOwnerTemplate;
import org.openmetadata.service.events.scheduled.template.DataInsightTotalAssetTemplate;
import org.openmetadata.service.exception.EventSubscriptionJobException;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.KpiRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.search.SearchClient;
Expand Down Expand Up @@ -99,7 +100,7 @@ public void execute(JobExecutionContext jobExecutionContext) {

private void sendReportsToTeams(
SearchClient searchClient, Long scheduleTime, Long currentTime, int numberOfDaysChange)
throws IOException {
throws SearchIndexException {
PaginatedEntitiesSource teamReader =
new PaginatedEntitiesSource(TEAM, 10, List.of("name", "email", "users"));
while (!teamReader.isDone()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;

import es.org.elasticsearch.action.bulk.BulkItemResponse;
import es.org.elasticsearch.action.bulk.BulkRequest;
import es.org.elasticsearch.action.bulk.BulkResponse;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.openmetadata.common.utils.CommonUtil;
Expand All @@ -28,13 +24,11 @@
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Failure;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.apps.AbstractNativeApplication;
import org.openmetadata.service.exception.ProcessorException;
import org.openmetadata.service.exception.SinkException;
import org.openmetadata.service.exception.SourceException;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.search.elasticsearch.ElasticSearchDataInsightProcessor;
Expand Down Expand Up @@ -90,7 +84,6 @@ public class SearchIndexApp extends AbstractNativeApplication {
"webAnalyticUserActivityReportData",
"domain",
"storedProcedure");
private static final String ENTITY_TYPE_ERROR_MSG = "EntityType: %s %n Cause: %s %n Stack: %s";
private final List<PaginatedEntitiesSource> paginatedEntitiesSources = new ArrayList<>();
private final List<PaginatedDataInsightSource> paginatedDataInsightSources = new ArrayList<>();
private Processor entityProcessor;
Expand All @@ -106,8 +99,7 @@ public void init(App app, CollectionDAO dao, SearchRepository searchRepository)
// request for reindexing
EventPublisherJob request =
JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class)
.withStats(new Stats())
.withFailure(new Failure());
.withStats(new Stats());
if (request.getEntities().contains(ALL)) {
request.setEntities(ALL_ENTITIES);
}
Expand Down Expand Up @@ -171,13 +163,16 @@ public void startApp(JobExecutionContext jobExecutionContext) {
// Mark Job as Completed
updateJobStatus();
} catch (Exception ex) {
String error =
String.format(
"Reindexing Job Has Encountered an Exception. %n Job Data: %s, %n Stack : %s ",
jobData.toString(), ExceptionUtils.getStackTrace(ex));
LOG.error(error);
IndexingError indexingError =
new IndexingError()
.withErrorSource(IndexingError.ErrorSource.JOB)
.withMessage(
String.format(
"Reindexing Job Has Encountered an Exception. %n Job Data: %s, %n Stack : %s ",
jobData.toString(), ExceptionUtils.getStackTrace(ex)));
LOG.error(indexingError.getMessage());
jobData.setStatus(EventPublisherJob.Status.FAILED);
handleJobError(error, System.currentTimeMillis());
jobData.setFailure(indexingError);
} finally {
// store job details in Database
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats());
Expand All @@ -197,8 +192,7 @@ public void updateRecordToDb(JobExecutionContext jobExecutionContext) {
// Update Error
if (jobData.getFailure() != null) {
appRecord.setFailureContext(
new FailureContext()
.withAdditionalProperty("failure", JsonUtils.pojoToJson(jobData.getFailure())));
new FailureContext().withAdditionalProperty("failure", jobData.getFailure()));
}

// Update Stats
Expand All @@ -217,58 +211,13 @@ private void entitiesReIndex() {
contextData.put(ENTITY_TYPE_KEY, paginatedEntitiesSource.getEntityType());
ResultList<? extends EntityInterface> resultList;
while (!stopped && !paginatedEntitiesSource.isDone()) {
long currentTime = System.currentTimeMillis();
try {
resultList = paginatedEntitiesSource.readNext(null);
if (!resultList.getData().isEmpty()) {
if (searchRepository
.getSearchType()
.equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
// process data to build Reindex Request
os.org.opensearch.action.bulk.BulkRequest requests =
(os.org.opensearch.action.bulk.BulkRequest)
entityProcessor.process(resultList, contextData);
// process data to build Reindex Request
os.org.opensearch.action.bulk.BulkResponse response =
(os.org.opensearch.action.bulk.BulkResponse)
searchIndexSink.write(requests, contextData);
// update Status
handleErrorsOs(
resultList, paginatedEntitiesSource.getLastFailedCursor(), response, currentTime);
} else {
// process data to build Reindex Request
BulkRequest requests = (BulkRequest) entityProcessor.process(resultList, contextData);
// process data to build Reindex Request
BulkResponse response = (BulkResponse) searchIndexSink.write(requests, contextData);
// update Status
handleErrorsEs(
resultList, paginatedEntitiesSource.getLastFailedCursor(), response, currentTime);
}
searchIndexSink.write(entityProcessor.process(resultList, contextData), contextData);
}
} catch (SourceException rx) {
handleSourceError(
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
rx.getCause(),
""),
currentTime);
} catch (ProcessorException px) {
handleProcessorError(
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
px.getCause(),
""),
currentTime);
} catch (SinkException wx) {
handleEsSinkError(
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
wx.getCause(),
""),
currentTime);
} catch (SearchIndexException rx) {
jobData.setFailure(rx.getIndexingError());
}
}
updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats());
Expand All @@ -283,55 +232,14 @@ private void dataInsightReindex() {
contextData.put(ENTITY_TYPE_KEY, paginatedDataInsightSource.getEntityType());
ResultList<ReportData> resultList;
while (!stopped && !paginatedDataInsightSource.isDone()) {
long currentTime = System.currentTimeMillis();
try {
resultList = paginatedDataInsightSource.readNext(null);
if (!resultList.getData().isEmpty()) {
if (searchRepository
.getSearchType()
.equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
// process data to build Reindex Request
os.org.opensearch.action.bulk.BulkRequest requests =
(os.org.opensearch.action.bulk.BulkRequest)
dataInsightProcessor.process(resultList, contextData);
// process data to build Reindex Request
os.org.opensearch.action.bulk.BulkResponse response =
(os.org.opensearch.action.bulk.BulkResponse)
searchIndexSink.write(requests, contextData);
handleErrorsOs(resultList, "", response, currentTime);
} else {
// process data to build Reindex Request
BulkRequest requests =
(BulkRequest) dataInsightProcessor.process(resultList, contextData);
// process data to build Reindex Request
BulkResponse response = (BulkResponse) searchIndexSink.write(requests, contextData);
handleErrorsEs(resultList, "", response, currentTime);
}
searchIndexSink.write(
dataInsightProcessor.process(resultList, contextData), contextData);
}
} catch (SourceException rx) {
handleSourceError(
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedDataInsightSource.getEntityType(),
rx.getCause(),
""),
currentTime);
} catch (ProcessorException px) {
handleProcessorError(
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedDataInsightSource.getEntityType(),
px.getCause(),
""),
currentTime);
} catch (SinkException wx) {
handleEsSinkError(
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedDataInsightSource.getEntityType(),
wx.getCause(),
""),
currentTime);
} catch (SearchIndexException ex) {
jobData.setFailure(ex.getIndexingError());
}
}
updateStats(
Expand Down Expand Up @@ -390,132 +298,11 @@ private void reCreateIndexes(String entityType) {
searchRepository.createIndex(indexType);
}

private void handleErrorsOs(
ResultList<?> data,
String lastCursor,
os.org.opensearch.action.bulk.BulkResponse response,
long time) {
handleSourceError(data, lastCursor, time);
handleOsSinkErrors(response, time);
}

private void handleErrorsEs(
ResultList<?> data, String lastCursor, BulkResponse response, long time) {
handleSourceError(data, lastCursor, time);
handleEsSinkErrors(response, time);
}

private void handleSourceError(String reason, long time) {
handleError("source", reason, time);
}

private void handleProcessorError(String reason, long time) {
handleError("processor", reason, time);
}

private void handleError(String errType, String reason, long time) {
Failure failures = jobData.getFailure() != null ? jobData.getFailure() : new Failure();
failures.withAdditionalProperty("errorFrom", errType);
failures.withAdditionalProperty("lastFailedReason", reason);
failures.withAdditionalProperty("lastFailedAt", time);
jobData.setFailure(failures);
}

private void handleEsSinkError(String reason, long time) {
handleError("sink", reason, time);
}

private void handleJobError(String reason, long time) {
handleError("job", reason, time);
}

@SneakyThrows
private void handleSourceError(ResultList<?> data, String lastCursor, long time) {
if (!data.getErrors().isEmpty()) {
StringBuilder builder = new StringBuilder();
for (String str : data.getErrors()) {
builder.append(str);
builder.append("%n");
}
handleSourceError(
String.format(
"SourceContext: After Cursor : %s, Encountered Error While Reading Data. Following Entities were not fetched Successfully : %s",
lastCursor, builder),
time);
}
}

@SneakyThrows
private void handleOsSinkErrors(os.org.opensearch.action.bulk.BulkResponse response, long time) {
List<Map<String, Object>> details = new ArrayList<>();
for (os.org.opensearch.action.bulk.BulkItemResponse bulkItemResponse : response) {
if (bulkItemResponse.isFailed()) {
Map<String, Object> detailsMap = new HashMap<>();
os.org.opensearch.action.bulk.BulkItemResponse.Failure failure =
bulkItemResponse.getFailure();
detailsMap.put(
"context",
String.format(
"EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ",
failure.getId()));
detailsMap.put(
"lastFailedReason",
String.format(
"Index Type: [%s], Reason: [%s] %n Trace : [%s]",
failure.getIndex(),
failure.getMessage(),
ExceptionUtils.getStackTrace(failure.getCause())));
detailsMap.put("lastFailedAt", System.currentTimeMillis());
details.add(detailsMap);
}
}
if (!details.isEmpty()) {
handleEsSinkError(
String.format(
"[EsWriter][BulkItemResponse] Got Following Error Responses: %n %s ",
JsonUtils.pojoToJson(details, true)),
time);
}
}

@SneakyThrows
private void handleEsSinkErrors(BulkResponse response, long time) {
List<Map<String, Object>> details = new ArrayList<>();
for (BulkItemResponse bulkItemResponse : response) {
if (bulkItemResponse.isFailed()) {
Map<String, Object> detailsMap = new HashMap<>();
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
detailsMap.put(
"context",
String.format(
"EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ",
failure.getId()));
detailsMap.put(
"lastFailedReason",
String.format(
"Index Type: [%s], Reason: [%s] %n Trace : [%s]",
failure.getIndex(),
failure.getMessage(),
ExceptionUtils.getStackTrace(failure.getCause())));
detailsMap.put("lastFailedAt", System.currentTimeMillis());
details.add(detailsMap);
}
}
if (!details.isEmpty()) {
handleEsSinkError(
String.format(
"[EsWriter][BulkItemResponse] Got Following Error Responses: %s ",
JsonUtils.pojoToJson(details, true)),
time);
}
}

private void updateJobStatus() {
if (stopped) {
jobData.setStatus(EventPublisherJob.Status.STOPPED);
} else {
if (jobData.getFailure() != null
&& !jobData.getFailure().getAdditionalProperties().isEmpty()) {
if (jobData.getFailure() != null) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
} else {
jobData.setStatus(EventPublisherJob.Status.COMPLETED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.openmetadata.schema.entity.app.App;
Expand Down Expand Up @@ -87,8 +89,10 @@ public void jobWasExecuted(
context = runRecord.getFailureContext();
}
if (jobException != null) {
context.withAdditionalProperty("message", jobException.getMessage());
context.withAdditionalProperty("jobStackTrace", ExceptionUtils.getStackTrace(jobException));
Map<String, Object> failure = new HashMap<>();
failure.put("message", jobException.getMessage());
failure.put("jobStackTrace", ExceptionUtils.getStackTrace(jobException));
context.withAdditionalProperty("failure", failure);
}
runRecord.setFailureContext(context);
}
Expand Down
Loading

0 comments on commit 8d1780c

Please sign in to comment.