Skip to content

Commit

Permalink
Revert "Adds a check to skip serialization-deserialization if request…
Browse files Browse the repository at this point in the history
… is for same node (opensearch-project#2765) (opensearch-project#2973)"

This reverts commit ddbe517.
  • Loading branch information
cwperks committed Jul 21, 2023
1 parent 89df7b4 commit 360396e
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import org.opensearch.action.support.ActionFilter;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.component.Lifecycle.State;
Expand Down Expand Up @@ -208,7 +207,6 @@ public final class OpenSearchSecurityPlugin extends OpenSearchSecuritySSLPlugin
private volatile ConfigurationRepository cr;
private volatile AdminDNs adminDns;
private volatile ClusterService cs;
private static volatile DiscoveryNode localNode;
private volatile AuditLog auditLog;
private volatile BackendRegistry backendRegistry;
private volatile SslExceptionHandler sslExceptionHandler;
Expand Down Expand Up @@ -1792,12 +1790,11 @@ public List<String> getSettingsFilter() {
}

@Override
public void onNodeStarted(DiscoveryNode localNode) {
public void onNodeStarted() {
log.info("Node started");
if (!SSLConfig.isSslOnlyMode() && !client && !disabled) {
cr.initOnNodeStart();
}
this.localNode = localNode;
final Set<ModuleInfo> securityModules = ReflectionHelper.getModulesLoaded();
log.info("{} OpenSearch Security modules loaded so far: {}", securityModules.size(), securityModules);
}
Expand Down Expand Up @@ -1877,14 +1874,6 @@ private static String handleKeyword(final String field) {
return field;
}

public static DiscoveryNode getLocalNode() {
return localNode;
}

public static void setLocalNode(DiscoveryNode node) {
localNode = node;
}

public static class GuiceHolder implements LifecycleComponent {

private static RepositoriesService repositoriesService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -132,6 +131,7 @@ public <T extends TransportResponse> void sendRequestDecorate(
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {

final Map<String, String> origHeaders0 = getThreadContext().getHeaders();
final User user0 = getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER);
final String injectedUserString = getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER);
Expand All @@ -146,9 +146,6 @@ public <T extends TransportResponse> void sendRequestDecorate(
final String origCCSTransientMf = getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_MASKED_FIELD_CCS);

final boolean isDebugEnabled = log.isDebugEnabled();
final DiscoveryNode localNode = OpenSearchSecurityPlugin.getLocalNode();
boolean isSameNodeRequest = localNode != null && localNode.equals(connection.getNode());

try (ThreadContext.StoredContext stashedContext = getThreadContext().stashContext()) {
final TransportResponseHandler<T> restoringHandler = new RestoringTransportResponseHandler<T>(handler, stashedContext);
getThreadContext().putHeader("_opendistro_security_remotecn", cs.getClusterName().value());
Expand Down Expand Up @@ -226,7 +223,7 @@ && getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROL

getThreadContext().putHeader(headerMap);

ensureCorrectHeaders(remoteAddress0, user0, origin0, injectedUserString, injectedRolesString, isSameNodeRequest);
ensureCorrectHeaders(remoteAddress0, user0, origin0, injectedUserString, injectedRolesString);

if (isActionTraceEnabled()) {
getThreadContext().putHeader(
Expand All @@ -252,8 +249,7 @@ private void ensureCorrectHeaders(
final User origUser,
final String origin,
final String injectedUserString,
final String injectedRolesString,
boolean isSameNodeRequest
final String injectedRolesString
) {
// keep original address

Expand All @@ -267,49 +263,30 @@ && getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_ORIGIN_HEADE
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_ORIGIN_HEADER, Origin.LOCAL.toString());
}

TransportAddress transportAddress = null;
if (remoteAdr != null && remoteAdr instanceof TransportAddress) {

String remoteAddressHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER);

if (remoteAddressHeader == null) {
transportAddress = (TransportAddress) remoteAdr;
getThreadContext().putHeader(
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER,
Base64Helper.serializeObject(((TransportAddress) remoteAdr).address())
);
}
}

// we put headers as transient for same node requests
if (isSameNodeRequest) {
if (transportAddress != null) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS, transportAddress);
}
String userHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER);

if (userHeader == null) {
if (origUser != null) {
// if request is going to be handled by same node, we directly put transient value as the thread context is not going to be
// stah.
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_USER, origUser);
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER, Base64Helper.serializeObject(origUser));
} else if (StringUtils.isNotEmpty(injectedRolesString)) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES, injectedRolesString);
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_HEADER, injectedRolesString);
} else if (StringUtils.isNotEmpty(injectedUserString)) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER, injectedUserString);
}
} else {
if (transportAddress != null) {
getThreadContext().putHeader(
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER,
Base64Helper.serializeObject(transportAddress.address())
);
}

final String userHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER);
if (userHeader == null) {
// put as headers for other requests
if (origUser != null) {
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER, Base64Helper.serializeObject(origUser));
} else if (StringUtils.isNotEmpty(injectedRolesString)) {
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_HEADER, injectedRolesString);
} else if (StringUtils.isNotEmpty(injectedUserString)) {
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER_HEADER, injectedUserString);
}
getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER_HEADER, injectedUserString);
}
}

}

private ThreadContext getThreadContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

package org.opensearch.security.transport;

// CS-SUPPRESS-SINGLE: RegexpSingleline Extensions manager used to allow/disallow TLS connections to extensions
import java.net.InetSocketAddress;
import java.security.cert.X509Certificate;
import java.util.Objects;
Expand Down Expand Up @@ -63,7 +62,6 @@
import org.opensearch.transport.TransportRequestHandler;

import static org.opensearch.security.OpenSearchSecurityPlugin.isActionTraceEnabled;
// CS-ENFORCE-SINGLE

public class SecurityRequestHandler<T extends TransportRequest> extends SecuritySSLRequestHandler<T> {

Expand Down Expand Up @@ -142,31 +140,7 @@ protected void messageReceivedDecorate(
}

// bypass non-netty requests
if (getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_USER) != null
|| getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER) != null
|| getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES) != null
|| getThreadContext().getTransient(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS) != null) {

final String rolesValidation = getThreadContext().getHeader(
ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION_HEADER
);
if (!Strings.isNullOrEmpty(rolesValidation)) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION, rolesValidation);
}

if (isActionTraceEnabled()) {
getThreadContext().putHeader(
"_opendistro_security_trace" + System.currentTimeMillis() + "#" + UUID.randomUUID().toString(),
Thread.currentThread().getName()
+ " DIR -> "
+ transportChannel.getChannelType()
+ " "
+ getThreadContext().getHeaders()
);
}

putInitialActionClassHeader(initialActionClassValue, resolvedActionClass);
} else {
if (channelType.equals("direct")) {
final String userHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER);
final String injectedRolesHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_HEADER);
final String injectedUserHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER_HEADER);
Expand All @@ -186,15 +160,15 @@ protected void messageReceivedDecorate(
);
}

String originalRemoteAddress = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER);
final String originalRemoteAddress = getThreadContext().getHeader(
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER
);

if (!Strings.isNullOrEmpty(originalRemoteAddress)) {
getThreadContext().putTransient(
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS,
new TransportAddress((InetSocketAddress) Base64Helper.deserializeObject(originalRemoteAddress))
);
} else {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS, request.remoteAddress());
}

final String rolesValidation = getThreadContext().getHeader(
Expand All @@ -203,9 +177,20 @@ protected void messageReceivedDecorate(
if (!Strings.isNullOrEmpty(rolesValidation)) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION, rolesValidation);
}
}

if (channelType.equals("direct")) {
if (isActionTraceEnabled()) {
getThreadContext().putHeader(
"_opendistro_security_trace" + System.currentTimeMillis() + "#" + UUID.randomUUID().toString(),
Thread.currentThread().getName()
+ " DIR -> "
+ transportChannel.getChannelType()
+ " "
+ getThreadContext().getHeaders()
);
}

putInitialActionClassHeader(initialActionClassValue, resolvedActionClass);

super.messageReceivedDecorate(request, handler, transportChannel, task);
return;
}
Expand Down Expand Up @@ -238,13 +223,11 @@ protected void messageReceivedDecorate(
// if transport channel is not a netty channel but a direct or local channel (e.g. send via network) then allow it (regardless
// of beeing a internal: or shard request)
// also allow when issued from a remote cluster for cross cluster search
// CS-SUPPRESS-SINGLE: RegexpSingleline Used to allow/disallow TLS connections to extensions
if (!HeaderHelper.isInterClusterRequest(getThreadContext())
&& !HeaderHelper.isTrustedClusterRequest(getThreadContext())
&& !HeaderHelper.isExtensionRequest(getThreadContext())
&& !task.getAction().equals("internal:transport/handshake")
&& (task.getAction().startsWith("internal:") || task.getAction().contains("["))) {
// CS-ENFORCE-SINGLE

auditLog.logMissingPrivileges(task.getAction(), request, task);
log.error(
Expand Down Expand Up @@ -284,11 +267,57 @@ protected void messageReceivedDecorate(
}

// network intercluster request or cross search cluster request
// CS-SUPPRESS-SINGLE: RegexpSingleline Used to allow/disallow TLS connections to extensions
if (!(HeaderHelper.isInterClusterRequest(getThreadContext())
if (HeaderHelper.isInterClusterRequest(getThreadContext())
|| HeaderHelper.isTrustedClusterRequest(getThreadContext())
|| HeaderHelper.isExtensionRequest(getThreadContext()))) {
// CS-ENFORCE-SINGLE
|| HeaderHelper.isExtensionRequest(getThreadContext())) {

final String userHeader = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER);
final String injectedRolesHeader = getThreadContext().getHeader(
ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_HEADER
);
final String injectedUserHeader = getThreadContext().getHeader(
ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER_HEADER
);

if (Strings.isNullOrEmpty(userHeader)) {
// Keeping role injection with higher priority as plugins under OpenSearch will be using this
// on transport layer
if (!Strings.isNullOrEmpty(injectedRolesHeader)) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES, injectedRolesHeader);
} else if (!Strings.isNullOrEmpty(injectedUserHeader)) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_USER, injectedUserHeader);
}
} else {
getThreadContext().putTransient(
ConfigConstants.OPENDISTRO_SECURITY_USER,
Objects.requireNonNull((User) Base64Helper.deserializeObject(userHeader))
);
}

String originalRemoteAddress = getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS_HEADER);

if (!Strings.isNullOrEmpty(originalRemoteAddress)) {
getThreadContext().putTransient(
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS,
new TransportAddress((InetSocketAddress) Base64Helper.deserializeObject(originalRemoteAddress))
);
} else {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS, request.remoteAddress());
}

final String rolesValidation = getThreadContext().getHeader(
ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION_HEADER
);
if (!Strings.isNullOrEmpty(rolesValidation)) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_INJECTED_ROLES_VALIDATION, rolesValidation);
}

} else {
// this is a netty request from a non-server node (maybe also be internal: or a shard request)
// and therefore issued by a transport client

// since OS 2.0 we do not support this any longer because transport client no longer available

final OpenSearchException exception = ExceptionUtils.createTransportClientNoLongerSupportedException();
log.error(exception.toString());
transportChannel.sendResponse(exception);
Expand All @@ -311,8 +340,9 @@ protected void messageReceivedDecorate(
}

putInitialActionClassHeader(initialActionClassValue, resolvedActionClass);

super.messageReceivedDecorate(request, handler, transportChannel, task);
}
super.messageReceivedDecorate(request, handler, transportChannel, task);
} finally {

if (isActionTraceEnabled()) {
Expand Down Expand Up @@ -374,15 +404,13 @@ protected void addAdditionalContextValues(
}
}

// CS-SUPPRESS-SINGLE: RegexpSingleline Extensions manager used to allow/disallow TLS connections to extensions
String extensionUniqueId = getThreadContext().getHeader("extension_unique_id");
if (extensionUniqueId != null) {
ExtensionsManager extManager = OpenSearchSecurityPlugin.GuiceHolder.getExtensionsManager();
if (extManager.lookupExtensionSettingsById(extensionUniqueId).isPresent()) {
getThreadContext().putTransient(ConfigConstants.OPENDISTRO_SECURITY_SSL_TRANSPORT_EXTENSION_REQUEST, Boolean.TRUE);
}
}
// CS-ENFORCE-SINGLE

super.addAdditionalContextValues(action, request, localCerts, peerCerts, principal);
}
Expand Down
Loading

0 comments on commit 360396e

Please sign in to comment.