Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#28] Updated flink jpmml examples #39

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions flink-jpmml-examples/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Running Examples
## Running simple examples
This module contains the examples running simple predictions from an iris Source.
The source emits the following data:
```
Expand All @@ -25,5 +25,48 @@ Either you can employ the _quick_ predictor:
./path/to/bin/flink run -c io.radicalbit.examples.QuickEvaluateKmeans <path/to/project/root>/flink-jpmml/flink-jpmml-examples/target/scala-2.x/flink-jpmml-examples-assembly-0.6.0-SNAPSHOT.jar --model path/to/pmml/model.pmml --output /path/to/output
```

## Fault-Tolerance

Both above jobs log out predictions to output path.
_if you like testing the fault-tolerance behaviour of the operator you can run a `CheckpointEvaluate` example._

In order to do that:

1) Create a socket in your local machine:
```
nc -l -k 9999
```

2) Run the flink-cluster( [Flink 1.3.2](http://flink.apache.org/downloads.html#binaries) is required ):
```
./path/to/flink-1.3.2/start-cluster.sh
```

3) run the flink job:
```
./path/to/bin/flink run -c io.radicalbit.examples.CheckpointEvaluate <path/to/project/root>/flink-jpmml/flink-jpmml-examples/target/scala-2.x/flink-jpmml-examples-assembly-0.6.0-SNAPSHOT.jar --output /path/to/output
```

4) Send the model via socket, in this case you can use the models in `flink-jpmml-assets`:
```
<path/to/project/root>/flink-jpmml/flink-jpmml-assets/resources/kmeans.xml
<path/to/project/root>/flink-jpmml/flink-jpmml-assets/resources/kmeans_nooutput.xml

```

6) Stop the task manager:
```
./path/to/flink-1.3.2/bin/taskmanager.sh stop
```
_you can see the job's status in Flink UI on http://localhost:8081_


7) Restart the task manager:
```
./path/to/flink-1.3.2/bin/taskmanager.sh start
```

_At this point, when the job is restarted, there's no need to re-send the models info by control stream because you should see the models from the last checkpoint_



Note: Both above jobs log out predictions to output path.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (C) 2017 Radicalbit
*
* This file is part of flink-JPMML
*
* flink-JPMML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* flink-JPMML is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.examples

import io.radicalbit.examples.model.Utils
import io.radicalbit.examples.models.Iris
import io.radicalbit.examples.util.DynamicParams
import io.radicalbit.flink.pmml.scala.api.PmmlModel
import io.radicalbit.flink.pmml.scala.models.control.AddMessage
import io.radicalbit.flink.pmml.scala.models.core.ModelId
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._

import scala.util.Random

object CheckpointEvaluate {

private final lazy val idSet = Set(
"4897c9f4-5226-43c7-8f2d-f9fd388cf2bc",
"5f919c52-2ef8-4ff2-94b2-2e64bb85005e"
)

def main(args: Array[String]): Unit = {
val parameterTool = ParameterTool.fromArgs(args)

val outputPath = parameterTool.getRequired("output")

val env = StreamExecutionEnvironment.getExecutionEnvironment

val parameters = DynamicParams.fromParameterTool(parameterTool)

//Enable checkpoint to keep control stream
env.enableCheckpointing(parameters.ckpInterval, CheckpointingMode.EXACTLY_ONCE)

//Create source for Iris data
val eventStream = env.addSource((sc: SourceContext[Iris]) => {
val NumberOfParameters = 4
lazy val RandomGenerator = scala.util.Random
val RandomMin = 0.2
val RandomMax = 6.0

@inline def truncateDouble(n: Double) = (math floor n * 10) / 10

while (true) {
def randomVal = RandomMin + (RandomMax - RandomMin) * RandomGenerator.nextDouble()
val dataForIris = Seq.fill(NumberOfParameters)(truncateDouble(randomVal))
val iris =
Iris(idSet.toVector(Random.nextInt(idSet.size)) + ModelId.separatorSymbol + "1",
dataForIris.head,
dataForIris(1),
dataForIris(2),
dataForIris.last,
Utils.now())
sc.collect(iris)
Thread.sleep(1000)
}
})

//Create a stream for socket
val controlStream = env
.socketTextStream("localhost", 9999)
.map(path => AddMessage(idSet.toVector(Random.nextInt(idSet.size)), 1L, path, Utils.now()))

/*
* Make a prediction withSupportStream that represents the stream from the socket
* evaluate the model with model upload in ControlStream
*
* */
val predictions = eventStream
.withSupportStream(controlStream)
.evaluate { (event: Iris, model: PmmlModel) =>
val vectorized = event.toVector
val prediction = model.predict(vectorized, Some(0.0))
(event, prediction.value)
}

predictions
.writeAsText(outputPath, WriteMode.OVERWRITE)

env.execute("Checkpoint Evaluate Example")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2017 Radicalbit
*
* This file is part of flink-JPMML
*
* flink-JPMML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* flink-JPMML is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.examples

import io.radicalbit.examples.models.Iris
import io.radicalbit.examples.sources.{ControlSource, IrisSource}
import io.radicalbit.examples.util.DynamicParams
import io.radicalbit.flink.pmml.scala._
import io.radicalbit.flink.pmml.scala.api.PmmlModel
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala._

/**
* Toy Job about Stateful Dynamic Model Serving:
* The job owns two input streams:
* - event stream: The main input events stream
* - control stream: Control messages about model repository server current state
*
*/
object DynamicEvaluateKmeans {

def main(args: Array[String]): Unit = {

val parameterTool = ParameterTool.fromArgs(args)

val parameters = DynamicParams.fromParameterTool(parameterTool)

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.enableCheckpointing(parameters.ckpInterval, CheckpointingMode.EXACTLY_ONCE)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, if you use parameter here, then bring this best practice also on CheckpointEvaluate


val eventStream = IrisSource.irisSource(env, Option(parameters.availableIds))
val controlStream =
ControlSource.generateStream(env, parameters.genPolicy, parameters.pathAndIds, parameters.ctrlGenInterval)

val predictions = eventStream
.withSupportStream(controlStream)
.evaluate { (event: Iris, model: PmmlModel) =>
val vectorized = event.toVector
val prediction = model.predict(vectorized, Some(0.0))
(event, prediction.value)
}

predictions.writeAsText(parameters.outputPath)

env.execute("Dynamic Clustering Example")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

package io.radicalbit.examples

import org.apache.flink.streaming.api.scala._
import io.radicalbit.examples.model.IrisSource._
import io.radicalbit.examples.sources.IrisSource._
import io.radicalbit.examples.util.EnsureParameters
import io.radicalbit.flink.pmml.scala.api.reader.ModelReader
import io.radicalbit.flink.pmml.scala._
import io.radicalbit.flink.pmml.scala.api.reader.ModelReader
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object EvaluateKmeans extends EnsureParameters {

Expand All @@ -36,19 +36,18 @@ object EvaluateKmeans extends EnsureParameters {
val (inputModel, output) = ensureParams(params)

//Read data from custom iris source
val irisDataStream = irisSource(env)
val irisDataStream = irisSource(env, None)

//Load model
val modelReader = ModelReader(inputModel)

//Using evaluate operator
val prediction = irisDataStream.evaluate(modelReader) {
//Iris data and modelReader instance
case (event, model) => {
case (event, model) =>
val vectorized = event.toVector
val prediction = model.predict(vectorized, Some(0.0))
(event, prediction.value.getOrElse(-1.0))
}
}

prediction.writeAsText(output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

package io.radicalbit.examples

import org.apache.flink.streaming.api.scala._
import io.radicalbit.examples.model.IrisSource._
import io.radicalbit.examples.sources.IrisSource._
import io.radicalbit.examples.util.EnsureParameters
import io.radicalbit.flink.pmml.scala._
import io.radicalbit.flink.pmml.scala.api.reader.ModelReader
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object QuickEvaluateKmeans extends EnsureParameters {

Expand All @@ -36,7 +36,7 @@ object QuickEvaluateKmeans extends EnsureParameters {
val (inputModel, output) = ensureParams(params)

//Read data from custom iris source
val irisDataStream = irisSource(env)
val irisDataStream = irisSource(env, None)

//Convert iris to DenseVector
val irisToVector = irisDataStream.map(_.toVector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.examples.model
package io.radicalbit.examples.models

import io.radicalbit.flink.pmml.scala.models.input.BaseEvent
import org.apache.flink.ml.math.DenseVector

case class Iris(sepalLength: Double, sepalWidth: Double, petalLength: Double, petalWidth: Double) {
case class Iris(modelId: String,
sepalLength: Double,
sepalWidth: Double,
petalLength: Double,
petalWidth: Double,
occurredOn: Long)
extends BaseEvent {
def toVector = DenseVector(sepalLength, sepalWidth, petalLength, petalWidth)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2017 Radicalbit
*
* This file is part of flink-JPMML
*
* flink-JPMML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* flink-JPMML is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.examples.model

import java.util.UUID

import io.radicalbit.flink.pmml.scala.models.core.ModelId

object Utils {

final val modelVersion = 1.toString

def retrieveMappingIdPath(modelPaths: Seq[String]): Map[String, String] =
modelPaths.map(path => (UUID.randomUUID().toString, path)).toMap

def retrieveAvailableId(mappingIdPath: Map[String, String]): Seq[String] =
mappingIdPath.keys.map(name => name + ModelId.separatorSymbol + modelVersion).toSeq

def now(): Long = System.currentTimeMillis()
}
Loading