Skip to content

Commit

Permalink
#192 add boolean inSink param on getColmeta form different process o…
Browse files Browse the repository at this point in the history
…n Source or Sink end for oracle Date type
  • Loading branch information
baisui1981 committed Apr 18, 2023
1 parent 6a83271 commit af40e65
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private void initSuForm(boolean withUserIdPk) {
cols.add(new ColumnMetaData(0, FILED_USER_ID, new com.qlangtech.tis.plugin.ds.DataType(Types.BIGINT), withUserIdPk));
cols.add(new ColumnMetaData(1, FIELD_USER_NAME, new com.qlangtech.tis.plugin.ds.DataType(Types.VARBINARY), false));
try {
EasyMock.expect(metaPlugin.getTableMetadata(EntityName.parse(id1))).andReturn(cols);
EasyMock.expect(metaPlugin.getTableMetadata(true, EntityName.parse(id1))).andReturn(cols);
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ public TableInDB getTablesInDB() {
throw new UnsupportedOperationException();
}



@Override
public List<ColumnMetaData> getTableMetadata(EntityName table) {
public List<ColumnMetaData> getTableMetadata(boolean inSink,EntityName table) {
if (!StringUtils.equals(dumper.testTableName, table.getTableName())) {
Assert.fail("dumper.testTableName:" + dumper.testTableName + " must equal with:" + table);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ private TargetColumnMeta getTargetColumnMeta(
}
DataSourceFactory dsFactory = (DataSourceFactory) vals.createDescribable(pluginContext).getInstance();
List<ColumnMetaData> tableMetadata = null;
tableMetadata = dsFactory.getTableMetadata(EntityName.parse(targetTable));
tableMetadata = dsFactory.getTableMetadata(false, EntityName.parse(targetTable));

colMetas = tableMetadata.stream().collect(Collectors.toMap((m) -> m.getKey(), (m) -> m));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,7 @@ public void doGetDsTabsVals(Context context) throws IOException {
DescriptorsJSON desc2Json = new DescriptorsJSON(reader.getDescriptor());
mapCols = selectedTabs.stream().collect(Collectors.toMap((tab) -> tab, (tab) -> {
try {
return reader.getTableMetadata(EntityName.parse(tab));
return reader.getTableMetadata(false, EntityName.parse(tab));
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -1354,7 +1354,7 @@ public void doGetDatasourceTableCols(Context context) throws TableNotFoundExcept
String tableName = this.getString("tabName");
com.qlangtech.tis.workflow.pojo.DatasourceDb db = getDsDb();
DataxReader dbDataxReader = OfflineManager.getDBDataxReader(this, db.getName());
List<ColumnMetaData> colsMeta = dbDataxReader.getTableMetadata(EntityName.parse(tableName));
List<ColumnMetaData> colsMeta = dbDataxReader.getTableMetadata(false, EntityName.parse(tableName));
this.setBizResult(context, colsMeta);
}

Expand Down Expand Up @@ -1400,7 +1400,7 @@ public void doReflectTableCols(Context context) throws Exception {
}

DataSourceFactory dsStore = TIS.getDataBasePlugin(PostedDSProp.parse(dumpNode.getDbName()));
sqlCols.setCols(dsStore.getTableMetadata(dumpNode.parseEntityName()));
sqlCols.setCols(dsStore.getTableMetadata(false, dumpNode.parseEntityName()));
// TISTable tisTable = dbPlugin.loadTableMeta(dumpNode.getName());
// if (CollectionUtils.isEmpty(tisTable.getReflectCols())) {
// throw new IllegalStateException("db:" + dumpNode.getDbName() + ",table:" + dumpNode.getName() + " relevant table col reflect cols can not be empty");
Expand Down Expand Up @@ -1545,7 +1545,7 @@ public void doCheckTableLogicNameRepeat(Context context) {

List<ColumnMetaData> cols = null;// offlineManager.getTableMetadata(db.getName(), table);
try {
cols = dbPlugin.getTableMetadata(EntityName.parse(table));
cols = dbPlugin.getTableMetadata(false, EntityName.parse(table));
if (cols.size() < 1) {
this.addErrorMessage(context, "表:[" + table + "]没有定义列");
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ public TableInDB getTablesInDB() {
}

@Override
public List<ColumnMetaData> getTableMetadata(EntityName table) throws TableNotFoundException {
return reader.getTableMetadata(table);
public List<ColumnMetaData> getTableMetadata(boolean inSink, EntityName table) throws TableNotFoundException {
return reader.getTableMetadata(false, table);
}

@Override
public List<ColumnMetaData> getTableMetadata(JDBCConnection conn, EntityName table) throws TableNotFoundException {
return reader.getTableMetadata(conn, table);
public List<ColumnMetaData> getTableMetadata(JDBCConnection conn, boolean inSink, EntityName table) throws TableNotFoundException {
return reader.getTableMetadata(conn, false, table);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,19 @@ public static DataxReader getThreadBingDataXReader() {
}

public static <T extends DataxReader> T getDataxReader(IPropertyType.SubFormFilter filter) {
IPluginStore<?> pluginStore = HeteroEnum.getDataXReaderAndWriterStore(
filter.uploadPluginMeta.getPluginContext(), true, filter.uploadPluginMeta);
DataxReader reader = (DataxReader) pluginStore.getPlugin();
if (reader == null) {
throw new IllegalStateException("dataXReader can not be null:" + filter.uploadPluginMeta.toString());
}
return (T) reader;
IPluginStore<?> pluginStore = HeteroEnum.getDataXReaderAndWriterStore(
filter.uploadPluginMeta.getPluginContext(), true, filter.uploadPluginMeta);
DataxReader reader = (DataxReader) pluginStore.getPlugin();
if (reader == null) {
throw new IllegalStateException("dataXReader can not be null:" + filter.uploadPluginMeta.toString());
}
return (T) reader;
}

@Override
public IStreamTableMeta getStreamTableMeta(String tableName) {
try {
List<ColumnMetaData> cols = this.getTableMetadata(EntityName.parse(tableName));
List<ColumnMetaData> cols = this.getTableMetadata(false, EntityName.parse(tableName));

return new IStreamTableMeta() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public static List<Option> getContextTableCols(Function<List<ColumnMetaData>, St
List<ColumnMetaData> cols
= context.getContextAttr(KEY_TABLE_COLS, (key) -> {
try {
return dsMeta.getTableMetadata(EntityName.parse(context.getSubFormIdentityField()));
return dsMeta.getTableMetadata(false, EntityName.parse(context.getSubFormIdentityField()));
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public JDBCConnection getConnection(String jdbcUrl, boolean usingPool) throws SQ
// return DriverManager.getConnection(jdbcUrl, username, StringUtils.trimToNull(password));
// }

protected List<ColumnMetaData> parseTableColMeta(String jdbcUrl, JDBCConnection conn, EntityName table)
protected List<ColumnMetaData> parseTableColMeta(boolean inSink, String jdbcUrl, JDBCConnection conn, EntityName table)
throws SQLException, TableNotFoundException {
table = logicTable2PhysicsTable(jdbcUrl, table);

Expand Down Expand Up @@ -201,7 +201,7 @@ protected List<ColumnMetaData> parseTableColMeta(String jdbcUrl, JDBCConnection
pkCols.add(columnName);
}

return wrapColsMeta(columns1, pkCols);
return wrapColsMeta(inSink, columns1, pkCols);
} finally {
closeResultSet(columns1);
closeResultSet(primaryKeys);
Expand All @@ -210,8 +210,8 @@ protected List<ColumnMetaData> parseTableColMeta(String jdbcUrl, JDBCConnection
// return columns;
}

public List<ColumnMetaData> wrapColsMeta(ResultSet columns1) throws SQLException {
return wrapColsMeta(columns1, Collections.emptySet());
public List<ColumnMetaData> wrapColsMeta(boolean inSink, ResultSet columns1) throws SQLException {
return wrapColsMeta(inSink, columns1, Collections.emptySet());
}

public static final String KEY_COLUMN_NAME = "COLUMN_NAME";
Expand All @@ -224,11 +224,11 @@ public List<ColumnMetaData> wrapColsMeta(ResultSet columns1) throws SQLException
public static final String KEY_DATA_TYPE = "DATA_TYPE";
public static final String KEY_COLUMN_SIZE = "COLUMN_SIZE";

public List<ColumnMetaData> wrapColsMeta(ResultSet columns1, Set<String> pkCols) throws SQLException {
return this.wrapColsMeta(columns1, new CreateColumnMeta(pkCols, columns1));
public List<ColumnMetaData> wrapColsMeta(boolean inSink, ResultSet columns1, Set<String> pkCols) throws SQLException {
return this.wrapColsMeta(inSink, columns1, new CreateColumnMeta(pkCols, columns1));
}

public List<ColumnMetaData> wrapColsMeta(ResultSet columns1, CreateColumnMeta columnMetaCreator) throws SQLException {
public List<ColumnMetaData> wrapColsMeta(boolean inSink, ResultSet columns1, CreateColumnMeta columnMetaCreator) throws SQLException {

ColumnMetaData colMeta;
String colName = null;
Expand Down Expand Up @@ -312,11 +312,11 @@ private String getDbSchema() {
return dbSchema;
}

protected List<ColumnMetaData> parseTableColMeta(EntityName table, String jdbcUrl) throws TableNotFoundException {
protected List<ColumnMetaData> parseTableColMeta(boolean inSink, EntityName table, String jdbcUrl) throws TableNotFoundException {

AtomicReference<List<ColumnMetaData>> ref = new AtomicReference<>();
validateConnection(jdbcUrl, (conn) -> {
List<ColumnMetaData> columnMetaData = parseTableColMeta(jdbcUrl, conn, table);
List<ColumnMetaData> columnMetaData = parseTableColMeta(inSink, jdbcUrl, conn, table);
ref.set(columnMetaData);
});
return ref.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,28 @@ default TableInDB getTablesInDB() {
throw new UnsupportedOperationException();
}


/**
* Get table column metaData list
*
* @param inSink 是否在处理sink数据源 https://github.com/qlangtech/tis/issues/192,在处理oracle的Date类型时:inSink:true 则要定义成sql.date false: 保持datetime类型
* @param table
* @return
* @throws TableNotFoundException
*/
default List<ColumnMetaData> getTableMetadata(EntityName table) throws TableNotFoundException {
default List<ColumnMetaData> getTableMetadata(boolean inSink, EntityName table) throws TableNotFoundException {
throw new UnsupportedOperationException("invoke from:" + this.getClass().getName());
}


/**
* Get table column metaData list
*
* @param inSink 是否在执行sink流程 https://github.com/qlangtech/tis/issues/192
* @param table
* @return
*/
default List<ColumnMetaData> getTableMetadata(JDBCConnection conn, EntityName table) throws TableNotFoundException {
default List<ColumnMetaData> getTableMetadata(JDBCConnection conn, boolean inSink, EntityName table) throws TableNotFoundException {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ public List<ColumnMetaData> getFinalTaskNodeCols() throws Exception {
if (this.isSingleTableModel()) {
DependencyNode dumpNode = this.getDumpNodes().get(0);
DataSourceFactory dbPlugin = TIS.getDataBasePlugin(new PostedDSProp(DBIdentity.parseId(dumpNode.getDbName())));
List<ColumnMetaData> cols = dbPlugin.getTableMetadata(dumpNode.parseEntityName());
List<ColumnMetaData> cols = dbPlugin.getTableMetadata(false, dumpNode.parseEntityName());
return cols; //tisTable.getReflectCols();
// .stream().map((c) -> {
// return new ColName(c.getKey());
Expand All @@ -763,7 +763,7 @@ private SqlTaskNode getFinalTaskNode() throws Exception {
DataSourceFactory dbPlugin = TIS.getDataBasePlugin(new PostedDSProp(DBIdentity.parseId(dumpNode.getDbName())));
// List<ColumnMetaData> tableMetadata = ;
//TISTable tab = dbPlugin.loadTableMeta(dumpNode.getName());
taskNode.setContent(ColumnMetaData.buildExtractSQL(dumpNode.getName(), true, dbPlugin.getTableMetadata(dumpNode.parseEntityName())).toString());
taskNode.setContent(ColumnMetaData.buildExtractSQL(dumpNode.getName(), true, dbPlugin.getTableMetadata(false,dumpNode.parseEntityName())).toString());
return taskNode;
} else {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public static void createDefaultErRule(SqlTaskNodeMeta.SqlDataFlowTopology topol
// 还没有定义erRule
DependencyNode dumpNode = topology.getFirstDumpNode();
DataSourceFactory dsStore = TIS.getDataBasePlugin(new PostedDSProp(DBIdentity.parseId(dumpNode.getDbName())));
List<ColumnMetaData> cols = dsStore.getTableMetadata(dumpNode.parseEntityName());
List<ColumnMetaData> cols = dsStore.getTableMetadata(false, dumpNode.parseEntityName());
//String topologyName, DependencyNode node, TargetColumnMeta targetColMetas
Optional<ColumnMetaData> firstPK = cols.stream().filter((col) -> col.isPk()).findFirst();
if (!firstPK.isPresent()) {
Expand Down

0 comments on commit af40e65

Please sign in to comment.