Skip to content

Commit

Permalink
Remove unused methods (#21988)
Browse files Browse the repository at this point in the history
* Remove unused methods

* Rename test variable
  • Loading branch information
benmoriceau authored Jan 27, 2023
1 parent c0d3900 commit ddfd82b
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,6 @@ class AttemptCreationInput {

}

@Data
@NoArgsConstructor
@AllArgsConstructor
class AttemptCreationOutput {

private Integer attemptId;

}

/**
* Create a new attempt for a given job ID
*
* @param input POJO containing the jobId
* @return A POJO containing the attemptId
*/
@ActivityMethod
AttemptCreationOutput createNewAttempt(AttemptCreationInput input) throws RetryableException;

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand All @@ -89,24 +71,6 @@ class AttemptNumberCreationOutput {
@ActivityMethod
AttemptNumberCreationOutput createNewAttemptNumber(AttemptCreationInput input) throws RetryableException;

@Data
@NoArgsConstructor
@AllArgsConstructor
class JobSuccessInput {

private Long jobId;
private Integer attemptId;
private UUID connectionId;
private StandardSyncOutput standardSyncOutput;

}

/**
* Set a job status as successful
*/
@ActivityMethod
void jobSuccess(JobSuccessInput input);

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand Down Expand Up @@ -143,25 +107,6 @@ class JobFailureInput {
@ActivityMethod
void jobFailure(JobFailureInput input);

@Data
@NoArgsConstructor
@AllArgsConstructor
class AttemptFailureInput {

private Long jobId;
private Integer attemptId;
private UUID connectionId;
private StandardSyncOutput standardSyncOutput;
private AttemptFailureSummary attemptFailureSummary;

}

/**
* Set an attempt status as failed
*/
@ActivityMethod
void attemptFailure(AttemptFailureInput input);

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand All @@ -181,24 +126,6 @@ class AttemptNumberFailureInput {
@ActivityMethod
void attemptFailureWithAttemptNumber(AttemptNumberFailureInput input);

@Data
@NoArgsConstructor
@AllArgsConstructor
class JobCancelledInput {

private Long jobId;
private Integer attemptId;
private UUID connectionId;
private AttemptFailureSummary attemptFailureSummary;

}

/**
* Set a job status as cancelled
*/
@ActivityMethod
void jobCancelled(JobCancelledInput input);

@Data
@NoArgsConstructor
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,27 +196,6 @@ private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID ds
}
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) throws RetryableException {
try {
final long jobId = input.getJobId();
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId));
final Job job = jobPersistence.getJob(jobId);

final WorkerRun workerRun = temporalWorkerRunFactory.create(job);
final Path logFilePath = workerRun.getJobRoot().resolve(LogClientSingleton.LOG_FILENAME);
final int persistedAttemptNumber = jobPersistence.createAttempt(jobId, logFilePath);
emitJobIdToReleaseStagesMetric(OssMetricsRegistry.ATTEMPT_CREATED_BY_RELEASE_STAGE, jobId);
emitAttemptCreatedEvent(job, persistedAttemptNumber);

LogClientSingleton.getInstance().setJobMdc(workerEnvironment, logConfigs, workerRun.getJobRoot());
return new AttemptCreationOutput(persistedAttemptNumber);
} catch (final IOException e) {
throw new RetryableException(e);
}
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationInput input) throws RetryableException {
Expand All @@ -240,43 +219,33 @@ public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationI

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void jobSuccess(final JobSuccessInput input) {
public void jobSuccessWithAttemptNumber(final JobSuccessInputWithAttemptNumber input) {
ApmTraceUtils.addTagsToTrace(
Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));
try {
final long jobId = input.getJobId();
final int attemptId = input.getAttemptId();
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId));
final int attemptNumber = input.getAttemptNumber();
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptNumber, JOB_ID_KEY, jobId));

if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
jobPersistence.writeOutput(jobId, attemptId, jobOutput);
jobPersistence.writeOutput(jobId, attemptNumber, jobOutput);
} else {
log.warn("The job {} doesn't have any output for the attempt {}", jobId, attemptId);
log.warn("The job {} doesn't have any output for the attempt {}", jobId, attemptNumber);
}
jobPersistence.succeedAttempt(jobId, attemptId);
jobPersistence.succeedAttempt(jobId, attemptNumber);
emitJobIdToReleaseStagesMetric(OssMetricsRegistry.ATTEMPT_SUCCEEDED_BY_RELEASE_STAGE, jobId);
final Job job = jobPersistence.getJob(jobId);

jobNotifier.successJob(job);
emitJobIdToReleaseStagesMetric(OssMetricsRegistry.JOB_SUCCEEDED_BY_RELEASE_STAGE, jobId);
trackCompletion(job, JobStatus.SUCCEEDED);
} catch (final IOException e) {
trackCompletionForInternalFailure(input.getJobId(), input.getConnectionId(), input.getAttemptId(), JobStatus.SUCCEEDED, e);
trackCompletionForInternalFailure(input.getJobId(), input.getConnectionId(), input.getAttemptNumber(), JobStatus.SUCCEEDED, e);
throw new RetryableException(e);
}
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void jobSuccessWithAttemptNumber(final JobSuccessInputWithAttemptNumber input) {
ApmTraceUtils.addTagsToTrace(
Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));
jobSuccess(new JobSuccessInput(
input.getJobId(),
input.getAttemptNumber(),
input.getConnectionId(),
input.getStandardSyncOutput()));
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void jobFailure(final JobFailureInput input) {
Expand Down Expand Up @@ -305,21 +274,24 @@ public void jobFailure(final JobFailureInput input) {

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void attemptFailure(final AttemptFailureInput input) {
public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput input) {
ApmTraceUtils.addTagsToTrace(
Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));

try {
final int attemptId = input.getAttemptId();
final int attemptNumber = input.getAttemptNumber();
final long jobId = input.getJobId();
final AttemptFailureSummary failureSummary = input.getAttemptFailureSummary();

ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId));
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptNumber, JOB_ID_KEY, jobId));
traceFailures(failureSummary);

jobPersistence.failAttempt(jobId, attemptId);
jobPersistence.writeAttemptFailureSummary(jobId, attemptId, failureSummary);
jobPersistence.failAttempt(jobId, attemptNumber);
jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber, failureSummary);

if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
jobPersistence.writeOutput(jobId, attemptId, jobOutput);
jobPersistence.writeOutput(jobId, attemptNumber, jobOutput);
}

emitJobIdToReleaseStagesMetric(OssMetricsRegistry.ATTEMPT_FAILED_BY_RELEASE_STAGE, jobId);
Expand All @@ -331,51 +303,28 @@ public void attemptFailure(final AttemptFailureInput input) {

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput input) {
public void jobCancelledWithAttemptNumber(final JobCancelledInputWithAttemptNumber input) {
ApmTraceUtils.addTagsToTrace(
Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));
attemptFailure(new AttemptFailureInput(
input.getJobId(),
input.getAttemptNumber(),
input.getConnectionId(),
input.getStandardSyncOutput(),
input.getAttemptFailureSummary()));
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void jobCancelled(final JobCancelledInput input) {
try {
final long jobId = input.getJobId();
final int attemptId = input.getAttemptId();
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId));
jobPersistence.failAttempt(jobId, attemptId);
jobPersistence.writeAttemptFailureSummary(jobId, attemptId, input.getAttemptFailureSummary());
final int attemptNumber = input.getAttemptNumber();
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptNumber, JOB_ID_KEY, jobId));
jobPersistence.failAttempt(jobId, attemptNumber);
jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber, input.getAttemptFailureSummary());
jobPersistence.cancelJob(jobId);

final Job job = jobPersistence.getJob(jobId);
emitJobIdToReleaseStagesMetric(OssMetricsRegistry.JOB_CANCELLED_BY_RELEASE_STAGE, jobId);
jobNotifier.failJob("Job was cancelled", job);
trackCompletion(job, JobStatus.FAILED);
} catch (final IOException e) {
trackCompletionForInternalFailure(input.getJobId(), input.getConnectionId(), input.getAttemptId(), JobStatus.FAILED, e);
trackCompletionForInternalFailure(input.getJobId(), input.getConnectionId(), input.getAttemptNumber(), JobStatus.FAILED, e);
throw new RetryableException(e);
}
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void jobCancelledWithAttemptNumber(final JobCancelledInputWithAttemptNumber input) {
ApmTraceUtils.addTagsToTrace(
Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptNumber(), CONNECTION_ID_KEY, input.getConnectionId(), JOB_ID_KEY, input.getJobId()));

jobCancelled(new JobCancelledInput(
input.getJobId(),
input.getAttemptNumber(),
input.getConnectionId(),
input.getAttemptFailureSummary()));
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public void reportJobStart(final ReportJobStartInput input) {
Expand Down
Loading

0 comments on commit ddfd82b

Please sign in to comment.