Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adding role based filtering for rest of APIs #325

Merged
merged 9 commits into from
Dec 11, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@

package com.amazon.opendistroforelasticsearch.ad.transport;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;

import java.io.IOException;
import static com.amazon.opendistroforelasticsearch.ad.util.ParseUtils.getDetector;
import static com.amazon.opendistroforelasticsearch.ad.util.ParseUtils.getUserContext;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -34,64 +36,104 @@
import org.elasticsearch.transport.TransportService;

import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorFunction;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler;
import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import com.amazon.opendistroforelasticsearch.commons.authuser.User;

public class AnomalyDetectorJobTransportAction extends HandledTransportAction<AnomalyDetectorJobRequest, AnomalyDetectorJobResponse> {
private final Logger logger = LogManager.getLogger(AnomalyDetectorJobTransportAction.class);

private final Client client;
private final ClusterService clusterService;
private final Settings settings;
private final AnomalyDetectionIndices anomalyDetectionIndices;
private final NamedXContentRegistry xContentRegistry;
private volatile Boolean filterByEnabled;

@Inject
public AnomalyDetectorJobTransportAction(
TransportService transportService,
ActionFilters actionFilters,
Client client,
ClusterService clusterService,
Settings settings,
AnomalyDetectionIndices anomalyDetectionIndices,
NamedXContentRegistry xContentRegistry
) {
super(AnomalyDetectorJobAction.NAME, transportService, actionFilters, AnomalyDetectorJobRequest::new);
this.client = client;
this.clusterService = clusterService;
this.settings = settings;
this.anomalyDetectionIndices = anomalyDetectionIndices;
this.xContentRegistry = xContentRegistry;
filterByEnabled = AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterByEnabled = it);
}

@Override
protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener<AnomalyDetectorJobResponse> listener) {
String detectorId = request.getDetectorID();
// By the time request reaches here, the user permissions are validated by Security plugin.
User user = getUserContext(client);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
resolveUserAndExecute(user, detectorId, listener, () -> adJobExecute(request, listener));
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

private void adJobExecute(AnomalyDetectorJobRequest request, ActionListener<AnomalyDetectorJobResponse> listener) {
String detectorId = request.getDetectorID();
long seqNo = request.getSeqNo();
long primaryTerm = request.getPrimaryTerm();
String rawPath = request.getRawPath();
TimeValue requestTimeout = REQUEST_TIMEOUT.get(settings);

// By the time request reaches here, the user permissions are validated by Security plugin.
// Since the detectorID is provided, this can only happen if User is part of a role which has access
// to the detector. This is filtered by our Search Detector API.

try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
IndexAnomalyDetectorJobActionHandler handler = new IndexAnomalyDetectorJobActionHandler(
client,
listener,
anomalyDetectionIndices,
detectorId,
seqNo,
primaryTerm,
requestTimeout,
xContentRegistry
);
IndexAnomalyDetectorJobActionHandler handler = new IndexAnomalyDetectorJobActionHandler(
client,
listener,
anomalyDetectionIndices,
detectorId,
seqNo,
primaryTerm,
requestTimeout,
xContentRegistry
);
try {
if (rawPath.endsWith(RestHandlerUtils.START_JOB)) {
handler.startAnomalyDetectorJob();
} else if (rawPath.endsWith(RestHandlerUtils.STOP_JOB)) {
handler.stopAnomalyDetectorJob(detectorId);
}
} catch (IOException e) {
} catch (Exception e) {
logger.error(e);
listener.onFailure(e);
}
}

private void resolveUserAndExecute(
User requestedUser,
String detectorId,
ActionListener<AnomalyDetectorJobResponse> listener,
AnomalyDetectorFunction function
) {
if (requestedUser == null) {
// Security is disabled or user is superadmin
function.execute();
} else if (!filterByEnabled) {
// security is enabled and filterby is disabled.
function.execute();
} else {
// security is enabled and filterby is enabled.
// Get detector and check if the user has permissions to access the detector
try {
getDetector(requestedUser, detectorId, listener, function, client, clusterService, xContentRegistry);
} catch (Exception e) {
listener.onFailure(e);
}
}
}
saratvemulapalli marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package com.amazon.opendistroforelasticsearch.ad.transport;

import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES;
import static com.amazon.opendistroforelasticsearch.ad.util.ParseUtils.getDetector;
import static com.amazon.opendistroforelasticsearch.ad.util.ParseUtils.getUserContext;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
Expand All @@ -35,6 +38,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -47,45 +51,77 @@
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorInternalState;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorFunction;
import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import com.amazon.opendistroforelasticsearch.commons.authuser.User;

public class DeleteAnomalyDetectorTransportAction extends HandledTransportAction<DeleteAnomalyDetectorRequest, DeleteResponse> {

private static final Logger LOG = LogManager.getLogger(DeleteAnomalyDetectorTransportAction.class);
private final Client client;
private final ClusterService clusterService;
private NamedXContentRegistry xContentRegistry;
private volatile Boolean filterByEnabled;

@Inject
public DeleteAnomalyDetectorTransportAction(
TransportService transportService,
ActionFilters actionFilters,
Client client,
ClusterService clusterService,
Settings settings,
NamedXContentRegistry xContentRegistry
) {
super(DeleteAnomalyDetectorAction.NAME, transportService, actionFilters, DeleteAnomalyDetectorRequest::new);
this.client = client;
this.clusterService = clusterService;
this.xContentRegistry = xContentRegistry;
filterByEnabled = AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterByEnabled = it);
}

@Override
protected void doExecute(Task task, DeleteAnomalyDetectorRequest request, ActionListener<DeleteResponse> listener) {
String detectorId = request.getDetectorID();
LOG.info("Delete anomaly detector job {}", detectorId);

User user = getUserContext(client);
// By the time request reaches here, the user permissions are validated by Security plugin.
// Since the detectorID is provided, this can only happen if User is part of a role which has access
// to the detector. This is filtered by our Search Detector API.
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
getDetectorJob(detectorId, listener, () -> deleteAnomalyDetectorJobDoc(detectorId, listener));
resolveUserAndExecute(
user,
detectorId,
listener,
() -> getDetectorJob(detectorId, listener, () -> deleteAnomalyDetectorJobDoc(detectorId, listener))
);
} catch (Exception e) {
LOG.error(e);
listener.onFailure(e);
}
}

private void resolveUserAndExecute(
User requestedUser,
String detectorId,
ActionListener<DeleteResponse> listener,
AnomalyDetectorFunction function
) {
if (requestedUser == null) {
// Security is disabled or user is superadmin
function.execute();
} else if (!filterByEnabled) {
// security is enabled and filterby is disabled.
function.execute();
} else {
// security is enabled and filterby is enabled.
// Get detector and check if the user has permissions to access the detector
try {
getDetector(requestedUser, detectorId, listener, function, client, clusterService, xContentRegistry);
} catch (Exception e) {
listener.onFailure(e);
}
}
}

private void deleteAnomalyDetectorJobDoc(String detectorId, ActionListener<DeleteResponse> listener) {
LOG.info("Delete anomaly detector job {}", detectorId);
DeleteRequest deleteRequest = new DeleteRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, detectorId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES;
import static com.amazon.opendistroforelasticsearch.ad.util.ParseUtils.getDetector;
import static com.amazon.opendistroforelasticsearch.ad.util.ParseUtils.getUserContext;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.PROFILE;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

Expand All @@ -37,9 +40,11 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
Expand All @@ -55,15 +60,18 @@
import com.amazon.opendistroforelasticsearch.ad.model.DetectorProfile;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorProfileName;
import com.amazon.opendistroforelasticsearch.ad.model.EntityProfileName;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorFunction;
import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import com.amazon.opendistroforelasticsearch.commons.authuser.User;
import com.google.common.collect.Sets;

public class GetAnomalyDetectorTransportAction extends HandledTransportAction<GetAnomalyDetectorRequest, GetAnomalyDetectorResponse> {

private static final Logger LOG = LogManager.getLogger(GetAnomalyDetectorTransportAction.class);

private final ClusterService clusterService;
private final Client client;

private final Set<String> allProfileTypeStrs;
Expand All @@ -74,16 +82,20 @@ public class GetAnomalyDetectorTransportAction extends HandledTransportAction<Ge
private final Set<EntityProfileName> defaultEntityProfileTypes;
private final NamedXContentRegistry xContentRegistry;
private final DiscoveryNodeFilterer nodeFilter;
private volatile Boolean filterByEnabled;

@Inject
public GetAnomalyDetectorTransportAction(
TransportService transportService,
DiscoveryNodeFilterer nodeFilter,
ActionFilters actionFilters,
ClusterService clusterService,
Client client,
Settings settings,
NamedXContentRegistry xContentRegistry
) {
super(GetAnomalyDetectorAction.NAME, transportService, actionFilters, GetAnomalyDetectorRequest::new);
this.clusterService = clusterService;
this.client = client;

List<DetectorProfileName> allProfiles = Arrays.asList(DetectorProfileName.values());
Expand All @@ -100,10 +112,46 @@ public GetAnomalyDetectorTransportAction(

this.xContentRegistry = xContentRegistry;
this.nodeFilter = nodeFilter;
filterByEnabled = AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(FILTER_BY_BACKEND_ROLES, it -> filterByEnabled = it);
}

@Override
protected void doExecute(Task task, GetAnomalyDetectorRequest request, ActionListener<GetAnomalyDetectorResponse> listener) {
String detectorID = request.getDetectorID();
User user = getUserContext(client);
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
resolveUserAndExecute(user, detectorID, listener, () -> getExecute(request, listener));
} catch (Exception e) {
LOG.error(e);
listener.onFailure(e);
}
}

private void resolveUserAndExecute(
User requestedUser,
String detectorId,
ActionListener<GetAnomalyDetectorResponse> listener,
AnomalyDetectorFunction function
) {
if (requestedUser == null) {
// Security is disabled or user is superadmin
function.execute();
} else if (!filterByEnabled) {
// security is enabled and filterby is disabled.
function.execute();
} else {
// security is enabled and filterby is enabled.
// Get detector and check if the user has permissions to access the detector
try {
getDetector(requestedUser, detectorId, listener, function, client, clusterService, xContentRegistry);
} catch (Exception e) {
listener.onFailure(e);
}
}
}

protected void getExecute(GetAnomalyDetectorRequest request, ActionListener<GetAnomalyDetectorResponse> listener) {
String detectorID = request.getDetectorID();
Long version = request.getVersion();
String typesStr = request.getTypeStr();
Expand All @@ -112,7 +160,7 @@ protected void doExecute(Task task, GetAnomalyDetectorRequest request, ActionLis
boolean all = request.isAll();
boolean returnJob = request.isReturnJob();

try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
try {
if (!Strings.isEmpty(typesStr) || rawPath.endsWith(PROFILE) || rawPath.endsWith(PROFILE + "/")) {
if (entityValue != null) {
Set<EntityProfileName> entityProfilesToCollect = getEntityProfilesToCollect(typesStr, all);
Expand Down
Loading