diff --git a/examples/batch/elastic_search_simple/README.md b/examples/batch/elastic_search_simple/README.md new file mode 100644 index 0000000000..1c025a1488 --- /dev/null +++ b/examples/batch/elastic_search_simple/README.md @@ -0,0 +1,91 @@ + +# Seldon Batch Processing Elasticsearch Data Source + +This example will walk you through the steps for you to leverage the data ingestor batch functionality in Seldon Core to build your own custom Elasticsearch Data Ingestor which will consume from a Kafka Topic, send the data to Seldon and then publish the output into another Kafka topic. The diagram below provides some intuition on what we'll be deploying: + +[](./batch-seldon-elk-cluster.png) + +In this example we will: +1. Create an ELK Data Ingestor +2. Run our ELK Data Ingestor with docker-compose +3. Test the Data Ingestor in Kubernetes Cluster with test model + +## 1) Create ELK Data Ingestor + +To get started we want to create a simple data ingestor that consumes and publishes from ELK. + +For this, we will create the file `ingestor/ElasticDataIngestor.py` with the following contents: + +```python +from elasticsearch import Elasticsearch +from elasticsearch_dsl import Search + +class ElasticDataIngestor: + + def __init__( + self, + host, + port, + index, + data_ids): + """ + This includes the inputs from the parameters, which we could provide some by default + """ + self.host = host + self.port = port + self.index = index + self.data_ids = data_ids + + def connection(self): + es = Elasticsearch(hosts=[self.host], port=self.port) + connection = { + "client": es, + "index": self.index + } + return connection + + @staticmethod + def fetch(self, connection): + search = (Search(using=connection["client"], index=connection["index"]) + .query("match", logger_name=self.celery_task.task_id) + .sort("@timestamp") + .source(['message']) + .params(preserve_order=True)) + + result = search.scan() + in_data = [log for log in result] + return False, in_data + + @staticmethod + def process(seldon_client, connection, in_data): + out_data = seldon_client.predict(data=in_data) + return out_data + + @staticmethod + def publish(out_data, in_data, connection): + res = connection["client"].index( + connection["index"], body=out_data) +``` + +## 2) Containerise the data ingestor + +In order to containerise the data ingestor we need to create a new file `ingestor/s2i/environment` with the following contents: + +```bash +MODEL_NAME=ElasticDataIngestor +API_TYPE=BATCH +``` + +With this now we just have to containerise our wrapper using the Seldon CLI tools by running: + +``` +s2i build ingestor/. seldonio/seldon-core-s2i-python37:0.18-SNAPSHOT elastic_data_ingestor:0.1 +``` + +## 3. Test the Data Ingestor in Kubernetes Cluster with test model + +In order to test the model locally we need to perform the following steps: + + +TODO + diff --git a/examples/batch/elastic_search_simple/batch-seldon-elk-stack.png b/examples/batch/elastic_search_simple/batch-seldon-elk-stack.png new file mode 100755 index 0000000000..266d7abc94 Binary files /dev/null and b/examples/batch/elastic_search_simple/batch-seldon-elk-stack.png differ diff --git a/examples/batch/elastic_search_simple/ingestor/.s2i/environment b/examples/batch/elastic_search_simple/ingestor/.s2i/environment new file mode 100644 index 0000000000..9da73c1c79 --- /dev/null +++ b/examples/batch/elastic_search_simple/ingestor/.s2i/environment @@ -0,0 +1,3 @@ +MODEL_NAME=ElasticDataIngestor +API_TYPE=BATCH + diff --git a/examples/batch/elastic_search_simple/ingestor/ElasticDataIngestor.py b/examples/batch/elastic_search_simple/ingestor/ElasticDataIngestor.py new file mode 100644 index 0000000000..da92246100 --- /dev/null +++ b/examples/batch/elastic_search_simple/ingestor/ElasticDataIngestor.py @@ -0,0 +1,41 @@ +from elasticsearch import Elasticsearch +from elasticsearch_dsl import Search + + +class ElasticDataIngestor: + def __init__(self, host, port, index, data_ids): + """ + This includes the inputs from the parameters, which we could provide some by default + """ + self.host = host + self.port = port + self.index = index + self.data_ids = data_ids + + def connection(self): + es = Elasticsearch(hosts=[self.host], port=self.port) + connection = {"client": es, "index": self.index} + return connection + + @staticmethod + def fetch(self, connection): + search = ( + Search(using=connection["client"], index=connection["index"]) + .query("match", logger_name=self.celery_task.task_id) + .sort("@timestamp") + .source(["message"]) + .params(preserve_order=True) + ) + + result = search.scan() + in_data = [log for log in result] + return False, in_data + + @staticmethod + def process(seldon_client, connection, in_data): + out_data = seldon_client.predict(data=in_data) + return out_data + + @staticmethod + def publish(out_data, in_data, connection): + res = connection["client"].index(connection["index"], body=out_data) diff --git a/examples/batch/kafka_streaming/README.md b/examples/batch/kafka_streaming/README.md new file mode 100644 index 0000000000..09c72c6652 --- /dev/null +++ b/examples/batch/kafka_streaming/README.md @@ -0,0 +1,246 @@ + +# Seldon Batch Processing Kafka Data Source + +This example will walk you through the steps for you to leverage the data ingestor batch functionality in Seldon Core to build your own custom Kafka Data Ingestor which will consume from a Kafka Topic, send the data to Seldon and then publish the output into another Kafka topic. The diagram below provides some intuition on what we'll be deploying: + +[](./batch-seldon-kafka-cluster.png) + +In this example we will: +1. Create a Kafka Data Ingestor +2. Run our Kafka Data Ingestor with docker-compose +3. Test the Data Ingestor in Kubernetes Cluster with test model + +## 1) Create Kafka Data Ingestor + +To get started we want to create a simple data ingestor that consumes and publishes from Kafka. + +For this, we will create the file `ingestor/KafkaDataIngestor.py` with the following contents: + +```python +import kafka +from multiprocessing import get_logger + +logger = get_logger() + +class KafkaDataIngestor: + + def __init__( + self, + bootstrap_servers, + consumer_topic, + producer_topic, + group_id): + """ + This includes the inputs from the parameters, which we could provide some by default + """ + self.bootstrap_servers = bootstrap_servers + self.consumer_topic = consumer_topic + self.producer_topic = producer_topic + self.group_id = group_id + + def connection(self): + consumer = kafka.KafkaConsumer( + self.consumer_topic, + group_id=self.group_id, + bootstrap_servers=self.bootstrap_servers) + producer = kafka.KafkaProducer( + bootstrap_servers=self.bootstrap_servers) + return { + "consumer": consumer, + "producer": producer, + "producer_topic": self.producer_topic + } + + @staticmethod + def fetch(self, connection): + """ + Callects the single next message in the queue to process or + waits until there is a new message. It leverages the yield + functionality that the KafkaConsumer exposes. + """ + return next(connection["consumer"]) + + @staticmethod + def process(seldon_client, connection, in_data): + out_data = seldon_client.predict(data=in_data) + return out_data + + @staticmethod + def publish(out_data, in_data, connection): + logger.info(f"Publishing with out_data=[{out_data}], in_data={in_data}") + + res = connection["producer"].send( + connection["producer_topic"], in_data) + + logger.info(f"Publish result: {res}") + +``` + +## 2) Containerise the data ingestor + +In order to containerise the data ingestor we need to create a new file `ingestor/s2i/environment` with the following contents: + +```bash +MODEL_NAME=KafkaDataIngestor +API_TYPE=BATCH +``` + +With this now we just have to containerise our wrapper using the Seldon CLI tools by running: + +``` +s2i build ingestor/. seldonio/seldon-core-s2i-python37:0.18-SNAPSHOT kafka_data_ingestor:0.1 +``` + +## 3. Test the Data Ingestor in Kubernetes Cluster with test model + +We now want to test our data ingestor with the existing Seldon Core model samples. + +For this you need to make sure you set up a Kubernetes cluster with all Seldon Core dependencies installed [(Operator, ingress, etc)](https://docs.seldon.io/projects/seldon-core/en/latest/workflow/install.html). + +Once you have everything installed, we'll do the following steps: + +1. Run Kafka in our Kubernetes Cluster +2. Create Seldon Deployment Config +3. Deploy our Seldon Deployment to our cluster +4. Publish messages and see messages coming out from kafka topics + +### 1. Run Kafka in our Kubernetes Cluster + +We first need to make sure our helm installer has access to the incubator charts: + +```console +helm repo add bitnami https://charts.bitnami.com/bitnami +``` + +Now we're able to cre +ate a simple Kafka deployment: + +```console +helm install my-kafka bitnami/kafka +``` + +Once it's running we'll be able to see the containers: + +```console +$ kubeclt get pods + +NAME READY STATUS RESTARTS AGE +my-kafka-0 1/1 Running 0 2m43s +my-kafka-1 1/1 Running 0 42s +my-kafka-zookeeper-0 1/1 Running 0 2m43s +my-kafka-zookeeper-1 1/1 Running 0 96s +my-kafka-zookeeper-2 1/1 Running 0 62s +``` + + +### 2) Create a Seldon Deployment that uses our model + +Now we want to create a Seldon Deploymen configuration file that we'll be able to deploy. + +For this we'll use a simple file that just sets the following: +* Selects the deployment to run without the Engine/Orchestrator +* Adds the environment variables to point it to the cluster we just deployed +* Points to the docker image that we just built + +The contents of `cluster/kafka_batch_deployment.yaml` are as follows: + +```yaml +apiVersion: machinelearning.seldon.io/v1 +kind: SeldonDeployment +metadata: + name: iris-streaming-deployment +spec: + name: iris-streaming-spec + predictors: + - name: iris-streaming-predictor + batch: + streaming: 'true' + name: kafka-ingestor + parameters: + - name: bootstrap_servers + type: STRING + value: kafka://my-kafka.default.svc.cluster.local:9092 + - name: consumer_topic + type: STRING + value: iris-streaming-deployment-kafka-ingestor-input + - name: producer_topic + type: STRING + value: iris-streaming-deployment-kafka-ingestor-output + - name: group_id + type: STRING + value: iris-streaming-deployment-kafka-ingestor + graph: + name: iris-model + endpoint: + type: REST + type: MODEL + children: [] + parameters: [] + componentSpecs: + - spec: + containers: + - image: seldonio/sklearn-iris:0.1 + name: iris-model + - image: kafka_data_ingestor:0.1 + name: kafka-ingestor +``` + + +### 3) Deploy our Seldon Deployment + +Now that we've created out deployment, we just need to launch it: + +```console +kubectl apply -f cluster/streaming_model_deployment.yaml +``` + +Once it's deployed we can see it by running: + +```console +$ kubectl get pods | grep streaming + +streaming-spec-streaming-graph-e90bdcd-56986c5d4b-7xvtm 1/1 Running 0 6m28s +``` + +4) Publish messages in the input topic and see messages coming from output topic + +Now we want to test it by sending some messages. + +We can get the name of the pod by running: + +```console +export STREAM_SELDON_POD=`kubectl get pod -l seldon-app=streaming-deployment-streaming-spec-streaming-graph -o jsonpath="{.items[0].metadata.name}"` +``` + +First let's run a consumer to see the output: + +```python +kubectl exec -i $STREAM_SELDON_POD python - <