Skip to content

Commit a60e3ff

Browse files
authored
Merge pull request #1 from DTStack/master
更新代码
2 parents 41a0c06 + 294277f commit a60e3ff

File tree

70 files changed

+2629
-98
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+2629
-98
lines changed

cassandra/cassandra-side/cassandra-all-side/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
</copy>
7777

7878
<move file="${basedir}/../../../plugins/cassandraallside/${project.artifactId}-${project.version}.jar"
79-
tofile="${basedir}/../../../plugins/cassandraallside/${project.name}.jar" />
79+
tofile="${basedir}/../../../plugins/cassandraallside/${project.name}-${git.branch}.jar" />
8080
</tasks>
8181
</configuration>
8282
</execution>

cassandra/cassandra-side/cassandra-async-side/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@
9292
</copy>
9393

9494
<move file="${basedir}/../../../plugins/cassandraasyncside/${project.artifactId}-${project.version}.jar"
95-
tofile="${basedir}/../../../plugins/cassandraasyncside/${project.name}.jar" />
95+
tofile="${basedir}/../../../plugins/cassandraasyncside/${project.name}-${git.branch}.jar" />
9696
</tasks>
9797
</configuration>
9898
</execution>

core/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@
106106
</fileset>
107107
</copy>
108108
<move file="${basedir}/../plugins/${project.artifactId}-${project.version}.jar"
109-
tofile="${basedir}/../plugins/${project.name}.jar" />
109+
tofile="${basedir}/../plugins/${project.name}-${git.branch}.jar" />
110110
</tasks>
111111
</configuration>
112112
</execution>

core/src/main/java/com/dtstack/flink/sql/sink/StreamSinkFactory.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ public static AbsTableParser getSqlParser(String pluginType, String sqlRootDir)
5151
DtClassLoader dtClassLoader = (DtClassLoader) classLoader;
5252

5353
String pluginJarPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, pluginType), sqlRootDir);
54-
5554
PluginUtil.addPluginJar(pluginJarPath, dtClassLoader);
56-
String className = PluginUtil.getSqlParserClassName(pluginType, CURR_TYPE);
55+
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
56+
String className = PluginUtil.getSqlParserClassName(typeNoVersion, CURR_TYPE);
5757
Class<?> targetParser = dtClassLoader.loadClass(className);
5858

5959
if(!AbsTableParser.class.isAssignableFrom(targetParser)){
@@ -77,7 +77,8 @@ public static TableSink getTableSink(TargetTableInfo targetTableInfo, String loc
7777

7878
PluginUtil.addPluginJar(pluginJarDirPath, dtClassLoader);
7979

80-
String className = PluginUtil.getGenerClassName(pluginType, CURR_TYPE);
80+
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
81+
String className = PluginUtil.getGenerClassName(typeNoVersion, CURR_TYPE);
8182
Class<?> sinkClass = dtClassLoader.loadClass(className);
8283

8384
if(!IStreamSinkGener.class.isAssignableFrom(sinkClass)){

core/src/main/java/com/dtstack/flink/sql/table/AbsTableParser.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ protected boolean fieldNameNeedsUpperCase() {
5656
return true;
5757
}
5858

59-
public abstract TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props);
59+
public abstract TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) throws Exception;
6060

6161
public boolean dealKeyPattern(String fieldRow, TableInfo tableInfo){
6262
for(Map.Entry<String, Pattern> keyPattern : keyPatternMap.entrySet()){

core/src/main/java/com/dtstack/flink/sql/table/SourceTableInfo.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121
package com.dtstack.flink.sql.table;
2222

2323
import org.apache.flink.calcite.shaded.com.google.common.base.Strings;
24+
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
2425
import org.apache.flink.calcite.shaded.com.google.common.collect.Maps;
26+
import org.apache.flink.util.StringUtils;
2527

28+
import java.util.ArrayList;
2629
import java.util.Map;
30+
import java.util.TimeZone;
2731

2832
/**
2933
* Reason:
@@ -36,6 +40,10 @@ public abstract class SourceTableInfo extends TableInfo {
3640

3741
public static final String SOURCE_SUFFIX = "Source";
3842

43+
public static final String TIME_ZONE_KEY="timezone";
44+
45+
private String timeZone=TimeZone.getDefault().getID();
46+
3947
private String eventTimeField;
4048

4149
private Integer maxOutOrderness = 10;
@@ -63,7 +71,6 @@ public void setMaxOutOrderness(Integer maxOutOrderness) {
6371
if(maxOutOrderness == null){
6472
return;
6573
}
66-
6774
this.maxOutOrderness = maxOutOrderness;
6875
}
6976

@@ -101,4 +108,23 @@ public String getAdaptSelectSql(){
101108
public String getAdaptName(){
102109
return getName() + "_adapt";
103110
}
111+
112+
public String getTimeZone() {
113+
return timeZone;
114+
}
115+
116+
public void setTimeZone(String timeZone) {
117+
if (StringUtils.isNullOrWhitespaceOnly(timeZone)){
118+
return;
119+
}
120+
timeZoneCheck(timeZone);
121+
this.timeZone = timeZone;
122+
}
123+
124+
private void timeZoneCheck(String timeZone) {
125+
ArrayList<String> zones = Lists.newArrayList(TimeZone.getAvailableIDs());
126+
if (!zones.contains(timeZone)){
127+
throw new IllegalArgumentException(" timezone is Incorrect!");
128+
}
129+
}
104130
}

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

+17
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
package com.dtstack.flink.sql.table;
2222

2323
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
24+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2425

2526
import java.io.Serializable;
2627
import java.util.List;
28+
import java.util.Map;
2729

2830
/**
2931
* Reason:
@@ -48,6 +50,9 @@ public abstract class TableInfo implements Serializable {
4850

4951
private final List<String> fieldList = Lists.newArrayList();
5052

53+
/**key:别名, value: realField */
54+
private Map<String, String> physicalFields = Maps.newHashMap();
55+
5156
private final List<String> fieldTypeList = Lists.newArrayList();
5257

5358
private final List<Class> fieldClassList = Lists.newArrayList();
@@ -114,6 +119,10 @@ public void addField(String fieldName){
114119
fieldList.add(fieldName);
115120
}
116121

122+
public void addPhysicalMappings(String aliasName, String physicalFieldName){
123+
physicalFields.put(aliasName, physicalFieldName);
124+
}
125+
117126
public void addFieldClass(Class fieldClass){
118127
fieldClassList.add(fieldClass);
119128
}
@@ -146,6 +155,14 @@ public List<Class> getFieldClassList() {
146155
return fieldClassList;
147156
}
148157

158+
public Map<String, String> getPhysicalFields() {
159+
return physicalFields;
160+
}
161+
162+
public void setPhysicalFields(Map<String, String> physicalFields) {
163+
this.physicalFields = physicalFields;
164+
}
165+
149166
public void finish(){
150167
this.fields = fieldList.toArray(new String[fieldList.size()]);
151168
this.fieldClasses = fieldClassList.toArray(new Class[fieldClassList.size()]);

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

+33-4
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121
package com.dtstack.flink.sql.util;
2222

2323
import com.dtstack.flink.sql.classloader.DtClassLoader;
24+
import org.apache.commons.lang3.StringUtils;
2425
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerationException;
2526
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
2627
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException;
2728
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
2829

2930
import java.io.ByteArrayInputStream;
3031
import java.io.File;
32+
import java.io.FilenameFilter;
3133
import java.io.IOException;
3234
import java.net.MalformedURLException;
3335
import java.net.URL;
@@ -105,15 +107,19 @@ public static Properties stringToProperties(String str) throws IOException{
105107
return properties;
106108
}
107109

108-
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir) throws MalformedURLException {
110+
public static URL getRemoteJarFilePath(String pluginType, String tableType, String remoteSqlRootDir) throws Exception {
109111
String dirName = pluginType + tableType.toLowerCase();
110-
String jarName = String.format("%s-%s.jar", pluginType, tableType.toLowerCase());
112+
String prefix = String.format("%s-%s", pluginType, tableType.toLowerCase());
113+
String jarPath = remoteSqlRootDir + SP + dirName;
114+
String jarName = getCoreJarFileName(jarPath, prefix);
111115
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
112116
}
113117

114-
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir) throws MalformedURLException {
118+
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir) throws Exception {
115119
String dirName = pluginType + sideOperator + tableType.toLowerCase();
116-
String jarName = String.format("%s-%s-%s.jar", pluginType, sideOperator, tableType.toLowerCase());
120+
String prefix = String.format("%s-%s-%s", pluginType, sideOperator, tableType.toLowerCase());
121+
String jarPath = remoteSqlRootDir + SP + dirName;
122+
String jarName = getCoreJarFileName(jarPath, prefix);
117123
return new URL("file:" + remoteSqlRootDir + SP + dirName + SP + jarName);
118124
}
119125

@@ -138,4 +144,27 @@ public static void addPluginJar(String pluginDir, DtClassLoader classLoader) thr
138144
}
139145
}
140146

147+
public static String getCoreJarFileName (String path, String prefix) throws Exception {
148+
String coreJarFileName = null;
149+
File pluginDir = new File(path);
150+
if (pluginDir.exists() && pluginDir.isDirectory()){
151+
File[] jarFiles = pluginDir.listFiles(new FilenameFilter() {
152+
@Override
153+
public boolean accept(File dir, String name) {
154+
return name.toLowerCase().startsWith(prefix) && name.toLowerCase().endsWith(".jar");
155+
}
156+
});
157+
158+
if (jarFiles != null && jarFiles.length > 0){
159+
coreJarFileName = jarFiles[0].getName();
160+
}
161+
}
162+
163+
if (StringUtils.isEmpty(coreJarFileName)){
164+
throw new Exception("Can not find core jar file in path:" + path);
165+
}
166+
167+
return coreJarFileName;
168+
}
169+
141170
}

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForLong.java

+15-8
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,18 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.watermarker;
2222

2323
import com.dtstack.flink.sql.util.MathUtil;
24-
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
2524
import org.apache.flink.streaming.api.windowing.time.Time;
2625
import org.apache.flink.types.Row;
2726
import org.slf4j.Logger;
2827
import org.slf4j.LoggerFactory;
2928

29+
import java.util.TimeZone;
30+
3031
/**
3132
* Custom watermark --- for eventtime
3233
* Date: 2017/12/28
@@ -44,23 +45,29 @@ public class CustomerWaterMarkerForLong extends AbsCustomerWaterMarker<Row> {
4445

4546
private long lastTime = 0;
4647

47-
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos) {
48+
private TimeZone timezone;
49+
50+
public CustomerWaterMarkerForLong(Time maxOutOfOrderness, int pos,String timezone) {
4851
super(maxOutOfOrderness);
4952
this.pos = pos;
53+
this.timezone= TimeZone.getTimeZone(timezone);
5054
}
5155

5256
@Override
5357
public long extractTimestamp(Row row) {
5458

5559
try{
56-
Long eveTime = MathUtil.getLongVal(row.getField(pos));
57-
lastTime = eveTime;
58-
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - eveTime)/1000));
59-
return eveTime;
60+
Long extractTime = MathUtil.getLongVal(row.getField(pos));
61+
62+
lastTime = extractTime + timezone.getOffset(extractTime);
63+
64+
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
65+
66+
return lastTime;
6067
}catch (Exception e){
6168
logger.error("", e);
6269
}
63-
6470
return lastTime;
6571
}
72+
6673
}

core/src/main/java/com/dtstack/flink/sql/watermarker/CustomerWaterMarkerForTimeStamp.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.slf4j.LoggerFactory;
2828

2929
import java.sql.Timestamp;
30+
import java.util.TimeZone;
3031

3132
/**
3233
* Custom watermark --- for eventtime
@@ -45,25 +46,30 @@ public class CustomerWaterMarkerForTimeStamp extends AbsCustomerWaterMarker<Row>
4546

4647
private long lastTime = 0;
4748

49+
private TimeZone timezone;
4850

49-
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos) {
51+
public CustomerWaterMarkerForTimeStamp(Time maxOutOfOrderness, int pos,String timezone) {
5052
super(maxOutOfOrderness);
5153
this.pos = pos;
54+
this.timezone= TimeZone.getTimeZone(timezone);
5255
}
5356

5457
@Override
5558
public long extractTimestamp(Row row) {
5659
try {
5760
Timestamp time = (Timestamp) row.getField(pos);
58-
lastTime = time.getTime();
5961

60-
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - time.getTime())/1000));
61-
return time.getTime();
62+
long extractTime=time.getTime();
63+
64+
lastTime = extractTime + timezone.getOffset(extractTime);
65+
66+
eventDelayGauge.setDelayTime(MathUtil.getIntegerVal((System.currentTimeMillis() - extractTime)/1000));
67+
68+
return lastTime;
6269
} catch (RuntimeException e) {
6370
logger.error("", e);
6471
}
6572
return lastTime;
6673
}
6774

68-
6975
}

core/src/main/java/com/dtstack/flink/sql/watermarker/WaterMarkerAssigner.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
5454

5555
int maxOutOrderness = sourceTableInfo.getMaxOutOrderness();
5656

57+
String timeZone=sourceTableInfo.getTimeZone();
58+
5759
String[] fieldNames = typeInfo.getFieldNames();
5860
TypeInformation<?>[] fieldTypes = typeInfo.getFieldTypes();
5961

@@ -75,9 +77,9 @@ public DataStream assignWaterMarker(DataStream<Row> dataStream, RowTypeInfo type
7577

7678
AbsCustomerWaterMarker waterMarker = null;
7779
if(fieldType.getTypeClass().getTypeName().equalsIgnoreCase("java.sql.Timestamp")){
78-
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos);
80+
waterMarker = new CustomerWaterMarkerForTimeStamp(Time.milliseconds(maxOutOrderness), pos,timeZone);
7981
}else if(fieldType.getTypeClass().getTypeName().equalsIgnoreCase("java.lang.Long")){
80-
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos);
82+
waterMarker = new CustomerWaterMarkerForLong(Time.milliseconds(maxOutOrderness), pos,timeZone);
8183
}else{
8284
throw new IllegalArgumentException("not support type of " + fieldType + ", current only support(timestamp, long).");
8385
}

elasticsearch5/elasticsearch5-sink/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585
</copy>
8686

8787
<move file="${basedir}/../../plugins/elasticsearchsink/${project.artifactId}-${project.version}.jar"
88-
tofile="${basedir}/../../plugins/elasticsearchsink/${project.name}.jar" />
88+
tofile="${basedir}/../../plugins/elasticsearchsink/${project.name}-${git.branch}.jar" />
8989
</tasks>
9090
</configuration>
9191
</execution>

hbase/hbase-side/hbase-all-side/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@
9393
</copy>
9494

9595
<move file="${basedir}/../../../plugins/hbaseallside/${project.artifactId}-${project.version}.jar"
96-
tofile="${basedir}/../../../plugins/hbaseallside/${project.name}.jar" />
96+
tofile="${basedir}/../../../plugins/hbaseallside/${project.name}-${git.branch}.jar" />
9797
</tasks>
9898
</configuration>
9999
</execution>

hbase/hbase-side/hbase-async-side/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
</copy>
9191

9292
<move file="${basedir}/../../../plugins/hbaseasyncside/${project.artifactId}-${project.version}.jar"
93-
tofile="${basedir}/../../../plugins/hbaseasyncside/${project.name}.jar" />
93+
tofile="${basedir}/../../../plugins/hbaseasyncside/${project.name}-${git.branch}.jar" />
9494
</tasks>
9595
</configuration>
9696
</execution>

0 commit comments

Comments
 (0)