Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#29] Update readme to 0.6.0 #41

Merged
merged 11 commits into from
Sep 21, 2017
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 89 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# flink-jpmml

Welcome! `flink-jpmml` is a fresh-made library for **Real-Time** machine learning predictions built on top of
Welcome! `flink-jpmml` is a fresh-made library for dynamic **real time** machine learning predictions built on top of
[PMML](http://dmg.org/pmml/v4-3/GeneralStructure.html) standard models and [Apache Flink](https://flink.apache.org/)
streaming engine.

Expand All @@ -13,13 +13,13 @@ scenario.

In order to getting started, you only need
* any well-known version of a PMML model (**3.2** or above)
* flink-jpmml is tested with the latest Flink (i.e. 1.3.1), but any working Apache Flink version
* flink-jpmml is tested with the latest Flink (i.e. 1.3.2), but any working Apache Flink version
([repo](https://github.com/apache/flink)) should work properly.

## Adding `flink-jpmml` dependency
* if you employ sbt add the following dependecy to your project:
* Snapshot: `"io.radicalbit" %% "flink-jpmml-scala" % "0.6.0-SNAPSHOT"`
* Stable: `"io.radicalbit" %% "flink-jpmml-scala" % "0.5.1"`
* Snapshot: `"io.radicalbit" %% "flink-jpmml-scala" % "0.7.0-SNAPSHOT"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be also updated the project version into version.sbt reflecting the changes

* Stable: `"io.radicalbit" %% "flink-jpmml-scala" % "0.6.0"`

* For [maven](https://maven.apache.org/) users instead:
* Snapshot
Expand All @@ -28,7 +28,7 @@ In order to getting started, you only need
<dependency>
<groupId>io.radicalbit</groupId>
<artifactId>flink-jpmml-scala</artifactId>
<version>0.6.0-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
</dependency>
</dependencies>
```
Expand All @@ -38,7 +38,7 @@ In order to getting started, you only need
<dependency>
<groupId>io.radicalbit</groupId>
<artifactId>flink-jpmml-scala</artifactId>
<version>0.5.1</version>
<version>0.6.0</version>
</dependency>
</dependencies>
```
Expand All @@ -65,11 +65,88 @@ Lets start.

## Getting Started

`flink-jpmml` enables Flink users to execute real-time predictions based on machine learning models trained by any
`flink-jpmml` enables Flink users to execute real time predictions based on machine learning models trained by any
system supporting the PMML
standard; this allows efficient streaming model serving along with the **powerful** Flink engine features.

Lets start with a concrete example.
### Dynamic Models Evaluation

Since `0.6.0` the project supports dynamic streaming model serving efficiently. For more information
we suggest to watch the [related talk](https://www.youtube.com/watch?v=0rWvMZ6JSD8&t=7s)
presented at last Flink Forward 2017 in Berlin.

First of all, we indentify univocally models by the `ModelId` abstraction, made of an *applicationName*
and a *version*.

> **e.g.** Suppose you have two _ensemble_ models A and B (PMML based) where A has a depth level 10 on width
100 and B depth 5 on width 200, and you desire to have a comparison between them, so likely you can identify
`applicationName` SVM and `versions` A and B.

`flink-jpmml` does not store models within its operator state, but related metadata information.
The operator is able to retrieve models from your distributed backend exploiting the concept of
metadata table. Then, your PMML models have to be persisted in a backend system
(see [here](#dist-backend) for supported systems).

If you want to use dynamic model evaluation you're going to define the following streams:

- `DataStream[ServingMessage]` this stream is the main user tool to feed the operator with
necessary model information; here, the user is not demanded to send by stream its PMML models but only the requested
descriptive metadata. The user should employ `ServingMessage` ADT in order to feed this stream.
By now, the user can define the following two messages:
- `AddMessage` it requires an `applicationName` Java UUID formatted, a `version`, the model source
`path` and a timestamp
- `DelMessage` it requires an `applicationName` Java UUID formatted, a `version` and a timestamp
- `DataStream[BaseEvent]` your input stream should extend the `BaseEvent` trait and defining the string `modelId`
formatted as `"<modelApplication>_<modelVersion>"` and a timestamp.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo `


### The syntax

Given the streams above you can achieve predictions way easily.

```
import io.radicalbit.flink.pmml.scala._
import org.apache.flink.ml.math.Vector
import org.apache.flink.streaming.api.scala._
import io.radicalbit.flink.pmml.scala.models.control.ServingMessage

...

val inputStream: DataStream[_ <: BaseEvent] = yourInputStream
val controlStream: DataStream[ServingMessage] = yourControlStream

val predictions =
inputStream
.withSupportStream(controlStream)
.evaluate { (event, model) =>
val vector = event.toVector
val prediction = model.predict(vector)

prediction
}
```

The features of flink-jpmml PMML models are better discussed [here](#single-model): you will find several ways to
handle your predictions. All the concepts introduced along the first `flink-jpmml`, i.e. how the model is built within
the operator, the operator configuration and so forth have been retained and are well described below.

We kept also the single operator model explained later if you want to bind a specific model to an operator instance.

### What happens internally

![flink-jpmml-architecture](flink-jpmml-assets/src/main/resources/architecture.png)

When an event **A** comes, it declares by its `modelId` which is the model it needs to be evaluated against.
If the model has not been uploaded within the operator yet, the latter will exploit the **metadata** information
to lazily retrieve the targeted model from the underlying distributed backend.

The control stream is the right tool for the user to provide the global picture of the models available to your
platform (this well fits a **model repository server** concept). **You will use this stream to feed the operator with
the information useful to your input events in order to let them grab easily the models.**

If the events are able to find the targeted models, the prediction is computed and a `Prediction` (based on ADT) outcome is returned, otherwise
the outcome will be an `EmptyPrediction`.

### <a name = "single-model"></a> Single Model Operator

Supposing you have your focused `InputStream` and you want to score related data
```
Expand Down Expand Up @@ -150,9 +227,9 @@ interesting details which worth a deeper analysis.
## Behind the scenes 
`flink-jpmml` main effort is to retain all the streaming concepts:

- since Flink is able to run against several **distributed backend**, users need to specify only the PMML source path:
- <a name="dist-backend"></a>since Flink is able to run against several **distributed backend**, users need to specify only the PMML source path:
the library will take care how to load the model in full compliance of the underlying distributed system
(e.g. HDFS, Alluxio)
(e.g. HDFS, Alluxio, S3, localFS)
- `ModelReader` is the object implementing the previous behavior; it will provide the loading methods but will read it
_lazily_, _i.e._ only when the transformation will be applied
- The `PMMLModel` will be loaded by a singleton model factory for each TaskManager running on your architecture; that means if you have an
Expand Down Expand Up @@ -218,6 +295,8 @@ here. `flink-jpmml` community is looking for you!
## Authors
* **Andrea Spina** - [andrea.spina@radicalbit.io](mailto:andrea.spina@radicalbit.io) [@spi-x-i](https://github.com/spi-x-i)
* **Francesco Frontera** - [francesco.frontera@radicalbit.io](mailto:francesco.frontera@radicalbit.io) [@francescofrontera](https://github.com/francescofrontera)
* **Riccardo Diomedi** - [riccardo.diomedi@radicalbit.io](mailto:riccardo.diomedi@radicalbit.io) [@riccardo14](https://github.com/riccardo14)
* **Mauro Cortellazzi** - [mauro.cortellazzi@radicalbit.io](mailto:mauro.cortellazzi@radicalbit.io) [@maocorte](https://github.com/maocorte)
* **Simone Robutti** - *Initial prototype* [simone.robutti@gmail.com](mailto:simone.robutti@gmail.com) [@chobeat](https://github.com/chobeat)
* **Stefano Baghino** - *Initial prototype* [@stefanobaghino](https://github.com/stefanobaghino)

Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object Evaluator {

/** If the evaluator exists, it returns a [[PmmlEvaluator]], [[EmptyEvaluator]] otherwise.
*
* @param evaluator An instance of [[ModelEvaluator]]
* @param evaluator An instance of [[org.jpmml.evaluator.ModelEvaluator]]
* @return An instance of [[Evaluator]]
*/
def apply(evaluator: ModelEvaluator[_ <: Model]): Evaluator = PmmlEvaluator(evaluator)
Expand All @@ -50,7 +50,7 @@ sealed trait Evaluator {

def model: ModelEvaluator[_ <: Model]

/** Returns [[ModelEvaluator]]] if evaluator has value, default value otherwise
/** Returns [[org.jpmml.evaluator.ModelEvaluator]]] if evaluator has value, default value otherwise
*
* @param default the defined default value
* @return the current evaluator if it has value, default otherwise
Expand Down Expand Up @@ -84,7 +84,7 @@ final case class PmmlEvaluator(modelEval: ModelEvaluator[_ <: Model]) extends Ev

/**
* Retrieving the evaluator of the JpmmlEvaluator
* @return the [[ModelEvaluator]]
* @return the [[org.jpmml.evaluator.ModelEvaluator]]
*/
override def model: ModelEvaluator[_ <: Model] = modelEval

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,32 @@ package object exceptions {

/** Models conformity failure between PMML model and input [[org.apache.flink.streaming.api.scala.DataStream]]
*
* @param msg
*/
private[scala] class InputValidationException(msg: String) extends RuntimeException(msg)

/** Models [[org.jpmml.evaluator.EvaluatorUtil.prepare()]] method failure
*
* @param msg
*/
private[scala] class InputPreparationException(msg: String) extends RuntimeException(msg)

/** Models empty result from [[org.jpmml.evaluator.ModelEvaluator]] evaluation
*
* @param msg
*/
private[scala] class JPMMLExtractionException(msg: String) extends RuntimeException(msg)

/** Models failure on loading PMML model from distributed system
*
* @param msg
* @param throwable
*/
private[scala] class ModelLoadingException(msg: String, throwable: Throwable)
extends RuntimeException(msg, throwable)

/** Prediction failure due to [[io.radicalbit.flink.pmml.scala.api.EmptyEvaluator]]
*
* @param msg
*/
private[scala] class EmptyEvaluatorException(msg: String) extends NoSuchElementException(msg)

/** Parsing of ModelId has failed
*
* @param msg
*/
private[scala] class WrongModelIdFormat(msg: String) extends ArrayIndexOutOfBoundsException(msg)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ private[scala] abstract class EvaluationCoFunction[EVENT, CTRL <: ServingMessage
override def snapshotState(context: FunctionSnapshotContext): Unit = {
snapshotMetadata.clear()
snapshotMetadata.add(new MetadataCheckpoint(servingMetadata.asJava))

}

override def initializeState(context: FunctionInitializationContext): Unit = {
Expand Down