Skip to content

Commit

Permalink
use lists instead of streams in TestDatabase
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Feb 16, 2024
1 parent 2f31ce7 commit 962841b
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import javax.sql.DataSource;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.exception.DataAccessException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.JdbcDatabaseContainer;
Expand Down Expand Up @@ -87,7 +87,7 @@ public T onClose(String fmtSql, Object... fmtArgs) {
* Executes a SQL statement after calling String.format on the arguments.
*/
public T with(String fmtSql, Object... fmtArgs) {
execSQL(Stream.of(String.format(fmtSql, fmtArgs)));
execSQL(List.of(String.format(fmtSql, fmtArgs)));
return self();
}

Expand All @@ -113,9 +113,9 @@ final public boolean isInitialized() {
return dslContext != null;
}

abstract protected Stream<Stream<String>> inContainerBootstrapCmd();
abstract protected List<List<String>> inContainerBootstrapCmd();

abstract protected Stream<String> inContainerUndoBootstrapCmd();
abstract protected List<String> inContainerUndoBootstrapCmd();

abstract public DatabaseDriver getDatabaseDriver();

Expand Down Expand Up @@ -167,22 +167,17 @@ public Database getDatabase() {
return new Database(getDslContext());
}

protected void execSQL(final Stream<String> sql) {
protected void execSQL(final List<String> sqls) {
try {
getDatabase().query(ctx -> {
sql.forEach(statement -> {
LOGGER.debug("{}", statement);
ctx.execute(statement);
});
return null;
});
} catch (SQLException e) {
for (String sql : sqls) {
getDslContext().execute(sql);
}
} catch (DataAccessException e) {
throw new RuntimeException(e);
}
}

protected void execInContainer(Stream<String> cmds) {
final List<String> cmd = cmds.toList();
protected void execInContainer(List<String> cmd) {
if (cmd.isEmpty()) {
return;
}
Expand Down Expand Up @@ -232,7 +227,7 @@ public B integrationTestConfigBuilder() {

@Override
public void close() {
execSQL(this.cleanupSQL.stream());
execSQL(this.cleanupSQL);
execInContainer(inContainerUndoBootstrapCmd());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteStateMessage.AirbyteStateType;
import java.sql.JDBCType;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -140,24 +141,27 @@ public BareBonesTestDatabase(PostgreSQLContainer<?> container) {
}

@Override
protected Stream<Stream<String>> inContainerBootstrapCmd() {
final var sql = Stream.of(
protected List<List<String>> inContainerBootstrapCmd() {
final var sqls = List.of(
String.format("CREATE DATABASE %s", getDatabaseName()),
String.format("CREATE USER %s PASSWORD '%s'", getUserName(), getPassword()),
String.format("GRANT ALL PRIVILEGES ON DATABASE %s TO %s", getDatabaseName(), getUserName()),
String.format("ALTER USER %s WITH SUPERUSER", getUserName()));
return Stream.of(Stream.concat(
Stream.of("psql",
"-d", getContainer().getDatabaseName(),
"-U", getContainer().getUsername(),
"-v", "ON_ERROR_STOP=1",
"-a"),
sql.flatMap(stmt -> Stream.of("-c", stmt))));
List<String> cmd = Arrays.asList("psql",
"-d", getContainer().getDatabaseName(),
"-U", getContainer().getUsername(),
"-v", "ON_ERROR_STOP=1",
"-a");
for (String sql : sqls) {
cmd.add("-c");
cmd.add(sql);
}
return List.of(cmd);
}

@Override
protected Stream<String> inContainerUndoBootstrapCmd() {
return Stream.empty();
protected List<String> inContainerUndoBootstrapCmd() {
return Collections.emptyList();
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.19.0'
cdkVersionRequired = '0.19.1'
features = ['db-sources']
useLocalCdk = true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public JsonNode constructInitialDebeziumState(final Properties properties,
assert Objects.nonNull(schemaHistory.schema());

final JsonNode asJson = serialize(offset, schemaHistory);
LOGGER.info("Initial Debezium state constructed: {}", asJson);
//LOGGER.info("Initial Debezium state constructed: {}", asJson);

if (asJson.get(MssqlCdcStateConstants.MSSQL_DB_HISTORY).asText().isBlank()) {
throw new RuntimeException("Schema history snapshot returned empty history.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -157,10 +159,10 @@ public String getJdbcUrl() {
}

@Override
protected Stream<Stream<String>> inContainerBootstrapCmd() {
return Stream.of(
mssqlCmd(Stream.of(String.format("CREATE DATABASE %s", getDatabaseName()))),
mssqlCmd(Stream.of(
protected List<List<String>> inContainerBootstrapCmd() {
return List.of(
mssqlCmd(List.of(String.format("CREATE DATABASE %s", getDatabaseName()))),
mssqlCmd(List.of(
String.format("USE %s", getDatabaseName()),
String.format("CREATE LOGIN %s WITH PASSWORD = '%s', DEFAULT_DATABASE = %s", getUserName(), getPassword(), getDatabaseName()),
String.format("ALTER SERVER ROLE [sysadmin] ADD MEMBER %s", getUserName()),
Expand All @@ -174,22 +176,22 @@ protected Stream<Stream<String>> inContainerBootstrapCmd() {
* aren't really worth it.
*/
@Override
protected Stream<String> inContainerUndoBootstrapCmd() {
return Stream.empty();
protected List<String> inContainerUndoBootstrapCmd() {
return Collections.emptyList();
}

public void dropDatabaseAndUser() {
execInContainer(mssqlCmd(Stream.of(
execInContainer(mssqlCmd(List.of(
String.format("USE master"),
String.format("ALTER DATABASE %s SET single_user WITH ROLLBACK IMMEDIATE", getDatabaseName()),
String.format("DROP DATABASE %s", getDatabaseName()))));
}

public Stream<String> mssqlCmd(final Stream<String> sql) {
return Stream.of("/opt/mssql-tools/bin/sqlcmd",
public List<String> mssqlCmd(final List<String> sql) {
return Arrays.asList("/opt/mssql-tools/bin/sqlcmd",
"-U", getContainer().getUsername(),
"-P", getContainer().getPassword(),
"-Q", sql.collect(Collectors.joining("; ")),
"-Q", StringUtils.join(sql, "; "),
"-b", "-e");
}

Expand Down

0 comments on commit 962841b

Please sign in to comment.