Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import javax.annotation.Nullable;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -171,11 +170,7 @@ public Map<TableId, TableChange> discoverDataCollectionSchemas(JdbcSourceConfig

try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
// fetch table schemas
Map<TableId, TableChange> tableSchemas = new HashMap<>();
for (TableId tableId : capturedTableIds) {
TableChange tableSchema = queryTableSchema(jdbc, tableId);
tableSchemas.put(tableId, tableSchema);
}
Map<TableId, TableChange> tableSchemas = queryTableSchema(jdbc, capturedTableIds);
return tableSchemas;
} catch (Exception e) {
throw new FlinkRuntimeException(
Expand All @@ -196,6 +191,14 @@ public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
return schema.getTableSchema(tableId);
}

private Map<TableId, TableChange> queryTableSchema(
JdbcConnection jdbc, List<TableId> tableIds) {
if (schema == null) {
schema = new CustomPostgresSchema((PostgresConnection) jdbc, sourceConfig);
}
return schema.getTableSchema(tableIds);
}

@Override
public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase) {
if (sourceSplitBase.isSnapshotSplit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@

import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

Expand All @@ -56,53 +59,87 @@ public TableChange getTableSchema(TableId tableId) {
// read schema from cache first
if (!schemasByTableId.containsKey(tableId)) {
try {
readTableSchema(tableId);
readTableSchema(Collections.singletonList(tableId));
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to read table schema", e);
}
}
return schemasByTableId.get(tableId);
}

private TableChange readTableSchema(TableId tableId) throws SQLException {
public Map<TableId, TableChange> getTableSchema(List<TableId> tableIds) {
// read schema from cache first
Map<TableId, TableChange> tableChanges = new HashMap();

List<TableId> unMatchTableIds = new ArrayList<>();
for (TableId tableId : tableIds) {
if (schemasByTableId.containsKey(tableId)) {
tableChanges.put(tableId, schemasByTableId.get(tableId));
} else {
unMatchTableIds.add(tableId);
}
}

if (!unMatchTableIds.isEmpty()) {
try {
readTableSchema(tableIds);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to read table schema", e);
}
for (TableId tableId : unMatchTableIds) {
if (schemasByTableId.containsKey(tableId)) {
tableChanges.put(tableId, schemasByTableId.get(tableId));
} else {
throw new FlinkRuntimeException(
String.format("Failed to read table schema of table %s", tableId));
}
}
}
return tableChanges;
}

private List<TableChange> readTableSchema(List<TableId> tableIds) throws SQLException {
List<TableChange> tableChanges = new ArrayList<>();

final PostgresOffsetContext offsetContext =
PostgresOffsetContext.initialContext(dbzConfig, jdbcConnection, Clock.SYSTEM);

PostgresPartition partition = new PostgresPartition(dbzConfig.getLogicalName());

// set the events to populate proper sourceInfo into offsetContext
offsetContext.event(tableId, Instant.now());

Tables tables = new Tables();
try {
jdbcConnection.readSchema(
tables,
dbzConfig.databaseName(),
tableId.schema(),
null,
dbzConfig.getTableFilters().dataCollectionFilter(),
null,
false);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to read schema", e);
}

Table table = Objects.requireNonNull(tables.forTable(tableId));

// TODO: check whether we always set isFromSnapshot = true
SchemaChangeEvent schemaChangeEvent =
SchemaChangeEvent.ofCreate(
partition,
offsetContext,
dbzConfig.databaseName(),
tableId.schema(),
null,
table,
true);

for (TableChanges.TableChange tableChange : schemaChangeEvent.getTableChanges()) {
this.schemasByTableId.put(tableId, tableChange);
for (TableId tableId : tableIds) {
Table table = Objects.requireNonNull(tables.forTable(tableId));
// set the events to populate proper sourceInfo into offsetContext
offsetContext.event(tableId, Instant.now());

// TODO: check whether we always set isFromSnapshot = true
SchemaChangeEvent schemaChangeEvent =
SchemaChangeEvent.ofCreate(
partition,
offsetContext,
dbzConfig.databaseName(),
tableId.schema(),
null,
table,
true);

for (TableChanges.TableChange tableChange : schemaChangeEvent.getTableChanges()) {
this.schemasByTableId.put(tableId, tableChange);
}
tableChanges.add(this.schemasByTableId.get(tableId));
}
return this.schemasByTableId.get(tableId);
return tableChanges;
}
}