diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml
old mode 100644
new mode 100755
diff --git a/.gitignore b/.gitignore
old mode 100644
new mode 100755
index 3dddfa35..cb105483
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,7 @@ temp/
resources/test/
demo/ScalaDemo/.DS_Store
dependency-reduced-pom.xml
+test/
+.vscode/
+.metals/
+.bloop/
\ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
old mode 100644
new mode 100755
diff --git a/LICENSE b/LICENSE
old mode 100644
new mode 100755
diff --git a/README.md b/README.md
old mode 100644
new mode 100755
diff --git a/bin/run.sh b/bin/run.sh
old mode 100644
new mode 100755
diff --git a/codecov.yml b/codecov.yml
old mode 100644
new mode 100755
diff --git a/conf/test.conf b/conf/test.conf
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/pom.xml b/demo/DataLoader/pom.xml
old mode 100644
new mode 100755
index c49916d3..ac87c1ac
--- a/demo/DataLoader/pom.xml
+++ b/demo/DataLoader/pom.xml
@@ -161,21 +161,6 @@
scala-logging_${scala.binary.version}
-
- org.slf4j
- slf4j-api
-
-
-
- ch.qos.logback
- logback-core
-
-
-
- ch.qos.logback
- logback-classic
-
-
com.typesafe.akka
akka-actor_${scala.binary.version}
@@ -216,13 +201,6 @@
ganymed-ssh2
-
-
- org.slf4j
- slf4j-log4j12
-
-
-
org.apache.spark
spark-core_${scala.binary.version}
diff --git a/demo/DataLoader/src/main/java/com/hackerforfuture/codeprototypes/dataloader/DeveloperApi.java b/demo/DataLoader/src/main/java/com/hackerforfuture/codeprototypes/dataloader/DeveloperApi.java
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/java/com/hackerforfuture/codeprototypes/dataloader/common/LoggingSignalHandler.java b/demo/DataLoader/src/main/java/com/hackerforfuture/codeprototypes/dataloader/common/LoggingSignalHandler.java
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/resources/log4j2.properties b/demo/DataLoader/src/main/resources/log4j2.properties
new file mode 100755
index 00000000..d4118a96
--- /dev/null
+++ b/demo/DataLoader/src/main/resources/log4j2.properties
@@ -0,0 +1,12 @@
+# Console logger
+appender.console.type=Console
+appender.console.name=STDOUT
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+appender.console.filter.threshold.type=ThresholdFilter
+appender.console.filter.threshold.level=DEBUG
+
+# configure logger
+rootLogger=INFO,STDOUT
+
+
diff --git a/demo/DataLoader/src/main/resources/logback.xml b/demo/DataLoader/src/main/resources/logback.xml
deleted file mode 100644
index 1fef4563..00000000
--- a/demo/DataLoader/src/main/resources/logback.xml
+++ /dev/null
@@ -1,103 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
- %d{HH:mm:ss.SSS} |-%-5level in%replace(%caller{1}){'\t|Caller.{1}0|\r\n|at\s', ''} - %msg%n
-
-
-
-
-
- WARN
- ACCEPT
- DENY
-
-
-
- ${log_dir}/%d{yyyy-MM-dd}/DataLoader.warn.log
- ${maxHistory}
-
-
-
- %d{HH:mm:ss.SSS} |-[%thread]-[%level]-[%logger{35}.%method:%line] - %msg%n
-
-
-
-
-
-
- INFO
- ACCEPT
- DENY
-
-
-
- ${log_dir}/%d{yyyy-MM-dd}/DataLoader.info.log
- ${maxHistory}
-
-
-
- %d{HH:mm:ss.SSS} |-[%thread]-[%level]-[%logger{35}.%method:%line] - %msg%n
-
-
-
-
-
- DEBUG
- ACCEPT
- DENY
-
-
-
- ${log_dir}/%d{yyyy-MM-dd}/DataLoader.debug.log
- ${maxHistory}
-
-
-
- %d{HH:mm:ss.SSS} |-[%thread]-[%level]-[%logger{35}.%method:%line] - %msg%n
-
-
-
-
-
-
- ERROR
- ACCEPT
- DENY
-
-
-
- ${log_dir}/%d{yyyy-MM-dd}/DataLoader.error.log
- ${maxHistory}
-
-
-
-
- %d{HH:mm:ss.SSS} |-[%thread]-[%level]-[%logger{35}.%method:%line] - %msg%n
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/DataLoader.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/DataLoader.scala
old mode 100644
new mode 100755
index 436b9233..aad6be7d
--- a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/DataLoader.scala
+++ b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/DataLoader.scala
@@ -31,8 +31,9 @@ object DataLoader extends LogSupport {
}
} catch {
case e: ReflectiveOperationException =>
- log.warn("Failed to register optional signal handler that logs a message when the process is terminated " +
- s"by a signal. Reason for registration failure is: $e", e)
+ logger.warn("Failed to register optional signal handler that logs a message " +
+ "when the process is terminated by a signal. " +
+ "Reason for registration failure is: $e", e)
}
// attach shutdown handler to catch terminating signals as well as normal termination
@@ -40,11 +41,12 @@ object DataLoader extends LogSupport {
override def run(): Unit = DataLoaderServer.shutdown()
})
+ logger.info("Starting DataLoaderServer...")
DataLoaderServer.startup()
DataLoaderServer.awaitShutdown()
} catch {
case NonFatal(e) =>
- log.error("Failed to run DataLoader", e)
+ logger.error("Failed to run DataLoader", e)
System.exit(1)
}
System.exit(0)
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/EventHandler.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/EventHandler.scala
new file mode 100755
index 00000000..68a0db15
--- /dev/null
+++ b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/EventHandler.scala
@@ -0,0 +1,19 @@
+package com.hackerforfuture.codeprototypes.dataloader.clusters
+
+/**
+ * Author: biyu.huang
+ * Date: 2023/11/2 11:30
+ * Description:
+ */
+trait EventHandler {
+
+ def handleHeartbeatEvent(): Unit
+
+ def handleRegisterEvent(): Unit
+
+ def handleStopEvent(): Unit
+
+ def handleRegisterTimeout(): Unit
+
+ def handelCheckHeartbeatEvent(): Unit
+}
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/Message.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/Message.scala
new file mode 100755
index 00000000..dd89e83a
--- /dev/null
+++ b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/Message.scala
@@ -0,0 +1,25 @@
+package com.hackerforfuture.codeprototypes.dataloader.clusters
+
+import akka.actor.ActorPath
+
+/**
+ * Author: biyu.huang
+ * Date: 2023/11/1 19:06
+ * Description:
+ */
+sealed trait Message
+
+case object Heartbeat extends Message
+
+case object CheckHeartbeat extends Message
+
+case object Register extends Message
+
+case object RegisterTimeout extends Message
+
+// 自定义消息类型,带有唯一标识ID
+case class CustomMessage(id: ActorPath, content: Any) extends Message
+
+case class SlaveActorTerminated(id: ActorPath, reason: String) extends Message
+
+case object StopActor extends Message
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/Master.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/Master.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/MasterActor.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/MasterActor.scala
new file mode 100755
index 00000000..042d9677
--- /dev/null
+++ b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/MasterActor.scala
@@ -0,0 +1,105 @@
+package com.hackerforfuture.codeprototypes.dataloader.clusters.master
+
+import akka.actor.{Actor, ActorLogging, ActorPath, Props, Terminated}
+import com.hackerforfuture.codeprototypes.dataloader.clusters._
+import com.hackerforfuture.codeprototypes.dataloader.common.Using
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+
+/**
+ * Author: biyu.huang
+ * Date: 2023/11/1 17:15
+ * Description:
+ */
+object MasterActor {
+ def props: Props = Props[MasterActor]
+}
+
+class MasterActor extends Actor with ActorLogging with EventHandler with Using {
+ // 存储已注册的SlaveActor的唯一标识ID和最后一次收到心跳的时间戳
+ private final val registeredSlaves = mutable.HashMap.empty[ActorPath, Long]
+
+ // 心跳超时时间
+ private final val heartbeatTimeout: FiniteDuration = 10.seconds
+
+ private final val initialTimeout: FiniteDuration = 5.seconds
+
+ // 启动时设置定时器
+ override def preStart(): Unit = {
+ // context.setReceiveTimeout(initialTimeout)
+
+ import context.dispatcher
+ context
+ .system
+ .scheduler
+ .schedule(initialTimeout, heartbeatTimeout, self, CheckHeartbeat)
+ }
+
+ override def handleHeartbeatEvent(): Unit = {
+ syncableBlock {
+ val senderId: ActorPath = sender().path
+ val ts: Long = System.currentTimeMillis()
+ if (registeredSlaves.contains(senderId)) {
+ registeredSlaves.put(senderId, ts)
+ log.info(s"[%s] Received heartbeat from slave: ${senderId.name}".format(ts))
+ } else {
+ registeredSlaves.put(senderId, ts)
+ log.info(s"[%s] Registered slave: ${senderId.name}".format(ts))
+ context.watch(sender())
+ }
+ }
+ }
+
+ override def handleRegisterEvent(): Unit = handleHeartbeatEvent()
+
+ override def handelCheckHeartbeatEvent(): Unit = {
+ val currentTime: Long = System.currentTimeMillis()
+ val timedOutSlaves = registeredSlaves.filter {
+ case (_, lastHeartbeatTime) =>
+ val elapsed = currentTime - lastHeartbeatTime
+ elapsed > (heartbeatTimeout.toMillis * 10)
+ }
+
+ timedOutSlaves.keys.foreach { id =>
+ registeredSlaves -= id
+ log.warning(s"Slave $id timed out and unregistered.")
+ self ! SlaveActorTerminated(id, "No heartbeat received")
+ }
+ }
+
+ override def handleStopEvent(): Unit = {
+ log.info("received StopActor message, shutting down ...")
+ registeredSlaves.foreach {
+ case (id, _) =>
+ log.info("try to stop %s".format(id.name))
+ context.system.actorSelection(id) ! StopActor
+ }
+ registeredSlaves.clear()
+ context.stop(self)
+ }
+
+ def receive: Receive = {
+ case Heartbeat => handleHeartbeatEvent()
+ case Register => handleRegisterEvent()
+ case CheckHeartbeat => handelCheckHeartbeatEvent()
+ case StopActor => handleStopEvent()
+ case CustomMessage(id, content) =>
+ if (registeredSlaves.contains(id)) {
+ log.info(s"Processing message from slave $id: $content")
+ // 处理消息逻辑
+ } else {
+ log.warning(s"Received message from unregistered slave: $id")
+ context.system.actorSelection(id) ! RegisterTimeout
+ }
+ case SlaveActorTerminated(id, reason) =>
+ log.warning(s"Slave $id terminated and unregistered. Reason: $reason")
+ registeredSlaves.remove(id)
+ case Terminated(slave) =>
+ val slaveId = slave.path
+ registeredSlaves.remove(slaveId)
+ log.warning(s"Slave ${slaveId.name} terminated.")
+ }
+
+ override def handleRegisterTimeout(): Unit = {}
+}
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/MasterAnt.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/MasterAnt.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/StateManager.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/StateManager.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/WorkerDetailInfo.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/WorkerDetailInfo.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/worker/SlaveActor.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/worker/SlaveActor.scala
new file mode 100755
index 00000000..f17c4081
--- /dev/null
+++ b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/worker/SlaveActor.scala
@@ -0,0 +1,70 @@
+package com.hackerforfuture.codeprototypes.dataloader.clusters.worker
+
+import akka.actor.{Actor, ActorLogging, ActorPath, Props, Terminated}
+import com.hackerforfuture.codeprototypes.dataloader.clusters._
+
+import scala.concurrent.duration.DurationInt
+
+/**
+ * Author: biyu.huang
+ * Date: 2023/11/1 17:16
+ * Description:
+ */
+
+object SlaveActor {
+ def props(master: akka.actor.ActorRef): Props = Props(new SlaveActor(master))
+}
+
+class SlaveActor(master: akka.actor.ActorRef) extends Actor with ActorLogging with EventHandler {
+ // 在启动时发送注册消息
+ override def preStart(): Unit = {
+ import context.dispatcher
+ context
+ .system
+ .scheduler
+ .schedule(5.seconds, 10.seconds, self, Heartbeat)
+ self ! Register
+ }
+
+ // 生成唯一标识ID
+ private final val uniqueID: ActorPath = self.path
+
+ def receive: Receive = {
+ // 发送带有唯一标识ID的心跳消息给Master
+ case Heartbeat => handleHeartbeatEvent()
+ // 发送带有唯一标识ID的注册消息给Master
+ case Register => handleRegisterEvent()
+ // 注册超时,重新发送注册消息
+ case RegisterTimeout => handleRegisterEvent()
+ case StopActor => handleStopEvent()
+ case Terminated(actorRef) =>
+ log.warning("Master %s terminated. %s is shutting down ...".format(
+ actorRef.path.name, self.path.name))
+ self ! StopActor
+ case message =>
+ // 发送带有唯一标识ID的自定义消息给Master
+ val customMessage = CustomMessage(uniqueID, message)
+ master ! customMessage
+ }
+
+ override def handleRegisterEvent(): Unit = {
+ master ! Register
+ log.info("try to register with master.")
+ context.watch(master)
+ }
+
+ override def handleHeartbeatEvent(): Unit = {
+ master ! Heartbeat
+ log.info("Sent heartbeat to master.")
+ }
+
+ override def handleStopEvent(): Unit = {
+ log.info("received StopActor message, shutting down ...")
+ context.stop(self)
+ }
+
+ override def handleRegisterTimeout(): Unit = {}
+
+ override def handelCheckHeartbeatEvent(): Unit = {}
+}
+
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/Configure.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/Configure.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/FileContextV1.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/FileContextV1.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/LogSupport.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/LogSupport.scala
old mode 100644
new mode 100755
index e5bdc3ed..8743b0f8
--- a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/LogSupport.scala
+++ b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/LogSupport.scala
@@ -8,11 +8,9 @@
package com.hackerforfuture.codeprototypes.dataloader.common
-import org.slf4j.{Logger, LoggerFactory}
+import com.typesafe.scalalogging.LazyLogging
/**
- * Created by wallace on 2018/1/20.
- */
-trait LogSupport {
- protected val log: Logger = LoggerFactory.getLogger(this.getClass.getName.stripSuffix("$"))
-}
+ * Created by wallace on 2018/1/20.
+ */
+trait LogSupport extends LazyLogging
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/PersistMode.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/PersistMode.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/Using.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/Using.scala
old mode 100644
new mode 100755
index 5f52d28d..2d5d7421
--- a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/Using.scala
+++ b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/Using.scala
@@ -8,18 +8,21 @@
package com.hackerforfuture.codeprototypes.dataloader.common
+import java.util.concurrent.locks.ReentrantLock
+import scala.language.reflectiveCalls
import scala.util.control.NonFatal
/**
* Created by wallace on 2018/1/20.
*/
trait Using extends LogSupport {
+ private val lock: ReentrantLock = new ReentrantLock()
protected def usingWithErrMsg[A <: {def close() : Unit}, B](param: A, errMsg: String)(f: A => B): Unit = {
try {
f(param)
} catch {
case NonFatal(e) =>
- log.error(s"$errMsg: ", e)
+ logger.error(s"$errMsg: ", e)
} finally {
param.close()
}
@@ -32,4 +35,13 @@ trait Using extends LogSupport {
param.close()
}
}
+
+ protected def syncableBlock[R](body: => R): R = {
+ lock.lock()
+ try {
+ body
+ } finally {
+ lock.unlock()
+ }
+ }
}
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/config/DataLoaderConfig.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/config/DataLoaderConfig.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/persist/PersistWriter.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/persist/PersistWriter.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/source/SourceReader.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/source/SourceReader.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/state/State.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/common/state/State.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/metadata/EventType.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/metadata/EventType.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/metadata/message/LocalMessage.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/metadata/message/LocalMessage.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/metadata/message/RemoteMessage.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/metadata/message/RemoteMessage.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/persist/FtpPersistWriter.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/persist/FtpPersistWriter.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/schedule/AntScheduler.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/schedule/AntScheduler.scala
old mode 100644
new mode 100755
index 87311dea..f131278e
--- a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/schedule/AntScheduler.scala
+++ b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/schedule/AntScheduler.scala
@@ -63,7 +63,7 @@ class AntScheduler(threadNum: Int,
* Initialize this scheduler so it is ready to accept scheduling of tasks
*/
override def startup(): Unit = {
- log.debug("Initializing task scheduler.")
+ logger.debug("Initializing task scheduler.")
this synchronized {
if (isStarted) throw new IllegalStateException("This scheduler has already been started!")
executor = Some(new ScheduledThreadPoolExecutor(threadNum))
@@ -81,7 +81,7 @@ class AntScheduler(threadNum: Int,
* This includes tasks scheduled with a delayed execution.
*/
override def shutdown(): Unit = {
- log.debug("Shutting down task scheduler.")
+ logger.debug("Shutting down task scheduler.")
// We use the local variable to avoid NullPointerException if another thread shuts down scheduler at same time.
val cachedExecutor: Option[ScheduledThreadPoolExecutor] = this.executor
if (cachedExecutor.isDefined) {
@@ -110,19 +110,19 @@ class AntScheduler(threadNum: Int,
* @param unit The unit for the preceding times.
*/
override def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: TimeUnit): Unit = {
- log.debug("Scheduling task %s with initial delay %d ms and period %d ms."
+ logger.debug("Scheduling task %s with initial delay %d ms and period %d ms."
.format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
this synchronized {
ensureRunning()
val runnable = new Runnable {
override def run(): Unit = {
try {
- log.debug("Beginning execution of scheduled task '%s'.".format(name))
+ logger.debug("Beginning execution of scheduled task '%s'.".format(name))
fun()
} catch {
- case t: Throwable => log.error("Uncaught exception in scheduled task '" + name + "'", t)
+ case t: Throwable => logger.error("Uncaught exception in scheduled task '" + name + "'", t)
} finally {
- log.debug("Completed execution of scheduled task '%s'.".format(name))
+ logger.debug("Completed execution of scheduled task '%s'.".format(name))
}
}
}
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/schedule/TaskGenerator.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/schedule/TaskGenerator.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/schedule/TaskScheduler.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/schedule/TaskScheduler.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/DataLoaderServer.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/DataLoaderServer.scala
old mode 100644
new mode 100755
index 32234905..1eb8a41c
--- a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/DataLoaderServer.scala
+++ b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/DataLoaderServer.scala
@@ -47,7 +47,7 @@ object DataLoaderServer extends LogSupport {
}
} catch {
case NonFatal(e) =>
- log.error("Failed to execute service thread", e)
+ logger.error("Failed to execute service thread", e)
isStartingUp.set(false)
shutdown()
throw e
@@ -65,11 +65,11 @@ object DataLoaderServer extends LogSupport {
startupComplete.set(false)
isShuttingDown.set(false)
shutdownLatch.countDown()
+ logger.info("Succeed to shutdown DataLoader.")
}
-
} catch {
case NonFatal(e) =>
- log.error("Fatal error during DataLoaderServer shutdown.", e)
+ logger.error("Fatal error during DataLoaderServer shutdown.", e)
isShuttingDown.set(false)
throw e
}
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/download/DataDownLoadProcess.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/download/DataDownLoadProcess.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/download/DataDownLoadService.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/download/DataDownLoadService.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/dynamicscan/DataScanConfigBase.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/dynamicscan/DataScanConfigBase.scala
old mode 100644
new mode 100755
index b792c7a2..9389b5a2
--- a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/dynamicscan/DataScanConfigBase.scala
+++ b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/dynamicscan/DataScanConfigBase.scala
@@ -29,10 +29,10 @@ trait DataScanConfigBase extends LogSupport {
val resourceFile = file
val configFile = new File(makePath(file))
if (configFile.exists()) {
- log.debug(s"Loading file [${configFile.getPath}] and resource [$resourceFile]")
+ logger.debug(s"Loading file [${configFile.getPath}] and resource [$resourceFile]")
ConfigFactory.parseFile(configFile).withFallback(ConfigFactory.load(resourceFile))
} else {
- log.debug(s"Loading resource [$resourceFile]")
+ logger.debug(s"Loading resource [$resourceFile]")
ConfigFactory.load(resourceFile)
}
}
@@ -62,7 +62,7 @@ trait DataScanConfigBase extends LogSupport {
(fileName, lastModified)
}.toMap
} else {
- log.warn("Does any upload config file exist?")
+ logger.warn("Does any upload config file exist?")
Map.empty
}
}
@@ -75,7 +75,7 @@ trait DataScanConfigBase extends LogSupport {
val newLastModified: Long = file.lastModified()
if (newLastModified == oldLastModified) {
- log.debug(s"$fileName never changed.")
+ logger.debug(s"$fileName never changed.")
null
} else {
lastModifiedMap.updated(fileName, newLastModified)
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/dynamicscan/DataScanService.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/dynamicscan/DataScanService.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/dynamicscan/UploadDataConfig.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/dynamicscan/UploadDataConfig.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/uniqueid/UniqueID.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/uniqueid/UniqueID.scala
old mode 100644
new mode 100755
index a7959011..978e46e0
--- a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/uniqueid/UniqueID.scala
+++ b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/uniqueid/UniqueID.scala
@@ -11,8 +11,8 @@ package com.hackerforfuture.codeprototypes.dataloader.server.uniqueid
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong}
/**
- * Created by wallace on 2019/4/4.
- */
+ * Created by wallace on 2019/4/4.
+ */
class UniqueID {
// SnowFlake - 64bit
@@ -32,7 +32,7 @@ class UniqueID {
def genUniqueID(): Option[Long] = {
val timestamp: Long = System.currentTimeMillis()
if (initFalg.compareAndSet(false, true)) {
- val wid: Int = workerId.get() << workerIdShift
+ val wid: Int = workerId.get() << workerIdShift.toInt
if (lastTimeStamp.compareAndSet(timestamp, timestamp)) {
val ts: Long = (timestamp - twepoch) << timeStampLeftShit
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/upload/DataUpLoadService.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/upload/DataUpLoadService.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/upload/DataUploadProcess.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/server/upload/DataUploadProcess.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/source/FtpSourceReader.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/source/FtpSourceReader.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/states/State.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/states/State.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/utils/AntThreadFactory.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/utils/AntThreadFactory.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/utils/LoaderUtils.scala b/demo/DataLoader/src/main/scala/com/hackerforfuture/codeprototypes/dataloader/utils/LoaderUtils.scala
old mode 100644
new mode 100755
diff --git a/demo/DataLoader/src/test/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/MasterActorUnitSpec.scala b/demo/DataLoader/src/test/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/MasterActorUnitSpec.scala
new file mode 100755
index 00000000..d827f1de
--- /dev/null
+++ b/demo/DataLoader/src/test/scala/com/hackerforfuture/codeprototypes/dataloader/clusters/master/MasterActorUnitSpec.scala
@@ -0,0 +1,54 @@
+package com.hackerforfuture.codeprototypes.dataloader.clusters.master
+
+import akka.actor.{ActorRef, ActorSystem, Terminated}
+import com.hackerforfuture.codeprototypes.dataloader.clusters.StopActor
+import com.hackerforfuture.codeprototypes.dataloader.clusters.worker.SlaveActor
+import com.hackerforfuture.codeprototypes.dataloader.common.LogSupport
+import com.typesafe.config.{Config, ConfigFactory}
+import org.scalatest.flatspec.AnyFlatSpec
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.{Duration, DurationInt}
+
+/**
+ * Author: biyu.huang
+ * Date: 2023/11/1 18:19
+ * Description:
+ */
+class MasterActorUnitSpec extends AnyFlatSpec with LogSupport {
+ "Dev" should "run Akka cluster" in {
+ // 创建自定义配置
+ val customConfig: Config = ConfigFactory.parseString(
+ """
+ akka {
+ log-dead-letters-during-shutdown = off
+ log-dead-letters = off
+ }
+ """)
+ val system: ActorSystem = ActorSystem("Akka-Cluster-System", customConfig)
+
+ // 创建MasterActor
+ val master: ActorRef = system.actorOf(MasterActor.props, "master")
+
+ // 创建多个SlaveActor,并将MasterActor作为参数传递给它们
+ val actor1 = system.actorOf(SlaveActor.props(master), "slave1")
+ val actor2 = system.actorOf(SlaveActor.props(master), "slave2")
+ val actor3 = system.actorOf(SlaveActor.props(master), "slave3")
+
+ // 停止系统的示例代码,可以根据需要进行调整
+ // 在这个示例中,我们在10秒后停止系统
+ import system.dispatcher
+ akka.pattern.after(10.seconds, system.scheduler) {
+ actor2 ! StopActor
+ Future.unit
+ }
+ // actor2 ! StopActor
+ val future: Future[Unit] = akka.pattern.after(55.seconds, system.scheduler) {
+ system.actorSelection("/user/*") ! StopActor
+ system.terminate()
+ Future.unit
+ }
+ Await.result(future, Duration(60, TimeUnit.SECONDS))
+ }
+}
diff --git a/demo/FlinkDemo/README.md b/demo/FlinkDemo/README.md
new file mode 100755
index 00000000..d5d210d3
--- /dev/null
+++ b/demo/FlinkDemo/README.md
@@ -0,0 +1,4 @@
+# how to build project?
+```bash
+mvn clean package -DskipTests -pl ./demo/FlinkDemo --am
+```
\ No newline at end of file
diff --git a/demo/FlinkDemo/pom.xml b/demo/FlinkDemo/pom.xml
new file mode 100755
index 00000000..88c346d5
--- /dev/null
+++ b/demo/FlinkDemo/pom.xml
@@ -0,0 +1,275 @@
+
+
+
+
+ 4.0.0
+
+ com.wallace.demo
+ CodePrototypesDemo
+ 0.1.0-SNAPSHOT
+ ../../pom.xml
+
+
+
+ 1.8
+
+
+ flinkdemo
+ 0.1.0-SNAPSHOT
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 3.1.0
+
+ true
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+ incremental
+
+
+
+ compile-scala
+ compile
+
+ add-source
+ compile
+ testCompile
+
+
+
+ test-compile-first
+ test-compile
+
+ add-source
+ testCompile
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ ${java.version}
+ ${java.version}
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+ shade-package
+ package
+
+ shade
+
+
+ ${jar.file.name}
+ true
+ jar-with-dependencies
+
+
+
+ reference.conf
+
+
+
+
+ org.apache.flink:force-shading
+ com.google.code.findbugs:jsr305
+
+
+
+
+ *:*
+
+ module-info.class
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+ true
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+
+ ${project.build.directory}/unitspecs-reports
+
+ .
+ WDF UnitSpec.txt
+ ${project.build.directory}/site/scalatest
+ true
+
+
+
+ test
+ test
+
+ test
+
+
+
+
+
+
+ org.scalastyle
+ scalastyle-maven-plugin
+ 1.0.0
+
+ false
+ false
+ false
+ false
+ ${basedir}/src/main/scala
+ ${basedir}/src/test/scala
+ ${basedir}/../../scalastyle-config.xml
+ ${basedir}/target/scalastyle-output.xml
+ ${project.build.sourceEncoding}
+ ${project.reporting.outputEncoding}
+
+
+
+ scalastyle-check
+ package
+
+ check
+
+
+
+
+
+
+
+
+
+
+ org.apache.flink
+ flink-table-api-scala-bridge_${scala.binary.version}
+ ${flink.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+
+ org.apache.flink
+ flink-cep-scala_${scala.binary.version}
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+
+
+ com.typesafe.scala-logging
+ scala-logging_${scala.binary.version}
+ 3.9.5
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+ org.slf4j
+ slf4j-reload4j
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+
+
+ junit
+ junit
+ 4.13.2
+ test
+
+
+
+
+ org.scalatest
+ scalatest_${scala.binary.version}
+ 3.2.15
+
+
+
+
+ com.vladsch.flexmark
+ flexmark-all
+ 0.64.6
+
+
+
diff --git a/demo/FlinkDemo/src/main/resources/log4j.properties b/demo/FlinkDemo/src/main/resources/log4j.properties
new file mode 100755
index 00000000..cb00b143
--- /dev/null
+++ b/demo/FlinkDemo/src/main/resources/log4j.properties
@@ -0,0 +1,9 @@
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %l: %m%n
+
+# Settings to quiet third party logs that are too verbose
+log4j.logger.org.apache.flink=ERROR
diff --git a/demo/FlinkDemo/src/main/scala/com/notalk/flink/demo/FlinkCEPDemo.scala b/demo/FlinkDemo/src/main/scala/com/notalk/flink/demo/FlinkCEPDemo.scala
new file mode 100755
index 00000000..f6f0911f
--- /dev/null
+++ b/demo/FlinkDemo/src/main/scala/com/notalk/flink/demo/FlinkCEPDemo.scala
@@ -0,0 +1,97 @@
+package com.notalk.flink.demo
+
+import com.notalk.flink.demo.common.LogSupport
+import com.notalk.flink.demo.event.LoginEvent
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.cep.scala.{CEP, PatternStream}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.{KeyedStream, StreamExecutionEnvironment, createTypeInformation}
+import org.apache.flink.streaming.api.windowing.time.Time
+
+/**
+ * Author: biyu.huang
+ * Date: 2023/7/27 14:38
+ * Description: A demo of Flink CEP to detect patterns in a stream of login events
+ */
+object FlinkCEPDemo extends LogSupport {
+ def main(args: Array[String]): Unit = {
+ logger.info("Start running Flink CEP demo...")
+
+ // Set up the Flink execution environment
+ val scalaEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+ scalaEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ scalaEnv.setParallelism(1)
+
+ // Define the stream of login events
+ val stream: KeyedStream[LoginEvent, String] = scalaEnv
+ .fromElements(
+ LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
+ LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
+ LoginEvent("user_1", "192.168.0.3", "fail", 4000L),
+ LoginEvent("user_2", "192.168.10.10", "fail", 5000L),
+ LoginEvent("user_2", "192.168.10.11", "fail", 6000L),
+ LoginEvent("user_2", "192.168.10.12", "fail", 9000L),
+ LoginEvent("user_3", "192.168.19.3", "fail", 10000L),
+ LoginEvent("user_3", "192.168.19.4", "fail", 30000L),
+ LoginEvent("user_3", "192.168.19.5", "success", 35000L),
+ LoginEvent("user_4", "192.168.19.15", "success", 50000L),
+ LoginEvent("user_5", "192.168.21.112", "fail", 51000L),
+ LoginEvent("user_5", "192.168.23.13", "fail", 52000L),
+ LoginEvent("user_5", "192.168.34.12", "fail", 53000L),
+ LoginEvent("user_5", "192.168.44.11", "fail", 54000L),
+ )
+ .assignAscendingTimestamps(_.eventTime)
+ .keyBy(_.userId)
+
+ // Print the login events
+ stream.print("login_event")
+
+ // Define the pattern for three consecutive failed login attempts
+ val threeTimesFailPattern: Pattern[LoginEvent, LoginEvent] = Pattern
+ .begin[LoginEvent]("first")
+ .where(_.eventType == "fail")
+ .next("second")
+ .where(_.eventType == "fail")
+ .next("third")
+ .where(_.eventType == "fail")
+ .within(Time.seconds(5))
+
+ // Apply the pattern to the stream and select the matching events
+ val failedStream: PatternStream[LoginEvent] = CEP.pattern(stream, threeTimesFailPattern)
+ failedStream
+ .select((pattern: scala.collection.Map[String, Iterable[LoginEvent]]) => {
+ val first = pattern("first").iterator.next()
+ val second = pattern("second").iterator.next()
+ val third = pattern("third").iterator.next()
+
+ (first.userId, first.ip, second.ip, third.ip)
+ })
+ .printToErr("fail_result")
+
+ // Define the pattern for a successful login following a failed attempt
+ val successPattern: Pattern[LoginEvent, LoginEvent] = Pattern
+ .begin[LoginEvent]("fail", AfterMatchSkipStrategy.skipPastLastEvent())
+ .optional
+ .where(_.eventType == "fail")
+ .followedBy("success")
+ .where(_.eventType == "success")
+ .within(Time.seconds(30))
+
+ // Apply the pattern to the stream and select the matching events
+ val successStream = CEP.pattern(stream, successPattern)
+ successStream.select((pattern: scala.collection.Map[String, Iterable[LoginEvent]]) => {
+ val iterator: Iterator[LoginEvent] = pattern.getOrElse("fail", Iterable.empty).iterator
+ val fail: LoginEvent = if (iterator.hasNext) iterator.next() else LoginEvent("", "", "", 0L)
+ val success: LoginEvent = pattern("success").iterator.next()
+
+ (success.userId, fail.ip, success.ip)
+ })
+ .printToErr("success_result")
+
+ // Execute the Flink job
+ scalaEnv.execute()
+
+ logger.info("Stop running Flink CEP demo")
+ }
+}
\ No newline at end of file
diff --git a/demo/FlinkDemo/src/main/scala/com/notalk/flink/demo/common/LogLevel.scala b/demo/FlinkDemo/src/main/scala/com/notalk/flink/demo/common/LogLevel.scala
new file mode 100755
index 00000000..735ae053
--- /dev/null
+++ b/demo/FlinkDemo/src/main/scala/com/notalk/flink/demo/common/LogLevel.scala
@@ -0,0 +1,11 @@
+package com.notalk.flink.demo.common
+
+/**
+ * Author: biyu.huang
+ * Date: 2023/7/27 12:09
+ * Description:
+ */
+object LogLevel extends Enumeration {
+ type LogLevel = Value
+ val DEBUG, INFO, WARN, ERROR, TRACE = Value
+}
diff --git a/demo/FlinkDemo/src/main/scala/com/notalk/flink/demo/common/LogSupport.scala b/demo/FlinkDemo/src/main/scala/com/notalk/flink/demo/common/LogSupport.scala
new file mode 100755
index 00000000..d4ecf787
--- /dev/null
+++ b/demo/FlinkDemo/src/main/scala/com/notalk/flink/demo/common/LogSupport.scala
@@ -0,0 +1,24 @@
+package com.notalk.flink.demo.common
+
+import com.notalk.flink.demo.common.LogLevel.LogLevel
+import com.typesafe.scalalogging.LazyLogging
+
+import scala.reflect.ClassTag
+
+/**
+ * Author: biyu.huang
+ * Date: 2023/7/27 12:08
+ * Description:
+ */
+trait LogSupport extends LazyLogging {
+ protected def logRecord[T: ClassTag](msg: T, level: LogLevel = LogLevel.INFO): Unit = {
+ level match {
+ case LogLevel.DEBUG => logger.debug(s"$msg")
+ case LogLevel.INFO => logger.info(s"$msg")
+ case LogLevel.WARN => logger.warn(s"$msg")
+ case LogLevel.ERROR => logger.error(s"$msg")
+ case LogLevel.TRACE => logger.trace(s"$msg")
+ case _ => logger.info(s"$msg")
+ }
+ }
+}
diff --git a/demo/FlinkDemo/src/main/scala/com/notalk/flink/demo/event/LoginEvent.scala b/demo/FlinkDemo/src/main/scala/com/notalk/flink/demo/event/LoginEvent.scala
new file mode 100755
index 00000000..7bd31098
--- /dev/null
+++ b/demo/FlinkDemo/src/main/scala/com/notalk/flink/demo/event/LoginEvent.scala
@@ -0,0 +1,12 @@
+package com.notalk.flink.demo.event
+
+/**
+ * Author: biyu.huang
+ * Date: 2023/7/27 14:39
+ * Description:
+ */
+case class LoginEvent(
+ userId: String,
+ ip: String,
+ eventType: String,
+ eventTime: Long)
diff --git a/demo/FlinkDemo/src/test/resources/log4j.properties b/demo/FlinkDemo/src/test/resources/log4j.properties
new file mode 100755
index 00000000..0f1249cc
--- /dev/null
+++ b/demo/FlinkDemo/src/test/resources/log4j.properties
@@ -0,0 +1,6 @@
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.out
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %l: %m%n
diff --git a/demo/FlinkDemo/src/test/resources/log4j2.properties b/demo/FlinkDemo/src/test/resources/log4j2.properties
new file mode 100755
index 00000000..af6a7e83
--- /dev/null
+++ b/demo/FlinkDemo/src/test/resources/log4j2.properties
@@ -0,0 +1,8 @@
+#appender.console.type=Console
+#appender.console.name=STDOUT
+#appender.console.layout.type=PatternLayout
+#appender.console.layout.pattern=%d{yy/MM/dd HH:mm:ss} %p %l: %m%n
+#appender.console.filter.threshold.type=ThresholdFilter
+#appender.console.filter.threshold.level=debug
+#
+#rootLogger=ERROR,STDOUT
diff --git a/demo/GccDemo/GccDemo.xcodeproj/project.pbxproj b/demo/GccDemo/GccDemo.xcodeproj/project.pbxproj
old mode 100644
new mode 100755
diff --git a/demo/GccDemo/GccDemo.xcodeproj/project.xcworkspace/contents.xcworkspacedata b/demo/GccDemo/GccDemo.xcodeproj/project.xcworkspace/contents.xcworkspacedata
old mode 100644
new mode 100755
diff --git a/demo/GccDemo/GccDemo.xcodeproj/project.xcworkspace/xcshareddata/IDEWorkspaceChecks.plist b/demo/GccDemo/GccDemo.xcodeproj/project.xcworkspace/xcshareddata/IDEWorkspaceChecks.plist
old mode 100644
new mode 100755
diff --git a/demo/GccDemo/GccDemo.xcodeproj/project.xcworkspace/xcuserdata/wallace.xcuserdatad/UserInterfaceState.xcuserstate b/demo/GccDemo/GccDemo.xcodeproj/project.xcworkspace/xcuserdata/wallace.xcuserdatad/UserInterfaceState.xcuserstate
old mode 100644
new mode 100755
diff --git a/demo/GccDemo/GccDemo.xcodeproj/xcuserdata/wallace.xcuserdatad/xcschemes/xcschememanagement.plist b/demo/GccDemo/GccDemo.xcodeproj/xcuserdata/wallace.xcuserdatad/xcschemes/xcschememanagement.plist
old mode 100644
new mode 100755
diff --git a/demo/GccDemo/GccDemo/main.cpp b/demo/GccDemo/GccDemo/main.cpp
old mode 100644
new mode 100755
diff --git a/demo/GccDemo/cdemo/main.c b/demo/GccDemo/cdemo/main.c
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/.kms_conf b/demo/ScalaDemo/.kms_conf
new file mode 100755
index 00000000..de4d9463
Binary files /dev/null and b/demo/ScalaDemo/.kms_conf differ
diff --git a/demo/ScalaDemo/pom.xml b/demo/ScalaDemo/pom.xml
old mode 100644
new mode 100755
index 36fb0acf..f3dc01aa
--- a/demo/ScalaDemo/pom.xml
+++ b/demo/ScalaDemo/pom.xml
@@ -115,7 +115,8 @@
org.scalatest
scalatest-maven-plugin
- ${project.build.directory}/unitspecs-reports
+ ${project.build.directory}/unitspecs-reports
+
.
WDF UnitSpec.txt
${project.build.directory}/site/scalatest
@@ -158,6 +159,16 @@
org.apache.spark
spark-core_${scala.binary.version}
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
@@ -267,6 +278,14 @@
com.google.code.gson
gson
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
@@ -338,6 +357,18 @@
log4j
log4j
+
+ org.slf4j
+ slf4j-reload4j
+
+
+ reload4j
+ ch.qos.reload4j
+
+
+ curator-client
+ org.apache.curator
+
@@ -361,6 +392,24 @@
scala-logging_${scala.binary.version}
+
+
+ org.apache.logging.log4j
+ log4j-api
+
+
+ org.apache.logging.log4j
+ log4j-core
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+
+
+ org.slf4j
+ slf4j-api
+
+
com.jcraft
jsch
@@ -383,17 +432,6 @@
better-files_${scala.binary.version}
-
- ch.qos.logback
- logback-core
-
-
-
- ch.qos.logback
- logback-classic
-
-
-
com.typesafe.akka
akka-actor_${scala.binary.version}
@@ -423,17 +461,6 @@
ch.ethz.ganymed
ganymed-ssh2
-
-
- org.slf4j
- slf4j-api
-
-
-
- org.slf4j
- slf4j-log4j12
-
-
@@ -484,17 +511,23 @@
flexmark-all
test
-
-
-
-
-
redis.clients
jedis
+
+ org.apache.curator
+ curator-framework
+ 2.13.0
+
+
+ org.apache.curator
+ curator-recipes
+ 2.13.0
+
+
com.zte.hadoop.loader
hadoop-loader
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/DateUtilTest.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/DateUtilTest.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/InitClassDemo.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/InitClassDemo.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/InitializationDemo.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/InitializationDemo.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/RegexDemo.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/RegexDemo.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/ReloadThread.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/ReloadThread.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/common/JavaLogSupport.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/common/JavaLogSupport.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/ASmrPlr.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/ASmrPlr.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/HW_ASmr.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/HW_ASmr.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/HW_Field.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/HW_Field.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/MRO.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/MRO.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/MROSax.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/MROSax.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/NorthMRInfoHW.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/NorthMRInfoHW.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/SaxHandler.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/parsexml/SaxHandler.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/ConfigMapStorage.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/ConfigMapStorage.java
new file mode 100755
index 00000000..f8e77bc6
--- /dev/null
+++ b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/ConfigMapStorage.java
@@ -0,0 +1,149 @@
+package com.wallace.demo.app.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.crypto.Cipher;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.Key;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Author: biyu.huang
+ * Date: 2024/7/24 10:08
+ * Description:
+ */
+public class ConfigMapStorage {
+ private static final Logger logger = LoggerFactory.getLogger(ConfigMapStorage.class);
+ private static final String ALGORITHM = "AES/GCM/PKCS5Padding";
+ private static final int KEY_LENGTH = 32;
+
+ // AES-GCM needs 96-bit(12 bytes) IV, refer to GaloisCounterMode.DEFAULT_IV_LEN
+ private static final int IV_LENGTH = 12;
+ private Key encryptionKey;
+ private String SAVE_FILE;
+ private Map configMap;
+
+ private Key generateKey() throws Exception {
+ KeyGenerator keyGen = KeyGenerator.getInstance("AES");
+ keyGen.init(256);
+ SecretKey secretKey = keyGen.generateKey();
+ return new SecretKeySpec(secretKey.getEncoded(), "AES");
+ }
+
+ public ConfigMapStorage(@Nullable String savePath) {
+ String envKey = System.getenv("ENCRYPTION_KEY");
+ if (envKey == null) {
+ throw new IllegalArgumentException("Environment variable ENCRYPTION_KEY not set");
+ }
+ byte[] keyBytes = Base64.getDecoder().decode(envKey);
+ byte[] newKeyBytes = new byte[KEY_LENGTH];
+ System.arraycopy(keyBytes, 0, newKeyBytes, 0, Math.min(keyBytes.length, KEY_LENGTH));
+ this.encryptionKey = new SecretKeySpec(newKeyBytes, "AES");
+
+ String SAVE_FILE_NAME = "/.kms_kv";
+ if (savePath == null || savePath.trim().isEmpty()) {
+ this.SAVE_FILE = "/tmp" + SAVE_FILE_NAME;
+ } else {
+ String fixedSavePath = savePath.trim();
+ if (fixedSavePath.endsWith("/")) {
+ this.SAVE_FILE = fixedSavePath.substring(0, fixedSavePath.length() - 1) + SAVE_FILE_NAME;
+ } else {
+ this.SAVE_FILE = fixedSavePath + SAVE_FILE_NAME;
+ }
+ }
+
+ this.configMap = new HashMap<>();
+ logger.info("SAVE_PATH -> " + this.SAVE_FILE);
+ }
+
+ public static ConfigMapStorage getInstance() {
+ return new ConfigMapStorage(null);
+ }
+
+ public static ConfigMapStorage getInstance(String savePath) {
+ return new ConfigMapStorage(savePath);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void loadConfigMap() {
+ try (ObjectInputStream ois = new ObjectInputStream(new FileInputStream(SAVE_FILE))) {
+ this.configMap = (Map) ois.readObject();
+ } catch (FileNotFoundException e) {
+ logger.error(this.SAVE_FILE + " not found. A new one should be created.");
+ } catch (IOException | ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void saveConfigMap() {
+ try (ObjectOutputStream oos = new ObjectOutputStream(Files.newOutputStream(Paths.get(SAVE_FILE)))) {
+ oos.writeObject(this.configMap);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void addConfig(String key, String value) throws Exception {
+ String encryptedValue = encrypt(value);
+ logger.info("[ENCRYPTED] " + key + " ===> " + encryptedValue);
+ configMap.put(key, encryptedValue);
+ saveConfigMap();
+ }
+
+ public String getConfig(String key) throws Exception {
+ String encryptedToken = this.configMap.get(key);
+ return encryptedToken != null ? decrypt(encryptedToken) : null;
+ }
+
+ public int getSize() {
+ return this.configMap.size();
+ }
+
+ private String encrypt(String data) throws Exception {
+ byte[] dataBytes = data.getBytes();
+ Cipher cipher = Cipher.getInstance(ALGORITHM);
+ cipher.init(Cipher.ENCRYPT_MODE, encryptionKey);
+ byte[] encryptedBytes = cipher.doFinal(dataBytes);
+ byte[] ivBytes = cipher.getIV();
+ byte[] encryptedData = new byte[IV_LENGTH + encryptedBytes.length];
+ System.arraycopy(ivBytes, 0, encryptedData, 0, IV_LENGTH);
+ System.arraycopy(encryptedBytes, 0, encryptedData, IV_LENGTH, encryptedBytes.length);
+ return Base64.getEncoder().encodeToString(encryptedData);
+ }
+
+ private String decrypt(String encryptedData) throws Exception {
+ byte[] cipherBytes = Base64.getDecoder().decode(encryptedData);
+ byte[] ivBytes = new byte[IV_LENGTH];
+ System.arraycopy(cipherBytes, 0, ivBytes, 0, IV_LENGTH);
+ GCMParameterSpec gcmParamSpec = new GCMParameterSpec(128, ivBytes, 0, IV_LENGTH);
+ Cipher cipher = Cipher.getInstance(ALGORITHM);
+ cipher.init(Cipher.DECRYPT_MODE, encryptionKey, gcmParamSpec);
+ byte[] rawBytes = cipher.doFinal(cipherBytes, IV_LENGTH, cipherBytes.length - IV_LENGTH);
+ return new String(rawBytes);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ConfigMapStorage configMapStorage = ConfigMapStorage.getInstance();
+ String dummyToken1 = Base64.getEncoder().encodeToString(configMapStorage.generateKey().getEncoded());
+ String dummyToken2 = "fake_token";
+ logger.info("[RAW] token1 ===> " + dummyToken1);
+ logger.info("[RAW] token2 ===> " + dummyToken2);
+ configMapStorage.addConfig("token1", dummyToken1);
+ configMapStorage.addConfig("token2", dummyToken2);
+
+ configMapStorage.loadConfigMap();
+ logger.info("[LOAD] configMap.size() = " + configMapStorage.getSize());
+ logger.info("[LOAD] token2 ===> " + configMapStorage.getConfig("token2"));
+ logger.info("[LOAD] token1 ===> " + configMapStorage.getConfig("token1"));
+ }
+}
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/DateUtil.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/DateUtil.java
old mode 100644
new mode 100755
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/FuncUtil.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/FuncUtil.java
old mode 100644
new mode 100755
index d585ad09..a89bbd7b
--- a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/FuncUtil.java
+++ b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/FuncUtil.java
@@ -14,25 +14,22 @@ public static String[] split(String str, String sep, String other) {
}
public static String[] split(String str, char split, char other) {
- // if (!str.contains(other + "")) {
- // return str.split(split + "", -1);
- // } else {
ArrayList strList = new ArrayList<>();
- int num = 0;// other干扰符个数
- int off = 0;// 字串的起始位置
- int subStrSize = 0;// 字串的长度
+ int startIndex = 0; // 字串的起始位置
+ int subStrSize = 0; // 字串的长度
+ int num = 0; // Initialize num
for (int i = 0; i < str.length(); i++) {
char c = str.charAt(i);
// 最后一个字符
if (c != split && i == str.length() - 1) {
- strList.add(str.substring(off, off + subStrSize + 1));
+ strList.add(str.substring(startIndex, startIndex + subStrSize + 1));
}
if (c == other) {
num++;
subStrSize++;
} else if (num % 2 == 0 && c == split) {
- strList.add(str.substring(off, off + subStrSize));
- off += subStrSize + 1;
+ strList.add(str.substring(startIndex, startIndex + subStrSize));
+ startIndex += subStrSize + 1;
subStrSize = 0;
} else {
subStrSize++;
@@ -42,6 +39,35 @@ public static String[] split(String str, char split, char other) {
return strList.toArray(fields);
}
+ public static String[] splitWithLimit(String str, char split, char other, int limit) {
+ ArrayList strList = new ArrayList<>(); // Initialize strList
+ int startIndex = 0; // 字串的起始位置
+ int subStrSize = 0; // 字串的长度
+ int splitCount = 0; // 分割次数
+ int num = 0; // Initialize num
+
+ for (int i = 0; i < str.length(); i++) {
+ char c = str.charAt(i);
+ // 最后一个字符
+ if (c != split && i == str.length() - 1) {
+ strList.add(str.substring(startIndex, startIndex + subStrSize + 1));
+ }
+ if (c == other) {
+ num++;
+ subStrSize++;
+ } else if (num % 2 == 0 && c == split && splitCount < limit - 1) {
+ strList.add(str.substring(startIndex, startIndex + subStrSize));
+ startIndex += subStrSize + 1;
+ subStrSize = 0;
+ splitCount++;
+ } else {
+ subStrSize++;
+ }
+ }
+ String fields[] = new String[strList.size()];
+ return strList.toArray(fields);
+ }
+
/**
* 将数组按照指定的字符拆分
*
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/StringToHash.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/StringToHash.java
new file mode 100755
index 00000000..1187c4ce
--- /dev/null
+++ b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/StringToHash.java
@@ -0,0 +1,117 @@
+package com.wallace.demo.app.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.SecretKeySpec;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.lang.Long;
+
+/**
+ * Author: biyu.huang
+ * Date: 2024/5/9 11:50
+ * Description:
+ */
+public class StringToHash {
+ private static final Logger log = LoggerFactory.getLogger(StringToHash.class);
+
+ public static void main(String[] args) throws Exception {
+ String inputStr = "test";
+
+ // 加密
+ String kycInfoSecureKey = "o69BYlB9umqDAT3sizrC1Q==";
+
+ byte[] encrypted = aesCbcEncryptPkcs7Iv(kycInfoSecureKey.getBytes(StandardCharsets.UTF_8),
+ inputStr.getBytes(StandardCharsets.UTF_8));
+
+ // 哈希计算
+ String encryptedHex = bytesToHex(encrypted);
+ long hashCode = getHashCode(encryptedHex);
+
+ // 取模运算
+ // long tableIdx = Long.parseLong(nameHash, 16) % 1000;
+
+ log.info(String.format("input_str -> %s, hash_code -> %s", inputStr,
+ Long.toUnsignedString(hashCode)));
+ }
+
+ private static final byte[] AES_CBC_IV = new byte[16];
+
+ public static long getHashCode(String nameEncrypt) {
+ return getHashCode(nameEncrypt.getBytes(StandardCharsets.UTF_8));
+ }
+
+ public static long getHashCode(byte[] bytes) {
+ Hash64 hash64 = new Hash64();
+ hash64.write(bytes);
+ return hash64.getHashCode();
+ }
+ public static String aesCbcEncryptWithIv(byte[] key, byte[] plainText) throws Exception {
+ SecretKeySpec secretKeySpec = new SecretKeySpec(key, "AES");
+ Cipher cipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
+ cipher.init(Cipher.ENCRYPT_MODE, secretKeySpec, new IvParameterSpec(AES_CBC_IV));
+ byte[] encrypted = cipher.doFinal(plainText, 0, plainText.length);
+ byte[] result = new byte[plainText.length];
+ System.arraycopy(encrypted, 0, result, 0, plainText.length);
+ return bytesToHex(result);
+ }
+
+ public static byte[] pkcs7Padding(byte[] ciphertext, int blockSize) {
+ int padding = blockSize - (ciphertext.length % blockSize);
+ byte[] padText = new byte[padding];
+ Arrays.fill(padText, (byte) padding);
+ byte[] result = Arrays.copyOf(ciphertext, ciphertext.length + padding);
+ System.arraycopy(padText, 0, result, ciphertext.length, padding);
+ return result;
+ }
+
+ public static byte[] aesCbcEncryptPkcs7Iv(byte[] key, byte[] message) throws Exception {
+ byte[] plainText = pkcs7Padding(message, 16);
+ String encrypted = aesCbcEncryptWithIv(key, plainText);
+ return hexToBytes(encrypted);
+ }
+
+ private static String bytesToHex(byte[] bytes) {
+ StringBuilder sb = new StringBuilder();
+ for (byte b : bytes) {
+ sb.append(String.format("%02x", b));
+ }
+ return sb.toString();
+ }
+
+ private static byte[] hexToBytes(String hex) {
+ int len = hex.length();
+ byte[] data = new byte[len / 2];
+ for (int i = 0; i < len; i += 2) {
+ data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4)
+ + Character.digit(hex.charAt(i + 1), 16));
+ }
+ return data;
+ }
+
+
+ private static class Hash64 {
+ private long hashCode;
+
+ public Hash64() {
+ // based on fnv.offset64 in Golang(fnv.go), refer to https://en.wikipedia.org/wiki/Fowler-Noll-Vo_hash_function.
+ this.hashCode = -3750763034362895579L;
+ }
+
+ public void write(byte[] data) {
+ for (byte b : data) {
+ hashCode ^= (long) b;
+ hashCode *= 1099511628211L;
+ }
+ }
+
+ public long getHashCode() {
+ return this.hashCode;
+ }
+ }
+}
+
+
diff --git a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/ioutils/ParquetIOUtils.java b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/ioutils/ParquetIOUtils.java
old mode 100644
new mode 100755
index f3eae52f..fde97885
--- a/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/ioutils/ParquetIOUtils.java
+++ b/demo/ScalaDemo/src/main/java/com/wallace/demo/app/utils/ioutils/ParquetIOUtils.java
@@ -10,99 +10,235 @@
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Random;
import static org.apache.parquet.hadoop.ParquetReader.builder;
public class ParquetIOUtils {
- static Logger logger = Logger.getLogger(ParquetIOUtils.class);
+ static Logger logger = Logger.getLogger(ParquetIOUtils.class);
- public static void main(String[] args) throws Exception {
- //parquetWriter("test\\parquet-out2", "input.txt");
- parquetReaderV2();
- }
+ public static void main(String[] args) throws Exception {
+ //parquetWriter("test\\parquet-out2", "input.txt");
+ parquetReaderV2();
+ }
- private static void parquetReaderV2() throws Exception {
- GroupReadSupport readSupport = new GroupReadSupport();
- ParquetReader.Builder reader = builder(readSupport, new Path("test\\parquet-out2"));
- ParquetReader build = reader.build();
- Group line = null;
- while ((line = build.read()) != null) {
- Group time = line.getGroup("time", 0);
- //通过下标和字段名称都可以获取
- /*System.out.println(line.getString(0, 0)+"\t"+
- line.getString(1, 0)+"\t"+
- time.getInteger(0, 0)+"\t"+
- time.getString(1, 0)+"\t");*/
- System.out.println(line.getString("city", 0) + "\t" +
- line.getString("ip", 0) + "\t" +
- time.getInteger("ttl", 0) + "\t" +
- time.getString("ttl2", 0) + "\t");
-
- //System.out.println(line.toString());
+ private static void parquetReaderV2() {
+ GroupReadSupport readSupport = new GroupReadSupport();
+ ParquetReader.Builder reader = builder(readSupport, new Path("test\\parquet-out2"));
+ try (ParquetReader build = reader.build()) {
+ Group row;
+ GroupType schema = null;
+ while ((row = build.read()) != null) {
+ schema = schema == null ? row.getType() : schema;
+ for (Type field : schema.getFields()) {
+ primitiveFieldReader(field, row);
+ }
+ Group timeField = row.getGroup("time", 0);
+ //通过下标和字段名称都可以获取
+ /*System.out.println(line.getString(0, 0)+"\t"+
+ line.getString(1, 0)+"\t"+
+ time.getInteger(0, 0)+"\t"+
+ time.getString(1, 0)+"\t");*/
+ for (Type field : timeField.getType().asGroupType().getFields()) {
+ switch (field.getOriginalType()) {
+ case INT_64:
+ row.getInteger(field.getName(), 0);
+ break;
+ case DECIMAL:
+ row.getDouble(field.getName(), 0);
+ break;
+ case UTF8:
+ row.getString(field.getName(), 0);
+ break;
+ default:
+ break;
+ }
}
- System.out.println("读取结束");
+ logger.info(row.getString("city", 0) + "\t" +
+ row.getString("ip", 0) + "\t" +
+ timeField.getInteger("ttl", 0) + "\t" +
+ timeField.getString("ttl2", 0) + "\t");
+
+ //System.out.println(line.toString());
+
+ }
+ } catch (IOException e) {
+ logger.error(e);
}
+ logger.info("读取结束");
+ }
- //新版本中new ParquetReader()所有构造方法好像都弃用了,用上面的builder去构造对象
- static void parquetReader(String inPath) throws Exception {
- GroupReadSupport readSupport = new GroupReadSupport();
- ParquetReader.Builder builder = builder(readSupport, new Path(inPath));
- // ParquetReader reader = new ParquetReader(new Path(inPath), readSupport);
- Group line = null;
- while ((line = builder.build().read()) != null) {
- System.out.println(line.toString());
- }
- System.out.println("读取结束");
+ static Long binaryToUnscaledLong(Binary binary) {
+ // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here
+ // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without
+ // copying it.
+ ByteBuffer buffer = binary.toByteBuffer();
+ byte[] bytes = buffer.array();
+ int start = buffer.arrayOffset() + buffer.position();
+ int end = buffer.arrayOffset() + buffer.limit();
+
+ long unscaled = 0L;
+ int i = start;
+ while (i < end) {
+ unscaled = (unscaled << 8) | (bytes[i] & 0xff);
+ i += 1;
}
- /**
- * @param outPath 输出Parquet格式
- * @param inPath 输入普通文本文件
- * @throws IOException
- */
- static void parquetWriter(String outPath, String inPath) throws IOException {
- MessageType schema = MessageTypeParser.parseMessageType("message Pair {\n" +
- " required binary city (UTF8);\n" +
- " required binary ip (UTF8);\n" +
- " repeated group time {\n" +
- " required int32 ttl;\n" +
- " required binary ttl2;\n" +
- "}\n" +
- "}");
- GroupFactory factory = new SimpleGroupFactory(schema);
- Path path = new Path(outPath);
- Configuration configuration = new Configuration();
- GroupWriteSupport writeSupport = new GroupWriteSupport();
- GroupWriteSupport.setSchema(schema, configuration);
- ParquetWriter writer = new ParquetWriter<>(path, configuration, writeSupport);
- //把本地文件读取进去,用来生成parquet格式文件
- BufferedReader br = new BufferedReader(new FileReader(new File(inPath)));
- String line;
- Random r = new Random();
- while ((line = br.readLine()) != null) {
- String[] strs = line.split("\\s+");
- if (strs.length == 2) {
- Group group = factory.newGroup()
- .append("city", strs[0])
- .append("ip", strs[1]);
- Group tmpG = group.addGroup("time");
- tmpG.append("ttl", r.nextInt(9) + 1);
- tmpG.append("ttl2", r.nextInt(9) + "_a");
- writer.write(group);
- }
+ int bits = 8 * (end - start);
+ unscaled = (unscaled << (64 - bits)) >> (64 - bits);
+ return unscaled;
+ }
+
+ static Object readValue(Type field, Integer index, Group row) {
+ Object value = null;
+ if (field.isPrimitive()) {
+ switch (field.asPrimitiveType().getPrimitiveTypeName()) {
+ case FLOAT:
+ value = row.getFloat(field.getName(), 0);
+ break;
+ case INT32:
+ if (field.getOriginalType() == OriginalType.DECIMAL) {
+ DecimalMetadata metadata = field.asPrimitiveType().getDecimalMetadata();
+ int scale = metadata == null ? 0 : metadata.getScale();
+ value = BigDecimal.valueOf(row.getInteger(field.getName(), index), scale);
+ } else {
+ value = row.getInteger(field.getName(), index);
+ }
+ break;
+ case INT64:
+ if (field.getOriginalType() == OriginalType.DECIMAL) {
+ DecimalMetadata metadata = field.asPrimitiveType().getDecimalMetadata();
+ int scale = metadata == null ? 0 : metadata.getScale();
+ value = BigDecimal.valueOf(row.getLong(field.getName(), index), scale);
+ } else {
+ value = row.getLong(field.getName(), index);
+ }
+ break;
+ case BINARY:
+ if (field.getOriginalType() == OriginalType.DECIMAL) {
+ DecimalMetadata metadata = field.asPrimitiveType().getDecimalMetadata();
+ int scale = metadata == null ? 0 : metadata.getScale();
+ value = BigDecimal.valueOf(binaryToUnscaledLong(row.getBinary(field.getName(), index)), scale);
+ } else {
+ value = row.getLong(field.getName(), index);
+ }
+ break;
+ case DOUBLE:
+ value = row.getDouble(field.getName(), index);
+ break;
+ case BOOLEAN:
+ value = row.getBoolean(field.getName(), index);
+ break;
+ case INT96:
+ break;
+ case FIXED_LEN_BYTE_ARRAY:
+ break;
+ default:
+ throw new RuntimeException("unknown primitive type: " +
+ field.asPrimitiveType().getPrimitiveTypeName());
+ }
+ } else {
+ GroupType fieldGroupType = field.asGroupType();
+ Group fieldGroup = row.getGroup(field.getName(), index);
+ HashMap groupValue = new HashMap<>();
+ for (Type singleField : fieldGroupType.getFields()) {
+ Object fieldValue;
+ int elementNum = fieldGroup.getFieldRepetitionCount(singleField.getName());
+ if (elementNum <= 1) {
+ fieldValue = readValue(singleField, 0, fieldGroup);
+ } else {
+ ArrayList
- ch.qos.logback
- logback-core
+ com.typesafe.akka
+ akka-actor_${scala.binary.version}
- ch.qos.logback
- logback-classic
+ com.typesafe.akka
+ akka-remote_${scala.binary.version}
-
com.typesafe.akka
- akka-actor_${scala.binary.version}
+ akka-slf4j_${scala.binary.version}
com.typesafe.akka
- akka-remote_${scala.binary.version}
+ akka-protobuf_${scala.binary.version}
@@ -212,17 +211,6 @@
ch.ethz.ganymed
ganymed-ssh2
-
-
- org.slf4j
- slf4j-api
-
-
-
- org.slf4j
- slf4j-log4j12
-
-
org.apache.spark
@@ -299,6 +287,56 @@
org.apache.flink
flink-streaming-scala_${scala.binary.version}
1.9.1
+
+
+ com.typesafe.akka
+ akka-protobuf
+
+
+ com.typesafe.akka
+ akka-protobuf_${scala.binary.version}
+
+
+ com.typesafe.akka
+ akka-actor_${scala.binary.version}
+
+
+ com.typesafe.akka
+ akka-stream_${scala.binary.version}
+
+
+ com.typesafe.akka
+ akka-slf4j_${scala.binary.version}
+
+
+ org.scala-lang
+ scala-compiler
+
+
+ org.scala-lang
+ scala-library
+
+
+ org.scala-lang
+ scala-reflect
+
+
+ org.slf4j
+ slf4j-api
+
+
+ com.google.code.findbugs
+ jsr305
+
+
+ org.apache.commons
+ commons-lang3
+
+
+ org.apache.commons
+ commons-math3
+
+
@@ -309,5 +347,14 @@
org.scalanlp
nak_2.10
+
+
+ ml.dmlc
+ xgboost4j_${scala.binary.version}
+
+
+ ml.dmlc
+ xgboost4j-spark_${scala.binary.version}
+
diff --git a/demo/SparkDemo/src/at/scala/AccSpec.scala b/demo/SparkDemo/src/at/scala/AccSpec.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/java/com/wallace/demo/EncodingParser.java b/demo/SparkDemo/src/main/java/com/wallace/demo/EncodingParser.java
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/java/com/wallace/demo/socket/client/TalkClient.java b/demo/SparkDemo/src/main/java/com/wallace/demo/socket/client/TalkClient.java
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/java/com/wallace/demo/socket/server/MultiAcceptServer.java b/demo/SparkDemo/src/main/java/com/wallace/demo/socket/server/MultiAcceptServer.java
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/java/com/wallace/demo/socket/server/MultiServer.java b/demo/SparkDemo/src/main/java/com/wallace/demo/socket/server/MultiServer.java
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/java/com/wallace/demo/socket/server/SingleServer.java b/demo/SparkDemo/src/main/java/com/wallace/demo/socket/server/SingleServer.java
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/resources/log4j2.properties b/demo/SparkDemo/src/main/resources/log4j2.properties
new file mode 100755
index 00000000..c76604f0
--- /dev/null
+++ b/demo/SparkDemo/src/main/resources/log4j2.properties
@@ -0,0 +1,14 @@
+# Console logger
+appender.console.type=Console
+appender.console.name=STDOUT
+appender.console.layout.type=PatternLayout
+appender.console.layout.pattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+appender.console.filter.threshold.type=ThresholdFilter
+appender.console.filter.threshold.level=DEBUG
+
+# configure logger
+rootLogger=INFO,STDOUT
+#logger.gson_demo=INFO,specificLogger
+#logger.gson_demo.name=com.wallace.demo.app.GsonDemo
+#logger.gson_demo.additivity=false
+
diff --git a/demo/SparkDemo/src/main/resources/logback.xml b/demo/SparkDemo/src/main/resources/logback.xml
deleted file mode 100644
index 28210719..00000000
--- a/demo/SparkDemo/src/main/resources/logback.xml
+++ /dev/null
@@ -1,105 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
- %d{HH:mm:ss.SSS} |-[%thread]-[%level]-[%logger{35}.%method:%line] - %msg%n
-
-
-
-
-
-
- WARN
- ACCEPT
- DENY
-
-
-
- ${log_dir}/%d{yyyy-MM-dd}/CodePrototypesDemo.warn.log
- ${maxHistory}
-
-
-
- %d{HH:mm:ss.SSS} |-[%thread]-[%level]-[%logger{35}.%method:%line] - %msg%n
-
-
-
-
-
-
- INFO
- ACCEPT
- DENY
-
-
-
- ${log_dir}/%d{yyyy-MM-dd}/CodePrototypesDemo.info.log
- ${maxHistory}
-
-
-
- %d{HH:mm:ss.SSS} |-[%thread]-[%level]-[%logger{35}.%method:%line] - %msg%n
-
-
-
-
-
- DEBUG
- ACCEPT
- DENY
-
-
-
- ${log_dir}/%d{yyyy-MM-dd}/CodePrototypesDemo.debug.log
- ${maxHistory}
-
-
-
- %d{HH:mm:ss.SSS} |-[%thread]-[%level]-[%logger{35}.%method:%line] - %msg%n
-
-
-
-
-
-
- ERROR
- ACCEPT
- DENY
-
-
-
- ${log_dir}/%d{yyyy-MM-dd}/CodePrototypesDemo.error.log
- ${maxHistory}
-
-
-
-
- %d{HH:mm:ss.SSS} |-[%thread]-[%level]-[%logger{35}.%method:%line] - %msg%n
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/demo/SparkDemo/src/main/resources/msgproducer.conf b/demo/SparkDemo/src/main/resources/msgproducer.conf
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/resources/sample_1.csv b/demo/SparkDemo/src/main/resources/sample_1.csv
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/resources/spark_sql_data b/demo/SparkDemo/src/main/resources/spark_sql_data
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/resources/trainingData.csv.gz b/demo/SparkDemo/src/main/resources/trainingData.csv.gz
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/Boot.scala b/demo/SparkDemo/src/main/scala/com/wallace/Boot.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/common/CreateSparkSession.scala b/demo/SparkDemo/src/main/scala/com/wallace/common/CreateSparkSession.scala
old mode 100644
new mode 100755
index d4976938..51d20084
--- a/demo/SparkDemo/src/main/scala/com/wallace/common/CreateSparkSession.scala
+++ b/demo/SparkDemo/src/main/scala/com/wallace/common/CreateSparkSession.scala
@@ -13,6 +13,7 @@ trait CreateSparkSession extends FuncRunDuration with LogSupport {
.master(master)
.appName(appName)
.config("spark.sql.warehouse.dir", warehouseLocation)
+ .config("xgboost.spark.debug", "true")
//.enableHiveSupport()
.getOrCreate()
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/common/DemoConfig.scala b/demo/SparkDemo/src/main/scala/com/wallace/common/DemoConfig.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/common/FuncRunDuration.scala b/demo/SparkDemo/src/main/scala/com/wallace/common/FuncRunDuration.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/common/LogSupport.scala b/demo/SparkDemo/src/main/scala/com/wallace/common/LogSupport.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/common/ProjConfig.scala b/demo/SparkDemo/src/main/scala/com/wallace/common/ProjConfig.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/common/ProjLogger.scala b/demo/SparkDemo/src/main/scala/com/wallace/common/ProjLogger.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/common/UserDefineFunc.scala b/demo/SparkDemo/src/main/scala/com/wallace/common/UserDefineFunc.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/common/Using.scala b/demo/SparkDemo/src/main/scala/com/wallace/common/Using.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/common/sshclient/SshClient.scala b/demo/SparkDemo/src/main/scala/com/wallace/common/sshclient/SshClient.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/common/sshclient/SshClientUserInfo.scala b/demo/SparkDemo/src/main/scala/com/wallace/common/sshclient/SshClientUserInfo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/common/timeformat/TimePara.scala b/demo/SparkDemo/src/main/scala/com/wallace/common/timeformat/TimePara.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/flink/RedisAsyncFunc.scala b/demo/SparkDemo/src/main/scala/com/wallace/flink/RedisAsyncFunc.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/redis/JedisClusterPipeline.scala b/demo/SparkDemo/src/main/scala/com/wallace/redis/JedisClusterPipeline.scala
old mode 100644
new mode 100755
index 1820b79a..76d4b563
--- a/demo/SparkDemo/src/main/scala/com/wallace/redis/JedisClusterPipeline.scala
+++ b/demo/SparkDemo/src/main/scala/com/wallace/redis/JedisClusterPipeline.scala
@@ -16,15 +16,16 @@ import scala.collection.mutable.ArrayBuffer
*/
class JedisClusterPipeline(jedisCluster: JedisCluster) extends PipelineBase with Closeable {
self =>
- private val FIELD_CONNECTION_HANDLER: Field = getField(classOf[BinaryJedisCluster], "connectionHandler")
- private val FIELD_CACHE: Field = getField(classOf[JedisClusterConnectionHandler], "cache")
- private val clients: util.LinkedList[Client] = new util.LinkedList[Client]()
- private val jedisMap: util.HashMap[JedisPool, Jedis] = new util.HashMap[JedisPool, Jedis]()
- private val hasDataInBuf: AtomicBoolean = new AtomicBoolean(false)
- private val connectionHandler: JedisSlotBasedConnectionHandler = getValue(jedisCluster, FIELD_CONNECTION_HANDLER)
- .asInstanceOf[JedisSlotBasedConnectionHandler]
- private val clusterInfoCache: JedisClusterInfoCache = getValue(connectionHandler, FIELD_CACHE)
- .asInstanceOf[JedisClusterInfoCache]
+ private final val FIELD_CONNECTION_HANDLER: Field =
+ getField(classOf[BinaryJedisCluster], "connectionHandler")
+ private final val FIELD_CACHE: Field = getField(classOf[JedisClusterConnectionHandler], "cache")
+ private final val clients: util.LinkedList[Client] = new util.LinkedList[Client]()
+ private final val jedisMap: util.HashMap[JedisPool, Jedis] = new util.HashMap[JedisPool, Jedis]()
+ private final val hasDataInBuf: AtomicBoolean = new AtomicBoolean(false)
+ private final val connectionHandler: JedisSlotBasedConnectionHandler =
+ getValue(jedisCluster, FIELD_CONNECTION_HANDLER)
+ private final val clusterInfoCache: JedisClusterInfoCache =
+ getValue(connectionHandler, FIELD_CACHE)
override def getClient(key: String): Client = {
val binaryKey: Array[Byte] = SafeEncoder.encode(key)
@@ -41,15 +42,15 @@ class JedisClusterPipeline(jedisCluster: JedisCluster) extends PipelineBase with
private def getJedis(slot: Int): Jedis = {
val pool: JedisPool = clusterInfoCache.getSlotPool(slot)
val tryGetJedis: Option[Jedis] = Option(jedisMap.get(pool))
- val jedisCli: Jedis = if (tryGetJedis.isEmpty) {
- val tmp: Jedis = pool.getResource
- jedisMap.put(pool, tmp)
- tmp
+ val jedisClient: Jedis = if (tryGetJedis.isEmpty) {
+ val jedis: Jedis = pool.getResource
+ jedisMap.put(pool, jedis)
+ jedis
} else {
tryGetJedis.get
}
hasDataInBuf.set(true)
- jedisCli
+ jedisClient
}
def pipelineSetEx(data: Array[KVDataEX]): Unit = {
@@ -59,7 +60,7 @@ class JedisClusterPipeline(jedisCluster: JedisCluster) extends PipelineBase with
val expireTime: Int = elem.expireTime
self.setex(elem.key, expireTime, elem.value)
}
- syncAndReturnAll
+ syncAndReturnAll()
} catch {
case e: Exception =>
throw new RuntimeException("[setex] operator error", e)
@@ -70,7 +71,7 @@ class JedisClusterPipeline(jedisCluster: JedisCluster) extends PipelineBase with
val result: ArrayBuffer[KVData] = new ArrayBuffer[KVData]()
try {
keys.foreach(self.hgetAll)
- val res = syncAndReturnAll.map(_.asInstanceOf[util.Map[String, String]])
+ val res = syncAndReturnAll().map(_.asInstanceOf[util.Map[String, String]])
res.foreach {
elem =>
@@ -85,9 +86,9 @@ class JedisClusterPipeline(jedisCluster: JedisCluster) extends PipelineBase with
result.result().toArray[KVData]
}
- def syncAndReturnAll: Array[Any] = innerSync
+ def syncAndReturnAll(): Array[Any] = innerSync()
- private def innerSync: Array[Any] = {
+ private def innerSync(): Array[Any] = {
val responseList: ArrayBuffer[Any] = new ArrayBuffer[Any]()
val clientSet: util.HashSet[Client] = new util.HashSet[Client]()
var isExcept: Boolean = true
@@ -166,9 +167,9 @@ class JedisClusterPipeline(jedisCluster: JedisCluster) extends PipelineBase with
}
}
- private def getValue[T](obj: AnyRef, field: Field): AnyRef = {
+ private def getValue[T](obj: AnyRef, field: Field): T = {
try {
- field.get(obj)
+ field.get(obj).asInstanceOf[T]
} catch {
case e@(_: IllegalAccessException | _: IllegalArgumentException) =>
throw new RuntimeException("failed to get value", e)
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/snmp/SnmpPDUAnalysis.scala b/demo/SparkDemo/src/main/scala/com/wallace/snmp/SnmpPDUAnalysis.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/Boot.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/Boot.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/dataproprocess/AvgValue.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/dataproprocess/AvgValue.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/dataproprocess/DataProducer.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/dataproprocess/DataProducer.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/graphxdemo/GraphXDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/graphxdemo/GraphXDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/dataframedemo/DataFrameDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/dataframedemo/DataFrameDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/dataframedemo/PersonInfo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/dataframedemo/PersonInfo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/dataframedemo/SpendingInfo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/dataframedemo/SpendingInfo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/dataframedemo/WindowExprDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/dataframedemo/WindowExprDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/exam/SparkExamDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/exam/SparkExamDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/rdddemo/DBScanDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/rdddemo/DBScanDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/rdddemo/ParquetDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/rdddemo/ParquetDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/rdddemo/RddDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/rdddemo/RddDemo.scala
old mode 100644
new mode 100755
index 8bbc812e..d2da0478
--- a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/rdddemo/RddDemo.scala
+++ b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/rdddemo/RddDemo.scala
@@ -13,7 +13,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
@@ -23,7 +22,7 @@ import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
*/
object RddDemo extends CreateSparkSession with Using {
private val _spark: SparkSession = createSparkSession("RddDemo")
- val minPartitions: Int = Math.min(Runtime.getRuntime.availableProcessors(), 10)
+ private val minPartitions: Int = Math.min(Runtime.getRuntime.availableProcessors(), 10)
def readTextFile(filePath: String): Unit = {
val sc: SparkContext = _spark.sparkContext
@@ -61,8 +60,7 @@ object RddDemo extends CreateSparkSession with Using {
def main(args: Array[String]): Unit = {
val sc = _spark.sparkContext
- val hc = new HiveContext(sc)
- import hc.implicits._
+ import _spark.implicits._
val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)
println(s"Home Directory: ${fs.getHomeDirectory}")
val rdd: RDD[String] = sc.parallelize(Array("hello world", "hello", "world", "hello world world"), 2)
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/rdddemo/RddStaticsDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/rdddemo/RddStaticsDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/structuredstreamingdemo/StructuredStreamingDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/structuredstreamingdemo/StructuredStreamingDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/udfdemo/UdfDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkdemo/udfdemo/UdfDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/ChiSqLearning.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/ChiSqLearning.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/ETS.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/ETS.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/ForecastIndoorAndOutdoorMR.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/ForecastIndoorAndOutdoorMR.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/GBDTModelDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/GBDTModelDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/LightGBMDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/LightGBMDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/MLLibDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/MLLibDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/PipelinesDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/PipelinesDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/XGBoostDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/XGBoostDemo.scala
new file mode 100644
index 00000000..97445fe1
--- /dev/null
+++ b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkmllibdemo/XGBoostDemo.scala
@@ -0,0 +1,195 @@
+package com.wallace.spark.sparkmllibdemo
+
+import com.wallace.common.{CreateSparkSession, Using}
+import ml.dmlc.xgboost4j.scala.Booster
+import ml.dmlc.xgboost4j.scala.spark.{XGBoostClassificationModel, XGBoostClassifier}
+import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
+import org.apache.spark.ml.feature.VectorAssembler
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
+import org.apache.spark.sql.functions._
+
+import scala.collection.{immutable, mutable}
+import scala.util.Random
+
+/**
+ * Author: biyu.huang
+ * Date: 2024/12/31 11:39
+ * Description:
+ */
+object XGBoostDemo extends CreateSparkSession with Using {
+ def main(args: Array[String]): Unit = {
+ usingSpark(createSparkSession("XGBoost Demo")) {
+ spark =>
+ // Randomly generate 20,000 samples
+ val random = new Random()
+ val data = (1 to 20000).map { _ =>
+ val randomLabel = random.nextInt(2).toDouble // 随机生成 0 或 1
+ val feature1 = random.nextDouble() * 10 + 1.0 // 特征 1,范围 [0, 10)
+ val feature2 = random.nextDouble() * 20 + 1.0 // 特征 2,范围 [0, 20)
+ val feature3 = random.nextDouble() * 30 + 1.0 // 特征 3,范围 [0, 30)
+
+ val label = if (feature1 <= 5.0) {
+ 1.0
+ } else if (feature2 >= 10.0) {
+ 1.0
+ } else if (feature1 > 5.0 & feature2 < 10.0 & feature3 >= 20.0) {
+ 1.0
+ } else {
+ 0.0
+ }
+
+ val finalLabel: Double = if (label == randomLabel) label else 0.0
+ (finalLabel, feature1, feature2, feature3)
+ }.zipWithIndex.map {
+ case ((label, feature1, feature2, feature3), id) =>
+ (label, feature1, feature2, feature3, id)
+ }
+
+ val df = spark.createDataFrame(data)
+ .toDF("label", "feature1", "feature2", "feature3", "id")
+ // Process features
+ val assembler = new VectorAssembler()
+ .setInputCols(Array("feature1", "feature2", "feature3"))
+ .setOutputCol("features")
+ val transformedDF = assembler.transform(df)
+ transformedDF.show()
+ // Split raw_data into train_data and test_data
+ val Array(trainData, testData) = transformedDF.randomSplit(Array(0.8, 0.2), seed = 42)
+
+ // Split train_data into k-folds
+ val k = 5 // Define K fold
+ val folds: Array[Dataset[Row]] = trainData.randomSplit(Array.fill(k)(1.0 / k), seed = 42)
+ // Verify the data distribution across folds.
+ folds.zipWithIndex.foreach { case (df, idx) => log.info(s"Fold $idx: ${df.count()} rows") }
+
+ // XGBoostClassifier parameters
+ val params = Map(
+ "seed" -> 42,
+ "eta" -> 0.1,
+ "max_depth" -> 4,
+ "objective" -> "binary:logistic", // 二分类 multi:softprob
+ "num_round" -> 200, // 迭代次数
+ "num_workers" -> 2, // 并行任务数
+ // "verbosity" -> 2, // 输出详细日志
+ "handle_invalid" -> "keep",
+ "use_external_memory" -> "false" // 避免使用 Rabit Tracker
+ )
+
+ // model and logloss
+ val results: immutable.Seq[(Double, XGBoostClassificationModel, Map[String, Double])] =
+ (0 until k).map { i =>
+ val trainData = folds.filterNot(_ == folds(i)).reduce(_.union(_)) // 除第 i 折外的所有数据
+ val testData = folds(i) // 第 i 折作为验证集
+
+ // 初始化 XGBoostClassifier
+ val xgbClassifier = new XGBoostClassifier(params)
+ .setFeaturesCol("features")
+ .setLabelCol("label")
+ .setEvalSets(Map("train" -> trainData.toDF(), "eval" -> testData.toDF())) // 指定训练和验证集
+ // Model training
+ val model = xgbClassifier.fit(trainData)
+ log.info(s"Model summary: ${model.summary.toString()}")
+ featureImportance(model, Array("feature1", "feature2", "feature3"))
+ // Model transform
+ val predictions = model.transform(testData)
+ // Calculate log-loss
+ val epsilon: Double = 1e-15
+ val logLossUDF = udf((y_true: Double, y_pred: Double) => {
+ val clipped = Math.max(epsilon, Math.min(1 - epsilon, y_pred))
+ -y_true * Math.log(clipped) - (1 - y_true) * Math.log(1 - clipped)
+ })
+
+ predictions.show(10, truncate = false)
+ val vectorToArray = udf((v: org.apache.spark.ml.linalg.Vector) => v.toArray)
+ val logLossDF = predictions
+ .withColumn("probability", vectorToArray(col("probability")))
+ .select(
+ col("label").alias("y_true"),
+ col("probability").getItem(1).alias("y_pred")
+ ).withColumn("logloss", logLossUDF(col("y_true"), col("y_pred")))
+ val avgLogLoss = logLossDF.select(avg("logloss")).collect()(0)(0)
+ val metrics: Map[String, Double] = modelEvaluation(predictions)
+ log.info(s"Fold $i Logloss: $avgLogLoss, AUC-ROC: ${metrics("auc-roc")}, " +
+ s"Accuracy: ${metrics("accuracy")}, Precision: ${metrics("precision")}, " +
+ s"Recall: ${metrics("recall")}, F1 Score: ${metrics("f1_score")}")
+ // Confusion Matrix
+ val confusionMatrix = predictions
+ .groupBy("label", "prediction")
+ .count().orderBy("label", "prediction")
+ confusionMatrix.show()
+ (avgLogLoss.asInstanceOf[Double], model, metrics)
+ }
+
+ // Predict 投票法 (Voting)
+ val predictions: DataFrame = results.map(_._2.transform(testData))
+ .map(_.select("id", "label", "features", "rawPrediction", "prediction"))
+ .reduce(_.union(_))
+ .groupBy("id")
+ .agg(
+ expr("mode(prediction)").alias("prediction"),
+ max("rawPrediction").alias("rawPrediction"),
+ max("label").alias("label"),
+ max("features").alias("features")
+ )
+ // results.minBy(_._1)._2.transform(testData)
+ predictions.select("id", "label", "features", "rawPrediction", "prediction").show()
+
+ // AUC
+ val aucArray = results.map(_._3.apply("auc-roc"))
+ val auc = aucArray.sum / aucArray.size
+ val metrics = modelEvaluation(predictions)
+ log.info(s"AUC-ROC: $auc, " +
+ s"Accuracy: ${metrics("accuracy")}, Precision: ${metrics("precision")}, " +
+ s"Recall: ${metrics("recall")}, F1 Score: ${metrics("f1_score")}")
+ // 创建混淆矩阵
+ val confusionMatrix =
+ predictions.groupBy("label", "prediction").count().orderBy("label", "prediction")
+ confusionMatrix.show()
+ }
+ }
+
+ private def modelEvaluation(prediction: DataFrame,
+ objective: String = "binary"): Map[String, Double] = {
+ val auc: Double = objective match {
+ case "binary" =>
+ val binaryEvaluator = new BinaryClassificationEvaluator()
+ .setLabelCol("label")
+ .setRawPredictionCol("rawPrediction")
+ .setMetricName("areaUnderROC")
+ // AUC
+ binaryEvaluator.evaluate(prediction)
+ case "multi" =>
+ Double.NaN
+ }
+ val evaluator = new MulticlassClassificationEvaluator()
+ .setLabelCol("label")
+ .setPredictionCol("prediction")
+ // Accuracy
+ val accuracy: Double = evaluator.setMetricName("accuracy").evaluate(prediction)
+ // Precision
+ val precision: Double = evaluator.setMetricName("precisionByLabel").evaluate(prediction)
+ // Recall
+ val recall: Double = evaluator.setMetricName("recallByLabel").evaluate(prediction)
+ // F1 score
+ val f1Score: Double = evaluator.setMetricName("f1").evaluate(prediction)
+
+ Map(
+ "auc-roc" -> auc,
+ "accuracy" -> accuracy,
+ "precision" -> precision,
+ "recall" -> recall,
+ "f1_score" -> f1Score
+ )
+ }
+
+ private def featureImportance(model: XGBoostClassificationModel,
+ featureNames: Array[String]): Unit = {
+ val featureImportance: Map[String, Double] = model.nativeBooster
+ .getScore(featureNames, importanceType = "total_cover")
+ val totalScore = featureImportance.values.sum
+ featureImportance.toSeq.sortBy(-_._2).foreach {
+ case (feature, importance) =>
+ log.info(s"Feature: $feature, Importance: ${importance * 1.0 / totalScore}")
+ }
+ }
+}
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/MessageConsumer.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/MessageConsumer.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/MessageDetail.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/MessageDetail.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/OffsetsManagedConsumer.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/OffsetsManagedConsumer.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/PIDRateEstimator.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/PIDRateEstimator.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/kafkademo/KafkaConsumerDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/kafkademo/KafkaConsumerDemo.scala
old mode 100644
new mode 100755
index b69398ab..44ea887d
--- a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/kafkademo/KafkaConsumerDemo.scala
+++ b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/kafkademo/KafkaConsumerDemo.scala
@@ -1,18 +1,17 @@
package com.wallace.spark.sparkstreaming.kafkademo
-import java.io.{File, FileInputStream}
-import java.nio.ByteBuffer
-import java.util
-import java.util.Properties
-
import com.wallace.common.Using
import kafka.common.OffsetAndMetadata
import kafka.coordinator.{BaseKey, GroupMetadataManager}
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.{PartitionInfo, TopicPartition}
-import scala.collection.JavaConversions._
+import java.nio.ByteBuffer
+import java.util
+import java.util.Properties
+import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`
import scala.collection.mutable
+import scala.jdk.CollectionConverters.{asScalaBufferConverter, iterableAsScalaIterableConverter, seqAsJavaListConverter, setAsJavaSetConverter}
/**
* Created by 10192057 on 2017/8/4.
@@ -38,7 +37,7 @@ object KafkaConsumerDemo extends Using {
using(createConsumer[ByteBuffer, ByteBuffer]("org.apache.kafka.common.serialization.ByteBufferDeserializer", "org.apache.kafka.common.serialization.ByteBufferDeserializer")) {
consumer =>
val p: TopicPartition = new TopicPartition(topics.head, "wallace_temp".hashCode % 30)
- consumer.assign(Set(p))
+ consumer.assign(Set(p).asJava)
//consumer.seekToBeginning(Set(p))
// consumer.seekToEnd(parts)
// parts.foreach {
@@ -58,11 +57,11 @@ object KafkaConsumerDemo extends Using {
consumer =>
//consumer.assign(List(p0, p1))
val partitions: util.List[PartitionInfo] = consumer.partitionsFor(topics.last)
- val parts: mutable.Seq[TopicPartition] = partitions.map {
+ val parts: mutable.Seq[TopicPartition] = partitions.asScala.map {
p =>
new TopicPartition(p.topic(), p.partition())
}
- consumer.assign(parts)
+ consumer.assign(parts.asJava)
//consumer.seekToBeginning(parts)
// consumer.seekToEnd(parts)
// parts.foreach {
@@ -72,7 +71,7 @@ object KafkaConsumerDemo extends Using {
partitions.map(x => x.toString).foreach(p => log.error("[KafkaConsumerDemo] %s".format(p)))
val record: ConsumerRecords[String, String] = consumer.poll(20480L)
- record.map(x => (x.key(), x.value())).foreach {
+ record.asScala.map(x => (x.key(), x.value())).foreach {
r =>
log.error(s"[KafkaConsumerDemo]\nKey: %s\nValue: %s".format(r._1, r._2))
}
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/kafkademo/KafkaProducerDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/kafkademo/KafkaProducerDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/kafkademo/KafkaSSConsumerDemo.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/sparkstreaming/kafkademo/KafkaSSConsumerDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/statics/LinearFeatureNormalized.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/statics/LinearFeatureNormalized.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/spark/statics/UserStatCounter.scala b/demo/SparkDemo/src/main/scala/com/wallace/spark/statics/UserStatCounter.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/utils/ArgsParser.scala b/demo/SparkDemo/src/main/scala/com/wallace/utils/ArgsParser.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/com/wallace/utils/DateUtils.scala b/demo/SparkDemo/src/main/scala/com/wallace/utils/DateUtils.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/main/scala/org/apache/spark/streaming/flumedemo/SparkStreamingFlumeDemo.scala b/demo/SparkDemo/src/main/scala/org/apache/spark/streaming/flumedemo/SparkStreamingFlumeDemo.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/test/resources/trainingData.csv.gz b/demo/SparkDemo/src/test/resources/trainingData.csv.gz
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/test/scala/com/wallace/UnitSpec.scala b/demo/SparkDemo/src/test/scala/com/wallace/UnitSpec.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/test/scala/com/wallace/common/sshclient/SSHClientUnitSpec.scala b/demo/SparkDemo/src/test/scala/com/wallace/common/sshclient/SSHClientUnitSpec.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/test/scala/com/wallace/common/timeformat/TimeParaUnitSpec.scala b/demo/SparkDemo/src/test/scala/com/wallace/common/timeformat/TimeParaUnitSpec.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/test/scala/com/wallace/snmp/SnmpPDUAnalysisUnitSpec.scala b/demo/SparkDemo/src/test/scala/com/wallace/snmp/SnmpPDUAnalysisUnitSpec.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/test/scala/com/wallace/spark/sparkdemo/rdddemo/RddDemoUnitSpec.scala b/demo/SparkDemo/src/test/scala/com/wallace/spark/sparkdemo/rdddemo/RddDemoUnitSpec.scala
old mode 100644
new mode 100755
diff --git a/demo/SparkDemo/src/test/scala/com/wallace/utils/DateUtilsUnitSpec.scala b/demo/SparkDemo/src/test/scala/com/wallace/utils/DateUtilsUnitSpec.scala
old mode 100644
new mode 100755
diff --git a/demo/demo-common/pom.xml b/demo/demo-common/pom.xml
old mode 100644
new mode 100755
diff --git a/demo/demo-common/src/main/proto/test.proto b/demo/demo-common/src/main/proto/test.proto
old mode 100644
new mode 100755
diff --git a/idf.jserialized b/idf.jserialized
old mode 100644
new mode 100755
diff --git a/lib/loader.jar b/lib/loader.jar
old mode 100644
new mode 100755
diff --git a/pom.xml b/pom.xml
old mode 100644
new mode 100755
index 30f16773..73b39194
--- a/pom.xml
+++ b/pom.xml
@@ -22,17 +22,23 @@
demo/SparkDemo
demo/ScalaDemo
demo/demo-common
+ demo/FlinkDemo
1.8
2.12.8
2.12
- 3.3.3
+ 3.4.4
+ 2.5.23
+ 1.17.0
UTF-8
+ UTF-8
1.8
1.8
${project.basedir}
+ 2.24.3
+ 2.0.7
@@ -74,11 +80,53 @@
${scala.version}
+
com.typesafe.scala-logging
scala-logging_${scala.binary.version}
3.9.2
+
+ org.apache.logging.log4j
+ log4j-api
+ ${log4j2.version}
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${log4j2.version}
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+ ${log4j2.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
+
+ org.slf4j
+ slf4j-reload4j
+ ${slf4j.version}
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
@@ -108,43 +156,37 @@
3.4.0
+
- org.slf4j
- slf4j-api
- 1.7.7
-
-
-
- ch.qos.logback
- logback-core
- 1.2.13
+ com.typesafe.akka
+ akka-actor_${scala.binary.version}
+ ${akka.version}
+
- ch.qos.logback
- logback-classic
- 1.2.13
+ com.typesafe.akka
+ akka-remote_${scala.binary.version}
+ ${akka.version}
-
com.typesafe.akka
- akka-actor_${scala.binary.version}
- 2.5.16
+ akka-slf4j_${scala.binary.version}
+ ${akka.version}
-
com.typesafe.akka
- akka-remote_${scala.binary.version}
- 2.5.16
+ akka-protobuf_${scala.binary.version}
+ ${akka.version}
com.typesafe
config
- 1.2.1
+ 1.4.2
@@ -182,14 +224,6 @@
build210
-
-
- org.slf4j
- slf4j-log4j12
- 1.7.16
-
-
-
org.apache.spark
spark-core_${scala.binary.version}
@@ -337,6 +371,19 @@
1.3
+
+ ml.dmlc
+ xgboost4j_${scala.binary.version}
+ 2.1.3
+
+
+
+
+ ml.dmlc
+ xgboost4j-spark_${scala.binary.version}
+ 2.1.3
+
+
diff --git a/project/Builds.scala b/project/Builds.scala
old mode 100644
new mode 100755
diff --git a/project/Common.scala b/project/Common.scala
old mode 100644
new mode 100755
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
old mode 100644
new mode 100755
diff --git a/project/build.properties b/project/build.properties
old mode 100644
new mode 100755
diff --git a/project/plugins.sbt b/project/plugins.sbt
old mode 100644
new mode 100755
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
old mode 100644
new mode 100755