Skip to content

Commit

Permalink
Introducing Dynamic Model Evaluation (#38)
Browse files Browse the repository at this point in the history
This closes #27
  • Loading branch information
riccardo14 authored and Andrea committed Sep 20, 2017
1 parent 5f4381f commit 391f1b0
Show file tree
Hide file tree
Showing 29 changed files with 1,402 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util
import io.radicalbit.flink.pmml.scala.api.exceptions.{InputValidationException, JPMMLExtractionException}
import io.radicalbit.flink.pmml.scala.api.pipeline.Pipeline
import io.radicalbit.flink.pmml.scala.api.reader.ModelReader
import io.radicalbit.flink.pmml.scala.models._
import io.radicalbit.flink.pmml.scala.models.prediction.Prediction
import org.apache.flink.ml.math.Vector
import org.dmg.pmml.FieldName
import org.jpmml.evaluator._
Expand Down Expand Up @@ -87,6 +87,8 @@ class PmmlModel(private[api] val evaluator: Evaluator) extends Pipeline {

import io.radicalbit.flink.pmml.scala.api.converter.VectorConverter._

final def modelName: String = evaluator.model.getModel.getModelName

/** Implements the entire prediction pipeline, which can be described as 4 main steps:
*
* - `validateInput` validates the input to be conform to PMML model size
Expand All @@ -97,12 +99,12 @@ class PmmlModel(private[api] val evaluator: Evaluator) extends Pipeline {
*
* - `extractTarget` extracts the target from evaluation result.
*
* As final action the pipelined statement is executed by [[io.radicalbit.flink.pmml.scala.models.Prediction]]
* As final action the pipelined statement is executed by [[Prediction]]
*
* @param inputVector the input event as a [[org.apache.flink.ml.math.Vector]] instance
* @param replaceNan A [[scala.Option]] describing a replace value for not defined vector values
* @tparam V subclass of [[org.apache.flink.ml.math.Vector]]
* @return [[io.radicalbit.flink.pmml.scala.models.Prediction]] instance
* @return [[Prediction]] instance
*/
final def predict[V <: Vector](inputVector: V, replaceNan: Option[Double] = None): Prediction = {
val result = Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,9 @@ package object exceptions {
*/
private[scala] class EmptyEvaluatorException(msg: String) extends NoSuchElementException(msg)

/** Parsing of ModelId has failed
*
*/
private[scala] class WrongModelIdFormat(msg: String) extends ArrayIndexOutOfBoundsException(msg)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright (C) 2017 Radicalbit
*
* This file is part of flink-JPMML
*
* flink-JPMML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* flink-JPMML is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.flink.pmml.scala.api.functions

import io.radicalbit.flink.pmml.scala.api.PmmlModel
import io.radicalbit.flink.pmml.scala.api.exceptions.ModelLoadingException
import io.radicalbit.flink.pmml.scala.api.managers.{MetadataManager, ModelsManager}
import io.radicalbit.flink.pmml.scala.api.reader.ModelReader
import io.radicalbit.flink.pmml.scala.logging.LazyLogging
import io.radicalbit.flink.pmml.scala.models.control.{AddMessage, DelMessage, ServingMessage}
import io.radicalbit.flink.pmml.scala.models.core.{ModelId, ModelInfo}
import io.radicalbit.flink.pmml.scala.models.state.CheckpointType.MetadataCheckpoint
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.util.Collector

import scala.collection.JavaConverters._
import scala.collection.{immutable, mutable}
import scala.util.{Failure, Success, Try}

/** Abstract class extending a [[CoProcessFunction]]; it provides:
* two maps for caching both [[PmmlModel]] and Metadata of each Model
* the open method in order to initialize the Metadata Map
* the processElement2 in order to handle models and metadata against a control stream
*
* Abstract class extends [[CheckpointedFunction]] and provides therefore:
* the snapshotState method in order to checkpoint the current state of the operator
* the initializeState in order to provide an initial state at the operator and/or restore the latest
*
* @tparam EVENT The input Type of the event to predict
* @tparam CTRL The control stream Type. Note: It must extend [[io.radicalbit.flink.pmml.scala.models.control.ServingMessage]]
* @tparam OUT The output Type
*/
private[scala] abstract class EvaluationCoFunction[EVENT, CTRL <: ServingMessage, OUT]
extends CoProcessFunction[EVENT, CTRL, OUT]
with CheckpointedFunction
with LazyLogging {

@transient
private var snapshotMetadata: ListState[MetadataCheckpoint] = _

@transient
final protected var servingMetadata: immutable.Map[ModelId, ModelInfo] = _

final protected lazy val servingModels: mutable.WeakHashMap[Int, PmmlModel] =
mutable.WeakHashMap.empty[Int, PmmlModel]

override def processElement2(control: CTRL,
ctx: CoProcessFunction[EVENT, CTRL, OUT]#Context,
out: Collector[OUT]): Unit = {
manageModels(control)
manageMetadata(control)
}

override def snapshotState(context: FunctionSnapshotContext): Unit = {
snapshotMetadata.clear()
snapshotMetadata.add(new MetadataCheckpoint(servingMetadata.asJava))
}

override def initializeState(context: FunctionInitializationContext): Unit = {
servingMetadata = immutable.Map.empty[ModelId, ModelInfo]

val description = new ListStateDescriptor[MetadataCheckpoint](
"metadata-snapshot",
TypeInformation.of(new TypeHint[MetadataCheckpoint]() {}))

snapshotMetadata = context.getOperatorStateStore.getUnionListState(description)

if (context.isRestored) {
Try(snapshotMetadata.get()) match {
case Success(state) => servingMetadata ++= state.asScala.toSet[MetadataCheckpoint].flatMap(_.asScala).toMap
case Failure(_) => logger.info("Not available state in ListState!")
}
}
}

final def loadModel(path: String): PmmlModel =
Try(PmmlModel.fromReader(ModelReader(path))) match {
case Success(model) =>
logger.info("Model has been successfully loaded, model name: {}", model.modelName)
model
case Failure(e) => throw new ModelLoadingException(e.getMessage, e)
}

final def fromMetadata(modelId: String): PmmlModel = {
val currentModelId: ModelId = ModelId.fromIdentifier(modelId)
if (!servingMetadata.contains(currentModelId)) PmmlModel.empty
else addAndRetrieveModel(modelId, currentModelId)
}

final private def addAndRetrieveModel(modelId: String, currentModelId: ModelId): PmmlModel = {
val currentModelPath = servingMetadata(currentModelId).path
val loadedModel = loadModel(currentModelPath)
servingModels += (modelId.hashCode -> loadedModel)
loadedModel
}

final private def manageMetadata(control: CTRL): Unit =
control match {
case add: AddMessage =>
servingMetadata = MetadataManager(add, servingMetadata)
case del: DelMessage =>
servingMetadata = MetadataManager(del, servingMetadata)
}

final private def manageModels(control: CTRL): Unit =
control match {
case del: DelMessage => servingModels --= ModelsManager(del, servingModels)
case _ =>
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (C) 2017 Radicalbit
*
* This file is part of flink-JPMML
*
* flink-JPMML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* flink-JPMML is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.flink.pmml.scala.api.managers

import io.radicalbit.flink.pmml.scala.logging.LazyLogging
import io.radicalbit.flink.pmml.scala.models.control.{AddMessage, DelMessage}
import io.radicalbit.flink.pmml.scala.models.core.{ModelId, ModelInfo}

import scala.collection.immutable

/**
* Type class in order to enrich Message Protocol ADT with proper methods for acting on metadata.
*
* @tparam T Specific Message ADT subType; it's contro-variant in order to accept super classes, then generics.
*/
sealed trait MetadataManager[T] {

def manageMetadata(command: T, metadata: immutable.Map[ModelId, ModelInfo]): immutable.Map[ModelId, ModelInfo]

}

object MetadataManager extends LazyLogging {

implicit def apply[T: MetadataManager](
command: T,
metadata: immutable.Map[ModelId, ModelInfo]): immutable.Map[ModelId, ModelInfo] =
implicitly[MetadataManager[T]].manageMetadata(command, metadata)

/**
* Implicit value aimed to Adding model information to metadata.
*
* If a new model is coming (where new means a model bind to a previously unknown identifier)
* so the model information is added to metadata.
*
* If a not new model is coming (where not new means the model has an already present identifier)
* so a WARN is logged to the system; indeed, if the user wants to update a model, he should provide
* a newer version for it.
*
* Add messages don't remove elements from metadata for any reason.
*
*/
implicit val addMetadataServing = new MetadataManager[AddMessage] {

def manageMetadata(addMessage: AddMessage,
metadata: immutable.Map[ModelId, ModelInfo]): immutable.Map[ModelId, ModelInfo] = {
if (metadata.contains(addMessage.modelId)) {
logger.warn("ADD action on existing models is not possible (newer version needed). {} given.", addMessage)
metadata
} else metadata + (addMessage.modelId -> addMessage.modelInfo)
}

}

/**
* Implicit object aimed to removing model information from metadata.
*
* Del messages don't add elements to metadata for any reason.
*
* If a metadata element needs to be removed a key Set of the
* to-be-removed elements is returned.
*
*/
implicit val deleteMetadataServing = new MetadataManager[DelMessage] {

def manageMetadata(delMessage: DelMessage,
metadata: immutable.Map[ModelId, ModelInfo]): immutable.Map[ModelId, ModelInfo] =
metadata - delMessage.modelId

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (C) 2017 Radicalbit
*
* This file is part of flink-JPMML
*
* flink-JPMML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* flink-JPMML is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.flink.pmml.scala.api.managers

import io.radicalbit.flink.pmml.scala.api.PmmlModel
import io.radicalbit.flink.pmml.scala.logging.LazyLogging
import io.radicalbit.flink.pmml.scala.models.control.DelMessage

import scala.collection.{immutable, mutable}

/**
* Type class in order to enrich Message Protocol ADT with proper methods for acting on models.
*
* @tparam T Specific Message ADT subType; it's contro-variant in order to accept super classes, then generics.
*/
sealed trait ModelsManager[T] {

def manageModels(command: T, models: mutable.Map[Int, PmmlModel]): immutable.Set[Int]

}

object ModelsManager extends LazyLogging {

def apply[T: ModelsManager](command: T, models: mutable.Map[Int, PmmlModel]): Set[Int] =
implicitly[ModelsManager[T]].manageModels(command, models)

/**
* Implicit value aimed to removing models from internal operator state on DelMessage.
*
* [[deleteModelsServing.manageModels]] returns the elements key set which need to be
* removed from models.
*
*/
implicit val deleteModelsServing = new ModelsManager[DelMessage] {

def manageModels(delMessage: DelMessage, models: mutable.Map[Int, PmmlModel]): immutable.Set[Int] = {
models.keySet.intersect(Set(delMessage.modelId.hashCode)).toSet
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package io.radicalbit.flink.pmml.scala.api.pipeline

import io.radicalbit.flink.pmml.scala.api._
import io.radicalbit.flink.pmml.scala.api.exceptions.InputPreparationException
import io.radicalbit.flink.pmml.scala.models.Prediction
import io.radicalbit.flink.pmml.scala.models.prediction.Prediction
import org.apache.flink.ml.math.Vector
import org.dmg.pmml.FieldName
import org.jpmml.evaluator.{EvaluatorUtil, FieldValue, ModelField}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2017 Radicalbit
*
* This file is part of flink-JPMML
*
* flink-JPMML is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* flink-JPMML is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with flink-JPMML. If not, see <http://www.gnu.org/licenses/>.
*/

package io.radicalbit.flink.pmml.scala.models.control

import io.radicalbit.flink.pmml.scala.models.core.{ModelId, ModelInfo}

/** Defines the mandatory fields that events control stream must implement.
*
*/
sealed trait ServingMessage {

def name: String

def occurredOn: Long

}

/** Defines a event control message in order to add a new model
*
* @param name of the model
* @param version of the model
* @param path of the model
* @param occurredOn represents when the event occurred
*/
final case class AddMessage(name: String, version: Long, path: String, occurredOn: Long) extends ServingMessage {

def modelId: ModelId = ModelId(name, version)

def modelInfo: ModelInfo = ModelInfo(path)

}

/** Defines a event control message in order to delete a model
*
* @param name of the model
* @param version of the model
* @param occurredOn represents when the event occurred
*/
final case class DelMessage(name: String, version: Long, occurredOn: Long) extends ServingMessage {

def modelId: ModelId = ModelId(name, version)

}
Loading

0 comments on commit 391f1b0

Please sign in to comment.