From 09cec49b45ac1d918c72473f987b511838ab5051 Mon Sep 17 00:00:00 2001 From: Ryan Br Date: Tue, 27 Jun 2023 14:38:48 -0700 Subject: [PATCH] Rbroughan/retry states api handler (#7484) --- airbyte-api/src/main/openapi/config.yaml | 85 +++++++++++++ .../errors/IdNotFoundKnownException.java | 2 +- .../apis/JobRetryStatesApiController.java | 57 +++++++++ .../server/handlers/RetryStatesHandler.java | 42 +++++++ .../api_domain_mapping/RetryStatesMapper.java | 45 +++++++ .../repositories/RetryStatesRepository.java | 14 ++- .../repositories/domain/RetryState.java | 2 +- .../apis/JobRetryStatesApiControllerTest.java | 103 ++++++++++++++++ .../handlers/RetryStatesHandlerTest.java | 78 ++++++++++++ .../RetryStatesMapperTest.java | 113 ++++++++++++++++++ .../RetryStatesRepositoryTest.java | 57 +++++++++ 11 files changed, 595 insertions(+), 3 deletions(-) create mode 100644 airbyte-server/src/main/java/io/airbyte/server/apis/JobRetryStatesApiController.java create mode 100644 airbyte-server/src/main/java/io/airbyte/server/handlers/RetryStatesHandler.java create mode 100644 airbyte-server/src/main/java/io/airbyte/server/handlers/api_domain_mapping/RetryStatesMapper.java create mode 100644 airbyte-server/src/test/java/io/airbyte/server/apis/JobRetryStatesApiControllerTest.java create mode 100644 airbyte-server/src/test/java/io/airbyte/server/handlers/RetryStatesHandlerTest.java create mode 100644 airbyte-server/src/test/java/io/airbyte/server/handlers/api_domain_mapping/RetryStatesMapperTest.java diff --git a/airbyte-api/src/main/openapi/config.yaml b/airbyte-api/src/main/openapi/config.yaml index 6a4e24f4533..01440407705 100644 --- a/airbyte-api/src/main/openapi/config.yaml +++ b/airbyte-api/src/main/openapi/config.yaml @@ -2781,6 +2781,40 @@ paths: application/json: schema: $ref: "#/components/schemas/StreamStatusRead" + /v1/jobs/retry_states/create_or_update: + post: + summary: Creates or updates a retry state for a job. + tags: + - job_retry_states + operationId: createOrUpdate + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/JobRetryStateRequestBody" + responses: + "204": + description: Successfully put retry state. + /v1/jobs/retry_states/get: + post: + summary: Gets a retry state. + tags: + - job_retry_states + operationId: get + requestBody: + content: + application/json: + schema: + $ref: "#/components/schemas/JobIdRequestBody" + responses: + "200": + description: Successfully retrieved retry state for a job. + content: + application/json: + schema: + $ref: "#/components/schemas/RetryStateRead" + "404": + $ref: "#/components/responses/NotFoundResponse" components: securitySchemes: bearerAuth: @@ -6262,6 +6296,57 @@ components: $ref: "#/components/schemas/SlackNotificationConfiguration" notificationTrigger: $ref: "#/components/schemas/NotificationTrigger" + RetryStateRead: + type: object + required: + - id + - jobId + - connectionId + - successiveCompleteFailures + - totalCompleteFailures + - successivePartialFailures + - totalPartialFailures + properties: + id: + type: string + format: uuid + connectionId: + $ref: "#/components/schemas/ConnectionId" + jobId: + $ref: "#/components/schemas/JobId" + successiveCompleteFailures: + type: integer + totalCompleteFailures: + type: integer + successivePartialFailures: + type: integer + totalPartialFailures: + type: integer + JobRetryStateRequestBody: + type: object + required: + - connectionId + - jobId + - successiveCompleteFailures + - totalCompleteFailures + - successivePartialFailures + - totalPartialFailures + properties: + id: + type: string + format: uuid + connectionId: + $ref: "#/components/schemas/ConnectionId" + jobId: + $ref: "#/components/schemas/JobId" + successiveCompleteFailures: + type: integer + totalCompleteFailures: + type: integer + successivePartialFailures: + type: integer + totalPartialFailures: + type: integer InvalidInputProperty: type: object required: diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/errors/IdNotFoundKnownException.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/errors/IdNotFoundKnownException.java index c9d58491202..b93270a25d6 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/errors/IdNotFoundKnownException.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/errors/IdNotFoundKnownException.java @@ -38,7 +38,7 @@ public String getId() { } /** - * Get additional info about the not found rsource. + * Get additional info about the not found resource. * * @return info */ diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/JobRetryStatesApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/JobRetryStatesApiController.java new file mode 100644 index 00000000000..f980b7f8c84 --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/JobRetryStatesApiController.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.apis; + +import static io.airbyte.commons.auth.AuthRoleConstants.ADMIN; + +import io.airbyte.api.generated.JobRetryStatesApi; +import io.airbyte.api.model.generated.JobIdRequestBody; +import io.airbyte.api.model.generated.JobRetryStateRequestBody; +import io.airbyte.api.model.generated.RetryStateRead; +import io.airbyte.commons.server.errors.IdNotFoundKnownException; +import io.airbyte.commons.server.scheduling.AirbyteTaskExecutors; +import io.airbyte.server.handlers.RetryStatesHandler; +import io.micronaut.http.HttpStatus; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Post; +import io.micronaut.http.annotation.Status; +import io.micronaut.scheduling.annotation.ExecuteOn; +import io.micronaut.security.annotation.Secured; + +@SuppressWarnings("MissingJavadocType") +@Controller("/api/v1/jobs/retry_states") +public class JobRetryStatesApiController implements JobRetryStatesApi { + + private final RetryStatesHandler handler; + + @SuppressWarnings("MissingJavadocType") + public JobRetryStatesApiController(final RetryStatesHandler handler) { + this.handler = handler; + } + + @Secured({ADMIN}) + @ExecuteOn(AirbyteTaskExecutors.IO) + @Post(uri = "/get") + @Override + public RetryStateRead get(final JobIdRequestBody req) { + final var found = handler.getByJobId(req); + + if (found.isEmpty()) { + throw new IdNotFoundKnownException(String.format("Could not find Retry State for job_id: %d.", req.getId()), String.valueOf(req.getId())); + } + + return found.get(); + } + + @Secured({ADMIN}) + @ExecuteOn(AirbyteTaskExecutors.IO) + @Post(uri = "/create_or_update") + @Override + @Status(HttpStatus.NO_CONTENT) + public void createOrUpdate(final JobRetryStateRequestBody req) { + handler.putByJobId(req); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/RetryStatesHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/RetryStatesHandler.java new file mode 100644 index 00000000000..a4537b7d88f --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/RetryStatesHandler.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.handlers; + +import io.airbyte.api.model.generated.JobIdRequestBody; +import io.airbyte.api.model.generated.JobRetryStateRequestBody; +import io.airbyte.api.model.generated.RetryStateRead; +import io.airbyte.server.handlers.api_domain_mapping.RetryStatesMapper; +import io.airbyte.server.repositories.RetryStatesRepository; +import jakarta.inject.Singleton; +import java.util.Optional; + +/** + * Interface layer between the API and Persistence layers. + */ +@SuppressWarnings("MissingJavadocMethod") +@Singleton +public class RetryStatesHandler { + + final RetryStatesRepository repo; + final RetryStatesMapper mapper; + + public RetryStatesHandler(final RetryStatesRepository repo, final RetryStatesMapper mapper) { + this.repo = repo; + this.mapper = mapper; + } + + public Optional getByJobId(final JobIdRequestBody req) { + final var found = repo.findByJobId(req.getId()); + + return found.map(mapper::map); + } + + public void putByJobId(final JobRetryStateRequestBody req) { + final var model = mapper.map(req); + + repo.createOrUpdateByJobId(model.getJobId(), model); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/api_domain_mapping/RetryStatesMapper.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/api_domain_mapping/RetryStatesMapper.java new file mode 100644 index 00000000000..80b3043211e --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/api_domain_mapping/RetryStatesMapper.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.handlers.api_domain_mapping; + +import io.airbyte.api.model.generated.JobRetryStateRequestBody; +import io.airbyte.api.model.generated.RetryStateRead; +import io.airbyte.server.repositories.domain.RetryState; +import jakarta.inject.Singleton; + +/** + * Maps between the API and Persistence layers. It is not static to be injectable and enable easier + * testing in dependents. + */ +@Singleton +@SuppressWarnings("MissingJavadocMethod") +public class RetryStatesMapper { + + // API to Domain + public RetryState map(final JobRetryStateRequestBody api) { + return RetryState.builder() + .id(api.getId()) + .connectionId(api.getConnectionId()) + .jobId(api.getJobId()) + .successiveCompleteFailures(api.getSuccessiveCompleteFailures()) + .totalCompleteFailures(api.getTotalCompleteFailures()) + .successivePartialFailures(api.getSuccessivePartialFailures()) + .totalPartialFailures(api.getTotalPartialFailures()) + .build(); + } + + // Domain to API + public RetryStateRead map(final RetryState domain) { + return new RetryStateRead() + .id(domain.getId()) + .connectionId(domain.getConnectionId()) + .jobId(domain.getJobId()) + .successiveCompleteFailures(domain.getSuccessiveCompleteFailures()) + .totalCompleteFailures(domain.getTotalCompleteFailures()) + .successivePartialFailures(domain.getSuccessivePartialFailures()) + .totalPartialFailures(domain.getTotalPartialFailures()); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/repositories/RetryStatesRepository.java b/airbyte-server/src/main/java/io/airbyte/server/repositories/RetryStatesRepository.java index 7f2b1ed419b..78339d30296 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/repositories/RetryStatesRepository.java +++ b/airbyte-server/src/main/java/io/airbyte/server/repositories/RetryStatesRepository.java @@ -11,7 +11,7 @@ import java.util.Optional; import java.util.UUID; -@SuppressWarnings("MissingJavadocType") +@SuppressWarnings({"MissingJavadocMethod", "MissingJavadocType"}) @JdbcRepository(dialect = Dialect.POSTGRES) public interface RetryStatesRepository extends PageableRepository { @@ -19,4 +19,16 @@ public interface RetryStatesRepository extends PageableRepository retryStateFieldsMatrix() { + return Stream.of( + Arguments.of(Fixtures.retryId1, Fixtures.jobId1, Fixtures.connectionId1, 1, 2, 3, 4), + Arguments.of(Fixtures.retryId2, Fixtures.jobId1, Fixtures.connectionId1, 0, 0, 9, 9), + Arguments.of(Fixtures.retryId3, Fixtures.jobId2, Fixtures.connectionId2, 3, 2, 1, 0), + Arguments.of(Fixtures.retryId2, Fixtures.jobId3, Fixtures.connectionId1, 3, 2, 1, 9), + Arguments.of(Fixtures.retryId1, Fixtures.jobId1, Fixtures.connectionId2, 1, 1, 0, 0)); + } + + private static class Fixtures { + + static UUID retryId1 = UUID.randomUUID(); + static UUID retryId2 = UUID.randomUUID(); + static UUID retryId3 = UUID.randomUUID(); + + static Long jobId1 = ThreadLocalRandom.current().nextLong(); + static Long jobId2 = ThreadLocalRandom.current().nextLong(); + static Long jobId3 = ThreadLocalRandom.current().nextLong(); + + static UUID connectionId1 = UUID.randomUUID(); + static UUID connectionId2 = UUID.randomUUID(); + + } + +} diff --git a/airbyte-server/src/test/java/io/airbyte/server/repositories/RetryStatesRepositoryTest.java b/airbyte-server/src/test/java/io/airbyte/server/repositories/RetryStatesRepositoryTest.java index 7e7cc1bbdc9..235a87f70d2 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/repositories/RetryStatesRepositoryTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/repositories/RetryStatesRepositoryTest.java @@ -155,6 +155,62 @@ void findByJobId() { Assertions.assertEquals(s3, found3.get()); } + @Test + void testExistsByJobId() { + final var s = Fixtures.state() + .jobId(Fixtures.jobId3) + .build(); + + repo.save(s); + + final var exists1 = repo.existsByJobId(Fixtures.jobId3); + final var exists2 = repo.existsByJobId(Fixtures.jobId2); + + Assertions.assertTrue(exists1); + Assertions.assertFalse(exists2); + } + + @Test + void testCreateOrUpdateByJobIdUpdate() { + final var s = Fixtures.state() + .jobId(Fixtures.jobId2) + .build(); + + final var inserted = repo.save(s); + final var id = inserted.getId(); + final var found1 = repo.findById(id); + + final var updated = Fixtures.stateFrom(inserted) + .successiveCompleteFailures(s.getSuccessiveCompleteFailures() + 1) + .totalCompleteFailures(s.getTotalCompleteFailures() + 1) + .successivePartialFailures(0) + .build(); + + repo.createOrUpdateByJobId(Fixtures.jobId2, updated); + + final var found2 = repo.findById(id); + + Assertions.assertTrue(found1.isPresent()); + Assertions.assertEquals(s, found1.get()); + + Assertions.assertTrue(found2.isPresent()); + Assertions.assertEquals(updated, found2.get()); + } + + @Test + void testCreateOrUpdateByJobIdCreate() { + final var s = Fixtures.state() + .jobId(Fixtures.jobId4) + .build(); + + repo.createOrUpdateByJobId(Fixtures.jobId4, s); + + final var found1 = repo.findByJobId(Fixtures.jobId4); + + Assertions.assertTrue(found1.isPresent()); + Assertions.assertEquals(s, found1.get()); + } + private static class Fixtures { static UUID connectionId1 = UUID.randomUUID(); @@ -163,6 +219,7 @@ private static class Fixtures { static Long jobId1 = ThreadLocalRandom.current().nextLong(); static Long jobId2 = ThreadLocalRandom.current().nextLong(); static Long jobId3 = ThreadLocalRandom.current().nextLong(); + static Long jobId4 = ThreadLocalRandom.current().nextLong(); static RetryStateBuilder state() { return RetryState.builder()