Skip to content

Commit

Permalink
Verify updated configuration is newer on reload
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Nied <petern@amazon.com>
  • Loading branch information
peternied committed Aug 31, 2023
1 parent 0338cdd commit fa74022
Show file tree
Hide file tree
Showing 20 changed files with 195 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public void updateUserConfiguration(List<TestSecurityConfig.User> users) {
private static void triggerConfigurationReload(Client client) {
ConfigUpdateResponse configUpdateResponse = client.execute(
ConfigUpdateAction.INSTANCE,
new ConfigUpdateRequest(CType.lcStringValues().toArray(new String[0]))
new ConfigUpdateRequest(CType.values(), null)
).actionGet();
if (configUpdateResponse.hasFailures()) {
throw new RuntimeException("ConfigUpdateResponse produced failures: " + configUpdateResponse.failures());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,54 @@
package org.opensearch.security.action.configupdate;

import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.security.securityconf.impl.CType;

public class ConfigUpdateRequest extends BaseNodesRequest<ConfigUpdateRequest> {

private String[] configTypes;
private CType[] configTypes;
private Long[] sequenceIds;

public ConfigUpdateRequest(StreamInput in) throws IOException {
super(in);
this.configTypes = in.readStringArray();
this.configTypes = CType.fromStringValues(in.readStringArray());
if (in.available() != 0) {
this.sequenceIds = ConfigUpdateRequest.longArrayFromStringValues(in.readStringArray());
}
}

public ConfigUpdateRequest() {
super(new String[0]);
}

public ConfigUpdateRequest(String[] configTypes) {
/**
* @param configTypes The types of configuration that should be reloaded
* @param sequenceIds Sequence numbers that should be smaller than the values from reloaded configuration, if null ignored.
*/
public ConfigUpdateRequest(final CType[] configTypes, final Long[] sequenceIds) {
this();
setConfigTypes(configTypes);
this.configTypes = configTypes;
this.sequenceIds = sequenceIds;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(configTypes);
out.writeStringArray(prepToSerialize(configTypes));
out.writeStringArray(prepToSerialize(sequenceIds));
}

public String[] getConfigTypes() {
return configTypes;
}

public void setConfigTypes(final String[] configTypes) {
this.configTypes = configTypes;
public Map<CType, Long> getTypeAndSequenceIdMap() {
return IntStream.range(0, configTypes.length)
.boxed()
.collect(Collectors.toMap(i -> configTypes[i], i -> sequenceIds != null ? sequenceIds[i] : null));
}

@Override
Expand All @@ -72,4 +84,40 @@ public ActionRequestValidationException validate() {
}
return null;
}

private static String[] prepToSerialize(final CType[] ctypes) {
if (ctypes == null) {
return null;
}

final String[] serializedReady = new String[ctypes.length];
for (int i = 0; i < ctypes.length; i++) {
serializedReady[i] = ctypes[i] + "";
}
return serializedReady;
}

private static String[] prepToSerialize(final Long[] longs) {
if (longs == null) {
return null;
}

final String[] serializedReady = new String[longs.length];
for (int i = 0; i < longs.length; i++) {
serializedReady[i] = longs[i] != null ? longs[i] + "" : null;
}
return serializedReady;
}

private static Long[] longArrayFromStringValues(final String[] longsAsStrings) {
if (longsAsStrings == null) {
return null;
}

final Long[] asLongs = new Long[longsAsStrings.length];
for (int i = 0; i < longsAsStrings.length; i++) {
asLongs[i] = longsAsStrings[i] != null ? Long.parseLong(longsAsStrings[i]) : null;
}
return asLongs;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.security.auth.BackendRegistry;
import org.opensearch.security.configuration.ConfigurationRepository;
import org.opensearch.security.securityconf.DynamicConfigFactory;
import org.opensearch.security.securityconf.impl.CType;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;
Expand All @@ -58,7 +56,6 @@ public class TransportConfigUpdateAction extends TransportNodesAction<
protected Logger logger = LogManager.getLogger(getClass());
private final Provider<BackendRegistry> backendRegistry;
private final ConfigurationRepository configurationRepository;
private DynamicConfigFactory dynamicConfigFactory;

@Inject
public TransportConfigUpdateAction(
Expand All @@ -68,8 +65,7 @@ public TransportConfigUpdateAction(
final TransportService transportService,
final ConfigurationRepository configurationRepository,
final ActionFilters actionFilters,
Provider<BackendRegistry> backendRegistry,
DynamicConfigFactory dynamicConfigFactory
Provider<BackendRegistry> backendRegistry
) {
super(
ConfigUpdateAction.NAME,
Expand All @@ -85,7 +81,6 @@ public TransportConfigUpdateAction(

this.configurationRepository = configurationRepository;
this.backendRegistry = backendRegistry;
this.dynamicConfigFactory = dynamicConfigFactory;
}

public static class NodeConfigUpdateRequest extends TransportRequest {
Expand Down Expand Up @@ -125,9 +120,9 @@ protected ConfigUpdateResponse newResponse(

@Override
protected ConfigUpdateNodeResponse nodeOperation(final NodeConfigUpdateRequest request) {
configurationRepository.reloadConfiguration(CType.fromStringValues((request.request.getConfigTypes())));
configurationRepository.reloadConfiguration(request.request);
backendRegistry.get().invalidateCache();
return new ConfigUpdateNodeResponse(clusterService.localNode(), request.request.getConfigTypes(), null);
return new ConfigUpdateNodeResponse(clusterService.localNode(), new String[] {}, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
Expand All @@ -66,6 +66,7 @@
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.env.Environment;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.security.action.configupdate.ConfigUpdateRequest;
import org.opensearch.security.auditlog.AuditLog;
import org.opensearch.security.auditlog.config.AuditConfig;
import org.opensearch.security.securityconf.DynamicConfigFactory;
Expand Down Expand Up @@ -218,7 +219,7 @@ private ConfigurationRepository(
while (!dynamicConfigFactory.isInitialized()) {
try {
LOGGER.debug("Try to load config ...");
reloadConfiguration(Arrays.asList(CType.values()));
reloadConfiguration(new ConfigUpdateRequest(CType.values(), null));
break;
} catch (Exception e) {
LOGGER.debug("Unable to load configuration due to {}", String.valueOf(ExceptionUtils.getRootCause(e)));
Expand Down Expand Up @@ -372,11 +373,11 @@ public SecurityDynamicConfiguration<?> getConfiguration(CType configurationType)

private final Lock LOCK = new ReentrantLock();

public void reloadConfiguration(Collection<CType> configTypes) throws ConfigUpdateAlreadyInProgressException {
public void reloadConfiguration(final ConfigUpdateRequest updateRequest) throws ConfigUpdateAlreadyInProgressException {
try {
if (LOCK.tryLock(60, TimeUnit.SECONDS)) {
try {
reloadConfiguration0(configTypes, this.acceptInvalid);
reloadConfiguration0(updateRequest, this.acceptInvalid);
} finally {
LOCK.unlock();
}
Expand All @@ -389,8 +390,8 @@ public void reloadConfiguration(Collection<CType> configTypes) throws ConfigUpda
}
}

private void reloadConfiguration0(Collection<CType> configTypes, boolean acceptInvalid) {
final Map<CType, SecurityDynamicConfiguration<?>> loaded = getConfigurationsFromIndex(configTypes, false, acceptInvalid);
private void reloadConfiguration0(final ConfigUpdateRequest updateRequest, boolean acceptInvalid) {
final Map<CType, SecurityDynamicConfiguration<?>> loaded = getConfigurationsFromIndex(updateRequest, false, acceptInvalid);
configCache.putAll(loaded);
notifyAboutChanges(loaded);
}
Expand All @@ -417,21 +418,25 @@ private synchronized void notifyAboutChanges(Map<CType, SecurityDynamicConfigura
* @param logComplianceEvent
* @return
*/
public Map<CType, SecurityDynamicConfiguration<?>> getConfigurationsFromIndex(
Collection<CType> configTypes,
boolean logComplianceEvent
) {
return getConfigurationsFromIndex(configTypes, logComplianceEvent, this.acceptInvalid);
public SecurityDynamicConfiguration<?> getConfigurationFromIndex(CType configType, boolean logComplianceEvent) {
return getConfigurationFromIndex(configType, logComplianceEvent, this.acceptInvalid);
}

public SecurityDynamicConfiguration<?> getConfigurationFromIndex(CType configType, boolean logComplianceEvent, boolean acceptInvalid) {
final ConfigUpdateRequest updateRequest = new ConfigUpdateRequest(new CType[] { configType }, null);
return getConfigurationsFromIndex(updateRequest, logComplianceEvent, acceptInvalid).get(configType);
}

public Map<CType, SecurityDynamicConfiguration<?>> getConfigurationsFromIndex(
Collection<CType> configTypes,
boolean logComplianceEvent,
boolean acceptInvalid
final ConfigUpdateRequest updateRequest,
final boolean logComplianceEvent,
final boolean acceptInvalid
) {

final ThreadContext threadContext = threadPool.getThreadContext();
final Map<CType, SecurityDynamicConfiguration<?>> retVal = new HashMap<>();
final Map<CType, Long> typeAndSequenceIdMap = updateRequest.getTypeAndSequenceIdMap();
final CType[] configTypes = typeAndSequenceIdMap.keySet().toArray(new CType[0]);

try (StoredContext ctx = threadContext.stashContext()) {
threadContext.putHeader(ConfigConstants.OPENDISTRO_SECURITY_CONF_REQUEST_HEADER, "true");
Expand All @@ -445,39 +450,63 @@ public Map<CType, SecurityDynamicConfiguration<?>> getConfigurationsFromIndex(
} else {
LOGGER.debug("security index exists and was created with ES 7 (new layout)");
}
retVal.putAll(
validate(cl.load(configTypes.toArray(new CType[0]), 10, TimeUnit.SECONDS, acceptInvalid), configTypes.size())
);
retVal.putAll(validate(cl.load(configTypes, 10, TimeUnit.SECONDS, acceptInvalid), typeAndSequenceIdMap));

} else {
// wait (and use new layout)
LOGGER.debug("security index not exists (yet)");
retVal.putAll(
validate(cl.load(configTypes.toArray(new CType[0]), 10, TimeUnit.SECONDS, acceptInvalid), configTypes.size())
);
retVal.putAll(validate(cl.load(configTypes, 10, TimeUnit.SECONDS, acceptInvalid), typeAndSequenceIdMap));
}

} catch (Exception e) {
throw new OpenSearchException(e);
}

if (logComplianceEvent && auditLog.getComplianceConfig().isEnabled()) {
CType configurationType = configTypes.iterator().next();
Map<String, String> fields = new HashMap<String, String>();
if (typeAndSequenceIdMap.size() != 1) {
throw new RuntimeException("Unexpected configuration log event that is missing critical details");
}
final CType configurationType = typeAndSequenceIdMap.keySet()
.stream()
.findFirst()
.orElseThrow(() -> new RuntimeException("Loaded configuation without expected CType!"));
final Map<String, String> fields = new HashMap<String, String>();
fields.put(configurationType.toLCString(), Strings.toString(MediaTypeRegistry.JSON, retVal.get(configurationType)));
auditLog.logDocumentRead(this.securityIndex, configurationType.toLCString(), null, fields);
}

return retVal;
}

private Map<CType, SecurityDynamicConfiguration<?>> validate(Map<CType, SecurityDynamicConfiguration<?>> conf, int expectedSize)
throws InvalidConfigException {
private Map<CType, SecurityDynamicConfiguration<?>> validate(
Map<CType, SecurityDynamicConfiguration<?>> conf,
Map<CType, Long> expectedSequences
) throws InvalidConfigException {

if (conf == null || conf.size() != expectedSize) {
if (conf == null || conf.size() != expectedSequences.size()) {
throw new InvalidConfigException("Retrieved only partial configuration");
}

final List<String> configurationSeqIssues = expectedSequences.entrySet().stream().map(kvp -> {
if (kvp.getValue() != null) {
final SecurityDynamicConfiguration<?> configEntry = conf.get(kvp.getKey());
if (configEntry.getSeqNo() > kvp.getValue()) {
return "Configuration "
+ kvp.getKey()
+ ", sequence number ("
+ configEntry.getSeqNo()
+ ") was lower than expected ("
+ kvp.getValue()
+ ")";
}
}
return null;
}).filter(Objects::nonNull).collect(Collectors.toList());

if (!configurationSeqIssues.isEmpty()) {
throw new InvalidConfigException(configurationSeqIssues.stream().collect(Collectors.joining("\n")));
}

return conf;
}

Expand Down
Loading

0 comments on commit fa74022

Please sign in to comment.