Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Replace 2x LogUtils by QueryContext #753

Merged
merged 1 commit into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
package org.opensearch.sql.common.utils;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.logging.log4j.ThreadContext;

/**
* Utility class for generating/accessing the request id from logging context.
* Utility class for recording and accessing context for the query being executed.
*/
public class LogUtils {
public class QueryContext {

/**
* The key of the request id in the context map.
Expand All @@ -29,17 +30,22 @@ public class LogUtils {
* call this method twice on the same thread within the lifetime of the request.
* </p>
*/
public static void addRequestId() {
ThreadContext.put(REQUEST_ID_KEY, UUID.randomUUID().toString());
public static String addRequestId() {
var id = UUID.randomUUID().toString();
ThreadContext.put(REQUEST_ID_KEY, id);
return id;
}

/**
* Get RequestID.
* @return the current request id from {@link ThreadContext}.
*/
public static String getRequestId() {
final String requestId = ThreadContext.get(REQUEST_ID_KEY);
return requestId;
var id = ThreadContext.get(REQUEST_ID_KEY);
if (null == id) {
id = addRequestId();
}
return id;
}

/**
Expand All @@ -57,7 +63,7 @@ public static Runnable withCurrentContext(final Runnable task) {
};
}

private LogUtils() {
private QueryContext() {
throw new AssertionError(
getClass().getCanonicalName() + " is a utility class and must not be initialized");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestStatus;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.query.QueryAction;
import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy;
import org.opensearch.sql.legacy.utils.LogUtils;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transports;

Expand Down Expand Up @@ -73,13 +73,13 @@ public void execute(Client client, Map<String, String> params, QueryAction query
if (isBlockingAction(queryAction) && isRunningInTransportThread()) {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Async blocking query action [{}] for executor [{}] in current thread [{}]",
LogUtils.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName());
QueryContext.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName());
}
async(client, params, queryAction, channel);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] Continue running query action [{}] for executor [{}] in current thread [{}]",
LogUtils.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName());
QueryContext.getRequestId(), name(executor), name(queryAction), Thread.currentThread().getName());
}
doExecuteWithTimeMeasured(client, params, queryAction, channel);
}
Expand Down Expand Up @@ -110,18 +110,18 @@ private void async(Client client, Map<String, String> params, QueryAction queryA
doExecuteWithTimeMeasured(client, params, queryAction, channel);
} catch (IOException | SqlParseException | OpenSearchException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", LogUtils.getRequestId(),
LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", QueryContext.getRequestId(),
e.getMessage());
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
} catch (IllegalStateException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.warn("[{}] [MCB] async task got a runtime exception: {}", LogUtils.getRequestId(),
LOG.warn("[{}] [MCB] async task got a runtime exception: {}", QueryContext.getRequestId(),
e.getMessage());
channel.sendResponse(new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE,
"Memory circuit is broken."));
} catch (Throwable t) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", LogUtils.getRequestId(),
LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", QueryContext.getRequestId(),
t.getMessage());
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
String.valueOf(t.getMessage())));
Expand All @@ -132,7 +132,7 @@ private void async(Client client, Map<String, String> params, QueryAction queryA

// Preserve context of calling thread to ensure headers of requests are forwarded when running blocking actions
threadPool.schedule(
LogUtils.withCurrentContext(runnable),
QueryContext.withCurrentContext(runnable),
new TimeValue(0L),
SQL_WORKER_THREAD_POOL_NAME
);
Expand All @@ -152,7 +152,7 @@ private void doExecuteWithTimeMeasured(Client client,
Duration elapsed = Duration.ofNanos(System.nanoTime() - startTime);
int slowLogThreshold = LocalClusterState.state().getSettingValue(Settings.Key.SQL_SLOWLOG);
if (elapsed.getSeconds() >= slowLogThreshold) {
LOG.warn("[{}] Slow query: elapsed={} (ms)", LogUtils.getRequestId(), elapsed.toMillis());
LOG.warn("[{}] Slow query: elapsed={} (ms)", QueryContext.getRequestId(), elapsed.toMillis());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestStatus;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.query.join.BackOffRetryStrategy;
import org.opensearch.sql.legacy.utils.LogUtils;
import org.opensearch.threadpool.ThreadPool;

public class CursorAsyncRestExecutor {
Expand Down Expand Up @@ -57,20 +57,20 @@ private void async(Client client, Map<String, String> params, RestChannel channe
doExecuteWithTimeMeasured(client, params, channel);
} catch (IOException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", LogUtils.getRequestId(),
LOG.warn("[{}] [MCB] async task got an IO/SQL exception: {}", QueryContext.getRequestId(),
e.getMessage());
e.printStackTrace();
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
} catch (IllegalStateException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.warn("[{}] [MCB] async task got a runtime exception: {}", LogUtils.getRequestId(),
LOG.warn("[{}] [MCB] async task got a runtime exception: {}", QueryContext.getRequestId(),
e.getMessage());
e.printStackTrace();
channel.sendResponse(new BytesRestResponse(RestStatus.INSUFFICIENT_STORAGE,
"Memory circuit is broken."));
} catch (Throwable t) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", LogUtils.getRequestId(),
LOG.warn("[{}] [MCB] async task got an unknown throwable: {}", QueryContext.getRequestId(),
t.getMessage());
t.printStackTrace();
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR,
Expand All @@ -82,7 +82,7 @@ private void async(Client client, Map<String, String> params, RestChannel channe

// Preserve context of calling thread to ensure headers of requests are forwarded when running blocking actions
threadPool.schedule(
LogUtils.withCurrentContext(runnable),
QueryContext.withCurrentContext(runnable),
new TimeValue(0L),
SQL_WORKER_THREAD_POOL_NAME
);
Expand All @@ -101,7 +101,7 @@ private void doExecuteWithTimeMeasured(Client client,
Duration elapsed = Duration.ofNanos(System.nanoTime() - startTime);
int slowLogThreshold = LocalClusterState.state().getSettingValue(Settings.Key.SQL_SLOWLOG);
if (elapsed.getSeconds() >= slowLogThreshold) {
LOG.warn("[{}] Slow query: elapsed={} (ms)", LogUtils.getRequestId(), elapsed.toMillis());
LOG.warn("[{}] Slow query: elapsed={} (ms)", QueryContext.getRequestId(), elapsed.toMillis());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestStatus;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.legacy.antlr.OpenSearchLegacySqlAnalyzer;
Expand All @@ -60,7 +61,6 @@
import org.opensearch.sql.legacy.request.SqlRequestParam;
import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException;
import org.opensearch.sql.legacy.utils.JsonPrettyFormatter;
import org.opensearch.sql.legacy.utils.LogUtils;
import org.opensearch.sql.legacy.utils.QueryDataAnonymizer;
import org.opensearch.sql.sql.domain.SQLQueryRequest;

Expand Down Expand Up @@ -123,7 +123,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
Metrics.getInstance().getNumericalMetric(MetricName.REQ_TOTAL).increment();
Metrics.getInstance().getNumericalMetric(MetricName.REQ_COUNT_TOTAL).increment();

LogUtils.addRequestId();
QueryContext.addRequestId();

try {
if (!isSQLFeatureEnabled()) {
Expand All @@ -137,12 +137,12 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
if (isExplainRequest(request)) {
throw new IllegalArgumentException("Invalid request. Cannot explain cursor");
} else {
LOG.info("[{}] Cursor request {}: {}", LogUtils.getRequestId(), request.uri(), sqlRequest.cursor());
LOG.info("[{}] Cursor request {}: {}", QueryContext.getRequestId(), request.uri(), sqlRequest.cursor());
return channel -> handleCursorRequest(request, sqlRequest.cursor(), client, channel);
}
}

LOG.info("[{}] Incoming request {}: {}", LogUtils.getRequestId(), request.uri(),
LOG.info("[{}] Incoming request {}: {}", QueryContext.getRequestId(), request.uri(),
QueryDataAnonymizer.anonymizeData(sqlRequest.getSql()));

Format format = SqlRequestParam.getFormat(request.params());
Expand All @@ -152,11 +152,11 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
sqlRequest.getSql(), request.path(), request.params());
RestChannelConsumer result = newSqlQueryHandler.prepareRequest(newSqlRequest, client);
if (result != RestSQLQueryAction.NOT_SUPPORTED_YET) {
LOG.info("[{}] Request is handled by new SQL query engine", LogUtils.getRequestId());
LOG.info("[{}] Request is handled by new SQL query engine", QueryContext.getRequestId());
return result;
}
LOG.debug("[{}] Request {} is not supported and falling back to old SQL engine",
LogUtils.getRequestId(), newSqlRequest);
QueryContext.getRequestId(), newSqlRequest);

final QueryAction queryAction = explainRequest(client, sqlRequest, format);
return channel -> executeSqlRequest(request, queryAction, client, channel);
Expand All @@ -182,10 +182,10 @@ private void handleCursorRequest(final RestRequest request, final String cursor,

private static void logAndPublishMetrics(final Exception e) {
if (isClientError(e)) {
LOG.error(LogUtils.getRequestId() + " Client side error during query execution", e);
LOG.error(QueryContext.getRequestId() + " Client side error during query execution", e);
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_CUS).increment();
} else {
LOG.error(LogUtils.getRequestId() + " Server side error during query execution", e);
LOG.error(QueryContext.getRequestId() + " Server side error during query execution", e);
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestStatus;
import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.legacy.executor.format.ErrorMessageFactory;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.utils.LogUtils;

/**
* Currently this interface is for node level.
Expand Down Expand Up @@ -67,7 +67,7 @@ public List<ReplacedRoute> replacedRoutes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {

LogUtils.addRequestId();
QueryContext.addRequestId();

try {
return channel -> channel.sendResponse(new BytesRestResponse(RestStatus.OK,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import org.apache.logging.log4j.ThreadContext;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.opensearch.sql.legacy.utils.LogUtils;
import org.opensearch.sql.common.utils.QueryContext;

public class LogUtilsTest {
public class QueryContextTest {

private static final String REQUEST_ID_KEY = "request_id";

Expand All @@ -30,32 +30,32 @@ public void cleanUpContext() {
public void addRequestId() {

Assert.assertNull(ThreadContext.get(REQUEST_ID_KEY));
LogUtils.addRequestId();
QueryContext.addRequestId();
final String requestId = ThreadContext.get(REQUEST_ID_KEY);
Assert.assertNotNull(requestId);
}

@Test
public void addRequestId_alreadyExists() {

LogUtils.addRequestId();
QueryContext.addRequestId();
final String requestId = ThreadContext.get(REQUEST_ID_KEY);
LogUtils.addRequestId();
QueryContext.addRequestId();
final String requestId2 = ThreadContext.get(REQUEST_ID_KEY);
Assert.assertThat(requestId2, not(equalTo(requestId)));
}

@Test
public void getRequestId_doesNotExist() {
assertEquals("ID", LogUtils.getRequestId());
assertNotNull(QueryContext.getRequestId());
}

@Test
public void getRequestId() {

final String test_request_id = "test_id_111";
ThreadContext.put(REQUEST_ID_KEY, test_request_id);
final String requestId = LogUtils.getRequestId();
final String requestId = QueryContext.getRequestId();
Assert.assertThat(requestId, equalTo(test_request_id));
}

Expand All @@ -68,6 +68,6 @@ public void withCurrentContext() throws InterruptedException {
};
ThreadContext.put("test11", "value11");
ThreadContext.put("test22", "value11");
new Thread(LogUtils.withCurrentContext(task)).join();
new Thread(QueryContext.withCurrentContext(task)).join();
}
}
Loading