Skip to content

Commit

Permalink
Rbroughan/retry states api handler (#7484)
Browse files Browse the repository at this point in the history
  • Loading branch information
tryangul committed Jun 27, 2023
1 parent 35d7652 commit 09cec49
Show file tree
Hide file tree
Showing 11 changed files with 595 additions and 3 deletions.
85 changes: 85 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2781,6 +2781,40 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/StreamStatusRead"
/v1/jobs/retry_states/create_or_update:
post:
summary: Creates or updates a retry state for a job.
tags:
- job_retry_states
operationId: createOrUpdate
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/JobRetryStateRequestBody"
responses:
"204":
description: Successfully put retry state.
/v1/jobs/retry_states/get:
post:
summary: Gets a retry state.
tags:
- job_retry_states
operationId: get
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/JobIdRequestBody"
responses:
"200":
description: Successfully retrieved retry state for a job.
content:
application/json:
schema:
$ref: "#/components/schemas/RetryStateRead"
"404":
$ref: "#/components/responses/NotFoundResponse"
components:
securitySchemes:
bearerAuth:
Expand Down Expand Up @@ -6262,6 +6296,57 @@ components:
$ref: "#/components/schemas/SlackNotificationConfiguration"
notificationTrigger:
$ref: "#/components/schemas/NotificationTrigger"
RetryStateRead:
type: object
required:
- id
- jobId
- connectionId
- successiveCompleteFailures
- totalCompleteFailures
- successivePartialFailures
- totalPartialFailures
properties:
id:
type: string
format: uuid
connectionId:
$ref: "#/components/schemas/ConnectionId"
jobId:
$ref: "#/components/schemas/JobId"
successiveCompleteFailures:
type: integer
totalCompleteFailures:
type: integer
successivePartialFailures:
type: integer
totalPartialFailures:
type: integer
JobRetryStateRequestBody:
type: object
required:
- connectionId
- jobId
- successiveCompleteFailures
- totalCompleteFailures
- successivePartialFailures
- totalPartialFailures
properties:
id:
type: string
format: uuid
connectionId:
$ref: "#/components/schemas/ConnectionId"
jobId:
$ref: "#/components/schemas/JobId"
successiveCompleteFailures:
type: integer
totalCompleteFailures:
type: integer
successivePartialFailures:
type: integer
totalPartialFailures:
type: integer
InvalidInputProperty:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public String getId() {
}

/**
* Get additional info about the not found rsource.
* Get additional info about the not found resource.
*
* @return info
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis;

import static io.airbyte.commons.auth.AuthRoleConstants.ADMIN;

import io.airbyte.api.generated.JobRetryStatesApi;
import io.airbyte.api.model.generated.JobIdRequestBody;
import io.airbyte.api.model.generated.JobRetryStateRequestBody;
import io.airbyte.api.model.generated.RetryStateRead;
import io.airbyte.commons.server.errors.IdNotFoundKnownException;
import io.airbyte.commons.server.scheduling.AirbyteTaskExecutors;
import io.airbyte.server.handlers.RetryStatesHandler;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.annotation.Status;
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.security.annotation.Secured;

@SuppressWarnings("MissingJavadocType")
@Controller("/api/v1/jobs/retry_states")
public class JobRetryStatesApiController implements JobRetryStatesApi {

private final RetryStatesHandler handler;

@SuppressWarnings("MissingJavadocType")
public JobRetryStatesApiController(final RetryStatesHandler handler) {
this.handler = handler;
}

@Secured({ADMIN})
@ExecuteOn(AirbyteTaskExecutors.IO)
@Post(uri = "/get")
@Override
public RetryStateRead get(final JobIdRequestBody req) {
final var found = handler.getByJobId(req);

if (found.isEmpty()) {
throw new IdNotFoundKnownException(String.format("Could not find Retry State for job_id: %d.", req.getId()), String.valueOf(req.getId()));
}

return found.get();
}

@Secured({ADMIN})
@ExecuteOn(AirbyteTaskExecutors.IO)
@Post(uri = "/create_or_update")
@Override
@Status(HttpStatus.NO_CONTENT)
public void createOrUpdate(final JobRetryStateRequestBody req) {
handler.putByJobId(req);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.handlers;

import io.airbyte.api.model.generated.JobIdRequestBody;
import io.airbyte.api.model.generated.JobRetryStateRequestBody;
import io.airbyte.api.model.generated.RetryStateRead;
import io.airbyte.server.handlers.api_domain_mapping.RetryStatesMapper;
import io.airbyte.server.repositories.RetryStatesRepository;
import jakarta.inject.Singleton;
import java.util.Optional;

/**
* Interface layer between the API and Persistence layers.
*/
@SuppressWarnings("MissingJavadocMethod")
@Singleton
public class RetryStatesHandler {

final RetryStatesRepository repo;
final RetryStatesMapper mapper;

public RetryStatesHandler(final RetryStatesRepository repo, final RetryStatesMapper mapper) {
this.repo = repo;
this.mapper = mapper;
}

public Optional<RetryStateRead> getByJobId(final JobIdRequestBody req) {
final var found = repo.findByJobId(req.getId());

return found.map(mapper::map);
}

public void putByJobId(final JobRetryStateRequestBody req) {
final var model = mapper.map(req);

repo.createOrUpdateByJobId(model.getJobId(), model);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.handlers.api_domain_mapping;

import io.airbyte.api.model.generated.JobRetryStateRequestBody;
import io.airbyte.api.model.generated.RetryStateRead;
import io.airbyte.server.repositories.domain.RetryState;
import jakarta.inject.Singleton;

/**
* Maps between the API and Persistence layers. It is not static to be injectable and enable easier
* testing in dependents.
*/
@Singleton
@SuppressWarnings("MissingJavadocMethod")
public class RetryStatesMapper {

// API to Domain
public RetryState map(final JobRetryStateRequestBody api) {
return RetryState.builder()
.id(api.getId())
.connectionId(api.getConnectionId())
.jobId(api.getJobId())
.successiveCompleteFailures(api.getSuccessiveCompleteFailures())
.totalCompleteFailures(api.getTotalCompleteFailures())
.successivePartialFailures(api.getSuccessivePartialFailures())
.totalPartialFailures(api.getTotalPartialFailures())
.build();
}

// Domain to API
public RetryStateRead map(final RetryState domain) {
return new RetryStateRead()
.id(domain.getId())
.connectionId(domain.getConnectionId())
.jobId(domain.getJobId())
.successiveCompleteFailures(domain.getSuccessiveCompleteFailures())
.totalCompleteFailures(domain.getTotalCompleteFailures())
.successivePartialFailures(domain.getSuccessivePartialFailures())
.totalPartialFailures(domain.getTotalPartialFailures());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,24 @@
import java.util.Optional;
import java.util.UUID;

@SuppressWarnings("MissingJavadocType")
@SuppressWarnings({"MissingJavadocMethod", "MissingJavadocType"})
@JdbcRepository(dialect = Dialect.POSTGRES)
public interface RetryStatesRepository extends PageableRepository<RetryState, UUID> {

Optional<RetryState> findByJobId(final Long jobId);

void updateByJobId(final Long jobId, final RetryState update);

boolean existsByJobId(final long jobId);

default void createOrUpdateByJobId(final long jobId, final RetryState payload) {
final var exists = existsByJobId(jobId);

if (exists) {
updateByJobId(jobId, payload);
} else {
save(payload);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* DTO for our data access layer.
*/
@Builder
@Builder(toBuilder = true)
@AllArgsConstructor
@Getter
@EqualsAndHashCode(exclude = {"id", "createdAt", "updatedAt"})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.apis;

import static org.mockito.Mockito.when;

import io.airbyte.api.model.generated.JobIdRequestBody;
import io.airbyte.api.model.generated.JobRetryStateRequestBody;
import io.airbyte.api.model.generated.RetryStateRead;
import io.airbyte.commons.json.Jsons;
import io.airbyte.server.handlers.RetryStatesHandler;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpStatus;
import io.micronaut.test.annotation.MockBean;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.Mockito;

@SuppressWarnings({"PMD.JUnitTestsShouldIncludeAssert", "MissingJavadocType"})
@MicronautTest
@Requires(property = "mockito.test.enabled",
defaultValue = StringUtils.TRUE,
value = StringUtils.TRUE)
@Requires(env = {Environment.TEST})
class JobRetryStatesApiControllerTest extends BaseControllerTest {

@Mock
RetryStatesHandler handler = Mockito.mock(RetryStatesHandler.class);

@MockBean(RetryStatesHandler.class)
@Replaces(RetryStatesHandler.class)
RetryStatesHandler mmStreamStatusesHandler() {
return handler;
}

static String PATH_BASE = "/api/v1/jobs/retry_states";
static String PATH_GET = PATH_BASE + "/get";
static String PATH_PUT = PATH_BASE + "/create_or_update";

@Test
void getForJobFound() throws Exception {
when(handler.getByJobId(Mockito.any()))
.thenReturn(Optional.of(new RetryStateRead()));

testEndpointStatus(
HttpRequest.POST(
PATH_GET,
Jsons.serialize(Fixtures.jobIdReq())),
HttpStatus.OK);
}

@Test
void getForJobNotFound() throws Exception {
when(handler.getByJobId(Mockito.any()))
.thenReturn(Optional.empty());

testErrorEndpointStatus(
HttpRequest.POST(
PATH_GET,
Jsons.serialize(Fixtures.jobIdReq())),
HttpStatus.NOT_FOUND);
}

@Test
void putForJob() throws Exception {
testEndpointStatus(
HttpRequest.POST(
PATH_PUT,
Jsons.serialize(Fixtures.retryPutReq())),
HttpStatus.NO_CONTENT);
}

static class Fixtures {

static long jobId1 = 21891253;

static JobIdRequestBody jobIdReq() {
return new JobIdRequestBody().id(jobId1);
}

static JobRetryStateRequestBody retryPutReq() {
return new JobRetryStateRequestBody()
.id(UUID.randomUUID())
.connectionId(UUID.randomUUID())
.jobId(jobId1)
.successiveCompleteFailures(8)
.totalCompleteFailures(12)
.successivePartialFailures(4)
.totalPartialFailures(42);
}

}

}
Loading

0 comments on commit 09cec49

Please sign in to comment.