Skip to content


Folders and files

Last commit message
Last commit date

Latest commit

4bcaad9 · Oct 9, 2019
Sep 25, 2019
Sep 2, 2019
May 31, 2018
Sep 25, 2019
Sep 26, 2018
Sep 25, 2019
Jul 25, 2018
Jul 25, 2018
Jul 15, 2019
Oct 9, 2019
Jul 15, 2019
Jul 15, 2019
Jul 25, 2018
Sep 23, 2019
Mar 13, 2018
Mar 5, 2018
Sep 25, 2019
Sep 25, 2019
Oct 9, 2019

Repository files navigation

Barito Flow

Build Status

Component for handling flow of logs within a cluster. Support 2 modes:

  • Producer, for receiving logs and forwarding it to Kafka
  • Consumer, for consuming from kafka and forwarding it to Elasticsearch

Compatible with both gRPC and REST API. The use of REST API is optional and is implemented by using gRPC-gateway which work like a reverse-proxy server to translate RESTful HTTP API into gRPC. Barito flow infrastructure consists of producer and consumer.

Barito flow producer will turn on gRPC server and optionally REST gateway reverse proxy server. It will automatically create Kafka topic for the log if not exist yet. gRPC messages and services are declared in barito-proto repository.

Barito flow consumer will firstly create a topic event and generates the workers. Then based on the logs send, each topic inside this event topic will be created a cluster consumer separately. This cluster consumer will store the logs to Elasticsearch by calling a single store or bulk store Elasticsearch API. If the process has failed, Elasticsearch will halt all the workers and retry again after some backoff period. The halted workers only continue when the failed process success on the retry attempt.

Development Setup

Fetch and build the project.

git clone
cd barito-flow
go build

Generate mock classes.

mockgen -source=flow/leaky_bucket.go -destination=mock/leaky_bucket.go -package=mock
mockgen -source=flow/kafka_admin.go -destination=mock/kafka_admin.go -package=mock
mockgen -source=flow/Vendor/ -destination=mock/sync_producer.go -package=mock

Running Test Stack using Docker Compose

First, you need to install Docker on your local machine. Then you can run docker-compose:

$ docker-compose -f docker/docker-compose.yml up -d

This will pull Elasticsearch, Kafka, and build producer and consumer image. The ports are mapped as if they are running on local machine.

Producer Mode

Responsible for:

  • Receive logs by exposing an HTTP endpoint
  • Produce message to kafka cluster

After the project is built, run:

./barito-flow producer

# or
./barito-flow p

Endpoints using REST gateway:

POST /produce

    "context": {
        "kafka_topic": "kafka_topic",
        "kafka_partition": 1,
        "kafka_replication_factor": 1,
        "es_index_prefix": "es_index_prefix",
        "es_document_type": "es_document_type",
        "app_max_tps": 100,
        "app_secret": "app_secret"
    "timestamp": "optional timestamp here",
    "content": {
        "hello": "world",
        "key": "value",
        "num": 100

POST /produce_batch

    "context": {
        "kafka_topic": "kafka_topic",
        "kafka_partition": 1,
        "kafka_replication_factor": 1,
        "es_index_prefix": "es_index_prefix",
        "es_document_type": "es_document_type",
        "app_max_tps": 100,
        "app_secret": "app_secret"
    "items": [
            "content": {
                "timber_num": 1
            "content": {
                "timber_num": 2

These environment variables can be modified to customize its behaviour.

Name Description ENV Default Value
ConsulKafkaName Kafka service name in consul BARITO_CONSUL_KAFKA_NAME kafka
KafkaBrokers Kafka broker addresses (CSV). Get from env is not available in consul BARITO_KAFKA_BROKERS localhost:9092
KafkaMaxRetry Number of retry to connect to kafka during startup BARITO_KAFKA_MAX_RETRY 0 (unlimited)
KafkaRetryInterval Interval between retry connecting to kafka (in seconds) BARITO_KAFKA_RETRY_INTERVAL 10
ServeRestApi Toggle for REST gateway api BARITO_PRODUCER_REST_API true
ProducerAddressGrpc gRPC Server Address BARITO_PRODUCER_GRPC :8082
ProducerAddressRest REST Server Address BARITO_PRODUCER_REST :8080
ProducerMaxRetry Set kafka setting max retry BARITO_PRODUCER_MAX_RETRY 10
ProducerMaxTps Producer rate limit trx per second BARITO_PRODUCER_MAX_TPS 100
ProducerRateLimitResetInterval Producer rate limit reset interval (in seconds) BARITO_PRODUCER_RATE_LIMIT_RESET_INTERVAL 10

Consumer Mode

Responsible for:

  • Consume logs from kafka
  • Commit logs to elasticsearch

After the project is built, run:

./barito-flow Consumer

# or
./barito-flow c

These environment variables can be modified to customize its behaviour.

Name Description ENV Default Value
ConsulKafkaName Kafka service name in consul BARITO_CONSUL_KAFKA_NAME kafka
ConsulElasticsearchName Elasticsearch service name in consul BARITO_CONSUL_ELASTICSEARCH_NAME elasticsearch
KafkaBrokers Kafka broker addresses (CSV). Get from env is not available in consul BARITO_KAFKA_BROKERS ","
KafkaGroupID kafka consumer group id BARITO_KAFKA_GROUP_ID barito-group
KafkaMaxRetry Number of retry to connect to kafka during startup BARITO_KAFKA_MAX_RETRY 0 (unlimited)
KafkaRetryInterval Interval between retry connecting to kafka (in seconds) BARITO_KAFKA_RETRY_INTERVAL 10
ElasticsearchUrls Elastisearch addresses. Get from env is not available in consul BARITO_ELASTICSEARCH_URLS ","
EsIndexMethod BulkProcessor / SingleInsert BARITO_ELASTICSEARCH_INDEX_METHOD BulkProcessor
EsBulkSize BulkProcessor bulk size BARITO_ELASTICSEARCH_BULK_SIZE 100
EsFlushIntervalMs BulkProcessor flush interval (ms) BARITO_ELASTICSEARCH_FLUSH_INTERVAL_MS 500
PrintTPS print estimated consumed every second BARITO_PRINT_TPS false
PushMetricUrl push metric api url BARITO_PUSH_METRIC_URL
PushMetricInterval push metric interval BARITO_PUSH_METRIC_INTERVAL 30s

These following variables will be ignored if BARITO_ELASTICSEARCH_INDEX_METHOD is set to SingleInsert







MIT License, See LICENSE.