Skip to content

Commit

Permalink
Add source create/update and destination update endpoints (#529)
Browse files Browse the repository at this point in the history
  • Loading branch information
sherifnada authored Oct 9, 2020
1 parent b6e0f13 commit 371962f
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 32 deletions.
80 changes: 78 additions & 2 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,48 @@ paths:
description: Workspace not found
"422":
$ref: "#/components/responses/InvalidInput"
/v1/sources/create:
post:
tags:
- source
summary: Creates a source
operationId: createSource
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SourceCreate"
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/SourceRead"
"422":
$ref: "#/components/responses/InvalidInput"
/v1/sources/update:
post:
tags:
- source
summary: Update a source
operationId: updateSource
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SourceUpdate"
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/SourceRead"
"404":
description: Source not found
"422":
$ref: "#/components/responses/InvalidInput"
/v1/sources/list:
post:
tags:
Expand Down Expand Up @@ -337,6 +379,29 @@ paths:
description: Source Implementation not found
"422":
$ref: "#/components/responses/InvalidInput"
/v1/destinations/update:
post:
tags:
- destination
summary: Update destination
operationId: updateDestination
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/DestinationUpdate"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/DestinationRead"
"404":
description: Destination not found
"422":
$ref: "#/components/responses/InvalidInput"
/v1/destinations/list:
post:
tags:
Expand Down Expand Up @@ -881,13 +946,17 @@ components:
- name
- dockerRepository
- dockerImageTag
- documentationUrl
properties:
name:
type: string
dockerRepository:
type: string
dockerImageTag:
type: string
documentationUrl:
type: string
format: uri
SourceUpdate:
type: object
description: Update the source. Currently, the only allowed attribute to update is the default docker image version.
Expand All @@ -905,16 +974,19 @@ components:
- sourceId
- name
- dockerRepository
- dockerImageVersion
- dockerImageTag
properties:
sourceId:
$ref: "#/components/schemas/SourceId"
name:
type: string
dockerRepository:
type: string
dockerImageVersion:
dockerImageTag:
type: string
documentationUrl:
type: string
format: uri
SourceReadList:
type: object
required:
Expand Down Expand Up @@ -1049,6 +1121,7 @@ components:
- name
- dockerRepository
- dockerImageTag
- documentationUrl
properties:
destinationId:
$ref: "#/components/schemas/DestinationId"
Expand All @@ -1058,6 +1131,9 @@ components:
type: string
dockerImageTag:
type: string
documentationUrl:
type: string
format: uri
DestinationReadList:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ public List<StandardSource> listStandardSources()
return persistence.listConfigs(ConfigSchema.STANDARD_SOURCE, StandardSource.class);
}

public void writeStandardSource(final StandardSource source) throws JsonValidationException, IOException {
persistence.writeConfig(
ConfigSchema.STANDARD_SOURCE,
source.getSourceId().toString(),
source);
}

public StandardDestination getStandardDestination(final UUID destinationId)
throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(
Expand All @@ -89,6 +96,13 @@ public List<StandardDestination> listStandardDestinations()
return persistence.listConfigs(ConfigSchema.STANDARD_DESTINATION, StandardDestination.class);
}

public void writeStandardDestination(final StandardDestination destination) throws JsonValidationException, IOException {
persistence.writeConfig(
ConfigSchema.STANDARD_DESTINATION,
destination.getDestinationId().toString(),
destination);
}

public SourceConnectionImplementation getSourceConnectionImplementation(final UUID sourceImplementationId)
throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@
import io.airbyte.api.model.DestinationRead;
import io.airbyte.api.model.DestinationReadList;
import io.airbyte.api.model.DestinationSpecificationRead;
import io.airbyte.api.model.DestinationUpdate;
import io.airbyte.api.model.JobIdRequestBody;
import io.airbyte.api.model.JobInfoRead;
import io.airbyte.api.model.JobListRequestBody;
import io.airbyte.api.model.JobReadList;
import io.airbyte.api.model.SlugRequestBody;
import io.airbyte.api.model.SourceCreate;
import io.airbyte.api.model.SourceIdRequestBody;
import io.airbyte.api.model.SourceImplementationCreate;
import io.airbyte.api.model.SourceImplementationDiscoverSchemaRead;
Expand All @@ -56,6 +58,7 @@
import io.airbyte.api.model.SourceRead;
import io.airbyte.api.model.SourceReadList;
import io.airbyte.api.model.SourceSpecificationRead;
import io.airbyte.api.model.SourceUpdate;
import io.airbyte.api.model.WbConnectionRead;
import io.airbyte.api.model.WbConnectionReadList;
import io.airbyte.api.model.WorkspaceIdRequestBody;
Expand Down Expand Up @@ -156,14 +159,24 @@ public SourceRead getSource(@Valid SourceIdRequestBody sourceIdRequestBody) {
return execute(() -> sourcesHandler.getSource(sourceIdRequestBody));
}

@Override
public SourceRead createSource(@Valid SourceCreate sourceCreate) {
return execute(() -> sourcesHandler.createSource(sourceCreate));
}

@Override
public SourceRead updateSource(@Valid SourceUpdate sourceUpdate) {
return execute(() -> sourcesHandler.updateSource(sourceUpdate));
}

// SOURCE SPECIFICATION

@Override
public SourceSpecificationRead getSourceSpecification(@Valid SourceIdRequestBody sourceIdRequestBody) {
return execute(() -> schedulerHandler.getSourceSpecification(sourceIdRequestBody));
}

// SOURCE IMPLEMENTATION

@Override
public SourceImplementationRead createSourceImplementation(@Valid SourceImplementationCreate sourceImplementationCreate) {
return execute(() -> sourceImplementationsHandler.createSourceImplementation(sourceImplementationCreate));
Expand Down Expand Up @@ -201,8 +214,8 @@ public CheckConnectionRead checkConnectionToSourceImplementation(@Valid SourceIm
public SourceImplementationDiscoverSchemaRead discoverSchemaForSourceImplementation(@Valid SourceImplementationIdRequestBody sourceImplementationIdRequestBody) {
return execute(() -> schedulerHandler.discoverSchemaForSourceImplementation(sourceImplementationIdRequestBody));
}

// DESTINATION

@Override
public DestinationReadList listDestinations() {
return execute(destinationsHandler::listDestinations);
Expand All @@ -213,7 +226,13 @@ public DestinationRead getDestination(@Valid DestinationIdRequestBody destinatio
return execute(() -> destinationsHandler.getDestination(destinationIdRequestBody));
}

@Override
public DestinationRead updateDestination(@Valid DestinationUpdate destinationUpdate) {
return execute(() -> destinationsHandler.updateDestination(destinationUpdate));
}

// DESTINATION SPECIFICATION

@Override
public DestinationSpecificationRead getDestinationSpecification(@Valid DestinationIdRequestBody destinationIdRequestBody) {
return execute(() -> schedulerHandler.getDestinationSpecification(destinationIdRequestBody));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import io.airbyte.api.model.DestinationIdRequestBody;
import io.airbyte.api.model.DestinationRead;
import io.airbyte.api.model.DestinationReadList;
import io.airbyte.api.model.DestinationUpdate;
import io.airbyte.commons.json.JsonValidationException;
import io.airbyte.config.StandardDestination;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -58,12 +61,30 @@ public DestinationRead getDestination(DestinationIdRequestBody destinationIdRequ
return buildDestinationRead(configRepository.getStandardDestination(destinationIdRequestBody.getDestinationId()));
}

private static DestinationRead buildDestinationRead(StandardDestination standardDestination) {
final DestinationRead destinationRead = new DestinationRead();
destinationRead.setDestinationId(standardDestination.getDestinationId());
destinationRead.setName(standardDestination.getName());
public DestinationRead updateDestination(DestinationUpdate destinationUpdate) throws ConfigNotFoundException, IOException, JsonValidationException {
StandardDestination currentDestination = configRepository.getStandardDestination(destinationUpdate.getDestinationId());
StandardDestination newDestination = new StandardDestination()
.withDestinationId(currentDestination.getDestinationId())
.withDockerImageTag(destinationUpdate.getDockerImageTag())
.withDockerRepository(currentDestination.getDockerRepository())
.withName(currentDestination.getName())
.withDocumentationUrl(currentDestination.getDocumentationUrl());

configRepository.writeStandardDestination(newDestination);
return buildDestinationRead(newDestination);
}

return destinationRead;
private static DestinationRead buildDestinationRead(StandardDestination standardDestination) {
try {
return new DestinationRead()
.destinationId(standardDestination.getDestinationId())
.name(standardDestination.getName())
.dockerRepository(standardDestination.getDockerRepository())
.dockerImageTag(standardDestination.getDockerImageTag())
.documentationUrl(new URI(standardDestination.getDocumentationUrl()));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,89 @@

package io.airbyte.server.handlers;

import io.airbyte.api.model.SourceCreate;
import io.airbyte.api.model.SourceIdRequestBody;
import io.airbyte.api.model.SourceRead;
import io.airbyte.api.model.SourceReadList;
import io.airbyte.api.model.SourceUpdate;
import io.airbyte.commons.json.JsonValidationException;
import io.airbyte.config.StandardSource;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class SourcesHandler {

private final ConfigRepository configRepository;
private final Supplier<UUID> uuidSupplier;

public SourcesHandler(final ConfigRepository configRepository) {
this(configRepository, UUID::randomUUID);
}

public SourcesHandler(final ConfigRepository configRepository, Supplier<UUID> uuidSupplier) {
this.configRepository = configRepository;
this.uuidSupplier = uuidSupplier;
}

public SourceReadList listSources() throws ConfigNotFoundException, IOException, JsonValidationException {
final List<SourceRead> reads = configRepository.listStandardSources()
.stream()
.map(SourcesHandler::buildSourceRead)
.collect(Collectors.toList());

return new SourceReadList().sources(reads);
}

public SourceRead getSource(SourceIdRequestBody sourceIdRequestBody) throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardSource standardSource = configRepository.getStandardSource(sourceIdRequestBody.getSourceId());
return buildSourceRead(standardSource);
return buildSourceRead(configRepository.getStandardSource(sourceIdRequestBody.getSourceId()));
}

private static SourceRead buildSourceRead(StandardSource standardSource) {
final SourceRead sourceRead = new SourceRead();
sourceRead.setSourceId(standardSource.getSourceId());
sourceRead.setName(standardSource.getName());
public SourceRead createSource(SourceCreate sourceCreate) throws JsonValidationException, IOException, ConfigNotFoundException {
// TODO add validation for the incoming docker image
UUID id = uuidSupplier.get();
StandardSource source = new StandardSource()
.withSourceId(id)
.withDockerRepository(sourceCreate.getDockerRepository())
.withDockerImageTag(sourceCreate.getDockerImageTag())
.withDocumentationUrl(sourceCreate.getDocumentationUrl().toString())
.withName(sourceCreate.getName());

configRepository.writeStandardSource(source);

return sourceRead;
return buildSourceRead(source);
}

public SourceRead updateSource(SourceUpdate sourceUpdate) throws ConfigNotFoundException, IOException, JsonValidationException {
// TODO add validation to ensure the incoming version exists
StandardSource currentSource = configRepository.getStandardSource(sourceUpdate.getSourceId());
StandardSource newSource = new StandardSource()
.withSourceId(currentSource.getSourceId())
.withDockerImageTag(sourceUpdate.getDockerImageTag())
.withDockerRepository(currentSource.getDockerRepository())
.withDocumentationUrl(currentSource.getDocumentationUrl())
.withName(currentSource.getName());

configRepository.writeStandardSource(newSource);
return buildSourceRead(newSource);
}

private static SourceRead buildSourceRead(StandardSource standardSource) {
try {
return new SourceRead()
.sourceId(standardSource.getSourceId())
.name(standardSource.getName())
.dockerRepository(standardSource.getDockerRepository())
.dockerImageTag(standardSource.getDockerImageTag())
.documentationUrl(new URI(standardSource.getDocumentationUrl()));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

}
Loading

0 comments on commit 371962f

Please sign in to comment.