-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#28] Updated flink jpmml examples #39
Changes from 15 commits
3c5e575
c877d86
971d995
b25d59c
a0d441c
358a3cd
f9d31e3
549782c
c8d653c
9dc1035
0e4d616
5cbeb9b
4ddb6de
d7c882e
a89bc51
f844078
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
## Running Examples | ||
## Running simple examples | ||
This module contains the examples running simple predictions from an iris Source. | ||
The source emits the following data: | ||
``` | ||
|
@@ -25,5 +25,48 @@ Either you can employ the _quick_ predictor: | |
./path/to/bin/flink run -c io.radicalbit.examples.QuickEvaluateKmeans <path/to/project/root>/flink-jpmml/flink-jpmml-examples/target/scala-2.x/flink-jpmml-examples-assembly-0.6.0-SNAPSHOT.jar --model path/to/pmml/model.pmml --output /path/to/output | ||
``` | ||
|
||
## Fault-Tolerance | ||
|
||
Both above jobs log out predictions to output path. | ||
_if you like testing a fault-tolerance of the operator you can run a `CheckpointEvaluate` example._ | ||
|
||
In order to do that: | ||
|
||
1) Create a socket in your local machine: | ||
``` | ||
nc -l -k 9999 | ||
``` | ||
|
||
2) Run the flink-cluster( [Flink 1.3.2](http://flink.apache.org/downloads.html#binaries) is required ): | ||
``` | ||
./path/to/flink-1.3.2/start-cluster.sh | ||
``` | ||
|
||
3) run the flink job: | ||
``` | ||
./path/to/bin/flink run -c io.radicalbit.examples.CheckpointEvaluate <path/to/project/root>/flink-jpmml/flink-jpmml-examples/target/scala-2.x/flink-jpmml-examples-assembly-0.6.0-SNAPSHOT.jar --output /path/to/output | ||
``` | ||
|
||
4) Put the model via socket, in this case you can use the models in `flink-jpmml-assets`: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put --> Send |
||
``` | ||
<path/to/project/root>/flink-jpmml/flink-jpmml-assets/resources/kmeans.xml | ||
<path/to/project/root>/flink-jpmml/flink-jpmml-assets/resources/kmeans_nooutput.xml | ||
|
||
``` | ||
|
||
6) Stop the task manager: | ||
``` | ||
./path/to/flink-1.3.2/bin/taskmanager.sh stop | ||
``` | ||
_you can show the job's status in Flink UI on http://localhost:8081_ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can see |
||
|
||
|
||
7) Restart the task manager: | ||
``` | ||
./path/to/flink-1.3.2/bin/taskmanager.sh start | ||
``` | ||
|
||
_At this point, when the job is restarted, there's no need to reload the models in control stream because you should see the models from the last checkpoint_ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there's no need to re-send the models info by control stream |
||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ... And then? |
||
|
||
Note: Both above jobs log out predictions to output path. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
/* | ||
* Copyright (C) 2017 Radicalbit | ||
* | ||
* This file is part of flink-JPMML | ||
* | ||
* flink-JPMML is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU Affero General Public License as | ||
* published by the Free Software Foundation, either version 3 of the | ||
* License, or (at your option) any later version. | ||
* | ||
* flink-JPMML is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU Affero General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU Affero General Public License | ||
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
package io.radicalbit.examples | ||
|
||
import io.radicalbit.examples.model.Utils | ||
import io.radicalbit.examples.models.Iris | ||
import io.radicalbit.examples.util.DynamicParams | ||
import io.radicalbit.flink.pmml.scala.api.PmmlModel | ||
import io.radicalbit.flink.pmml.scala.models.control.AddMessage | ||
import io.radicalbit.flink.pmml.scala.models.core.ModelId | ||
import org.apache.flink.api.java.utils.ParameterTool | ||
import org.apache.flink.core.fs.FileSystem.WriteMode | ||
import org.apache.flink.streaming.api.CheckpointingMode | ||
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext | ||
import org.apache.flink.streaming.api.scala._ | ||
|
||
import scala.util.Random | ||
|
||
object CheckpointEvaluate { | ||
|
||
private final lazy val idSet = Set( | ||
"4897c9f4-5226-43c7-8f2d-f9fd388cf2bc", | ||
"5f919c52-2ef8-4ff2-94b2-2e64bb85005e" | ||
) | ||
|
||
def main(args: Array[String]): Unit = { | ||
val parameterTool = ParameterTool.fromArgs(args) | ||
|
||
val outputPath = parameterTool.getRequired("output") | ||
|
||
val env = StreamExecutionEnvironment.getExecutionEnvironment | ||
|
||
val parameters = DynamicParams.fromParameterTool(parameterTool) | ||
|
||
//Enable checkpoint to keep control stream | ||
env.enableCheckpointing(parameters.ckpInterval, CheckpointingMode.EXACTLY_ONCE) | ||
|
||
//Create source for Iris data | ||
val eventStream = env.addSource((sc: SourceContext[Iris]) => { | ||
val NumberOfParameters = 4 | ||
lazy val RandomGenerator = scala.util.Random | ||
val RandomMin = 0.2 | ||
val RandomMax = 6.0 | ||
|
||
@inline def truncateDouble(n: Double) = (math floor n * 10) / 10 | ||
|
||
while (true) { | ||
def randomVal = RandomMin + (RandomMax - RandomMin) * RandomGenerator.nextDouble() | ||
val dataForIris = Seq.fill(NumberOfParameters)(truncateDouble(randomVal)) | ||
val iris = | ||
Iris(idSet.toVector(Random.nextInt(idSet.size)) + ModelId.separatorSymbol + "1", | ||
dataForIris.head, | ||
dataForIris(1), | ||
dataForIris(2), | ||
dataForIris.last, | ||
Utils.now()) | ||
sc.collect(iris) | ||
Thread.sleep(1000) | ||
} | ||
}) | ||
|
||
//Create a stream for socket | ||
val controlStream = env | ||
.socketTextStream("localhost", 9999) | ||
.map(path => AddMessage(idSet.toVector(Random.nextInt(idSet.size)), 1L, path, Utils.now())) | ||
|
||
/* | ||
* Make a prediction withSupportStream that represents the stream from the socket | ||
* evaluate the model with model upload in ControlStream | ||
* */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. breakline for closing comments |
||
val predictions = eventStream | ||
.withSupportStream(controlStream) | ||
.evaluate { (event: Iris, model: PmmlModel) => | ||
val vectorized = event.toVector | ||
val prediction = model.predict(vectorized, Some(0.0)) | ||
(event, prediction.value) | ||
} | ||
|
||
predictions | ||
.writeAsText(outputPath, WriteMode.OVERWRITE) | ||
|
||
env.execute("Checkpoint Evaluate Example") | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
/* | ||
* Copyright (C) 2017 Radicalbit | ||
* | ||
* This file is part of flink-JPMML | ||
* | ||
* flink-JPMML is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU Affero General Public License as | ||
* published by the Free Software Foundation, either version 3 of the | ||
* License, or (at your option) any later version. | ||
* | ||
* flink-JPMML is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU Affero General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU Affero General Public License | ||
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
package io.radicalbit.examples | ||
|
||
import io.radicalbit.examples.models.Iris | ||
import io.radicalbit.examples.sources.{ControlSource, IrisSource} | ||
import io.radicalbit.examples.util.DynamicParams | ||
import io.radicalbit.flink.pmml.scala._ | ||
import io.radicalbit.flink.pmml.scala.api.PmmlModel | ||
import org.apache.flink.api.java.utils.ParameterTool | ||
import org.apache.flink.streaming.api.CheckpointingMode | ||
import org.apache.flink.streaming.api.scala._ | ||
|
||
/** | ||
* Toy Job about Stateful Dynamic Model Serving: | ||
* The job owns two input streams: | ||
* - event stream: The main input events stream | ||
* - control stream: Control messages about model repository server current state | ||
* | ||
*/ | ||
object DynamicEvaluateKmeans { | ||
|
||
def main(args: Array[String]): Unit = { | ||
|
||
val parameterTool = ParameterTool.fromArgs(args) | ||
|
||
val parameters = DynamicParams.fromParameterTool(parameterTool) | ||
|
||
val env = StreamExecutionEnvironment.getExecutionEnvironment | ||
|
||
env.enableCheckpointing(parameters.ckpInterval, CheckpointingMode.EXACTLY_ONCE) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please, if you use parameter here, then bring this best practice also on |
||
|
||
val eventStream = IrisSource.irisSource(env, Option(parameters.availableIds)) | ||
val controlStream = | ||
ControlSource.generateStream(env, parameters.genPolicy, parameters.pathAndIds, parameters.ctrlGenInterval) | ||
|
||
val predictions = eventStream | ||
.withSupportStream(controlStream) | ||
.evaluate { (event: Iris, model: PmmlModel) => | ||
val vectorized = event.toVector | ||
val prediction = model.predict(vectorized, Some(0.0)) | ||
(event, prediction.value) | ||
} | ||
|
||
predictions.writeAsText(parameters.outputPath) | ||
|
||
env.execute("Dynamic Clustering Example") | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright (C) 2017 Radicalbit | ||
* | ||
* This file is part of flink-JPMML | ||
* | ||
* flink-JPMML is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU Affero General Public License as | ||
* published by the Free Software Foundation, either version 3 of the | ||
* License, or (at your option) any later version. | ||
* | ||
* flink-JPMML is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU Affero General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU Affero General Public License | ||
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
package io.radicalbit.examples.model | ||
|
||
import java.util.UUID | ||
|
||
import io.radicalbit.flink.pmml.scala.models.core.ModelId | ||
|
||
object Utils { | ||
|
||
final val modelVersion = 1.toString | ||
|
||
def retrieveMappingIdPath(modelPaths: Seq[String]): Map[String, String] = | ||
modelPaths.map(path => (UUID.randomUUID().toString, path)).toMap | ||
|
||
def retrieveAvailableId(mappingIdPath: Map[String, String]): Seq[String] = | ||
mappingIdPath.keys.map(name => name + ModelId.separatorSymbol + modelVersion).toSeq | ||
|
||
def now(): Long = System.currentTimeMillis() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testing the fault-tolerance behaviour of the operator