Skip to content

Commit

Permalink
[hotfix-#1179][lookup] When lookup-type is 'NONE', each data will tri…
Browse files Browse the repository at this point in the history
…gger the logic of querying data.
  • Loading branch information
FlechazoW committed Aug 24, 2022
1 parent 0278622 commit cb9c8b6
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,18 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// 通过该参数得到类型转换器,将数据库中的字段转成对应的类型
final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();

if (cassandraLookupConf.getCache().equalsIgnoreCase(CacheType.LRU.toString())) {
return ParallelAsyncTableFunctionProvider.of(
new CassandraLruTableFunction(
if (cassandraLookupConf.getCache().equalsIgnoreCase(CacheType.ALL.toString())) {
return ParallelTableFunctionProvider.of(
new CassandraAllTableFunction(
cassandraLookupConf,
new CassandraRowConverter(
rowType, Arrays.asList(tableSchema.getFieldNames())),
tableSchema.getFieldNames(),
keyNames),
cassandraLookupConf.getParallelism());
}
return ParallelTableFunctionProvider.of(
new CassandraAllTableFunction(
return ParallelAsyncTableFunctionProvider.of(
new CassandraLruTableFunction(
cassandraLookupConf,
new CassandraRowConverter(
rowType, Arrays.asList(tableSchema.getFieldNames())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
}
fillKerberosConf();
hbaseSchema.setTableName(hBaseConf.getTable());
if (lookupConf.getCache().equalsIgnoreCase(CacheType.LRU.toString())) {
return ParallelAsyncTableFunctionProvider.of(
getAbstractLruTableFunction(), lookupConf.getParallelism());
if (lookupConf.getCache().equalsIgnoreCase(CacheType.ALL.toString())) {
return ParallelTableFunctionProvider.of(
getAbstractAllTableFunction(), lookupConf.getParallelism());
}
return ParallelTableFunctionProvider.of(
getAbstractAllTableFunction(), lookupConf.getParallelism());
return ParallelAsyncTableFunctionProvider.of(
getAbstractLruTableFunction(), lookupConf.getParallelism());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// 通过该参数得到类型转换器,将数据库中的字段转成对应的类型
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();

if (lookupConf.getCache().equalsIgnoreCase(CacheType.LRU.toString())) {
return ParallelAsyncTableFunctionProvider.of(
new JdbcLruTableFunction(
if (lookupConf.getCache().equalsIgnoreCase(CacheType.ALL.toString())) {
return ParallelTableFunctionProvider.of(
new JdbcAllTableFunction(
jdbcConf,
jdbcDialect,
lookupConf,
Expand All @@ -97,8 +97,8 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
rowType),
lookupConf.getParallelism());
}
return ParallelTableFunctionProvider.of(
new JdbcAllTableFunction(
return ParallelAsyncTableFunctionProvider.of(
new JdbcLruTableFunction(
jdbcConf,
jdbcDialect,
lookupConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,18 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// 通过该参数得到类型转换器,将数据库中的字段转成对应的类型
final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();

if (kuduLookupConf.getCache().equalsIgnoreCase(CacheType.LRU.toString())) {
return ParallelAsyncTableFunctionProvider.of(
new KuduLruTableFunction(
if (kuduLookupConf.getCache().equalsIgnoreCase(CacheType.ALL.toString())) {
return ParallelTableFunctionProvider.of(
new KuduAllTableFunction(
kuduLookupConf,
new KuduRowConverter(
rowType, Arrays.asList(tableSchema.getFieldNames())),
tableSchema.getFieldNames(),
keyNames),
kuduLookupConf.getParallelism());
}
return ParallelTableFunctionProvider.of(
new KuduAllTableFunction(
return ParallelAsyncTableFunctionProvider.of(
new KuduLruTableFunction(
kuduLookupConf,
new KuduRowConverter(rowType, Arrays.asList(tableSchema.getFieldNames())),
tableSchema.getFieldNames(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,18 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
}
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();

if (lookupConf.getCache().equalsIgnoreCase(CacheType.LRU.toString())) {
return ParallelAsyncTableFunctionProvider.of(
new RedisLruTableFunction(
redisConf, lookupConf, new RedisRowConverter(rowType)),
if (lookupConf.getCache().equalsIgnoreCase(CacheType.ALL.toString())) {
return ParallelTableFunctionProvider.of(
new RedisAllTableFunction(
redisConf,
lookupConf,
physicalSchema.getFieldNames(),
keyNames,
new RedisRowConverter(rowType)),
lookupConf.getParallelism());
}
return ParallelTableFunctionProvider.of(
new RedisAllTableFunction(
redisConf,
lookupConf,
physicalSchema.getFieldNames(),
keyNames,
new RedisRowConverter(rowType)),
return ParallelAsyncTableFunctionProvider.of(
new RedisLruTableFunction(redisConf, lookupConf, new RedisRowConverter(rowType)),
lookupConf.getParallelism());
}

Expand Down

0 comments on commit cb9c8b6

Please sign in to comment.