Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CSV (File) integration #180

Merged
merged 8 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions integrations/observability/csv_file/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# CSV Upload Integration

> CSV File based Upload Integration

## What is CSV Upload ?

CSV upload is an example of parsing and loading a CSV file into opensearch index using an agent

## What is CSV Integration ?

An integration is a bundle of pre-canned assets which are bundled togather in a meaningful manner.

**_CSV Upload_** integration includes docker live example including getting started instruction of using data-prepper or fluent-bit for
uploading the csv file into a dedicated index using a parser to transform the csv into json

## Ingesting CVS Using Data-Prepper
...

## Ingesting CVS Using Flunet-Bit

...

---
## Loading Integrations via DashboardManagement

To update an integration template navigate to the DashboardManagement and select [savedObjects](https://localhost:5601/_dashboards/app/management/opensearch-dashboards/objects) and import the new artifact:

1) Download the `nginx-1.0.0.ndjson` artifact from the [catalog release page](https://github.com/opensearch-project/opensearch-catalog/releases/edit/nginx-1.0.0)

2) Go to the [DashboardManagement -> savedObjects ](https://localhost:5601/_dashboards/app/management/opensearch-dashboards/objects)

![](https://github.com/opensearch-project/opensearch-catalog/assets/48943349/d96e9a78-e3de-4cce-ba66-23f7c084778d)

![](https://github.com/opensearch-project/opensearch-catalog/assets/48943349/a63ae102-706a-4980-b758-fff7f6b24a94)

3) Once there select import to load the recently downloaded integration artifact (`nginx-1.0.0.ndjson` suffix)

4) Open the [nginx integration](https://localhost:5601/app/integrations#/available/nginx) and install
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"attributes":{"description":"upload a csv file example using fluent-bit agent","kibanaSavedObjectMeta":{"searchSourceJSON":"{\"query\":{\"query\":\"\",\"language\":\"kuery\"},\"filter\":[]}"},"title":"fluent-bit-csv-upload","uiStateJSON":"{}","version":1,"visState":"{\"title\":\"fluent-bit-csv-upload\",\"type\":\"markdown\",\"aggs\":[],\"params\":{\"fontSize\":12,\"openLinksInNewTab\":false,\"markdown\":\"# Uploading a CSV File into an OpenSearch Index Using Fluent Bit\\n\\nThis tutorial will guide you through the process of setting up Fluent Bit to monitor a directory for CSV files and upload their contents into an OpenSearch index.\\n\\n## Prerequisites\\n\\n- An OpenSearch instance running and accessible.\\n- Fluent Bit installed on your system.\\n- A directory containing your CSV files.\\n\\n## Step 1: Install Fluent Bit\\n\\n### On Linux:\\n\\n```bash\\ncurl -L https://fluentbit.io/releases/1.8/fluent-bit-1.8.11-linux-x86_64.tar.gz -o fluent-bit.tar.gz\\ntar -xvf fluent-bit.tar.gz\\ncd fluent-bit/bin\\n```\\n\\n### On macOS:\\n\\n```bash\\nbrew install fluent-bit\\n```\\n\\n### On Windows:\\n\\nDownload and extract Fluent Bit from [Fluent Bit releases](https://fluentbit.io/download/).\\n\\n## Step 2: Create Fluent Bit Configuration Files\\n\\n#### Create `fluent-bit.conf`\\n\\nThis is the main configuration file for Fluent Bit. It defines the input source, parser, and output destination.\\n\\n```ini\\n[SERVICE]\\n Flush 1\\n Log_Level info\\n Parsers_File parsers.conf\\n\\n[INPUT]\\n Name tail\\n Path /path/to/your/csv/files/*.csv\\n Parser csv\\n Tag csv\\n Refresh_Interval 5\\n Rotate_Wait 30\\n\\n[OUTPUT]\\n Name opensearch\\n Match *\\n Host your-opensearch-host\\n Port 9200\\n Index csv-index\\n HTTP_User your-username\\n HTTP_Passwd your-password\\n tls off\\n Suppress_Type_Name On\\n tls.verify off\\n```\\n\\n### Create `parsers.conf`\\n\\nThis file defines the CSV parser.\\n\\n```ini\\n[PARSER]\\n Name csv\\n Format regex\\n Regex ^(?<timestamp>[^,]+),(?<log_level>[^,]+),(?<message>[^,]+),(?<application>[^,]+),(?<host>[^,]+)$\\n Time_Key timestamp\\n Time_Format %Y-%m-%d %H:%M:%S\\n```\\n\\n### Direct the CSV folder location\\n\\nEnsure this file is in the directory you specified in the `Path` of the `fluent-bit.conf` file.\\n\\n\\n## Step 3: Run Fluent Bit\\n\\nNavigate to the directory containing the Fluent Bit executable and the configuration files. Then, start Fluent Bit with the configuration files.\\n\\n```bash\\n/path/to/fluent-bit/bin/fluent-bit -c /path/to/fluent-bit.conf\\n```\\n\\n## Step 4: Verify Data in OpenSearch\\n\\nAfter starting Fluent Bit, you can verify the data ingestion by accessing OpenSearch and searching for the `csv-index` index.\\n\\nFor example, you can use OpenSearch Dashboards or the OpenSearch API to query the index:\\n\\n### Using OpenSearch Dashboards:\\n\\n1. Open OpenSearch Dashboards in your browser.\\n2. Navigate to the \\\"Discover\\\" tab.\\n3. Select the `csv-index` index pattern.\\n4. Verify that the log data from your CSV files is being ingested and displayed.\\n\\n### Using the OpenSearch API:\\n\\n```bash\\ncurl -X GET \\\"http://your-opensearch-host:9200/csv-index/_search?pretty\\\"\\n```\\n\\n---\\n## Live Testing with Docker Compose\\nIf you prefer to test this setup using Docker Compose, you can use the following docker-compose.yml file to quickly set up an OpenSearch instance along with Fluent Bit:\\n\\nUnder the `getting-started` section you can examine a live docker-compose sample:\\n```yaml\\n/csv_file/getting-started/fluent-bit\\n|-- docker-complete.yml\\n|-- data/\\n |-- fluent-bit.conf\\n |-- parsers.conf\\n |-- logs.csv\\n\\n```\\nUse the [docker-compose](../getting-started/fluent-bit/docker-complete.yml) you can find a complete:\\n\\n`docker compose -f docker-complete.yml up -d` would instantiate the services and start sending the csv sample logs into an index. \\n\"}}"},"id":"0fad8910-43d9-11ef-a69e-0549ba61487e","migrationVersion":{"visualization":"7.10.0"},"references":[],"type":"visualization","updated_at":"2024-07-17T16:55:31.713Z","version":"WzEsMV0="}
{"attributes":{"description":"upload a csv file example using fluent-bit agent","hits":0,"kibanaSavedObjectMeta":{"searchSourceJSON":"{\"query\":{\"language\":\"kuery\",\"query\":\"\"},\"filter\":[]}"},"optionsJSON":"{\"hidePanelTitles\":false,\"useMargins\":true}","panelsJSON":"[{\"version\":\"2.15.0\",\"gridData\":{\"x\":0,\"y\":0,\"w\":24,\"h\":15,\"i\":\"22a1a11f-7ecf-46c7-a73d-b6cb5eb07b45\"},\"panelIndex\":\"22a1a11f-7ecf-46c7-a73d-b6cb5eb07b45\",\"embeddableConfig\":{},\"panelRefName\":\"panel_0\"}]","timeRestore":false,"title":"csv-file-upload-fluent-bit-dashboard","version":1},"id":"1e4f1c40-43d9-11ef-a69e-0549ba61487e","migrationVersion":{"dashboard":"7.9.3"},"references":[{"id":"0fad8910-43d9-11ef-a69e-0549ba61487e","name":"panel_0","type":"visualization"}],"type":"dashboard","updated_at":"2024-07-17T16:55:31.713Z","version":"WzIsMV0="}
{"attributes":{"fields":"[{\"count\":0,\"name\":\"@timestamp\",\"type\":\"date\",\"esTypes\":[\"date\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true},{\"count\":0,\"name\":\"_id\",\"type\":\"string\",\"esTypes\":[\"_id\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":false},{\"count\":0,\"name\":\"_index\",\"type\":\"string\",\"esTypes\":[\"_index\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":false},{\"count\":0,\"name\":\"_score\",\"type\":\"number\",\"scripted\":false,\"searchable\":false,\"aggregatable\":false,\"readFromDocValues\":false},{\"count\":0,\"name\":\"_source\",\"type\":\"_source\",\"esTypes\":[\"_source\"],\"scripted\":false,\"searchable\":false,\"aggregatable\":false,\"readFromDocValues\":false},{\"count\":0,\"name\":\"_type\",\"type\":\"string\",\"scripted\":false,\"searchable\":false,\"aggregatable\":false,\"readFromDocValues\":false},{\"count\":0,\"name\":\"application\",\"type\":\"string\",\"esTypes\":[\"text\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":false,\"readFromDocValues\":false},{\"count\":0,\"name\":\"application.keyword\",\"type\":\"string\",\"esTypes\":[\"keyword\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true,\"subType\":{\"multi\":{\"parent\":\"application\"}}},{\"count\":0,\"name\":\"host\",\"type\":\"string\",\"esTypes\":[\"text\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":false,\"readFromDocValues\":false},{\"count\":0,\"name\":\"host.keyword\",\"type\":\"string\",\"esTypes\":[\"keyword\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true,\"subType\":{\"multi\":{\"parent\":\"host\"}}},{\"count\":0,\"name\":\"log_level\",\"type\":\"string\",\"esTypes\":[\"text\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":false,\"readFromDocValues\":false},{\"count\":0,\"name\":\"log_level.keyword\",\"type\":\"string\",\"esTypes\":[\"keyword\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true,\"subType\":{\"multi\":{\"parent\":\"log_level\"}}},{\"count\":0,\"name\":\"message\",\"type\":\"string\",\"esTypes\":[\"text\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":false,\"readFromDocValues\":false},{\"count\":0,\"name\":\"message.keyword\",\"type\":\"string\",\"esTypes\":[\"keyword\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true,\"subType\":{\"multi\":{\"parent\":\"message\"}}},{\"count\":0,\"name\":\"timestamp\",\"type\":\"string\",\"esTypes\":[\"text\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":false,\"readFromDocValues\":false},{\"count\":0,\"name\":\"timestamp.keyword\",\"type\":\"string\",\"esTypes\":[\"keyword\"],\"scripted\":false,\"searchable\":true,\"aggregatable\":true,\"readFromDocValues\":true,\"subType\":{\"multi\":{\"parent\":\"timestamp\"}}}]","timeFieldName":"@timestamp","title":"logs-index"},"id":"csv-getting-started-tutorial-1.0.0","migrationVersion":{"index-pattern":"7.6.0"},"references":[],"type":"index-pattern","updated_at":"2024-07-17T16:59:06.006Z","version":"WzMsMV0="}
{"exportedCount":3,"missingRefCount":0,"missingReferences":[]}
46 changes: 46 additions & 0 deletions integrations/observability/csv_file/csv_file-1.0.0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"name": "csv",
"version": "1.0.0",
"displayName": "csv-file",
"description": "Upload a CSV file",
"license": "Apache-2.0",
"type": "logs",
"labels": ["Logs", "Unstructured"],
"author": "OpenSearch",
"sourceUrl": "https://github.com/opensearch-project/opensearch-catalog/tree/main/integrations/observability/csv_file",
"workflows": [
{
"name": "dashboards",
"label": "Dashboards & Visualizations",
"description": "Dashboards and indices that enable you to easily visualize important metrics.",
"enabled_by_default": false
}
],
"statics": {
"logo": {
"annotation": "CSV Logo",
"path": "logo.svg"
},
"gallery": [
{
"annotation": "Fluent-Bit getting started tutorial",
"path": "fluent-bit-getting-started-dashboard.png"
},
{
"annotation": "Data-Prepper Logo",
"path": "data-pepper.png"
},
{
"annotation": "Fluent-Bit Logo",
"path": "fluentbit.png"
}
]
},
"components": [],
"assets": [
{ "name": "fluent-bit-csv-upload", "version": "1.0.0", "extension": "ndjson", "type": "savedObjectBundle", "workflows": ["dashboards"] }
],
"sampleData": {
"path": "logs.csv"
}
}
10 changes: 10 additions & 0 deletions integrations/observability/csv_file/data/logs.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
2024-07-16 12:00:00,INFO,Application started successfully,App1,host1
2024-07-16 12:01:00,DEBUG,User logged in,App1,host1
2024-07-16 12:01:05,ERROR,Failed to load resource,App1,host1
2024-07-16 12:02:00,WARN,Deprecated API used,App1,host1
2024-07-16 12:03:00,INFO,Background job executed,App1,host1
2024-07-16 12:04:00,DEBUG,Cache cleared,App1,host1
2024-07-16 12:05:00,INFO,User logged out,App1,host1
2024-07-16 12:06:00,ERROR,Database connection failed,App1,host1
2024-07-16 12:07:00,INFO,Application shutdown initiated,App1,host1
2024-07-16 12:08:00,INFO,Application shutdown completed,App1,host1
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# Uploading a CSV File into an OpenSearch Index Using Fluent Bit

This tutorial will guide you through the process of setting up Fluent Bit to monitor a directory for CSV files and upload their contents into an OpenSearch index.

## Prerequisites

- An OpenSearch instance running and accessible.
- Fluent Bit installed on your system.
- A directory containing your CSV files.

## Step 1: Install Fluent Bit

### On Linux:

```bash
curl -L https://fluentbit.io/releases/1.8/fluent-bit-1.8.11-linux-x86_64.tar.gz -o fluent-bit.tar.gz
tar -xvf fluent-bit.tar.gz
cd fluent-bit/bin
```

### On macOS:

```bash
brew install fluent-bit
```

### On Windows:

Download and extract Fluent Bit from [Fluent Bit releases](https://fluentbit.io/download/).

## Step 2: Create Fluent Bit Configuration Files

#### Create `fluent-bit.conf`

This is the main configuration file for Fluent Bit. It defines the input source, parser, and output destination.

```ini
[SERVICE]
Flush 1
Log_Level info
Parsers_File parsers.conf

[INPUT]
Name tail
Path /path/to/your/csv/files/*.csv
Parser csv
Tag csv
Refresh_Interval 5
Rotate_Wait 30

[OUTPUT]
Name opensearch
Match *
Host your-opensearch-host
Port 9200
Index csv-index
HTTP_User your-username
HTTP_Passwd your-password
tls off
Suppress_Type_Name On
tls.verify off
```

### Create `parsers.conf`

This file defines the CSV parser.

```ini
[PARSER]
Name csv
Format regex
Regex ^(?<timestamp>[^,]+),(?<log_level>[^,]+),(?<message>[^,]+),(?<application>[^,]+),(?<host>[^,]+)$
Time_Key timestamp
Time_Format %Y-%m-%d %H:%M:%S
```

### Direct the CSV folder location

Ensure this file is in the directory you specified in the `Path` of the `fluent-bit.conf` file.


## Step 3: Run Fluent Bit

Navigate to the directory containing the Fluent Bit executable and the configuration files. Then, start Fluent Bit with the configuration files.

```bash
/path/to/fluent-bit/bin/fluent-bit -c /path/to/fluent-bit.conf
```

## Step 4: Verify Data in OpenSearch

After starting Fluent Bit, you can verify the data ingestion by accessing OpenSearch and searching for the `csv-index` index.

For example, you can use OpenSearch Dashboards or the OpenSearch API to query the index:

### Using OpenSearch Dashboards:

1. Open OpenSearch Dashboards in your browser.
2. Navigate to the "Discover" tab.
3. Select the `csv-index` index pattern.
4. Verify that the log data from your CSV files is being ingested and displayed.

### Using the OpenSearch API:

```bash
curl -X GET "http://your-opensearch-host:9200/csv-index/_search?pretty"
```

---
## Live Testing with Docker Compose
If you prefer to test this setup using Docker Compose, you can use the following docker-compose.yml file to quickly set up an OpenSearch instance along with Fluent Bit:

Under the `getting-started` section you can examine a live docker-compose sample:
```yaml
/csv_file/getting-started/fluent-bit
|-- docker-complete.yml
|-- data/
|-- fluent-bit.conf
|-- parsers.conf
|-- logs.csv

```
Use the [docker-compose](fluent-bit/docker-complete.yml) you can find a complete:

`docker compose -f docker-complete.yml up -d` would instantiate the services and start sending the csv sample logs into an index.

---
# Data-Prepper CSV Processor Tutorial

The `csv` processor parses comma-separated values (CSVs) from the event into columns.

## Configuration Options

- **source** (String): The field in the event to be parsed. Default is `message`.
- **quote_character** (String): The text qualifier for a single column. Default is `"`.
- **delimiter** (String): The character separating each column. Default is `,`.
- **delete_header** (Boolean): Deletes the event header after parsing. Default is true.
- **column_names_source_key** (String): Specifies the CSV column names.
- **column_names** (List): User-specified column names.

## Usage Examples

### User-specified Column Names

```yaml
csv-pipeline:
source:
file:
path: "/full/path/to/ingest.csv"
record_type: "event"
processor:
- csv:
column_names: ["col1", "col2"]
sink:
- stdout:
```

### Automatically Detect Column Names

```yaml
csv-s3-pipeline:
source:
s3:
notification_type: "sqs"
codec:
newline:
skip_lines: 1
header_destination: "header"
compression: none
sqs:
queue_url: "https://sqs.<region>.amazonaws.com/<account id>/<queue name>"
aws:
region: "<region>"
processor:
- csv:
column_names_source_key: "header"
sink:
- stdout:
```

## Metrics

- **recordsIn**: Ingress records count.
- **recordsOut**: Egress records count.
- **timeElapsed**: Execution time.
- **csvInvalidEvents**: Count of invalid events.

For more details, visit the [CSV Processor Documentation](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/csv/).
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

csv-pipeline:
source:
file:
path: "/full/path/to/ingest.csv"
record_type: "event"
processor:
- csv:
column_names: ["col1", "col2", "col3"]
sink:
- opensearch:
hosts: ["https://opensearch-node1:9200"]
username: "admin"
password: "my_%New%_passW0rd!@#"
insecure: true
index_type: custom
index: logs-index
bulk_size: 4
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Nginx Proxy
NGINX_PORT=90
NGINX_ADDR=nginx:${NGINX_PORT}

# OpenSearch version
OPENSEARCH_VERSION=2.15.0
OPENSEARCH_ADMIN_PASSWORD=my_%New%_passW0rd!@#
OPENSEARCH_INITIAL_ADMIN_PASSWORD=my_%New%_passW0rd!@#

# OpenSearch Node1
OPENSEARCH_PORT=9200
OPENSEARCH_HOST=opensearch
OPENSEARCH_ADDR=${OPENSEARCH_HOST}:${OPENSEARCH_PORT}

# OpenSearch Dashboard
OPENSEARCH_DASHBOARD_PORT=5601
OPENSEARCH_DASHBOARD_HOST=opensearch-dashboards
OPENSEARCH_DASHBOARD_ADDR=${OPENSEARCH_DASHBOARD_HOST}:${OPENSEARCH_DASHBOARD_PORT}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[SERVICE]
Flush 1
Log_Level info
Parsers_File parsers.conf

[INPUT]
Name tail
Path /fluent-bit/data/*.csv
Parser csv
Tag csv

[INPUT]
Name dummy
Dummy {"timestamp":"2024-07-16 12:09:00", "log_level":"INFO", "message":"Dummy log message", "application":"App2", "host":"host2"}
Tag dummy

[OUTPUT]
Name opensearch
Host opensearch-node1
Match *
Port 9200
Type _doc
Index logs-index
tls On
tls.verify Off
Suppress_Type_Name On
HTTP_User admin
HTTP_Passwd my_%New%_passW0rd!@#
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
2024-07-16 12:00:00,INFO,Application started successfully,App1,host1
2024-07-16 12:01:00,DEBUG,User logged in,App1,host1
2024-07-16 12:01:05,ERROR,Failed to load resource,App1,host1
2024-07-16 12:02:00,WARN,Deprecated API used,App1,host1
2024-07-16 12:03:00,INFO,Background job executed,App1,host1
2024-07-16 12:04:00,DEBUG,Cache cleared,App1,host1
2024-07-16 12:05:00,INFO,User logged out,App1,host1
2024-07-16 12:06:00,ERROR,Database connection failed,App1,host1
2024-07-16 12:07:00,INFO,Application shutdown initiated,App1,host1
2024-07-16 12:08:00,INFO,Application shutdown completed,App1,host1
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[PARSER]
Name csv
Format regex
Regex ^(?<timestamp>[^,]+),(?<log_level>[^,]+),(?<message>[^,]+),(?<application>[^,]+),(?<host>[^,]+)$
Time_Key timestamp
Time_Format %Y-%m-%d %H:%M:%S
Loading