Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into MetalakeAdmin
Browse files Browse the repository at this point in the history
  • Loading branch information
Heng Qin committed Apr 3, 2024
2 parents 7b5ea94 + d2cde88 commit 03d4bc8
Show file tree
Hide file tree
Showing 75 changed files with 2,731 additions and 597 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/backend-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ jobs:
architecture: [linux/amd64]
java-version: [ 8, 11, 17 ]
test-mode: [ embedded, deploy ]
backend: [ jdbcBackend, kvBackend]
env:
PLATFORM: ${{ matrix.architecture }}
steps:
Expand Down Expand Up @@ -87,7 +88,7 @@ jobs:
- name: Backend Integration Test
id: integrationTest
run: |
./gradlew test --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PskipWebITs
./gradlew test --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PskipWebITs -P${{ matrix.backend }}
- name: Upload integrate tests reports
uses: actions/upload-artifact@v3
Expand Down
5 changes: 4 additions & 1 deletion api/src/main/java/com/datastrato/gravitino/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ enum Type {
FILESET,

/** Catalog Type for Message Queue, like kafka://topic */
MESSAGING
MESSAGING,

/** Catalog Type for test only. */
UNSUPPORTED
}

/**
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/com/datastrato/gravitino/NameIdentifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,17 @@ public static void checkFileset(NameIdentifier ident) {
Namespace.checkFileset(ident.namespace);
}

/**
* Check the given {@link NameIdentifier} is a topic identifier. Throw an {@link
* IllegalNameIdentifierException} if it's not.
*
* @param ident The topic {@link NameIdentifier} to check.
*/
public static void checkTopic(NameIdentifier ident) {
check(ident != null, "Topic identifier must not be null");
Namespace.checkTopic(ident.namespace);
}

/**
* Create a {@link NameIdentifier} from the given identifier string.
*
Expand Down
13 changes: 13 additions & 0 deletions api/src/main/java/com/datastrato/gravitino/Namespace.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,19 @@ public static void checkFileset(Namespace namespace) {
namespace);
}

/**
* Check if the given topic namespace is legal, throw an {@link IllegalNamespaceException} if it's
* illegal.
*
* @param namespace The topic namespace
*/
public static void checkTopic(Namespace namespace) {
check(
namespace != null && namespace.length() == 3,
"Topic namespace must be non-null and have 3 levels, the input namespace is %s",
namespace);
}

private Namespace(String[] levels) {
this.levels = levels;
}
Expand Down
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ allprojects {

// Change poll image pause time from 30s to 60s
param.environment("TESTCONTAINERS_PULL_PAUSE_TIMEOUT", "60")
if (project.hasProperty("jdbcBackend")) {
param.environment("jdbcBackend", "true")
}

val testMode = project.properties["testMode"] as? String ?: "embedded"
param.systemProperty("gravitino.log.path", project.buildDir.path + "/${project.name}-integration-test.log")
Expand Down
2 changes: 2 additions & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ dependencies {

testImplementation(libs.bundles.log4j)
testImplementation(libs.mockito.core)
testImplementation(libs.mysql.driver)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.mysql)

testRuntimeOnly(libs.junit.jupiter.engine)
}
Expand Down
2 changes: 2 additions & 0 deletions catalogs/catalog-hive/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ dependencies {
}
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.mockito.core)
testImplementation(libs.mysql.driver)

testImplementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion") {
exclude("org.apache.hadoop")
Expand All @@ -106,6 +107,7 @@ dependencies {
}
testImplementation(libs.slf4j.api)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.mysql)

testRuntimeOnly(libs.junit.jupiter.engine)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
*/
@Override
public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException {
NameIdentifier schemaIdent = NameIdentifier.of(namespace.levels());
if (!schemaExists(schemaIdent)) {
throw new NoSuchSchemaException("Schema (database) does not exist %s", namespace);
}

try {
ListTablesResponse listTablesResponse =
icebergTableOps.listTable(IcebergTableOpsHelper.getIcebergNamespace(namespace));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import com.datastrato.gravitino.dto.requests.MetalakeUpdateRequest;
import com.datastrato.gravitino.dto.requests.SchemaUpdateRequest;
import com.datastrato.gravitino.dto.requests.TableUpdateRequest;
import com.datastrato.gravitino.dto.requests.TopicUpdateRequest;
import com.datastrato.gravitino.file.FilesetChange;
import com.datastrato.gravitino.messaging.TopicChange;
import com.datastrato.gravitino.rel.SchemaChange;
import com.datastrato.gravitino.rel.TableChange;

Expand Down Expand Up @@ -82,6 +84,15 @@ static Catalog toCatalog(CatalogDTO catalog, RESTClient client) {
.build();

case MESSAGING:
return MessagingCatalog.builder()
.withName(catalog.name())
.withType(catalog.type())
.withProvider(catalog.provider())
.withComment(catalog.comment())
.withProperties(catalog.properties())
.withAudit((AuditDTO) catalog.auditInfo())
.withRestClient(client)
.build();
default:
throw new UnsupportedOperationException("Unsupported catalog type: " + catalog.type());
}
Expand Down Expand Up @@ -183,6 +194,23 @@ static FilesetUpdateRequest toFilesetUpdateRequest(FilesetChange change) {
}
}

static TopicUpdateRequest toTopicUpdateRequest(TopicChange change) {
if (change instanceof TopicChange.UpdateTopicComment) {
return new TopicUpdateRequest.UpdateTopicCommentRequest(
((TopicChange.UpdateTopicComment) change).getNewComment());
} else if (change instanceof TopicChange.SetProperty) {
return new TopicUpdateRequest.SetTopicPropertyRequest(
((TopicChange.SetProperty) change).getProperty(),
((TopicChange.SetProperty) change).getValue());
} else if (change instanceof TopicChange.RemoveProperty) {
return new TopicUpdateRequest.RemoveTopicPropertyRequest(
((TopicChange.RemoveProperty) change).getProperty());
} else {
throw new IllegalArgumentException(
"Unknown change type: " + change.getClass().getSimpleName());
}
}

private static TableUpdateRequest toColumnUpdateRequest(TableChange.ColumnChange change) {
if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import com.datastrato.gravitino.exceptions.NoSuchPartitionException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.exceptions.NoSuchTopicException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.NotFoundException;
import com.datastrato.gravitino.exceptions.PartitionAlreadyExistsException;
import com.datastrato.gravitino.exceptions.RESTException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TopicAlreadyExistsException;
import com.datastrato.gravitino.exceptions.UnauthorizedException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -112,6 +114,15 @@ public static Consumer<ErrorResponse> filesetErrorHandler() {
return FilesetErrorHandler.INSTANCE;
}

/**
* Creates an error handler specific to Topic operations.
*
* @return A Consumer representing the Topic error handler.
*/
public static Consumer<ErrorResponse> topicErrorHandler() {
return TopicErrorHandler.INSTANCE;
}

private ErrorHandlers() {}

/**
Expand Down Expand Up @@ -410,6 +421,41 @@ public void accept(ErrorResponse errorResponse) {
}
}

/** Error handler specific to Topic operations. */
@SuppressWarnings("FormatStringAnnotation")
private static class TopicErrorHandler extends RestErrorHandler {

private static final TopicErrorHandler INSTANCE = new TopicErrorHandler();

@Override
public void accept(ErrorResponse errorResponse) {
String errorMessage = formatErrorMessage(errorResponse);

switch (errorResponse.getCode()) {
case ErrorConstants.ILLEGAL_ARGUMENTS_CODE:
throw new IllegalArgumentException(errorMessage);

case ErrorConstants.NOT_FOUND_CODE:
if (errorResponse.getType().equals(NoSuchSchemaException.class.getSimpleName())) {
throw new NoSuchSchemaException(errorMessage);
} else if (errorResponse.getType().equals(NoSuchTopicException.class.getSimpleName())) {
throw new NoSuchTopicException(errorMessage);
} else {
throw new NotFoundException(errorMessage);
}

case ErrorConstants.ALREADY_EXISTS_CODE:
throw new TopicAlreadyExistsException(errorMessage);

case ErrorConstants.INTERNAL_ERROR_CODE:
throw new RuntimeException(errorMessage);

default:
super.accept(errorResponse);
}
}
}

/** Generic error handler for REST requests. */
private static class RestErrorHandler extends ErrorHandler {
private static final ErrorHandler INSTANCE = new RestErrorHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
.map(DTOConverters::toFilesetUpdateRequest)
.collect(Collectors.toList());
FilesetUpdatesRequest req = new FilesetUpdatesRequest(updates);
req.validate();

FilesetResponse resp =
restClient.put(
Expand Down
Loading

0 comments on commit 03d4bc8

Please sign in to comment.