Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][E2E] Support windows for the e2e of paimon #7329

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
package org.apache.seatunnel.e2e.connector.paimon;

import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.core.starter.utils.CompressionUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;

import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.lang3.StringUtils;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
Expand Down Expand Up @@ -52,6 +54,7 @@

import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.util.ArrayList;
Expand All @@ -68,7 +71,8 @@
"Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error")
@Slf4j
public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource {
private static final String CATALOG_ROOT_DIR = "/tmp/";

private static String CATALOG_ROOT_DIR = "/tmp/";
private static final String NAMESPACE = "paimon";
private static final String NAMESPACE_TAR = "paimon.tar.gz";
private static final String CATALOG_DIR = CATALOG_ROOT_DIR + NAMESPACE + "/";
Expand All @@ -77,10 +81,18 @@ public class PaimonSinkCDCIT extends TestSuiteBase implements TestResource {
private static final String FAKE_DATABASE1 = "FakeDatabase1";
private static final String FAKE_TABLE2 = "FakeTable1";
private static final String FAKE_DATABASE2 = "FakeDatabase2";
private String CATALOG_ROOT_DIR_WIN = "C:/Users/";
private String CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
private boolean isWindows;

@BeforeAll
@Override
public void startUp() throws Exception {}
public void startUp() throws Exception {
this.isWindows =
System.getProperties().getProperty("os.name").toUpperCase().contains("WINDOWS");
CATALOG_ROOT_DIR_WIN = CATALOG_ROOT_DIR_WIN + System.getProperty("user.name") + "/tmp/";
CATALOG_DIR_WIN = CATALOG_ROOT_DIR_WIN + NAMESPACE + "/";
}

@AfterAll
@Override
Expand Down Expand Up @@ -498,8 +510,15 @@ public void testFakeSinkPaimonWithFullTypeAndReadWithFilter(TestContainer contai

protected final ContainerExtendedFactory containerExtendedFactory =
container -> {
FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
FileUtils.createNewDir(CATALOG_DIR);
if (isWindows) {
FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR);
FileUtils.deleteFile(CATALOG_ROOT_DIR_WIN + "paimon.tar");
FileUtils.createNewDir(CATALOG_ROOT_DIR_WIN);
} else {
FileUtils.deleteFile(CATALOG_ROOT_DIR + NAMESPACE_TAR);
FileUtils.createNewDir(CATALOG_DIR);
}

container.execInContainer(
"sh",
"-c",
Expand All @@ -510,8 +529,13 @@ public void testFakeSinkPaimonWithFullTypeAndReadWithFilter(TestContainer contai
+ " "
+ NAMESPACE);
container.copyFileFromContainer(
CATALOG_ROOT_DIR + NAMESPACE_TAR, CATALOG_ROOT_DIR + NAMESPACE_TAR);
extractFiles();
CATALOG_ROOT_DIR + NAMESPACE_TAR,
(isWindows ? CATALOG_ROOT_DIR_WIN : CATALOG_ROOT_DIR) + NAMESPACE_TAR);
if (isWindows) {
extractFilesWin();
} else {
extractFiles();
}
};

private void extractFiles() {
Expand All @@ -532,6 +556,17 @@ private void extractFiles() {
}
}

private void extractFilesWin() {
try {
CompressionUtils.unGzip(
new File(CATALOG_ROOT_DIR_WIN + NAMESPACE_TAR), new File(CATALOG_ROOT_DIR_WIN));
CompressionUtils.unTar(
new File(CATALOG_ROOT_DIR_WIN + "paimon.tar"), new File(CATALOG_ROOT_DIR_WIN));
} catch (IOException | ArchiveException e) {
throw new RuntimeException(e);
}
}

private List<PaimonRecord> loadPaimonData(String dbName, String tbName) throws Exception {
Table table = getTable(dbName, tbName);
ReadBuilder readBuilder = table.newReadBuilder();
Expand Down Expand Up @@ -575,7 +610,11 @@ private Identifier getIdentifier(String dbName, String tbName) {

private Catalog getCatalog() {
Options options = new Options();
options.set("warehouse", "file://" + CATALOG_DIR);
if (isWindows) {
options.set("warehouse", "file://" + CATALOG_DIR_WIN);
} else {
options.set("warehouse", "file://" + CATALOG_DIR);
}
Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(options));
return catalog;
}
Expand Down
Loading