Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/http sink #67

Merged
merged 8 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pg-test:
go test -v -race $(shell go list ./... | grep -v 'testsupport' | grep 'tests' | grep -v 'tests/integration') -timeout 40m

.PHONY: integration-test
integration-test: integration-test-aws-kinesis integration-test-aws-sqs integration-test-kafka integration-test-nats integration-test-redis integration-test-redpanda
integration-test: integration-test-aws-kinesis integration-test-aws-sqs integration-test-kafka integration-test-nats integration-test-redis integration-test-redpanda integration-test-http

.PHONY: integration-test-aws-kinesis-test
integration-test-aws-kinesis:
Expand All @@ -90,5 +90,9 @@ integration-test-redis:
integration-test-redpanda:
go test -v -race $(shell go list ./... | grep 'tests/integration/redpanda') -timeout 10m

.PHONY: integration-test-http
integration-test-http:
go test -v -race $(shell go list ./... | grep 'tests/integration/http') -timeout 10m

.PHONY: all
all: build test fmt lint
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ duplicated (`test.some_value` becomes `TEST_SOME__VALUE`).

| Property | Description | Data Type | Default Value |
|-----------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|--------------------------:|--------------:|
| `sink.type` | The property defines which sink adapter is to be used. Valid values are `stdout`, `nats`, `kafka`, `redis`. | string | `stdout` |
| `sink.type` | The property defines which sink adapter is to be used. Valid values are `stdout`, `nats`, `kafka`, `redis`, `http`. | string | `stdout` |
| `sink.tombstone` | The property defines if delete events will be followed up with a tombstone event. | boolean | false |
| `sink.filters.<name>.<...>` | The filters definition defines filters to be executed against potentially replicated events. This property is a map with the filter name as its key and a [Sink Filter](#sink-filter-configuration). | map of filter definitions | empty map |

Expand Down Expand Up @@ -414,6 +414,26 @@ transaction id (if available), and content of the message.
| `sink.sqs.queue.url` | The URL of the FIFO queue in SQS. | string | empty string |
| `sink.sqs.aws.<...>` | AWS specific content as defined in [AWS service configuration](#aws-service-configuration). | struct | empty struct |

### HTTP Sink Configuration

HTTP specific configuration, which is only used if `sink.type` is set to `http`.
This Sink is an HTTP client that `POST`s the events as the payload.
Usage of TLS is inferred automatically from the prefix of the `url`, if `url` has
the `https://` prefix, then the respective TLS settings will be set according to
the properties defined in `sink.http.tls`.

| Property | Description | Data Type | Default Value |
|-------------------------------------------|---------------------------------------------------------------------------------------------------------------:|----------:|----------------------:|
| `sink.http.url` | The url where the requests are sent. You have to include the protocol scheme (`http`/`https`) too. | string | `http://localhost:80` |
| `sink.http.authentication.type` | Type of authentication to use when making the request. Valid values are `none`, `basic`, `header`. | string | none |
| `sink.http.authentication.basic.username` | If the authentication type is set to `basic` then this is the username used when making the request. | string | empty string |
| `sink.http.authentication.basic.password` | Maximum number of socket connections. | int | empty string |
| `sink.http.authentication.header.name` | Maximum number of retries before giving up. | int | empty string |
| `sink.http.authentication.header.value` | Minimum backoff between each retry in milliseconds. A value of `-1` disables backoff. | int | empty string |
| `sink.http.tls.skipverify` | The property defines if verification of TLS certificates is skipped. | bool | false |
| `sink.http.tls.clientauth` | The property defines the client auth value (as defined in [Go](https://pkg.go.dev/crypto/tls#ClientAuthType)). | int | 0 (NoClientCert) |


### AWS Service Configuration

This configuration is the basic configuration for AWS, including the region,
Expand Down
9 changes: 9 additions & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ sink.type = 'stdout'
#sink.sqs.aws.secretaccesskey = '...'
#sink.sqs.aws.sessiontoken = '...'

#sink.http.url = 'http://localhost:8080'
#sink.http.authentication.type = 'basic'
#sink.http.authentication.basic.username = 'test'
#sink.http.authentication.basic.password = '...'
#sink.http.authentication.header.name = 'x-api-key'
#sink.http.authentication.header.value = '...'
#sink.http.tls.skipverify = false
#sink.http.tls.clientauth = 0

topic.namingstrategy.type = 'debezium'
topic.prefix = 'timescaledb'

Expand Down
152 changes: 83 additions & 69 deletions config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,77 +43,91 @@ stateStorage:
# parallelsim: 5

sink:
#filters:
#filterName:
#condition: 'value.op == "u" && value.before.id == 2'
#default: true
# filters:
# filterName:
# condition: 'value.op == "u" && value.before.id == 2'
# default: true
tombstone: false
type: 'stdout'
#type: 'nats'
#nats:
#address: 'nats://localhost:4222'
#authorization: 'userinfo'
#userInfo:
#username: 'publisher'
#password: '...'
#type: 'kafka'
#kafka:
#brokers:
#- 'address:1'
#- 'address:2'
#sasl:
#enabled: true
#user: '$ConnectionString'
#mechanism: 'PLAIN'
#tls:
#enabled: true
#skipVerify: true
#clientAuth: 0
#type: 'redis'
#redis:
#network: 'tcp'
#address: 'localhost:6379'
#password: '...'
#database: 0
#poolSize: 0
#retries:
#maxAttempts: 0
#backoff:
#min: 8
#max: 512
#timeouts:
#dial: 0
#read: 0
#write: 0
#pool: 0
#idle: 0
#tls:
#enabled: false
#skipVerify: false
#clientAuth: 0
#type: 'kinesis'
#kinesis:
#stream:
#name: 'stream_name'
#create: true
#shardCount: 10
#mode: '...'
#aws:
#region: '...'
#endpoint: '...'
#accessKeyId: '...'
#secretAccessKey: '...'
#sessionToken: '...'
#type: 'sqs'
#sqs:
#queue:
#url: 'queue_url'
#aws:
#region: '...'
#endpoint: '...'
#accessKeyId: '...'
#secretAccessKey: '...'
#sessionToken: '...'
# type: 'nats'
# nats:
# address: 'nats://localhost:4222'
# authorization: 'userinfo'
# userInfo:
# username: 'publisher'
# password: '...'
# type: 'kafka'
# kafka:
# brokers:
# - 'address:1'
# - 'address:2'
# sasl:
# enabled: true
# user: '$ConnectionString'
# mechanism: 'PLAIN'
# tls:
# enabled: true
# skipVerify: true
# clientAuth: 0
# type: 'redis'
# redis:
# network: 'tcp'
# address: 'localhost:6379'
# password: '...'
# database: 0
# poolSize: 0
# retries:
# maxAttempts: 0
# backoff:
# min: 8
# max: 512
# timeouts:
# dial: 0
# read: 0
# write: 0
# pool: 0
# idle: 0
# tls:
# enabled: false
# skipVerify: false
# clientAuth: 0
# type: 'kinesis'
# kinesis:
# stream:
# name: 'stream_name'
# create: true
# shardCount: 10
# mode: '...'
# aws:
# region: '...'
# endpoint: '...'
# accessKeyId: '...'
# secretAccessKey: '...'
# sessionToken: '...'
# type: 'sqs'
# sqs:
# queue:
# url: 'queue_url'
# aws:
# region: '...'
# endpoint: '...'
# accessKeyId: '...'
# secretAccessKey: '...'
# sessionToken: '...'
# type: 'http'
# http:
# url: "http://localhost:8080"
# authentication:
# type: header
# basic:
# username: test
# password: ...
# header:
# name: "x-api-key"
# value: "..."
# tls:
# skipVerify: false
# clientAuth: 0

topic:
namingStrategy:
Expand Down
133 changes: 133 additions & 0 deletions internal/eventing/sink/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package http

import (
"bytes"
"crypto/tls"
"encoding/base64"
"fmt"
sinkimpl "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink"
config "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/encoding"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/sink"
"net/http"
"strings"
"time"
)

func init() {
sinkimpl.RegisterSink(config.Http, newHttpSink)
}

func basicAuth(
username, password string,
) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}

type httpSink struct {
client *http.Client
encoder *encoding.JsonEncoder
address *string
headers *http.Header
}

func newHttpSink(
c *config.Config,
) (sink.Sink, error) {

transport := http.DefaultTransport.(*http.Transport).Clone()
address := config.GetOrDefault(c, config.PropertyHttpUrl, "http://localhost:80")
tlsEnabled := strings.HasPrefix(address, "https://")
if tlsEnabled {
transport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: config.GetOrDefault(
c, config.PropertyHttpTlsSkipVerify, false,
),
ClientAuth: config.GetOrDefault(
c, config.PropertyHttpTlsClientAuth, tls.NoClientCert,
),
}
}

headers := make(http.Header)
headers.Add("Content-Type", "application/json")

authenticationType := config.GetOrDefault(c, config.PropertyHttpAuthenticationType, "none")
switch config.HttpAuthenticationType(authenticationType) {
case config.BasicAuthentication:
{
headers.Add("Authorization",
fmt.Sprintf("Basic %s",
basicAuth(config.GetOrDefault(c, config.PropertyHttpBasicAuthenticationUsername, ""),
config.GetOrDefault(c, config.PropertyHttpBasicAuthenticationPassword, ""),
),
),
)
}
case config.HeaderAuthentication:
{
headers.Add(config.GetOrDefault(c, config.PropertyHttpHeaderAuthenticationHeaderName, ""),
config.GetOrDefault(c, config.PropertyHttpHeaderAuthenticationHeaderValue, ""),
)
}
case config.NoneAuthentication:
{
}
default:
{
return nil, fmt.Errorf("http AuthenticationType '%s' doesn't exist", authenticationType)
}
}

return &httpSink{
client: &http.Client{Transport: transport},
encoder: encoding.NewJsonEncoderWithConfig(c),
address: &address,
headers: &headers,
}, nil
}

func (h *httpSink) Start() error {
return nil
}

func (h *httpSink) Stop() error {
h.client.CloseIdleConnections()
return nil
}

func (h *httpSink) Emit(
_ sink.Context, _ time.Time, topicName string, key, envelope schema.Struct,
) error {
payload, err := h.encoder.Marshal(envelope)
if err != nil {
return err
}
req, err := http.NewRequest("POST", *h.address, bytes.NewBuffer(payload))
if err != nil {
return err
}

req.Header = *h.headers
_, err = h.client.Do(req)
return err
}
1 change: 1 addition & 0 deletions internal/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
// Register built-in sinks
_ "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink/awskinesis"
_ "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink/awssqs"
_ "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink/http"
_ "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink/kafka"
_ "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink/nats"
_ "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink/redis"
Expand Down
Loading