Skip to content

Commit

Permalink
[Feature][DDL] Support for processing DDL data.
Browse files Browse the repository at this point in the history
  • Loading branch information
FlechazoW committed Oct 9, 2022
1 parent a2f6b22 commit c18e280
Show file tree
Hide file tree
Showing 243 changed files with 20,720 additions and 2,564 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ sh build/build.sh
Error message:

```java
[ERROR]Failed to execute goal com.diffplug.spotless:spotless-maven-plugin:2.4.2:check(spotless-check)on project flinkx-core:
[ERROR]Failed to execute goal com.diffplug.spotless:spotless-maven-plugin:2.4.2:check(spotless-check)on project chunjun-core:
Execution spotless-check of goal com.diffplug.spotless:spotless-maven-plugin:2.4.2:check failed:Unable to resolve dependencies:
Failed to collect dependencies at com.google.googlejavaformat:google-java-format:jar:1.7->com.google.errorprone:javac-shaded:jar:9+181-r4173-1:
Failed to read artifact descriptor for com.google.errorprone:javac-shaded:jar:9+181-r4173-1:Could not transfer artifact
Expand Down
2 changes: 1 addition & 1 deletion README_CH.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ chunjun目前支持tdh和开源hadoop平台,对不同的平台有需要使用
报错信息:

```java
[ERROR]Failed to execute goal com.diffplug.spotless:spotless-maven-plugin:2.4.2:check(spotless-check)on project flinkx-core:
[ERROR]Failed to execute goal com.diffplug.spotless:spotless-maven-plugin:2.4.2:check(spotless-check)on project chunjun-core:
Execution spotless-check of goal com.diffplug.spotless:spotless-maven-plugin:2.4.2:check failed:Unable to resolve dependencies:
Failed to collect dependencies at com.google.googlejavaformat:google-java-format:jar:1.7->com.google.errorprone:javac-shaded:jar:9+181-r4173-1:
Failed to read artifact descriptor for com.google.errorprone:javac-shaded:jar:9+181-r4173-1:Could not transfer artifact
Expand Down
8 changes: 7 additions & 1 deletion chunjun-connectors/chunjun-connector-binlog/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
</properties>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>

<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.parse</artifactId>
Expand Down Expand Up @@ -172,7 +178,7 @@
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>shade.core.com.google.common</shadedPattern>
<shadedPattern>shade.binlog.com.google.common</shadedPattern>
</relocation>
</relocations>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ public class BinlogConf extends ChunJunCommonConf {

public String cat;

/** 是否支持采集ddl* */
private boolean ddlSkip = true;

/** 任务启动时是否初始化表结构* */
private boolean initialTableStructure = false;

private long offsetLength = 18;

public String filter;

public long period = 1000L;
Expand Down Expand Up @@ -316,6 +324,30 @@ public int getTransactionSize() {
return transactionSize;
}

public boolean isDdlSkip() {
return ddlSkip;
}

public void setDdlSkip(boolean ddlSkip) {
this.ddlSkip = ddlSkip;
}

public boolean isInitialTableStructure() {
return initialTableStructure;
}

public long getOffsetLength() {
return offsetLength;
}

public void setOffsetLength(long offsetLength) {
this.offsetLength = offsetLength;
}

public void setInitialTableStructure(boolean initialTableStructure) {
this.initialTableStructure = initialTableStructure;
}

@Override
public String toString() {
return "BinlogConf{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import static com.dtstack.chunjun.constants.CDCConstantValue.AFTER_;
import static com.dtstack.chunjun.constants.CDCConstantValue.BEFORE;
import static com.dtstack.chunjun.constants.CDCConstantValue.BEFORE_;
import static com.dtstack.chunjun.constants.CDCConstantValue.DATABASE;
import static com.dtstack.chunjun.constants.CDCConstantValue.LSN;
import static com.dtstack.chunjun.constants.CDCConstantValue.OP_TIME;
import static com.dtstack.chunjun.constants.CDCConstantValue.SCHEMA;
import static com.dtstack.chunjun.constants.CDCConstantValue.TABLE;
Expand Down Expand Up @@ -174,25 +176,32 @@ public LinkedList<RowData> toInternal(BinlogEventRow binlogEventRow) throws Exce
if (pavingData) {
// 5: schema, table, ts, opTime,type
size =
5
7
+ rowData.getAfterColumnsList().size()
+ rowData.getBeforeColumnsList().size();
} else {
// 7: schema, table, ts, opTime,type, before, after
size = 7;
size = 9;
}

ColumnRowData columnRowData = new ColumnRowData(size);
columnRowData.addField(new NullColumn());
columnRowData.addHeader(DATABASE);
columnRowData.addExtHeader(DATABASE);
columnRowData.addField(new StringColumn(schema));
columnRowData.addHeader(SCHEMA);
columnRowData.addExtHeader(CDCConstantValue.SCHEMA);
columnRowData.addExtHeader(SCHEMA);
columnRowData.addField(new StringColumn(table));
columnRowData.addHeader(TABLE);
columnRowData.addExtHeader(CDCConstantValue.TABLE);
columnRowData.addField(new BigDecimalColumn(super.idWorker.nextId()));
columnRowData.addHeader(TS);
columnRowData.addExtHeader(TS);
columnRowData.addField(new TimestampColumn(binlogEventRow.getExecuteTime()));
columnRowData.addField(new StringColumn(binlogEventRow.getLsn()));
columnRowData.addHeader(LSN);
columnRowData.addExtHeader(LSN);
columnRowData.addField(
new StringColumn(String.valueOf(binlogEventRow.getExecuteTime())));
columnRowData.addHeader(OP_TIME);
columnRowData.addExtHeader(CDCConstantValue.OP_TIME);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,20 @@ public class BinlogEventRow implements Serializable {
private final CanalEntry.RowChange rowChange;
private final String schema;
private final String table;
private final String lsn;
private final long executeTime;

public BinlogEventRow(
CanalEntry.RowChange rowChange, String schema, String table, long executeTime) {
CanalEntry.RowChange rowChange,
String schema,
String table,
long executeTime,
String lsn) {
this.rowChange = rowChange;
this.schema = schema;
this.table = table;
this.executeTime = executeTime;
this.lsn = lsn;
}

public CanalEntry.RowChange getRowChange() {
Expand All @@ -58,6 +64,10 @@ public long getExecuteTime() {
return executeTime;
}

public String getLsn() {
return lsn;
}

@Override
public String toString() {
return "BinlogEventRow{"
Expand All @@ -71,6 +81,9 @@ public String toString() {
+ '\''
+ ", executeTime="
+ executeTime
+ '\''
+ ", lsn="
+ lsn
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ public class BinlogEventSink extends AbstractCanalLifeCycle
private final LinkedBlockingDeque<RowData> queue;
private final AbstractCDCRowConverter rowConverter;

private final String OFFSET_LENGTH;

public BinlogEventSink(BinlogInputFormat format) {
this.format = format;
this.queue = new LinkedBlockingDeque<>();
this.rowConverter = format.getRowConverter();
this.OFFSET_LENGTH = "%0" + this.format.getBinlogConf().getOffsetLength() + "d";
}

@Override
Expand All @@ -79,8 +82,9 @@ public boolean sink(
String schema = header.getSchemaName();
String table = header.getTableName();
long executeTime = header.getExecuteTime();
String lsn = buildLastPosition(entry);
try {
processRowChange(rowChange, schema, table, executeTime);
processRowChange(rowChange, schema, table, executeTime, lsn);
} catch (WriteRecordException e) {
// todo 脏数据记录
if (LOG.isDebugEnabled()) {
Expand All @@ -104,14 +108,19 @@ public boolean sink(
*/
@SuppressWarnings("unchecked")
private void processRowChange(
CanalEntry.RowChange rowChange, String schema, String table, long executeTime)
CanalEntry.RowChange rowChange,
String schema,
String table,
long executeTime,
String lsn)
throws WriteRecordException {
String eventType = rowChange.getEventType().toString();
List<String> categories = format.getCategories();
if (CollectionUtils.isNotEmpty(categories) && !categories.contains(eventType)) {
return;
}
BinlogEventRow binlogEventRow = new BinlogEventRow(rowChange, schema, table, executeTime);
BinlogEventRow binlogEventRow =
new BinlogEventRow(rowChange, schema, table, executeTime, lsn);
LinkedList<RowData> rowDatalist = null;
try {
rowDatalist = rowConverter.toInternal(binlogEventRow);
Expand Down Expand Up @@ -167,6 +176,11 @@ public void processErrorMsgRowData(ErrorMsgRowData rowData) {
}
}

protected String buildLastPosition(CanalEntry.Entry entry) {
String pos = String.format(OFFSET_LENGTH, entry.getHeader().getLogfileOffset());
return entry.getHeader().getLogfileName() + "/" + pos;
}

@Override
public void interrupt() {
LOG.warn("BinlogEventSink is interrupted");
Expand Down
1 change: 0 additions & 1 deletion chunjun-connectors/chunjun-connector-hdfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.11.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<artifactId>groovy-all</artifactId>
Expand Down
6 changes: 0 additions & 6 deletions chunjun-connectors/chunjun-connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,6 @@
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package com.dtstack.chunjun.connector.jdbc.sink;

import com.dtstack.chunjun.cdc.DdlRowData;
import com.dtstack.chunjun.cdc.DdlRowDataConvented;
import com.dtstack.chunjun.connector.jdbc.conf.JdbcConf;
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
Expand Down Expand Up @@ -336,30 +334,6 @@ public void closeInternal() {
JdbcUtil.closeDbResources(null, null, dbConn, true);
}

@Override
protected void executeDdlRwoData(DdlRowData ddlRowData) throws Exception {
if (ddlRowData instanceof DdlRowDataConvented
&& !((DdlRowDataConvented) ddlRowData).conventSuccessful()) {
return;
}
Statement statement = dbConn.createStatement();
statement.execute(ddlRowData.getSql());
}

/**
* write all data and commit transaction before execute ddl sql
*
* @param ddlRowData
* @throws Exception
*/
@Override
protected void preExecuteDdlRwoData(DdlRowData ddlRowData) throws Exception {
while (this.rows.size() > 0) {
this.writeRecordInternal();
}
doCommit();
}

/**
* 获取数据库连接,用于子类覆盖
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@
import java.util.Map;
import java.util.Properties;

/**
* Date: 2021/04/13 Company: www.dtstack.com
*
* @author tudou
*/
public abstract class JdbcSinkFactory extends SinkFactory {

protected JdbcConf jdbcConf;
Expand Down Expand Up @@ -102,12 +97,12 @@ public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
JdbcOutputFormatBuilder builder = getBuilder();
initColumnInfo();
builder.setJdbcConf(jdbcConf);
builder.setDdlConf(ddlConf);
builder.setJdbcDialect(jdbcDialect);
builder.setMonitorConfig(monitor);
builder.setColumnNameList(columnNameList);
builder.setColumnTypeList(columnTypeList);

AbstractRowConverter rowConverter = null;
AbstractRowConverter rowConverter;
final RowType rowType =
TableUtil.createRowType(jdbcConf.getColumn(), getRawTypeConverter());
// 同步任务使用transform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.table.types.logical.LogicalType;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -101,10 +101,10 @@ public static Connection getConnection(JdbcConf jdbcConf, JdbcDialect jdbcDialec
if (prop == null) {
prop = new Properties();
}
if (org.apache.commons.lang3.StringUtils.isNotBlank(jdbcConf.getUsername())) {
if (StringUtils.isNotBlank(jdbcConf.getUsername())) {
prop.put("user", jdbcConf.getUsername());
}
if (org.apache.commons.lang3.StringUtils.isNotBlank(jdbcConf.getPassword())) {
if (StringUtils.isNotBlank(jdbcConf.getPassword())) {
prop.put("password", jdbcConf.getPassword());
}
Properties finalProp = prop;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ public class Vertica11LookupOptions extends JdbcLookupOptions {
ConfigOptions.key("DT_PROVIDER_CLASS")
.stringType()
.defaultValue(
"com.dtstack.flinkx.connector.vertica11.lookup.provider.Vertica11DataSourceProvider")
"com.dtstack.chunjun.connector.vertica11.lookup.provider.Vertica11DataSourceProvider")
.withDescription(" lookup ");
}
7 changes: 7 additions & 0 deletions chunjun-connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@
</exclusion>
</exclusions>
</dependency>

<!-- https://mvnrepository.com/artifact/commons-collections/commons-collections -->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
Expand Down
11 changes: 6 additions & 5 deletions chunjun-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,12 @@
<version>3.1</version>
</dependency>

<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>

<dependency>
<groupId>com.github.jsqlparser</groupId>
Expand Down
Loading

0 comments on commit c18e280

Please sign in to comment.