Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-14: Add whitelist/blacklist for tables. #18

Merged
merged 1 commit into from
Nov 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import io.confluent.common.config.ConfigException;
import io.confluent.connect.jdbc.util.StringUtils;
Expand Down Expand Up @@ -75,7 +77,16 @@ public void start(Map<String, String> properties) throws ConnectException {
}

long tablePollMs = config.getLong(JdbcSourceConnectorConfig.TABLE_POLL_INTERVAL_MS_CONFIG);
tableMonitorThread = new TableMonitorThread(db, context, tablePollMs);
List<String> whitelist = config.getList(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG);
Set<String> whitelistSet = whitelist.isEmpty() ? null : new HashSet<>(whitelist);
List<String> blacklist = config.getList(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG);
Set<String> blacklistSet = blacklist.isEmpty() ? null : new HashSet<>(blacklist);
if (whitelistSet != null && blacklistSet != null)
throw new ConnectException(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG + " and "
+ JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG+ " are "
+ "exclusive.");
tableMonitorThread = new TableMonitorThread(db, context, tablePollMs, whitelistSet,
blacklistSet);
tableMonitorThread.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
+ "removed tables.";
public static final long TABLE_POLL_INTERVAL_MS_DEFAULT = 60 * 1000;

public static final String TABLE_WHITELIST_CONFIG = "table.whitelist";
private static final String TABLE_WHITELIST_DOC =
"List of tables to include in copying. If specified, table.blacklist may not be set.";
public static final String TABLE_WHITELIST_DEFAULT = "";

public static final String TABLE_BLACKLIST_CONFIG = "table.blacklist";
private static final String TABLE_BLACKLIST_DOC =
"List of tables to exclude from copying. If specified, table.whitelist may not be set.";
public static final String TABLE_BLACKLIST_DEFAULT = "";

static ConfigDef config = new ConfigDef()
.define(CONNECTION_URL_CONFIG, Type.STRING, Importance.HIGH, CONNECTION_URL_DOC)
.define(POLL_INTERVAL_MS_CONFIG, Type.INT, POLL_INTERVAL_MS_DEFAULT, Importance.HIGH,
Expand All @@ -93,7 +103,11 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
.define(TIMESTAMP_COLUMN_NAME_CONFIG, Type.STRING, TIMESTAMP_COLUMN_NAME_DEFAULT,
Importance.MEDIUM, TIMESTAMP_COLUMN_NAME_DOC)
.define(TABLE_POLL_INTERVAL_MS_CONFIG, Type.LONG, TABLE_POLL_INTERVAL_MS_DEFAULT,
Importance.LOW, TABLE_POLL_INTERVAL_MS_DOC);
Importance.LOW, TABLE_POLL_INTERVAL_MS_DOC)
.define(TABLE_WHITELIST_CONFIG, Type.LIST, TABLE_WHITELIST_DEFAULT,
Importance.MEDIUM, TABLE_WHITELIST_DOC)
.define(TABLE_BLACKLIST_CONFIG, Type.LIST, TABLE_BLACKLIST_DEFAULT,
Importance.MEDIUM, TABLE_BLACKLIST_DOC);

JdbcSourceConnectorConfig(Map<String, String> props) {
super(config, props);
Expand Down
32 changes: 28 additions & 4 deletions src/main/java/io/confluent/connect/jdbc/TableMonitorThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -38,13 +40,18 @@ public class TableMonitorThread extends Thread {
private final ConnectorContext context;
private final CountDownLatch shutdownLatch;
private final long pollMs;
private Set<String> whitelist;
private Set<String> blacklist;
private List<String> tables;

public TableMonitorThread(Connection db, ConnectorContext context, long pollMs) {
public TableMonitorThread(Connection db, ConnectorContext context, long pollMs,
Set<String> whitelist, Set<String> blacklist) {
this.db = db;
this.context = context;
this.shutdownLatch = new CountDownLatch(1);
this.pollMs = pollMs;
this.whitelist = whitelist;
this.blacklist = blacklist;
this.tables = null;
}

Expand Down Expand Up @@ -102,11 +109,28 @@ private boolean updateTables() {
return false;
}

// TODO: Any filtering like whitelists/blacklists or regex matches should be applied here.
final List<String> filteredTables;
if (whitelist != null) {
filteredTables = new ArrayList<>(tables.size());
for (String table : tables) {
if (whitelist.contains(table)) {
filteredTables.add(table);
}
}
} else if (blacklist != null) {
filteredTables = new ArrayList<>(tables.size());
for (String table : tables) {
if (!blacklist.contains(table)) {
filteredTables.add(table);
}
}
} else {
filteredTables = tables;
}

if (!tables.equals(this.tables)) {
if (!filteredTables.equals(this.tables)) {
List<String> previousTables = this.tables;
this.tables = tables;
this.tables = filteredTables;
db.notifyAll();
// Only return true if the table list wasn't previously null, i.e. if this was not the
// first table lookup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;

import static org.junit.Assert.assertEquals;
Expand All @@ -45,6 +46,7 @@ public class TableMonitorThreadTest {

private static final List<String> FIRST_TOPIC_LIST = Arrays.asList("foo");
private static final List<String> SECOND_TOPIC_LIST = Arrays.asList("foo", "bar");
private static final List<String> THIRD_TOPIC_LIST = Arrays.asList("foo", "bar", "baz");

private EmbeddedDerby db;
private Connection dbConn;
Expand All @@ -55,7 +57,6 @@ public class TableMonitorThreadTest {
public void setUp() throws SQLException {
db = new EmbeddedDerby();
dbConn = DriverManager.getConnection(db.getUrl());
tableMonitorThread = new TableMonitorThread(dbConn, context, POLL_INTERVAL);

PowerMock.mockStatic(JdbcUtils.class);
}
Expand All @@ -68,6 +69,8 @@ public void tearDown() throws Exception {

@Test
public void testSingleLookup() throws Exception {
tableMonitorThread = new TableMonitorThread(dbConn, context, POLL_INTERVAL, null, null);

EasyMock.expect(JdbcUtils.getTables(dbConn)).andAnswer(new IAnswer<List<String>>() {
@Override
public List<String> answer() throws Throwable {
Expand All @@ -85,8 +88,54 @@ public List<String> answer() throws Throwable {
PowerMock.verifyAll();
}

@Test
public void testWhitelist() throws Exception {
tableMonitorThread = new TableMonitorThread(dbConn, context, POLL_INTERVAL,
new HashSet<>(Arrays.asList("foo", "bar")), null);

EasyMock.expect(JdbcUtils.getTables(dbConn)).andAnswer(new IAnswer<List<String>>() {
@Override
public List<String> answer() throws Throwable {
tableMonitorThread.shutdown();
return THIRD_TOPIC_LIST;
}
});

PowerMock.replayAll();

tableMonitorThread.start();
tableMonitorThread.join();
assertEquals(Arrays.asList("foo", "bar"), tableMonitorThread.tables());

PowerMock.verifyAll();
}

@Test
public void testBlacklist() throws Exception {
tableMonitorThread = new TableMonitorThread(dbConn, context, POLL_INTERVAL,
null, new HashSet<>(Arrays.asList("bar", "baz")));

EasyMock.expect(JdbcUtils.getTables(dbConn)).andAnswer(new IAnswer<List<String>>() {
@Override
public List<String> answer() throws Throwable {
tableMonitorThread.shutdown();
return THIRD_TOPIC_LIST;
}
});

PowerMock.replayAll();

tableMonitorThread.start();
tableMonitorThread.join();
assertEquals(Arrays.asList("foo"), tableMonitorThread.tables());

PowerMock.verifyAll();
}

@Test
public void testReconfigOnUpdate() throws Exception {
tableMonitorThread = new TableMonitorThread(dbConn, context, POLL_INTERVAL, null, null);

EasyMock.expect(JdbcUtils.getTables(dbConn)).andReturn(FIRST_TOPIC_LIST);
// Returning same list should not change results
EasyMock.expect(JdbcUtils.getTables(dbConn)).andAnswer(new IAnswer<List<String>>() {
Expand Down