Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centralized Scheduled Job Identity Management using IdentityPlugin #394

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
*/
package org.opensearch.jobscheduler.spi;

import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.identity.tokens.AuthToken;
import org.opensearch.identity.tokens.AuthTokenNamedWriteableRegistry;
import org.opensearch.jobscheduler.spi.utils.LockService;

import java.io.IOException;
Expand All @@ -22,27 +25,32 @@ public class JobExecutionContext implements Writeable {
private final LockService lockService;
private final String jobIndexName;
private final String jobId;
private final AuthToken accessToken;

public JobExecutionContext(
Instant expectedExecutionTime,
JobDocVersion jobVersion,
LockService lockService,
String jobIndexName,
String jobId
String jobId,
AuthToken accessToken
) {
this.expectedExecutionTime = expectedExecutionTime;
this.jobVersion = jobVersion;
this.lockService = lockService;
this.jobIndexName = jobIndexName;
this.jobId = jobId;
this.accessToken = accessToken;
}

public JobExecutionContext(StreamInput in) throws IOException {
this.expectedExecutionTime = in.readInstant();
this.jobVersion = new JobDocVersion(in);
StreamInput nwin = new NamedWriteableAwareStreamInput(in, AuthTokenNamedWriteableRegistry.getNamedWriteableRegistry());
this.expectedExecutionTime = nwin.readInstant();
this.jobVersion = new JobDocVersion(nwin);
this.lockService = null;
this.jobIndexName = in.readString();
this.jobId = in.readString();
this.jobIndexName = nwin.readString();
this.jobId = nwin.readString();
this.accessToken = nwin.readOptionalNamedWriteable(AuthToken.class);
}

@Override
Expand All @@ -51,6 +59,7 @@ public void writeTo(StreamOutput out) throws IOException {
this.jobVersion.writeTo(out);
out.writeString(this.jobIndexName);
out.writeString(this.jobId);
out.writeOptionalNamedWriteable(this.accessToken);
}

public Instant getExpectedExecutionTime() {
Expand All @@ -73,4 +82,8 @@ public String getJobId() {
return this.jobId;
}

public AuthToken getAccessToken() {
return this.accessToken;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public void testSanity() throws Exception {
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
JOB_ID + uniqSuffix,
null
);
Instant testTime = Instant.now();
lockService.setTime(testTime);
Expand Down Expand Up @@ -136,7 +137,8 @@ public void testSanityWithCustomLockID() throws Exception {
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
JOB_ID + uniqSuffix,
null
);
Instant testTime = Instant.now();
lockService.setTime(testTime);
Expand Down Expand Up @@ -170,7 +172,8 @@ public void testSecondAcquireLockFail() throws Exception {
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
JOB_ID + uniqSuffix,
null
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand Down Expand Up @@ -199,7 +202,8 @@ public void testLockReleasedAndAcquired() throws Exception {
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
JOB_ID + uniqSuffix,
null
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand Down Expand Up @@ -233,7 +237,8 @@ public void testLockExpired() throws Exception {
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
JOB_ID + uniqSuffix,
null
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand Down Expand Up @@ -297,7 +302,8 @@ public void testMultiThreadCreateLock() throws Exception {
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
JOB_ID + uniqSuffix,
null
);

lockService.createLockIndex(ActionListener.wrap(created -> {
Expand Down Expand Up @@ -360,7 +366,8 @@ public void testMultiThreadAcquireLock() throws Exception {
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
JOB_ID + uniqSuffix,
null
);

lockService.createLockIndex(ActionListener.wrap(created -> {
Expand Down Expand Up @@ -427,7 +434,8 @@ public void testRenewLock() throws Exception {
new JobDocVersion(0, 0, 0),
lockService,
JOB_INDEX_NAME + uniqSuffix,
JOB_ID + uniqSuffix
JOB_ID + uniqSuffix,
null
);

lockService.acquireLockWithId(context.getJobIndexName(), LOCK_DURATION_SECONDS, lockID, ActionListener.wrap(lock -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,17 @@
package org.opensearch.jobscheduler;

import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.component.Lifecycle;
import org.opensearch.common.component.LifecycleComponent;
import org.opensearch.common.component.LifecycleListener;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.identity.IdentityService;
import org.opensearch.indices.IndicesService;
import org.opensearch.jobscheduler.rest.action.RestGetJobDetailsAction;
import org.opensearch.jobscheduler.rest.action.RestGetLockAction;
import org.opensearch.jobscheduler.rest.action.RestReleaseLockAction;
Expand Down Expand Up @@ -97,7 +103,7 @@ public Collection<Object> createComponents(
) {
this.lockService = new LockService(client, clusterService);
this.jobDetailsService = new JobDetailsService(client, clusterService, this.indicesToListen, this.indexToJobProviders);
this.scheduler = new JobScheduler(threadPool, this.lockService);
this.scheduler = new JobScheduler(threadPool, this.lockService, this.jobDetailsService);
this.sweeper = initSweeper(
environment.settings(),
client,
Expand Down Expand Up @@ -234,4 +240,54 @@ public List getRestHandlers(
return ImmutableList.of(restGetJobDetailsAction, restGetLockAction, restReleaseLockAction);
}

@Override
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {

final List<Class<? extends LifecycleComponent>> services = new ArrayList<>(1);
services.add(GuiceHolder.class);
return services;
}

public static class GuiceHolder implements LifecycleComponent {

private static IdentityService identityService;

private static IndicesService indicesService;

@Inject
public GuiceHolder(final IdentityService identityService, IndicesService indicesService) {
GuiceHolder.identityService = identityService;
GuiceHolder.indicesService = indicesService;
}

public static IdentityService getIdentityService() {
return identityService;
}

public static IndicesService getIndicesService() {
return indicesService;
}

@Override
public void close() {}

@Override
public Lifecycle.State lifecycleState() {
return null;
}

@Override
public void addLifecycleListener(LifecycleListener listener) {}

@Override
public void removeLifecycleListener(LifecycleListener listener) {}

@Override
public void start() {}

@Override
public void stop() {}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.jobscheduler.rest.action;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.node.NodeClient;
import org.opensearch.jobscheduler.JobSchedulerPlugin;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestStatus;

import java.io.IOException;
import java.util.List;
import java.util.Locale;

import com.google.common.collect.ImmutableList;
import static org.opensearch.rest.RestRequest.Method.GET;

import static org.opensearch.jobscheduler.spi.LockModel.GET_LOCK_ACTION;

/**
* This class consists of the REST handler to GET a lock model for extensions
*/
public class RestRefreshTokenAction extends BaseRestHandler {
private final Logger logger = LogManager.getLogger(RestRefreshTokenAction.class);

public LockService lockService;

public RestRefreshTokenAction(final LockService lockService) {
this.lockService = lockService;
}

@Override
public String getName() {
return GET_LOCK_ACTION;
}

@Override
public List<Route> routes() {
return ImmutableList.of(new Route(GET, String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_lock")));
}

@VisibleForTesting
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
return channel -> { channel.sendResponse(new BytesRestResponse(RestStatus.OK, "Not implemented yet")); };
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
*/
package org.opensearch.jobscheduler.scheduler;

import org.opensearch.identity.tokens.AuthToken;
import org.opensearch.jobscheduler.JobSchedulerPlugin;
import org.opensearch.jobscheduler.model.JobDetails;
import org.opensearch.jobscheduler.spi.JobExecutionContext;
import org.opensearch.jobscheduler.spi.ScheduledJobParameter;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.JobDocVersion;
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.jobscheduler.utils.JobDetailsService;
import org.opensearch.jobscheduler.utils.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -28,6 +31,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand All @@ -42,11 +46,14 @@ public class JobScheduler {
private Clock clock;
private final LockService lockService;

public JobScheduler(ThreadPool threadPool, final LockService lockService) {
private final JobDetailsService jobDetailsService;

public JobScheduler(ThreadPool threadPool, final LockService lockService, final JobDetailsService jobDetailsService) {
this.threadPool = threadPool;
this.scheduledJobInfo = new ScheduledJobInfo();
this.clock = Clock.systemDefaultZone();
this.lockService = lockService;
this.jobDetailsService = jobDetailsService;
}

@VisibleForTesting
Expand Down Expand Up @@ -182,13 +189,31 @@ boolean reschedule(
// schedule next execution
this.reschedule(jobParameter, jobInfo, jobRunner, version, jitterLimit);

JobDetails entry = jobDetailsService.findJobDetailsByJobIndex(jobInfo.getIndexName());

JobExecutionContext context;
AuthToken accessToken = null;
if (JobSchedulerPlugin.GuiceHolder.getIdentityService() != null
&& JobSchedulerPlugin.GuiceHolder.getIdentityService().getScheduledJobIdentityManager() != null
&& entry != null
&& entry.getExtensionUniqueId() != null) {
accessToken = JobSchedulerPlugin.GuiceHolder.getIdentityService()
.getScheduledJobIdentityManager()
.issueAccessTokenOnBehalfOfOperator(
jobInfo.getJobId(),
jobInfo.getIndexName(),
Optional.of(entry.getExtensionUniqueId())
);
}

// invoke job runner
JobExecutionContext context = new JobExecutionContext(
context = new JobExecutionContext(
jobInfo.getExpectedPreviousExecutionTime(),
version,
lockService,
this.lockService,
jobInfo.getIndexName(),
jobInfo.getJobId()
jobInfo.getJobId(),
accessToken
);

jobRunner.runJob(jobParameter, context);
Expand Down
Loading