Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce code duplication in integration tests #82

Merged
merged 2 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,82 +5,46 @@

package org.apache.flink.connector.nebula;

import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.exception.AuthFailedException;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.exception.NotValidConnectionException;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MockData {
private static final Logger LOGGER =
LoggerFactory.getLogger(MockData.class);

public static void mockSchema() {
NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
List<HostAddress> addresses = Arrays.asList(new HostAddress("127.0.0.1", 9669));
NebulaPool pool = new NebulaPool();
Session session = null;
try {
pool.init(addresses, nebulaPoolConfig);
session = pool.getSession("root", "nebula", true);

ResultSet respStringSpace = session.execute(createStringSpace());
ResultSet respIntSpace = session.execute(createIntSpace());

try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}


if (!respStringSpace.isSucceeded()) {
LOGGER.error("create string vid type space failed, {}",
respStringSpace.getErrorMessage());
assert (false);
}
if (!respIntSpace.isSucceeded()) {
LOGGER.error("create int vid type space failed, {}",
respIntSpace.getErrorMessage());
assert (false);
}
} catch (UnknownHostException | NotValidConnectionException
| IOErrorException | AuthFailedException | ClientServerIncompatibleException e) {
LOGGER.error("create space error, ", e);
assert (false);
} finally {
pool.close();
}
}


private static String createStringSpace() {
String exec = "CREATE SPACE IF NOT EXISTS test_string(partition_num=10,"
public static String createStringSpace() {
return "CREATE SPACE IF NOT EXISTS test_string(partition_num=10,"
+ "vid_type=fixed_string(8));"
+ "USE test_string;"
+ "CREATE TAG IF NOT EXISTS person(col1 fixed_string(8), col2 string, col3 int32,"
+ " col4 double, col5 date, col6 datetime, col7 time, col8 timestamp);"
+ "CREATE EDGE IF NOT EXISTS friend(col1 fixed_string(8), col2 string, col3 "
+ "int32, col4 double, col5 date, col6 datetime, col7 time, col8 timestamp);";
return exec;
}

private static String createIntSpace() {
String exec = "CREATE SPACE IF NOT EXISTS test_int(partition_num=10,vid_type=int64);"
public static String createIntSpace() {
return "CREATE SPACE IF NOT EXISTS test_int(partition_num=10,vid_type=int64);"
+ "USE test_int;"
+ "CREATE TAG IF NOT EXISTS person(col1 fixed_string(8), col2 string, col3 int32,"
+ " col4 double, col5 date, col6 datetime, col7 time, col8 timestamp);"
+ "CREATE EDGE IF NOT EXISTS friend(col1 fixed_string(8), col2 string, col3 "
+ "int32, col4 double, col5 date, col6 datetime, col7 time, col8 timestamp);";
return exec;
}

public static String createFlinkSinkSpace() {
return "CREATE SPACE IF NOT EXISTS flink_sink(partition_num=10,"
+ "vid_type=fixed_string(8));"
+ "USE flink_sink;"
+ "CREATE TAG IF NOT EXISTS player(name string, age int);";
}

public static String createFlinkTestSpace() {
return "CLEAR SPACE IF EXISTS `flink_test`;"
+ " CREATE SPACE IF NOT EXISTS `flink_test` (partition_num = 100,"
+ " charset = utf8, replica_factor = 3, collate = utf8_bin, vid_type = INT64);"
+ " USE `flink_test`;"
+ " CREATE TAG IF NOT EXISTS person (col1 string, col2 fixed_string(8),"
+ " col3 int8, col4 int16, col5 int32, col6 int64,"
+ " col7 date, col8 datetime, col9 timestamp, col10 bool,"
+ " col11 double, col12 float, col13 time, col14 geography);"
+ " CREATE EDGE IF NOT EXISTS friend (col1 string, col2 fixed_string(8),"
+ " col3 int8, col4 int16, col5 int32, col6 int64,"
+ " col7 date, col8 datetime, col9 timestamp, col10 bool,"
+ " col11 double, col12 float, col13 time, col14 geography);";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,49 +22,18 @@
import java.util.Collections;
import java.util.List;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;

public class NebulaITTestBase {

protected static final String META_ADDRESS = "127.0.0.1:9559";
protected static final String GRAPH_ADDRESS = "127.0.0.1:9669";
protected static final String USER_NAME = "root";
protected static final String USERNAME = "root";
protected static final String PASSWORD = "nebula";

private static final String[] NEBULA_SCHEMA_STATEMENTS = new String[]{
"CLEAR SPACE IF EXISTS `flink_test`;"
+ " CREATE SPACE IF NOT EXISTS `flink_test` (partition_num = 100,"
+ " charset = utf8, replica_factor = 3, collate = utf8_bin, vid_type = INT64);"
+ " USE `flink_test`;",
"CREATE TAG IF NOT EXISTS person (col1 string, col2 fixed_string(8),"
+ " col3 int8, col4 int16, col5 int32, col6 int64,"
+ " col7 date, col8 datetime, col9 timestamp, col10 bool,"
+ " col11 double, col12 float, col13 time, col14 geography);"
+ " CREATE EDGE IF NOT EXISTS friend (col1 string, col2 fixed_string(8),"
+ " col3 int8, col4 int16, col5 int32, col6 int64,"
+ " col7 date, col8 datetime, col9 timestamp, col10 bool,"
+ " col11 double, col12 float, col13 time, col14 geography);"
};
protected static Session session;
protected static NebulaPool pool;
protected static TableEnvironment tableEnvironment;

@BeforeClass
public static void beforeAll() throws IOErrorException {
initializeNebulaSession();
initializeNebulaSchema();
}

@AfterClass
public static void afterAll() {
closeNebulaSession();
}

private static void initializeNebulaSession() throws IOErrorException {
protected static void initializeNebulaSession() {
NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
String[] addressAndPort = GRAPH_ADDRESS.split(NebulaConstant.COLON);
List<HostAddress> addresses = Collections.singletonList(
Expand All @@ -77,30 +46,29 @@ private static void initializeNebulaSession() throws IOErrorException {
throw new RuntimeException("failed to initialize connection pool");
}
} catch (UnknownHostException e) {
throw new RuntimeException(e);
throw new RuntimeException("init nebula pool error", e);
}
try {
session = pool.getSession(USER_NAME, PASSWORD, true);
session = pool.getSession(USERNAME, PASSWORD, true);
} catch (NotValidConnectionException
| AuthFailedException
| IOErrorException
| ClientServerIncompatibleException e) {
throw new RuntimeException(e);
throw new RuntimeException("init nebula session error", e);
}
}

private static void initializeNebulaSchema() throws IOErrorException {
for (String stmt : NEBULA_SCHEMA_STATEMENTS) {
executeNGql(stmt);
// wait for at least two heartbeat cycles
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
protected static void initializeNebulaSchema(String statement) {
executeNGql(statement);
// wait for at least two heartbeat cycles
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private static void closeNebulaSession() {
protected static void closeNebulaSession() {
if (session != null) {
session.release();
}
Expand All @@ -109,13 +77,13 @@ private static void closeNebulaSession() {
}
}

@Before
public void before() {
tableEnvironment = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
}

protected static ResultSet executeNGql(String stmt) throws IOErrorException {
ResultSet response = session.execute(stmt);
protected static ResultSet executeNGql(String stmt) {
ResultSet response;
try {
response = session.execute(stmt);
} catch (IOErrorException e) {
throw new RuntimeException(String.format("failed to execute statement %s", stmt), e);
}
if (!response.isSucceeded()) {
throw new RuntimeException(String.format(
"failed to execute statement %s with error: %s",
Expand All @@ -125,12 +93,7 @@ protected static ResultSet executeNGql(String stmt) throws IOErrorException {
}

protected static void check(List<Row> expected, String stmt) {
ResultSet response;
try {
response = executeNGql(stmt);
} catch (IOErrorException e) {
throw new RuntimeException(String.format("failed to check result of %s", stmt), e);
}
ResultSet response = executeNGql(stmt);
if (expected == null || expected.isEmpty()) {
assertTrue(response.isEmpty());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,24 @@
import static org.apache.flink.connector.nebula.NebulaValueUtils.timeOf;
import static org.apache.flink.connector.nebula.NebulaValueUtils.valueOf;

import com.vesoft.nebula.client.graph.exception.IOErrorException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.nebula.MockData;
import org.apache.flink.connector.nebula.NebulaITTestBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,6 +39,24 @@ public class AbstractNebulaOutputFormatITTest extends NebulaITTestBase {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractNebulaOutputFormatITTest.class);

private static TableEnvironment tableEnvironment;

@BeforeClass
public static void beforeAll() {
initializeNebulaSession();
initializeNebulaSchema(MockData.createFlinkTestSpace());
}

@AfterClass
public static void afterAll() {
closeNebulaSession();
}

@Before
public void before() {
tableEnvironment = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
}

/**
* sink Nebula Graph Vertex Data with default INSERT mode
*/
Expand Down Expand Up @@ -67,7 +92,7 @@ public void testSinkVertexData() throws ExecutionException, InterruptedException
+ GRAPH_ADDRESS
+ "',"
+ "'username'='"
+ USER_NAME
+ USERNAME
+ "',"
+ "'password'='"
+ PASSWORD
Expand Down Expand Up @@ -206,7 +231,7 @@ private void checkVertexChangelog(String sourceTableName, String sinkTableName,
+ GRAPH_ADDRESS
+ "',"
+ "'username'='"
+ USER_NAME
+ USERNAME
+ "',"
+ "'password'='"
+ PASSWORD
Expand Down Expand Up @@ -268,7 +293,7 @@ public void testSinkEdgeData() throws ExecutionException, InterruptedException {
+ GRAPH_ADDRESS
+ "',"
+ "'username'='"
+ USER_NAME
+ USERNAME
+ "',"
+ "'password'='"
+ PASSWORD
Expand Down Expand Up @@ -347,7 +372,7 @@ public void testSinkEdgeDataWithoutRank() throws ExecutionException, Interrupted
+ GRAPH_ADDRESS
+ "',"
+ "'username'='"
+ USER_NAME
+ USERNAME
+ "',"
+ "'password'='"
+ PASSWORD
Expand Down Expand Up @@ -462,7 +487,7 @@ private void checkEdgeChangelog(String sourceTableName, String sinkTableName,
+ GRAPH_ADDRESS
+ "',"
+ "'username'='"
+ USER_NAME
+ USERNAME
+ "',"
+ "'password'='"
+ PASSWORD
Expand Down
Loading