Skip to content


Adding HCAD data ingestion script to AD (#585)
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <>
  • Loading branch information
amitgalitz authored Jun 24, 2022
1 parent c6f9b20 commit 4d7a8a4
Show file tree
Hide file tree
Showing 3 changed files with 361 additions and 0 deletions.
62 changes: 62 additions & 0 deletions dataGeneration/
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# OpenSearch Anomaly Detection Data Ingestion

## Introduction
The following code in this directory can be used to easily ingest data into an OpenSearch cluster that is fit for AD testing and benchmarking.

## Instal Prerequisites

### Python

Python 3.7 or above is required

### pip

Use pip to install the necessary requirements:

pip install -r requirements.txt

## Usage

### Quick Start

In order to execute the script you must have a running OpenSearch cluster so you can supply an endpoint for the data to be ingested too. The current iteration of this data script creates data in a cosine pattern with anomalies injected with a random seed throughout. The dataset created will have two categorical fields to test a multi-entity AD (of type `keyword`) and two fields that can act as the two features fields (cpuTime and jvmGcTime). These two fields are of type `double`.

### Example Request:

`python3 -ep -i test-index-1 -shards 5 -t 10 -p 30 --security`

- This will start data ingestion to the cluster with the given endpoint, creating an index called `test-index-1`, with 5 shards, utilizing 10 threads, for 30 points in time and with security turned on.
- The rest of the values not given in this example are set to the default explained below.
- To give further context there will be a 1 document created for every unique entity combination for every 'interval' which is defined at 600s (10 minutes) at default for 30 'intervals'.

### Ingestion Parameters

| Parameter Name | Description | Default | Required
| ----------- | ----------- | ----------- | ----------- |
| --endpoint | Endpoint OpenSearch cluster is running on | No default | Yes
| --index-name | Name of index that will be created and ingested too | No default | Yes
| --threads | Number of threads to be used for data ingestion | No deafult | Yes
| --shards | Number of shards for given index | 5 | No
| --bulk-size | Number of documents per bulk request | 3000 | No
| --ingestion-frequency | How often each respective document is indexed (in seconds) | 600 | No
| --points | Total number of points in time ingested | 1008 | No
| --number-of-host | number of 'host' entities | 1000 | No
| --number-of-process | number of 'process' entities | 1000 | No
| --number-of-historical-days | number of day of historical data to ingest | 2 | No
| --username | username for authentication if security is true | admin | No
| --password | password for authentication if security is true | admin | No

### Ingestion Commands

| Command Name | Description | Required
| ----------- | ----------- | ----------- |
| --security | sets security to true for creating client to index to cluster endpoint | NO
| --no-security | sets security to true for creating client to index to cluster endpoint | No

- If no command is given then the default is to set security to true

294 changes: 294 additions & 0 deletions dataGeneration/
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
Python script for ingesting sample data into OpenSearch index

#{"index": {"_index":"host-cloudwatch","_id":"1177"}}
#{"@timestamp":"2017-03-23T13:00:00","cpu":20.3, "memory":13,"host":"host1", "service": "service1"}

import numpy as np
from scipy.stats import uniform
import datetime
import time
import random
from random import Random
from retry import retry
import urllib3
import concurrent.futures
import argparse

from opensearchpy import OpenSearch, RequestsHttpConnection
from opensearchpy import helpers


parser = argparse.ArgumentParser()

parser.add_argument("-ep", "--endpoint", help="cluster endpoint", required=True)
parser.add_argument("-i", "--index-name", help=" ",required=True)
parser.add_argument("-shards", "--shards", type=int, help="The number of shards for the given index", required=True)
parser.add_argument("-t", "--threads", type=int, help="The number of threads to be used for data ingestion, make sure given machine has enough", required=True)
parser.add_argument("-bulk", "--bulk-size", type=int, default=3000, help="Number of documents per bulk request, default to 3000", )
parser.add_argument("-ingest", "--ingestion-frequency", type=int, default=600, help="how often each respective document is indexed, for example the default is 600 seconds which equates to every 10 minutes")
parser.add_argument("-p", "--points", type=int, default=1008, help="total number of points ingested, for example with 1008 points and a frequency of 600s, there will be 7 days of data")

parser.add_argument('--security', action='store_true')
parser.add_argument('--no-security', dest='security', action='store_false')

parser.add_argument("-nh", "--number-of-host", type=int, default=1000, help="number of 'host' entities, deafult is set to 1000, there will be two keyword categories in this index (must be at least 1)")
parser.add_argument("-np", "--number-of-process", type=int, default=1000, help="number of 'process' entities, deafult is set to 1000, there will be two keyword categories in this index (must be at least 1)" )
parser.add_argument("-hd", "--number-of-historical-days", type=int, default=2, help="number of day of historical data to ingest, defaults to 2")
parser.add_argument("-u", "--username", type=str, default="admin", help="username for authentication if security is true")
parser.add_argument("-pass", "--password", type=str, default="admin", help="password for authentication if security is true")

args = parser.parse_args()

URL = args.endpoint
INDEX_NAME = args.index_name
SHARD_NUMBER = args.shards
THREADS = args.threads

#deafult numbers of 1000 host and 1000 process mean a total of 1 million entities
HOST_NUMBER = args.number_of_host
PROCESS_NUMBER = args.number_of_process

#default of 1008 points with ingestion frequency set to 600 means there will basically be 1008 intervals = 7 days * 144 intervals/day
POINTS = args.points
INGESTION_FREQUENCY = args.ingestion_frequency
BULK_SIZE = args.bulk_size
USERNAME = args.username
PASSWORD = args.password
NUMBER_OF_HISTORICAL_DAYS = args.number_of_historical_days

index_name = "_index"
timestamp_name = "@timestamp"
cpu_name = "cpuTime"
mem_name = "jvmGcTime"
host_name = "host"
host_prefix = "host"
process_name = "process"
process_prefix = "process"
client = []

Generate index INDEX_NAME
def create_index(es, INDEX_NAME, shard_number):
# First, delete the index if it exists
print("Deleting index if it exists...")
es.indices.delete(index=INDEX_NAME, ignore=[400, 404])

# Next, create the index
print("Creating index \"{}\"...".format(INDEX_NAME))
request_body = {
"number_of_replicas": 0, # increase this number after indexing
"translog.durability":"async", # default: request
"refresh_interval":-1, # default: 1, remember to change this after finishing indexing process or just _refresh once at least if index wont be changed again

es.indices.create(index=INDEX_NAME, body=request_body)

Posts a document(s) to the index
@retry(delay=1, backoff=2)
def post_log(bulk_payload, thread_index):
global client
helpers.bulk(client[thread_index], bulk_payload)

def generate_val(amp, phase, base_dimension, index, period, noise, noiseprg):
data = np.empty(base_dimension, dtype=float)
for j in range(0, base_dimension):
# cos is [-1, 1], + 1 make it non-negative
data[j] = amp[j] * (np.cos(2 * np.pi * (index + phase[j]) / period) + 1) + noise * noiseprg.random()
if (noiseprg.random() < 0.01 and noiseprg.random() < 0.3):
factor = 5 * (1 + noiseprg.random())
change = factor * noise if noiseprg.random() < 0.5 else -factor * noise
if data[j] + change >= 0:
data[j] += change
return data

Posts all documents to index in stream
def post_log_stream(index_value, time_intervals, sample_per_interval, max_number, min_number, host_number, service_number, batch_size, thread_index, cosine_params):
# For each file, post all the docs
print("Posting logs...")
bulk_payload = list()
# give some data in the history for cold start
dtFormat = "%Y-%m-%dT%H:%M:%S"
startTs = datetime.datetime.utcnow() - datetime.timedelta(days=NUMBER_OF_HISTORICAL_DAYS)
count = 0
totalCount = 0
lastTotalCount = 0

keep_loop = True
j = (int)(min_number / service_number)
index = j * service_number - 1

while keep_loop and j < host_number:
host_str = host_prefix + str(j)
for l in range(service_number):
process_str = process_prefix + str(l)
index += 1
# index can be [min_number, max_number]
if index < min_number:
if index > max_number:
keep_loop = False
nextTs = startTs
prb = Random()
prb.seed(random.randint(0, 100000000))
cosine_p = cosine_params[index]
data_index = 0
for i in range(0, time_intervals):
ts = nextTs.strftime(dtFormat)
for k in range(0, sample_per_interval):
data = generate_val(cosine_p[1], cosine_p[0], 2, data_index,
50, 5, prb)
index_name: index_value,
timestamp_name: ts,
cpu_name: data[0],
mem_name: data[1],
host_name: host_str,
process_name: process_str
count += 1
data_index += 1
if count >= batch_size:
post_log(bulk_payload, thread_index)
bulk_payload = list() # reset list
totalCount += count
count = 0
# increment by ingestion_frequency (in seconds) after looping through each host multiple samples
nextTs += datetime.timedelta(seconds=INGESTION_FREQUENCY)
if totalCount - lastTotalCount > 1000000:
# report progress every 1 million inserts
print("totalCount {} thread_index {}".format(totalCount,
lastTotalCount = totalCount
j += 1

if len(bulk_payload) > 0:
post_log(bulk_payload, thread_index)
bulk_payload = list()
except Error as err:
print("error: {0}".format(err))

def split(a, n):
k, m = divmod(len(a), n)
return (a[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n))

# create an list of array of size total_entities, the inner array has 2 subarrays: phase, amp
def create_cosine(total_entities, base_dimension, period, amplitude):
cosine_param = np.empty(total_entities, dtype=object)
for i in range(0, total_entities):
phase = np.empty(base_dimension, dtype=float)
amp = np.empty(base_dimension, dtype=float)
for j in range(0, base_dimension):
phase[j] = random.randint(0, period)
amp[j] = (1 + 0.2 * random.random()) * amplitude
cosine_param[i] = np.array([phase, amp])
return cosine_param

Main entry method for script
def main():
global client
if SECURITY and URL.strip() == 'localhost':
for i in range(0, THREADS):
for i in range(0, THREADS):
hosts=[{'host': URL, 'port': 443}],
elif URL.strip() == 'localhost':
for i in range(0, THREADS):
hosts=[{'host': URL, 'port': 9200}],
es = OpenSearch(
hosts=[{'host': URL, 'port': 80}],
create_index(client[0], INDEX_NAME, SHARD_NUMBER)

# workload is a list of ranges like [range(0, 10000), range(10000, 20000)]
workload = list(split(range(total_entities), THREADS))
futures = []

# we we have both cpuTime and jvmGcTime field, so 2 features
cosine_params = create_cosine(total_entities, 2, 50, 100)
start = time.monotonic()
with concurrent.futures.ProcessPoolExecutor(max_workers=THREADS) as executor:
futures = []
for i in range(len(workload)):
# Using 1 sample per interval to reason about the result easier.
doc_per_interval = 1
futures.append(executor.submit(post_log_stream, INDEX_NAME, POINTS, doc_per_interval, workload[i][-1], workload[i][0], HOST_NUMBER, PROCESS_NUMBER, BULK_SIZE, i, cosine_params))
_ = concurrent.futures.as_completed(futures)

print('Concurrent took: %.2f minutes.' % ((time.monotonic() - start)/60))

if __name__ == "__main__":

5 changes: 5 additions & 0 deletions dataGeneration/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

0 comments on commit 4d7a8a4

Please sign in to comment.