Skip to content

Commit

Permalink
feat: add ai server proxy to builder server (#13467)
Browse files Browse the repository at this point in the history
  • Loading branch information
bnchrch committed Aug 15, 2024
1 parent c674420 commit 7cdd476
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.connector_builder.api.model.generated.ResolveManifestRequestBody;
import io.airbyte.connector_builder.api.model.generated.StreamRead;
import io.airbyte.connector_builder.api.model.generated.StreamReadRequestBody;
import io.airbyte.connector_builder.handlers.AssistProxyHandler;
import io.airbyte.connector_builder.handlers.ConnectorContributionHandler;
import io.airbyte.connector_builder.handlers.HealthHandler;
import io.airbyte.connector_builder.handlers.ResolveManifestHandler;
Expand All @@ -30,6 +31,7 @@
import io.micronaut.scheduling.annotation.ExecuteOn;
import io.micronaut.security.annotation.Secured;
import io.micronaut.security.rules.SecurityRule;
import java.util.Map;

/**
* Micronaut controller that defines the behavior for all endpoints related to building and testing
Expand All @@ -43,15 +45,18 @@ public class ConnectorBuilderController implements V1Api {
private final StreamHandler streamHandler;
private final ResolveManifestHandler resolveManifestHandler;
private final ConnectorContributionHandler connectorContributionHandler;
private final AssistProxyHandler assistProxyHandler;

public ConnectorBuilderController(final HealthHandler healthHandler,
final ResolveManifestHandler resolveManifestHandler,
final StreamHandler streamHandler,
final ConnectorContributionHandler connectorContributionHandler) {
final ConnectorContributionHandler connectorContributionHandler,
final AssistProxyHandler assistProxyHandler) {
this.healthHandler = healthHandler;
this.streamHandler = streamHandler;
this.resolveManifestHandler = resolveManifestHandler;
this.connectorContributionHandler = connectorContributionHandler;
this.assistProxyHandler = assistProxyHandler;
}

@Override
Expand Down Expand Up @@ -99,4 +104,14 @@ public ResolveManifest resolveManifest(@Body final ResolveManifestRequestBody re
return resolveManifestHandler.resolveManifest(resolveManifestRequestBody);
}

@Override
@Post(uri = "/v1/assist/v1/process",
consumes = MediaType.APPLICATION_JSON,
produces = MediaType.APPLICATION_JSON)
@Secured({AUTHENTICATED_USER})
@ExecuteOn(TaskExecutors.IO)
public Map<String, Object> assistV1Process(@Body final Map<String, Object> requestBody) {
return assistProxyHandler.process(requestBody);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
@file:Suppress("ktlint:standard:package-name")

package io.airbyte.connector_builder.exceptions

import com.fasterxml.jackson.databind.JsonNode
import io.airbyte.commons.server.errors.KnownException

class AssistProxyException(private var responseCode: Int, jsonBody: JsonNode) :
KnownException(getStringFromResponse(jsonBody), getThrowableFromResponse(jsonBody)) {
override fun getHttpCode(): Int {
return responseCode
}
}

fun getStringFromResponse(jsonBody: JsonNode): String {
if (jsonBody.has("message")) {
return jsonBody.get("message").asText()
}
return "Unknown AI Assist error"
}

fun getThrowableFromResponse(jsonBody: JsonNode): Throwable? {
if (jsonBody.has("exceptionStack")) {
val message = getStringFromResponse(jsonBody)
val givenStack = jsonBody.get("exceptionStack")
val givenClassName = jsonBody.get("exceptionClassName")?.asText() ?: "Python"
val stackTrace = convertToStackTrace(givenStack, givenClassName) ?: return null

val throwable = Throwable(message)
throwable.stackTrace = stackTrace
return throwable
}
return null
}

fun convertToStackTrace(
exceptionStack: JsonNode,
exceptionClassName: String,
): Array<StackTraceElement>? {
if (!exceptionStack.isArray) return null

// exceptionStack is an array of strings from python
return exceptionStack.mapIndexed { index, stackLine ->
val stackTraceParts = stackLine.asText().split(":")
val (fileName, lineNumber, functionName) = parseStackTraceParts(stackTraceParts, index)
StackTraceElement(exceptionClassName, functionName, fileName, lineNumber)
}.toTypedArray()
}

private fun parseStackTraceParts(
parts: List<String>,
index: Int,
): Triple<String, Int, String> {
return when (parts.size) {
3 -> Triple(parts[0], parseLineNumber(parts[1], index), parts[2])
2 -> Triple(parts[0], parseLineNumber(parts[1], index), "unknown_function")
1 -> Triple("unknown_file.py", index + 1, parts[0])
else -> Triple("unknown_file.py", index + 1, "unknown_function")
}
}

private fun parseLineNumber(
lineNumber: String,
index: Int,
): Int {
return lineNumber.toIntOrNull() ?: (index + 1)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/
@file:Suppress("ktlint:standard:package-name")

package io.airbyte.connector_builder.handlers

import io.airbyte.commons.json.Jsons
import io.airbyte.connector_builder.exceptions.ConnectorBuilderException
import io.airbyte.connector_builder.requester.assist.AssistConfiguration
import io.airbyte.connector_builder.requester.assist.AssistProxy
import jakarta.inject.Inject
import jakarta.inject.Singleton

/**
* Proxy to the Assist API.
*/
@Singleton
class AssistProxyHandler
@Inject
constructor(private val proxyConfig: AssistConfiguration) {
/**
* Call the Assistant to get connector data
*/
@Throws(ConnectorBuilderException::class)
fun process(requestBody: Map<String, Object>): Map<String, Object> {
val path = "/v1/process"
val proxy = AssistProxy(this.proxyConfig)

val jsonBody = Jsons.jsonNode(requestBody)
val result = proxy.post(path, jsonBody)
return Jsons.`object`(result, Map::class.java) as Map<String, Object>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/
@file:Suppress("ktlint:standard:package-name")

package io.airbyte.connector_builder.requester.assist

import java.io.IOException
import java.net.HttpURLConnection

/**
* Proxy to the Assist Service. Blocks until the job completes.
*/
interface AssistConfiguration {
@Throws(IOException::class)
fun getConnection(path: String): HttpURLConnection
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/
@file:Suppress("ktlint:standard:package-name")

package io.airbyte.connector_builder.requester.assist

import io.micronaut.context.annotation.Value
import jakarta.inject.Singleton
import org.jooq.tools.StringUtils
import java.io.IOException
import java.net.HttpURLConnection
import java.net.MalformedURLException
import java.net.ProtocolException
import java.net.URL

/**
* Construct and send requests to the CDK's Connector Builder handler.
*/
@Singleton
class AssistConfigurationImpl(
@Value("\${airbyte.connector-builder-server.ai-assist.url-base}") private val targetApiBaseUrl: String,
) : AssistConfiguration {
@Throws(IOException::class)
override fun getConnection(path: String): HttpURLConnection {
if (StringUtils.isBlank(targetApiBaseUrl)) {
throw RuntimeException("Assist Service URL is not set.")
}
try {
val url = URL("$targetApiBaseUrl$path")
val connection = url.openConnection() as HttpURLConnection
return connection
} catch (e: ProtocolException) {
throw RuntimeException(e)
} catch (e: MalformedURLException) {
throw RuntimeException(e)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved.
*/
@file:Suppress("ktlint:standard:package-name")

package io.airbyte.connector_builder.requester.assist

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.ObjectMapper
import io.airbyte.connector_builder.exceptions.AssistProxyException
import io.airbyte.connector_builder.exceptions.ConnectorBuilderException
import java.io.IOException
import java.io.InputStreamReader

class AssistProxy(private val proxyConfig: AssistConfiguration) {
fun post(
path: String,
jsonBody: JsonNode?,
): JsonNode {
val connection = proxyConfig.getConnection(path)
connection.apply {
requestMethod = "POST"
setRequestProperty("Content-Type", "application/json")
doOutput = true
}

connection.outputStream.use { outputStream ->
objectMapper.writeValue(outputStream, jsonBody)
}
val responseCode: Int
val jsonResponse: JsonNode

try {
responseCode = connection.responseCode
val inputStream =
if (responseCode in 200..299) {
connection.inputStream
} else {
connection.errorStream
}

jsonResponse =
inputStream.use { inputStream ->
InputStreamReader(inputStream, "utf-8").use { reader ->
reader.readText().let {
objectMapper.readTree(it)
}
}
}
} catch (e: IOException) {
throw ConnectorBuilderException("AI Assist processing error", e)
} finally {
connection.disconnect()
}

if (responseCode !in 200..299) {
throw AssistProxyException(responseCode, jsonResponse)
}

return jsonResponse
}

companion object {
private val objectMapper = ObjectMapper()
}
}
31 changes: 29 additions & 2 deletions airbyte-connector-builder-server/src/main/openapi/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,29 @@ paths:
$ref: "#/components/responses/ExceptionResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/assist/v1/process:
post:
tags:
- connectorBuilderServer
summary: Assist server access point
operationId: assistV1Process
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/AssistV1ProcessRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/AssistV1Process"
"400":
$ref: "#/components/responses/ExceptionResponse"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/health:
get:
tags:
Expand All @@ -123,7 +146,6 @@ paths:
$ref: "#/components/schemas/HealthCheckRead"
# This route is unsecured for external monitoring.
security: []

components:
securitySchemes:
bearerAuth:
Expand Down Expand Up @@ -373,6 +395,12 @@ components:
manifest:
type: object
description: The config-based connector manifest contents with $refs and $parameters resolved
AssistV1ProcessRequestBody:
type: object
additionalProperties: true
AssistV1Process:
type: object
additionalProperties: true
HealthCheckRead:
type: object
required:
Expand Down Expand Up @@ -437,7 +465,6 @@ components:
type: array
items:
$ref: "#/components/schemas/InvalidInputProperty"

responses:
InvalidInputResponse:
description: Input failed validation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ airbyte:
connector-builder-server:
github:
airbyte-pat-token: ${BUILDER_GITHUB_AIRBYTE_PAT_TOKEN:}
ai-assist:
url-base: ${AI_ASSIST_URL_BASE:}
acceptance:
test:
enabled: ${ACCEPTANCE_TEST_ENABLED:false}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.connector_builder.exceptions.CdkUnknownException;
import io.airbyte.connector_builder.exceptions.ConnectorBuilderException;
import io.airbyte.connector_builder.file_writer.MockAirbyteFileWriterImpl;
import io.airbyte.connector_builder.handlers.AssistProxyHandler;
import io.airbyte.connector_builder.handlers.ConnectorContributionHandler;
import io.airbyte.connector_builder.handlers.HealthHandler;
import io.airbyte.connector_builder.handlers.ResolveManifestHandler;
Expand Down Expand Up @@ -67,13 +68,15 @@ class ConnectorBuilderControllerIntegrationTest {
private MockAirbyteFileWriterImpl writer;
private AirbyteStreamFactory streamFactory;
private ContributionTemplates contributionTemplates;
private AssistProxyHandler assistProxyHandler;

@BeforeEach
void setup() {
this.healthHandler = mock(HealthHandler.class);
this.writer = new MockAirbyteFileWriterImpl();
this.streamFactory = VersionedAirbyteStreamFactory.noMigrationVersionedAirbyteStreamFactory();
this.contributionTemplates = new ContributionTemplates();
this.assistProxyHandler = mock(AssistProxyHandler.class);
}

@BeforeAll
Expand Down Expand Up @@ -101,7 +104,7 @@ ConnectorBuilderController createControllerWithSynchronousRunner(
this.writer, this.streamFactory, shouldThrow, exitCode, inputStream, errorStream, outputStream);
final AirbyteCdkRequesterImpl requester = new AirbyteCdkRequesterImpl(commandRunner);
return new ConnectorBuilderController(this.healthHandler, new ResolveManifestHandler(requester), new StreamHandler(requester),
new ConnectorContributionHandler(contributionTemplates, null));
new ConnectorContributionHandler(contributionTemplates, null), this.assistProxyHandler);
}

@Test
Expand Down
Loading

0 comments on commit 7cdd476

Please sign in to comment.