Skip to content

rewardStyle/kinetic

Repository files navigation

GoDoc Circle CI

kinetic

Kinetic is an MIT-licensed high-performance AWS Kinesis Client for Go

Kinetic wraps sendgridlabs go-kinesis library to provide maximum throughput for AWS Kinesis producers and consumers. An instance of a Kinetic listener/producer is meant to be used for each shard, so please use it accordingly. If you use more than one instance per-shard then you will hit the AWS Kinesis throughput limits.

Getting Started

Before using kinetic, you should make sure you have a created a Kinesis stream and your configuration file has the credentails necessary to read and write to the stream. Once this stream exists in AWS, kinetic will ensure it is in the "ACTIVE" state before running.

Testing

Tests are written using goconvey and kinesalite. Make sure you have kinesalite running locally before attempting to run the tests. They can be run either via the comamnd line:

$ go test -v -cover -race

or via web interface:

$ goconvey

Running

Kinetic can be used to interface with kinesis like so:

import "github.com/rewardStyle/kinetic"

// Use configuration in /etc/kinetic.conf
listener, _ := new(kinetic.Listener).Init()

// Use custom configuration
producer, _ := new(kinetic.Producer).InitC("your-stream", "0", "shard-type", "accesskey", "secretkey", "region", 10)

producer.Send(new(kinetic.Message).Init([]byte(`{"foo":"bar"}`), "test"))

// Using Retrieve
msg, err := listener.Retrieve()
if err != nil {
    println(err)
}

println(string(msg))

// Using Listen - will block unless sent in goroutine
go listener.Listen(func(msg []byte, wg *sync.WaitGroup) {
    println(string(msg))
    wg.Done()
})

producer.Send(new(KinesisMessage).Init([]byte(`{"foo":"bar"}`), "test"))

listener.Close()
producer.Close()

// Or with Kinesis Firehose
firehose, err := new(kinetic.Producer).Firehose()
if err != nil {
    println(err)
}

firehose.Send(new(KinesisMessage).Init([]byte(`{"foo":"bar"}`), "test"))

firehose.Close()

For more examples take a look at the tests. API documentation can be found here.