Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#28] Updated flink jpmml examples #39

Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (C) 2017 Radicalbit
*
* This file is part of flink-JPMML
*
* flink-JPMML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* flink-JPMML is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.examples

import io.radicalbit.examples.model.Utils
import io.radicalbit.examples.models.Iris
import io.radicalbit.flink.pmml.scala.api.PmmlModel
import io.radicalbit.flink.pmml.scala.models.control.AddMessage
import io.radicalbit.flink.pmml.scala.models.core.ModelId
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala._

import scala.util.Random

object CheckpointEvaluate {

private final lazy val idSet = Set(
"4897c9f4-5226-43c7-8f2d-f9fd388cf2bc",
"5f919c52-2ef8-4ff2-94b2-2e64bb85005e"
)

def main(args: Array[String]): Unit = {
val parameterTool = ParameterTool.fromArgs(args)

val outputPath = parameterTool.getRequired("output")

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specify long please


val eventStream = env.addSource((sc: SourceContext[Iris]) => {
val NumberOfParameters = 4
lazy val RandomGenerator = scala.util.Random
val RandomMin = 0.2
val RandomMax = 6.0

@inline def truncateDouble(n: Double) = (math floor n * 10) / 10

while (true) {
def randomVal = RandomMin + (RandomMax - RandomMin) * RandomGenerator.nextDouble()
val dataForIris = Seq.fill(NumberOfParameters)(truncateDouble(randomVal))
val iris =
Iris(idSet.toVector(Random.nextInt(idSet.size)) + ModelId.separatorSymbol + "1",
dataForIris.head,
dataForIris(1),
dataForIris(2),
dataForIris.last,
Utils.now())
sc.collect(iris)
Thread.sleep(1000)
}
})

val controlStream = env
.socketTextStream("localhost", 9999)
.map(path => AddMessage(idSet.toVector(Random.nextInt(idSet.size)), 1L, path, Utils.now()))

val predictions = eventStream
.withSupportStream(controlStream)
.evaluate { (event: Iris, model: PmmlModel) =>
val vectorized = event.toVector
val prediction = model.predict(vectorized, Some(0.0))
(event, prediction.value)
}

predictions
.writeAsText(outputPath, WriteMode.OVERWRITE)
.setParallelism(2)

env.execute("Checkpoint Evaluate Example")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2017 Radicalbit
*
* This file is part of flink-JPMML
*
* flink-JPMML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* flink-JPMML is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.examples

import io.radicalbit.examples.models.Iris
import io.radicalbit.examples.sources.{ControlSource, IrisSource}
import io.radicalbit.examples.util.DynamicParams
import io.radicalbit.flink.pmml.scala._
import io.radicalbit.flink.pmml.scala.api.PmmlModel
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala._

/**
* Toy Job about Stateful Dynamic Model Serving:
* The job owns two input streams:
* - event stream: The main input events stream
* - control stream: Control messages about model repository server current state
*
*/
object DynamicEvaluateKmeans {

def main(args: Array[String]): Unit = {

val parameterTool = ParameterTool.fromArgs(args)

val parameters = DynamicParams.fromParameterTool(parameterTool)

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.enableCheckpointing(parameters.ckpInterval, CheckpointingMode.EXACTLY_ONCE)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, if you use parameter here, then bring this best practice also on CheckpointEvaluate


val eventStream = IrisSource.irisSource(env, Option(parameters.availableIds))
val controlStream =
ControlSource.generateStream(env, parameters.genPolicy, parameters.pathAndIds, parameters.ctrlGenInterval)

val predictions = eventStream
.withSupportStream(controlStream)
.evaluate { (event: Iris, model: PmmlModel) =>
val vectorized = event.toVector
val prediction = model.predict(vectorized, Some(0.0))
(event, prediction.value)
}

predictions.writeAsText(parameters.outputPath)

env.execute("Dynamic Clustering Example")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

package io.radicalbit.examples

import org.apache.flink.streaming.api.scala._
import io.radicalbit.examples.model.IrisSource._
import io.radicalbit.examples.sources.IrisSource._
import io.radicalbit.examples.util.EnsureParameters
import io.radicalbit.flink.pmml.scala.api.reader.ModelReader
import io.radicalbit.flink.pmml.scala._
import io.radicalbit.flink.pmml.scala.api.reader.ModelReader
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object EvaluateKmeans extends EnsureParameters {

Expand All @@ -36,19 +36,18 @@ object EvaluateKmeans extends EnsureParameters {
val (inputModel, output) = ensureParams(params)

//Read data from custom iris source
val irisDataStream = irisSource(env)
val irisDataStream = irisSource(env, None)

//Load model
val modelReader = ModelReader(inputModel)

//Using evaluate operator
val prediction = irisDataStream.evaluate(modelReader) {
//Iris data and modelReader instance
case (event, model) => {
case (event, model) =>
val vectorized = event.toVector
val prediction = model.predict(vectorized, Some(0.0))
(event, prediction.value.getOrElse(-1.0))
}
}

prediction.writeAsText(output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

package io.radicalbit.examples

import org.apache.flink.streaming.api.scala._
import io.radicalbit.examples.model.IrisSource._
import io.radicalbit.examples.sources.IrisSource._
import io.radicalbit.examples.util.EnsureParameters
import io.radicalbit.flink.pmml.scala._
import io.radicalbit.flink.pmml.scala.api.reader.ModelReader
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object QuickEvaluateKmeans extends EnsureParameters {

Expand All @@ -36,7 +36,7 @@ object QuickEvaluateKmeans extends EnsureParameters {
val (inputModel, output) = ensureParams(params)

//Read data from custom iris source
val irisDataStream = irisSource(env)
val irisDataStream = irisSource(env, None)

//Convert iris to DenseVector
val irisToVector = irisDataStream.map(_.toVector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.examples.model
package io.radicalbit.examples.models

import io.radicalbit.flink.pmml.scala.models.input.BaseEvent
import org.apache.flink.ml.math.DenseVector

case class Iris(sepalLength: Double, sepalWidth: Double, petalLength: Double, petalWidth: Double) {
case class Iris(modelId: String,
sepalLength: Double,
sepalWidth: Double,
petalLength: Double,
petalWidth: Double,
occurredOn: Long)
extends BaseEvent {
def toVector = DenseVector(sepalLength, sepalWidth, petalLength, petalWidth)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (C) 2017 Radicalbit
*
* This file is part of flink-JPMML
*
* flink-JPMML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* flink-JPMML is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.examples.model

import java.util.UUID

import io.radicalbit.flink.pmml.scala.models.core.ModelId

object Utils {

final val modelVersion = 1.toString

def retrieveMappingIdPath(modelPaths: Seq[String]): Map[String, String] =
modelPaths.map(path => (UUID.randomUUID().toString, path)).toMap

def retrieveAvailableId(mappingIdPath: Map[String, String]): Seq[String] =
mappingIdPath.keys.map(name => name + ModelId.separatorSymbol + modelVersion).toSeq

def now(): Long = System.currentTimeMillis()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (C) 2017 Radicalbit
*
* This file is part of flink-JPMML
*
* flink-JPMML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* flink-JPMML is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.examples.sources

import io.radicalbit.flink.pmml.scala.models.control.ServingMessage
import org.apache.flink.streaming.api.scala._

object ControlSource {

sealed trait Mode

case object Loop extends Mode
case object Finite extends Mode
case object Random extends Mode

val procedures = Seq(Loop, Finite, Random)

/**
* Generation control stream method: it has three main generation logic (called _policies_):
* - INFINITE:
* - Loop Generation : The events are generating by infinitely looping over the paths' sequence
* - Random Generation : The events are randomly generated by picking from the paths' sequence
* - FINITE:
* - Finite Generation : The events are generating 1-to-1 with the paths' sequence
* @param env The Stream execution environment
* @param mode Generation policy
* @param mappingIdPath The list of the id. and path of the models that users want to employ into generation
* @param maxInterval Max Interval between controls' generation
* @return
*/
def generateStream(env: StreamExecutionEnvironment,
mode: Mode,
mappingIdPath: Map[String, String],
maxInterval: Long): DataStream[ServingMessage] =
mode match {
case Loop => env.addSource(new InfiniteSource(mappingIdPath, Loop, maxInterval))
case Random => env.addSource(new InfiniteSource(mappingIdPath, Random, maxInterval))
case Finite => env.addSource(new FiniteSource(mappingIdPath, maxInterval))
case _ => env.addSource(new InfiniteSource(mappingIdPath, Random, maxInterval))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2017 Radicalbit
*
* This file is part of flink-JPMML
*
* flink-JPMML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* flink-JPMML is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.examples.sources

import io.radicalbit.examples.model.Utils
import io.radicalbit.flink.pmml.scala.models.control.{AddMessage, ServingMessage}
import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.util.Random

/**
* Finite Control Messages Sources
* @param mappingIdPath The Id, models path
* @param maxInterval The Max interval of generation between events
*/
class FiniteSource(mappingIdPath: Map[String, String], maxInterval: Long) extends SourceFunction[ServingMessage] {

private val rand: Random = scala.util.Random

override def cancel(): Unit = {}

override def run(ctx: SourceFunction.SourceContext[ServingMessage]): Unit =
mappingIdPath.foreach { idPath =>
val (id, path) = idPath
ctx.getCheckpointLock.synchronized {
ctx.collect(AddMessage(id, 1, path, Utils.now))
}

Thread.sleep(rand.nextDouble() * maxInterval toLong)
}

}
Loading