Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2896] improve(IT): Add e2e test case for verifying fileset schema level and kafka topic level #2937

Merged
merged 13 commits into from
Apr 19, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public void startKafkaContainer() {
if (kafkaContainer == null) {
synchronized (ContainerSuite.class) {
if (kafkaContainer == null) {
KafkaContainer container = closer.register(KafkaContainer.builder().build());
KafkaContainer.Builder builder = KafkaContainer.builder().withNetwork(network);
KafkaContainer container = closer.register(builder.build());
try {
container.start();
} catch (Exception e) {
Expand Down
1 change: 1 addition & 0 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ tasks.test {
// Gravitino CI Docker image
environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.10")
environment("GRAVITINO_CI_TRINO_DOCKER_IMAGE", "datastrato/gravitino-ci-trino:0.1.5")
environment("GRAVITINO_CI_KAFKA_DOCKER_IMAGE", "apache/kafka:3.7.0")

copy {
from("${project.rootDir}/dev/docker/trino/conf")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.integration.test.web.ui;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.client.GravitinoAdminClient;
import com.datastrato.gravitino.client.GravitinoMetalake;
import com.datastrato.gravitino.integration.test.container.ContainerSuite;
import com.datastrato.gravitino.integration.test.util.AbstractIT;
import com.datastrato.gravitino.integration.test.web.ui.pages.CatalogsPage;
import com.datastrato.gravitino.integration.test.web.ui.pages.MetalakePage;
import com.datastrato.gravitino.integration.test.web.ui.utils.AbstractWebIT;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

@Tag("gravitino-docker-it")
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class CatalogsPageKafkaTest extends AbstractWebIT {
MetalakePage metalakePage = new MetalakePage();
CatalogsPage catalogsPage = new CatalogsPage();

private static final ContainerSuite containerSuite = ContainerSuite.getInstance();
protected static GravitinoAdminClient gravitinoClient;
private static GravitinoMetalake metalake;

protected static String gravitinoUri = "http://127.0.0.1:8090";
protected static String kafkaUri = "http://127.0.0.1:9092";

private static final String CATALOG_TABLE_TITLE = "Schemas";
private static final String SCHEMA_TOPIC_TITLE = "Topics";
private static final String METALAKE_NAME = "test";
private static final String CATALOG_TYPE_MESSAGING = "messaging";
private static final String HIVE_CATALOG_NAME = "catalog_hive";
private static final String MODIFIED_HIVE_CATALOG_NAME = HIVE_CATALOG_NAME + "_edited";
private static final String ICEBERG_CATALOG_NAME = "catalog_iceberg";
private static final String FILESET_CATALOG_NAME = "catalog_fileset";
private static final String KAFKA_CATALOG_NAME = "catalog_kafka";
private static final String SCHEMA_NAME = "default";
private static final String TOPIC_NAME = "topic1";

private static final String MYSQL_CATALOG_NAME = "catalog_mysql";

private static final String PG_CATALOG_NAME = "catalog_pg";

@BeforeAll
public static void before() throws Exception {
gravitinoClient = AbstractIT.getGravitinoClient();

gravitinoUri = String.format("http://127.0.0.1:%d", AbstractIT.getGravitinoServerPort());

containerSuite.startKafkaContainer();

String address = containerSuite.getKafkaContainer().getContainerIpAddress();
kafkaUri = String.format("%s:%s", address, "9092");
}

/**
* Creates a Kafka topic within the specified Metalake, Catalog, Schema, and Topic names.
*
* @param metalakeName The name of the Metalake.
* @param catalogName The name of the Catalog.
* @param schemaName The name of the Schema.
* @param topicName The name of the Kafka topic.
*/
void createTopic(String metalakeName, String catalogName, String schemaName, String topicName) {
Catalog catalog_kafka =
metalake.loadCatalog(NameIdentifier.ofCatalog(metalakeName, catalogName));
catalog_kafka
.asTopicCatalog()
.createTopic(
NameIdentifier.of(metalakeName, catalogName, schemaName, topicName),
"comment",
null,
Collections.emptyMap());
}

/**
* Drops a Kafka topic from the specified Metalake, Catalog, and Schema.
*
* @param metalakeName The name of the Metalake where the topic resides.
* @param catalogName The name of the Catalog that contains the topic.
* @param schemaName The name of the Schema under which the topic exists.
* @param topicName The name of the Kafka topic to be dropped.
*/
void dropTopic(String metalakeName, String catalogName, String schemaName, String topicName) {
Catalog catalog_kafka =
metalake.loadCatalog(NameIdentifier.ofCatalog(metalakeName, catalogName));
catalog_kafka
.asTopicCatalog()
.dropTopic(NameIdentifier.of(metalakeName, catalogName, schemaName, topicName));
}

@Test
@Order(0)
public void testCreateKafkaCatalog() throws InterruptedException {
// create metalake
clickAndWait(metalakePage.createMetalakeBtn);
metalakePage.setMetalakeNameField(METALAKE_NAME);
clickAndWait(metalakePage.submitHandleMetalakeBtn);
// load metalake
metalake = gravitinoClient.loadMetalake(NameIdentifier.of(METALAKE_NAME));
metalakePage.clickMetalakeLink(METALAKE_NAME);
// create kafka catalog actions
clickAndWait(catalogsPage.createCatalogBtn);
catalogsPage.setCatalogNameField(KAFKA_CATALOG_NAME);
clickAndWait(catalogsPage.catalogTypeSelector);
catalogsPage.clickSelectType("messaging");
catalogsPage.setCatalogCommentField("kafka catalog comment");
// set kafka catalog props
catalogsPage.setCatalogFixedProp("bootstrap.servers", kafkaUri);
clickAndWait(catalogsPage.handleSubmitCatalogBtn);
Assertions.assertTrue(catalogsPage.verifyGetCatalog(KAFKA_CATALOG_NAME));
}

@Test
@Order(1)
public void testKafkaSchemaTreeNode() throws InterruptedException {
// click kafka catalog tree node
String kafkaCatalogNode =
String.format(
"{{%s}}{{%s}}{{%s}}", METALAKE_NAME, KAFKA_CATALOG_NAME, CATALOG_TYPE_MESSAGING);
catalogsPage.clickTreeNode(kafkaCatalogNode);
// verify show table title、 schema name and tree node
Assertions.assertTrue(catalogsPage.verifyShowTableTitle(CATALOG_TABLE_TITLE));
Assertions.assertTrue(catalogsPage.verifyShowDataItemInList(SCHEMA_NAME, false));
List<String> treeNodes =
Arrays.asList(
MODIFIED_HIVE_CATALOG_NAME,
ICEBERG_CATALOG_NAME,
MYSQL_CATALOG_NAME,
PG_CATALOG_NAME,
FILESET_CATALOG_NAME,
KAFKA_CATALOG_NAME,
SCHEMA_NAME);
Assertions.assertTrue(catalogsPage.verifyTreeNodes(treeNodes));
}

@Test
@Order(2)
public void testKafkaTopicTreeNode() throws InterruptedException {
// 1. create topic of kafka catalog
createTopic(METALAKE_NAME, KAFKA_CATALOG_NAME, SCHEMA_NAME, TOPIC_NAME);
// 2. click schema tree node
String kafkaSchemaNode =
String.format(
"{{%s}}{{%s}}{{%s}}{{%s}}",
METALAKE_NAME, KAFKA_CATALOG_NAME, CATALOG_TYPE_MESSAGING, SCHEMA_NAME);
catalogsPage.clickTreeNode(kafkaSchemaNode);
// 3. verify show table title、 default schema name and tree node
Assertions.assertTrue(catalogsPage.verifyShowTableTitle(SCHEMA_TOPIC_TITLE));
Assertions.assertTrue(catalogsPage.verifyShowDataItemInList(TOPIC_NAME, false));
List<String> treeNodes =
Arrays.asList(
MODIFIED_HIVE_CATALOG_NAME,
ICEBERG_CATALOG_NAME,
MYSQL_CATALOG_NAME,
PG_CATALOG_NAME,
FILESET_CATALOG_NAME,
KAFKA_CATALOG_NAME,
SCHEMA_NAME,
TOPIC_NAME);
Assertions.assertTrue(catalogsPage.verifyTreeNodes(treeNodes));
}

@Test
@Order(3)
public void testKafkaTopicDetail() throws InterruptedException {
// 1. click toptic tree node
String topticNode =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo topticNode and // 1. click toptic tree node

String.format(
"{{%s}}{{%s}}{{%s}}{{%s}}{{%s}}",
METALAKE_NAME, KAFKA_CATALOG_NAME, CATALOG_TYPE_MESSAGING, SCHEMA_NAME, TOPIC_NAME);
catalogsPage.clickTreeNode(topticNode);
// 2. verify show tab details
Assertions.assertTrue(catalogsPage.verifyShowDetailsContent());
// 3. verify show highlight properties
Assertions.assertTrue(
catalogsPage.verifyShowPropertiesItemInList(
"key", "partition-count", "partition-count", true));
Assertions.assertTrue(
catalogsPage.verifyShowPropertiesItemInList("value", "partition-count", "1", true));
Assertions.assertTrue(
catalogsPage.verifyShowPropertiesItemInList(
"key", "replication-factor", "replication-factor", true));
Assertions.assertTrue(
catalogsPage.verifyShowPropertiesItemInList("value", "replication-factor", "1", true));
}

@Test
@Order(4)
public void testDropKafkaTopic() throws InterruptedException {
// delete topic of kafka catalog
dropTopic(METALAKE_NAME, KAFKA_CATALOG_NAME, SCHEMA_NAME, TOPIC_NAME);
// click schema tree node
String kafkaSchemaNode =
String.format(
"{{%s}}{{%s}}{{%s}}{{%s}}",
METALAKE_NAME, KAFKA_CATALOG_NAME, CATALOG_TYPE_MESSAGING, SCHEMA_NAME);
catalogsPage.clickTreeNode(kafkaSchemaNode);
// verify empty topic list
Assertions.assertTrue(catalogsPage.verifyEmptyTableData());
}

@Test
@Order(5)
public void testBackHomePage() throws InterruptedException {
clickAndWait(catalogsPage.backHomeBtn);
Assertions.assertTrue(catalogsPage.verifyBackHomePage());
}
}
Loading
Loading