From fbe370969824e1af07f1a1accada36293e8315ce Mon Sep 17 00:00:00 2001 From: chennuo Date: Sat, 24 Apr 2021 21:39:28 +0800 Subject: [PATCH 1/4] [FLINK-22442][CEP] Using scala api to change the TimeCharacteristic of the PatternStream is invalid --- .../scala/org/apache/flink/cep/scala/PatternStream.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala index 93c4d3023caa5..a082f7b74fefc 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala @@ -36,7 +36,7 @@ import scala.collection.Map * @param jPatternStream Underlying pattern stream from Java API * @tparam T Type of the events */ -class PatternStream[T](jPatternStream: JPatternStream[T]) { +class PatternStream[T](var jPatternStream: JPatternStream[T]) { private[flink] def wrappedPatternStream = jPatternStream @@ -447,17 +447,17 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { } def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T] = { - jPatternStream.sideOutputLateData(lateDataOutputTag) + jPatternStream = jPatternStream.sideOutputLateData(lateDataOutputTag) this } def inProcessingTime(): PatternStream[T] = { - jPatternStream.inProcessingTime() + jPatternStream = jPatternStream.inProcessingTime() this } def inEventTime(): PatternStream[T] = { - jPatternStream.inEventTime() + jPatternStream = jPatternStream.inEventTime() this } } From 17027e83726f000af37aeee27c05935244528f3e Mon Sep 17 00:00:00 2001 From: chennuo Date: Sun, 25 Apr 2021 08:50:35 +0800 Subject: [PATCH 2/4] [FLINK-22442][CEP] Using scala api to change the TimeCharacteristic of the PatternStream is invalid --- .../scala/CEPScalaApiPatternStreamTest.scala | 67 +++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala new file mode 100644 index 0000000000000..c796c3b5c81a3 --- /dev/null +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala @@ -0,0 +1,67 @@ +package org.apache.flink.cep.scala + +import java.lang.reflect.Field + +import org.apache.flink.cep +import org.apache.flink.cep.pattern.Pattern +import org.apache.flink.cep.pattern.conditions.SimpleCondition +import org.apache.flink.streaming.api.datastream.DataStreamSource +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.junit.Assert.assertEquals +import org.junit.Test + +class CEPScalaApiPatternStreamTest { + /** + * These tests simply check that use the Scala API to update the TimeCharacteristic of the PatternStream . + */ + + @Test + def updateCepTimeCharacteristicByScalaApi(): Unit = { + + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + val input: DataStreamSource[Event] = env.fromElements(Event(1, "barfoo", 1.0), Event(8, "end", 1.0)) + val pattern: Pattern[Event, Event] = Pattern.begin("start").where(new SimpleCondition[Event]() { + override def filter(value: Event): Boolean = value.name == "start" + }) + + val jestream: cep.PatternStream[Event] = org.apache.flink.cep.CEP.pattern(input, pattern) + + //get org.apache.flink.cep.scala.PatternStream + val sePstream = new PatternStream[Event](jestream) + + //get TimeBehaviour + val time1: AnyRef = getTimeBehaviourFromScalaPatternStream(sePstream) + + assertEquals(time1.toString, "EventTime") + + //change TimeCharacteristic use scala api + val sPstream: PatternStream[Event] = sePstream.inProcessingTime() + + //get TimeBehaviour + val time2: AnyRef = getTimeBehaviourFromScalaPatternStream(sPstream) + + assertEquals(time2.toString, "ProcessingTime") + + + } + + def getTimeBehaviourFromScalaPatternStream(seStream: org.apache.flink.cep.scala.PatternStream[Event]) = { + val field: Field = seStream.getClass.getDeclaredField("jPatternStream") + field.setAccessible(true) + val JPattern: AnyRef = field.get(seStream) + val stream: cep.PatternStream[Event] = JPattern.asInstanceOf[cep.PatternStream[Event]] + getTimeBehaviourFromJavaPatternStream(stream) + } + + def getTimeBehaviourFromJavaPatternStream(jeStream: org.apache.flink.cep.PatternStream[Event])={ + val builder: Field = jeStream.getClass.getDeclaredField("builder") + builder.setAccessible(true) + val o: AnyRef = builder.get(jeStream) + val timeBehaviour: Field = o.getClass.getDeclaredField("timeBehaviour") + timeBehaviour.setAccessible(true) + timeBehaviour.get(o) + } + + + case class Event(id:Int ,name:String ,price:Double) +} From 984327897d7e0b57ea366ad6ecc8369e94f9ea6f Mon Sep 17 00:00:00 2001 From: chennuo Date: Wed, 28 Apr 2021 00:11:47 +0800 Subject: [PATCH 3/4] [FLINK-22442][CEP] Using scala api to change the TimeCharacteristic of the PatternStream is invalid --- .../flink/cep/scala/PatternStream.scala | 13 ++-- .../scala/CEPScalaApiPatternStreamTest.scala | 65 +++++-------------- 2 files changed, 22 insertions(+), 56 deletions(-) diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala index a082f7b74fefc..ebc1c27c8ea1f 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala @@ -36,7 +36,7 @@ import scala.collection.Map * @param jPatternStream Underlying pattern stream from Java API * @tparam T Type of the events */ -class PatternStream[T](var jPatternStream: JPatternStream[T]) { +class PatternStream[T](jPatternStream: JPatternStream[T]) { private[flink] def wrappedPatternStream = jPatternStream @@ -447,18 +447,15 @@ class PatternStream[T](var jPatternStream: JPatternStream[T]) { } def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T] = { - jPatternStream = jPatternStream.sideOutputLateData(lateDataOutputTag) - this - } + PatternStream[T](jPatternStream.sideOutputLateData(lateDataOutputTag)) + } def inProcessingTime(): PatternStream[T] = { - jPatternStream = jPatternStream.inProcessingTime() - this + PatternStream[T](jPatternStream.inProcessingTime()) } def inEventTime(): PatternStream[T] = { - jPatternStream = jPatternStream.inEventTime() - this + PatternStream[T](jPatternStream.inEventTime()) } } diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala index c796c3b5c81a3..36c6887503c35 100644 --- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala +++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala @@ -1,67 +1,36 @@ package org.apache.flink.cep.scala -import java.lang.reflect.Field - import org.apache.flink.cep -import org.apache.flink.cep.pattern.Pattern -import org.apache.flink.cep.pattern.conditions.SimpleCondition -import org.apache.flink.streaming.api.datastream.DataStreamSource -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.junit.Assert.assertEquals +import org.apache.flink.cep.scala.pattern.Pattern +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.junit.Test class CEPScalaApiPatternStreamTest { /** - * These tests simply check that use the Scala API to update the TimeCharacteristic of the PatternStream . + * These tests simply check that use the Scala API to update the Characteristic of the PatternStream . */ @Test - def updateCepTimeCharacteristicByScalaApi(): Unit = { - + def testUpdatePatternStreamCharacteristicByScalaApi(): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment - val input: DataStreamSource[Event] = env.fromElements(Event(1, "barfoo", 1.0), Event(8, "end", 1.0)) - val pattern: Pattern[Event, Event] = Pattern.begin("start").where(new SimpleCondition[Event]() { - override def filter(value: Event): Boolean = value.name == "start" - }) - - val jestream: cep.PatternStream[Event] = org.apache.flink.cep.CEP.pattern(input, pattern) + val dummyDataStream: DataStream[(Int, Int)] = env.fromElements() + val pattern: Pattern[(Int, Int), (Int, Int)] = Pattern.begin[(Int, Int)]("dummy") - //get org.apache.flink.cep.scala.PatternStream - val sePstream = new PatternStream[Event](jestream) + val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, pattern) + val jStream: cep.PatternStream[(Int, Int)] = pStream.wrappedPatternStream - //get TimeBehaviour - val time1: AnyRef = getTimeBehaviourFromScalaPatternStream(sePstream) + assert(pStream.wrappedPatternStream == jStream) - assertEquals(time1.toString, "EventTime") + //change Characteristic use scala api + val pStream1: PatternStream[(Int, Int)] = pStream.inProcessingTime() + assert(pStream1.wrappedPatternStream != jStream) - //change TimeCharacteristic use scala api - val sPstream: PatternStream[Event] = sePstream.inProcessingTime() - - //get TimeBehaviour - val time2: AnyRef = getTimeBehaviourFromScalaPatternStream(sPstream) - - assertEquals(time2.toString, "ProcessingTime") + val pStream2: PatternStream[(Int, Int)] = pStream.inEventTime() + assert(pStream2.wrappedPatternStream != jStream) + val pStream3: PatternStream[(Int, Int)] = pStream.sideOutputLateData(new OutputTag[(Int, Int)]("dummy")) + assert(pStream3.wrappedPatternStream != jStream) } - - def getTimeBehaviourFromScalaPatternStream(seStream: org.apache.flink.cep.scala.PatternStream[Event]) = { - val field: Field = seStream.getClass.getDeclaredField("jPatternStream") - field.setAccessible(true) - val JPattern: AnyRef = field.get(seStream) - val stream: cep.PatternStream[Event] = JPattern.asInstanceOf[cep.PatternStream[Event]] - getTimeBehaviourFromJavaPatternStream(stream) - } - - def getTimeBehaviourFromJavaPatternStream(jeStream: org.apache.flink.cep.PatternStream[Event])={ - val builder: Field = jeStream.getClass.getDeclaredField("builder") - builder.setAccessible(true) - val o: AnyRef = builder.get(jeStream) - val timeBehaviour: Field = o.getClass.getDeclaredField("timeBehaviour") - timeBehaviour.setAccessible(true) - timeBehaviour.get(o) - } - - - case class Event(id:Int ,name:String ,price:Double) } From 2aa2e313fc30f618bba7919d3d15065ab9782909 Mon Sep 17 00:00:00 2001 From: chennuo Date: Sun, 2 May 2021 19:45:01 +0800 Subject: [PATCH 4/4] [FLINK-22442][CEP] Using scala api to change the TimeCharacteristic of the PatternStream is invalid --- .../scala/CEPScalaApiPatternStreamTest.scala | 36 ------------------- 1 file changed, 36 deletions(-) delete mode 100644 flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala deleted file mode 100644 index 36c6887503c35..0000000000000 --- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala +++ /dev/null @@ -1,36 +0,0 @@ -package org.apache.flink.cep.scala - -import org.apache.flink.cep -import org.apache.flink.cep.scala.pattern.Pattern -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} -import org.junit.Test - -class CEPScalaApiPatternStreamTest { - /** - * These tests simply check that use the Scala API to update the Characteristic of the PatternStream . - */ - - @Test - def testUpdatePatternStreamCharacteristicByScalaApi(): Unit = { - val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment - val dummyDataStream: DataStream[(Int, Int)] = env.fromElements() - val pattern: Pattern[(Int, Int), (Int, Int)] = Pattern.begin[(Int, Int)]("dummy") - - val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, pattern) - val jStream: cep.PatternStream[(Int, Int)] = pStream.wrappedPatternStream - - assert(pStream.wrappedPatternStream == jStream) - - //change Characteristic use scala api - val pStream1: PatternStream[(Int, Int)] = pStream.inProcessingTime() - assert(pStream1.wrappedPatternStream != jStream) - - val pStream2: PatternStream[(Int, Int)] = pStream.inEventTime() - assert(pStream2.wrappedPatternStream != jStream) - - val pStream3: PatternStream[(Int, Int)] = pStream.sideOutputLateData(new OutputTag[(Int, Int)]("dummy")) - assert(pStream3.wrappedPatternStream != jStream) - - } -}