Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce external job activation API #11706

Closed
Tracked by #11231
npepinpe opened this issue Feb 16, 2023 · 1 comment · Fixed by #11889
Closed
Tracked by #11231

Introduce external job activation API #11706

npepinpe opened this issue Feb 16, 2023 · 1 comment · Fixed by #11889
Assignees
Labels
component/broker component/engine component/stream-platform kind/toil Categorizes an issue or PR as general maintenance, i.e. cleanup, refactoring, etc. version:8.2.0 Marks an issue as being completely or in parts released in 8.2.0

Comments

@npepinpe
Copy link
Member

npepinpe commented Feb 16, 2023

Description

In order for both the ZPA and ZDP teams to be able to work independently on the job push epic, we should introduce the API for job activation/pushing between the engine and the broker as soon as possible.

Job activation is predicated on workers/streams being available to receive jobs. Since processing in the engine is synchronous, the API should be blocking/synchronous as well.

The engine should be able to query the API for a worker by job type. If there is none, then the job is not activated. If there is one, then the API must return all the properties required to activate the job, and a consumer for activated job.

The engine should activate the job with the given properties (e.g. deadline, worker, etc.). Once activated, it will collect the variables for that job (based on the worker's properties), and pass the job to the consumer. After that, the job should be guaranteed to be activated and locked (based on the deadline).

In the prototype, I didn't differentiate between workers, so I was ignoring all the activation properties. That said, here's what it looked like:

/**
 * Implementations of this interface help the engine decide whether to immediately activate a job
 * when it's made available. This is primarily used to immediately push jobs out to external workers
 * without going through a poll-request-cycle.
 */
@FunctionalInterface
public interface ExternalJobActivator {

  Optional<Handler> activateJob(final DirectBuffer type);

  @FunctionalInterface
  interface Handler {
    void handle(final long key, final JobRecord job);
  }
}

We would need to adapt it such that it returns a type which can supply activation properties and the handler/consumer.

Activation properties as far as I can tell are:

  • Worker name
  • Fetch variables (e.g. the variables to return with the job)
  • Deadline/timeout

Initially, this interface can replace the existing LongPollingJobNotification and the consumer for job types. The initial implementation can however still do the exact same thing:

  • Always return no activation properties
  • Send out the job available notification whenever the engine queries the API

Activating the job, collecting the variables, and pushing the job can be done in one or more follow up issues by the ZPA team once this is done.

Regarding observability, I don't think we need any special metrics here just yet.

@npepinpe
Copy link
Member Author

npepinpe commented Mar 2, 2023

After some mob programming, two options, a job specific one, and a more generic one.

public interface ExternalJobActivator {

  Optional<JobActivator> activate(final DirectBuffer jobType);

  interface JobActivator {
    JobConsumer consumer();

    ActivationProperties properties();
  }

  interface JobConsumer {
    void consume(final long jobKey, final JobRecord record);
  }

  interface ActivationProperties {
    String worker();

    Collection<String> fetchVariables();

    long timeout();
  }
}

And more generic:

public interface GatewayStreamer<Meta extends Metadata, Data> {
  Optional<GatewayStream<Meta, Data>> streamFor(final DirectBuffer streamId);

  interface GatewayStream<Meta extends Metadata, Data> {
    Meta metadata();

    void push(final Data payload, ErrorHandler<Data> errorHandler);

    interface Metadata {}
  }

  interface ErrorHandler<T> {
    void handleError(final Throwable error, T data);
  }
}

The generic one could be used as:

interface ActivatedJob {
  long jobKey();
  JobRecord record();
}

interface JobActivationProperties extends Metadata {
  String worker();
  Collection<String> fetchVariables();
  long timeout();
}

static void activateAndPushJob(final DirectBuffer jobType) {
  final GatewayStreamer<JobActivationProperties, ActivatedJob> jobStreamer = ...;
  final Optional<GatewayStream<JobActivationProperties, ActivatedJob>> foo = jobStreamer.streamFor(jobType);
  final ProcessingResultBuilder result;
  final ProcessingScheduleService scheduleService;

  if (foo.isPresent()) {
    final GatewayStream<JobActivationProperties, ActivatedJob> stream = foo.get();
    final ActivatedJob r = activateJob(remoteStream.metadata()); // make sure this is immutable
    result.appendPostCommitTask(
        () -> {
          stream.push(
              r,
              (error, job) ->
                  scheduleService.runDelayed(
                      Duration.ZERO,
                      errorResult ->
                          errorResult.appendCommandRecord(job.key, JobIntent.FAIL, job.job)));
          return true;
        });
  }
}

// Used somewhere in the engine to activate the job
static ActivatedJob activateJob(final JobActivationProperties properties) {
  // something
}

The advantage of the more generic one is that the ZPA team, which owns the user facing parts of the feature, can independently modify what is pushed to the client, and what are the required properties for activation, and also intelligently handle errors (i.e. how to yield a job back to the engine).

The downside is obviously an extra layer of abstraction without any clear future use cases for this.

@ghost ghost closed this as completed in da8b669 Mar 3, 2023
@npepinpe npepinpe added the version:8.2.0 Marks an issue as being completely or in parts released in 8.2.0 label Apr 5, 2023
This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/broker component/engine component/stream-platform kind/toil Categorizes an issue or PR as general maintenance, i.e. cleanup, refactoring, etc. version:8.2.0 Marks an issue as being completely or in parts released in 8.2.0
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants