Skip to content

Commit

Permalink
[#9876] Unified naming convention
Browse files Browse the repository at this point in the history
  • Loading branch information
smilu97 committed Apr 24, 2023
1 parent 2fcfaa8 commit a8de915
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 25 deletions.
12 changes: 6 additions & 6 deletions flink/src/main/java/com/navercorp/pinpoint/flink/Bootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.navercorp.pinpoint.collector.receiver.thrift.TCPReceiverBean;
import com.navercorp.pinpoint.common.server.bo.stat.join.JoinStatBo;
import com.navercorp.pinpoint.flink.cluster.FlinkServerRegister;
import com.navercorp.pinpoint.flink.config.FlinkConfiguration;
import com.navercorp.pinpoint.flink.config.FlinkProperties;
import com.navercorp.pinpoint.flink.dao.hbase.ApplicationMetricDao;
import com.navercorp.pinpoint.flink.dao.hbase.StatisticsDao;
import com.navercorp.pinpoint.flink.dao.hbase.StatisticsDaoInterceptor;
Expand Down Expand Up @@ -56,7 +56,7 @@ public class Bootstrap {
private final ClassPathXmlApplicationContext applicationContext;

private final TBaseFlatMapper tbaseFlatMapper;
private final FlinkConfiguration flinkConfiguration;
private final FlinkProperties flinkProperties;
private final TcpDispatchHandler tcpDispatchHandler;
private final TcpSourceFunction tcpSourceFunction;
private final ApplicationCache applicationCache;
Expand All @@ -72,7 +72,7 @@ private Bootstrap() {
applicationContext = new ClassPathXmlApplicationContext("applicationContext-flink.xml");

tbaseFlatMapper = applicationContext.getBean("tbaseFlatMapper", TBaseFlatMapper.class);
flinkConfiguration = applicationContext.getBean("flinkConfiguration", FlinkConfiguration.class);
flinkProperties = applicationContext.getBean("flinkProperties", FlinkProperties.class);
tcpDispatchHandler = applicationContext.getBean("tcpDispatchHandler", TcpDispatchHandler.class);
tcpSourceFunction = applicationContext.getBean("tcpSourceFunction", TcpSourceFunction.class);
applicationCache = applicationContext.getBean("applicationCache", ApplicationCache.class);
Expand Down Expand Up @@ -131,12 +131,12 @@ public ApplicationCache getApplicationCache() {
return applicationCache;
}

public FlinkConfiguration getFlinkConfiguration() {
return flinkConfiguration;
public FlinkProperties getFlinkProperties() {
return flinkProperties;
}

public StreamExecutionEnvironment createStreamExecutionEnvironment() {
if (flinkConfiguration.isLocalforFlinkStreamExecutionEnvironment()) {
if (flinkProperties.isLocalforFlinkStreamExecutionEnvironment()) {
LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
localEnvironment.setParallelism(1);
return localEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperClient;
import com.navercorp.pinpoint.common.server.cluster.zookeeper.ZookeeperEventWatcher;
import com.navercorp.pinpoint.common.util.NetUtils;
import com.navercorp.pinpoint.flink.config.FlinkConfiguration;
import com.navercorp.pinpoint.flink.config.FlinkProperties;
import com.navercorp.pinpoint.rpc.util.ClassUtils;
import com.navercorp.pinpoint.rpc.util.TimerFactory;
import com.navercorp.pinpoint.web.cluster.zookeeper.PushZnodeJob;
import com.navercorp.pinpoint.web.cluster.zookeeper.ZookeeperClusterDataManagerHelper;

import org.apache.curator.utils.ZKPaths;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand All @@ -60,18 +59,18 @@ public class FlinkServerRegister implements ZookeeperEventWatcher {

private Timer timer;

public FlinkServerRegister(FlinkConfiguration flinkConfiguration) {
Objects.requireNonNull(flinkConfiguration, "flinkConfiguration");
this.clusterEnable = flinkConfiguration.isFlinkClusterEnable();
this.connectAddress = flinkConfiguration.getFlinkClusterZookeeperAddress();
this.sessionTimeout = flinkConfiguration.getFlinkClusterSessionTimeout();
this.zookeeperPath = flinkConfiguration.getFlinkZNodePath();
public FlinkServerRegister(FlinkProperties flinkProperties) {
Objects.requireNonNull(flinkProperties, "flinkConfiguration");
this.clusterEnable = flinkProperties.isFlinkClusterEnable();
this.connectAddress = flinkProperties.getFlinkClusterZookeeperAddress();
this.sessionTimeout = flinkProperties.getFlinkClusterSessionTimeout();
this.zookeeperPath = flinkProperties.getFlinkZNodePath();

String zNodeName = getRepresentationLocalV4Ip() + ":" + flinkConfiguration.getFlinkClusterTcpPort();
String zNodeName = getRepresentationLocalV4Ip() + ":" + flinkProperties.getFlinkClusterTcpPort();
String zNodeFullPath = ZKPaths.makePath(zookeeperPath, zNodeName);

CreateNodeMessage createNodeMessage = new CreateNodeMessage(zNodeFullPath, new byte[0]);
int retryInterval = flinkConfiguration.getFlinkRetryInterval();
int retryInterval = flinkProperties.getFlinkRetryInterval();
this.pushFlinkNodeJob = new PushFlinkNodeJob(createNodeMessage, retryInterval);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
* @author minwoo.jung
*/
@Component
public class FlinkConfiguration {
private final Logger logger = LogManager.getLogger(FlinkConfiguration.class);
public class FlinkProperties {
private final Logger logger = LogManager.getLogger(FlinkProperties.class);

@Qualifier("flinkClusterProperties")
@Autowired
Expand All @@ -52,7 +52,7 @@ public class FlinkConfiguration {
@Value("${collector.l4.ip:}")
private String[] l4IpList = new String[0];

public FlinkConfiguration() {
public FlinkProperties() {
}

public boolean isFlinkClusterEnable() {
Expand Down
2 changes: 1 addition & 1 deletion flink/src/main/resources/applicationContext-flink.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
com.navercorp.pinpoint.collector.mapper.thrift.stat"/>

<bean class="com.navercorp.pinpoint.common.server.cluster.zookeeper.config.ClusterConfigurationFactory"/>
<bean id="flinkProperties" class="com.navercorp.pinpoint.flink.config.FlinkConfiguration"/>
<bean id="flinkProperties" class="com.navercorp.pinpoint.flink.config.FlinkProperties"/>
<bean class="com.navercorp.pinpoint.flink.dao.hbase.ApplicationDaoConfiguration"/>

<bean id="tcpReceiverProperties" class="com.navercorp.pinpoint.collector.thrift.config.AgentBaseDataReceiverProperties"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
/**
* @author Woonduk Kang(emeroad)
*/
@ContextConfiguration(classes = {FlinkConfiguration.class, ClusterConfigurationFactory.class})
@ContextConfiguration(classes = { FlinkProperties.class, ClusterConfigurationFactory.class})
@ExtendWith(SpringExtension.class)
public class FlinkPropertiesTest {
@Autowired
private FlinkConfiguration flinkConfiguration;
private FlinkProperties flinkProperties;

@Test
public void log() {
flinkConfiguration.log();
flinkProperties.log();
}

}

0 comments on commit a8de915

Please sign in to comment.