Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/http sink #67

Merged
merged 8 commits into from
Oct 1, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pg-test:
go test -v -race $(shell go list ./... | grep -v 'testsupport' | grep 'tests' | grep -v 'tests/integration') -timeout 40m

.PHONY: integration-test
integration-test: integration-test-aws-kinesis integration-test-aws-sqs integration-test-kafka integration-test-nats integration-test-redis integration-test-redpanda
integration-test: integration-test-aws-kinesis integration-test-aws-sqs integration-test-kafka integration-test-nats integration-test-redis integration-test-redpanda integration-test-http

.PHONY: integration-test-aws-kinesis-test
integration-test-aws-kinesis:
Expand All @@ -90,5 +90,9 @@ integration-test-redis:
integration-test-redpanda:
go test -v -race $(shell go list ./... | grep 'tests/integration/redpanda') -timeout 10m

.PHONY: integration-test-http
integration-test-http:
go test -v -race $(shell go list ./... | grep 'tests/integration/http') -timeout 10m

.PHONY: all
all: build test fmt lint
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ duplicated (`test.some_value` becomes `TEST_SOME__VALUE`).

| Property | Description | Data Type | Default Value |
|-----------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------:|--------------------------:|--------------:|
| `sink.type` | The property defines which sink adapter is to be used. Valid values are `stdout`, `nats`, `kafka`, `redis`. | string | `stdout` |
| `sink.type` | The property defines which sink adapter is to be used. Valid values are `stdout`, `nats`, `kafka`, `redis`, `http`. | string | `stdout` |
| `sink.tombstone` | The property defines if delete events will be followed up with a tombstone event. | boolean | false |
| `sink.filters.<name>.<...>` | The filters definition defines filters to be executed against potentially replicated events. This property is a map with the filter name as its key and a [Sink Filter](#sink-filter-configuration). | map of filter definitions | empty map |

Expand Down Expand Up @@ -414,6 +414,23 @@ transaction id (if available), and content of the message.
| `sink.sqs.queue.url` | The URL of the FIFO queue in SQS. | string | empty string |
| `sink.sqs.aws.<...>` | AWS specific content as defined in [AWS service configuration](#aws-service-configuration). | struct | empty struct |

### HTTP Sink Configuration

HTTP specific configuration, which is only used if `sink.type` is set to `http`. This Sink is an HTTP client that `POST`s the events as the payload.

| Property | Description | Data Type | Default Value |
|-------------------------------------------|---------------------------------------------------------------------------------------------------------------:|----------:|----------------------:|
| `sink.http.url` | The url where the requests are sent. You have to include the protocol scheme (`http`/`https`) too. | string | `http://localhost:80` |
| `sink.http.authentication.type` | Type of authentication to use when making the request. Valid values are `none`, `basic`, `header`. | string | none |
| `sink.http.authentication.basic.username` | If the authentication type is set to `basic` then this is the username used when making the request. | string | empty string |
| `sink.http.authentication.basic.password` | Maximum number of socket connections. | int | empty string |
| `sink.http.authentication.header.name` | Maximum number of retries before giving up. | int | empty string |
| `sink.http.authentication.header.value` | Minimum backoff between each retry in milliseconds. A value of `-1` disables backoff. | int | empty string |
| `sink.http.tls.enabled` | The property defines if TLS is enabled. | bool | false |
| `sink.http.tls.skipverify` | The property defines if verification of TLS certificates is skipped. | bool | false |
| `sink.http.tls.clientauth` | The property defines the client auth value (as defined in [Go](https://pkg.go.dev/crypto/tls#ClientAuthType)). | int | 0 (NoClientCert) |


### AWS Service Configuration

This configuration is the basic configuration for AWS, including the region,
Expand Down
7 changes: 7 additions & 0 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ sink.type = 'stdout'
#sink.sqs.aws.secretaccesskey = '...'
#sink.sqs.aws.sessiontoken = '...'

#sink.http.url = 'http://localhost:8080'
#sink.http.authentication.type = 'basic'
#sink.http.authentication.basic.username = 'test'
#sink.http.authentication.basic.password = '...'
#sink.http.authentication.header.name = 'x-api-key'
#sink.http.authentication.header.value = '...'

topic.namingstrategy.type = 'debezium'
topic.prefix = 'timescaledb'

Expand Down
11 changes: 11 additions & 0 deletions config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ sink:
#endpoint: '...'
#accessKeyId: '...'
#secretAccessKey: '...'
#type: 'http'
#http:
# url: "http://localhost:8080"
# authentication:
# type: header
# basic:
# username: test
# password: ...
# header:
# name: "x-api-key"
# value: "..."
#sessionToken: '...'

topic:
Expand Down
131 changes: 131 additions & 0 deletions internal/eventing/sink/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package http

import (
"bytes"
"crypto/tls"
"encoding/base64"
"fmt"
sinkimpl "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink"
config "github.com/noctarius/timescaledb-event-streamer/spi/config"
"github.com/noctarius/timescaledb-event-streamer/spi/encoding"
"github.com/noctarius/timescaledb-event-streamer/spi/schema"
"github.com/noctarius/timescaledb-event-streamer/spi/sink"
"net/http"
"time"
)

func init() {
sinkimpl.RegisterSink(config.Http, newHttpSink)
}

func basicAuth(
username, password string,
) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}

type httpSink struct {
client *http.Client
encoder *encoding.JsonEncoder
address *string
headers *http.Header
}

func newHttpSink(
c *config.Config,
) (sink.Sink, error) {

transport := http.DefaultTransport.(*http.Transport).Clone()
if config.GetOrDefault(c, config.PropertyHttpTlsEnabled, false) {
transport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: config.GetOrDefault(
c, config.PropertyHttpTlsSkipVerify, false,
),
ClientAuth: config.GetOrDefault(
c, config.PropertyHttpTlsClientAuth, tls.NoClientCert,
),
}
}

address := config.GetOrDefault(c, config.PropertyHttpUrl, "http://localhost:80")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have a separate transport, wouldn't it make sense to remove the protocol part here? Alternatively instead of using the TLS enabled prop, you could just check the protocol and if it's HTTPS go and assume TLS enabled (and just read the remaining properties).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely right, I was just hacking it together so the hard-coded one stayed in. I've pushed the commit where we infer the usage of TLS by the prefix of the url and if it's present then take the TLS settings into account.

headers := make(http.Header)

authenticationType := config.GetOrDefault(c, config.PropertyHttpAuthenticationType, "none")
switch config.HttpAuthenticationType(authenticationType) {
case config.BasicAuthentication:
{
headers.Add("Authorization",
fmt.Sprintf("Basic %s",
basicAuth(config.GetOrDefault(c, config.PropertyHttpBasicAuthenticationUsername, ""),
config.GetOrDefault(c, config.PropertyHttpBasicAuthenticationPassword, ""),
),
),
)
}
case config.HeaderAuthentication:
{
headers.Add(config.GetOrDefault(c, config.PropertyHttpHeaderAuthenticationHeaderName, ""),
config.GetOrDefault(c, config.PropertyHttpHeaderAuthenticationHeaderValue, ""),
)
}
case config.NoneAuthentication:
{
}
default:
{
return nil, fmt.Errorf("http AuthenticationType '%s' doesn't exist", authenticationType)
}
}

return &httpSink{
client: &http.Client{Transport: transport},
encoder: encoding.NewJsonEncoderWithConfig(c),
address: &address,
headers: &headers,
}, nil
}

func (h *httpSink) Start() error {
return nil
}

func (h *httpSink) Stop() error {
h.client.CloseIdleConnections()
return nil
}

func (h *httpSink) Emit(
_ sink.Context, _ time.Time, topicName string, key, envelope schema.Struct,
) error {
delete(envelope, "schema")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there should be a general property to disable schema sending. I think right now it'd be the only sink not including it. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this mainly depends on where you'd like to take the project. Would you like to have everything or almost everything as a configurable parameter?
If so then yeah it would make sense to be able to turn on/off the sending of the schema from the config file.

I can also see a world where the package always emits everything and it's up to the receiver to discard whatever is not needed or if one would like to avoid unnecessary traffic then one could transform the payload with a plugin before emitting.

I think you probably have better things to work on in this project then having to make schema.Struct be optional throughout all the sinks just because this one sink and the stdout one does not send it.

I have to be honest and tell you that I've started the http sink by copying the stdout one and that one starts the emitting process by deleting the schema.

For now I think it'd be best if all the sinks behaved the same way (aside from maybe the stdout one, see above), not just because of consistency, but because if you decide to make the schema sending optional you'll have an easier time refactoring and not having to wade through code just to find out if a particular sink decides to omit schema by default.

I've pushed a commit that adds back in the schema for this sink as well. I'll use a plugin to transform the payload before sending to the sink anyway, so I'll just omit it there.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree. I think it could be interesting since the schema creates quite some overhead and traffic. I also think it may be better to provide the option to create a separate stream of schema data, which could be routed to a schema registry, but I never used one before so I wouldn't have enough insight into it.

I think for now we can just keep it in and make it up to the receiver to discard it. For traffic reasons it may be interesting to eventually provide the option. Tbh, even stdout should support it, since the idea (next to testing) was to pipe the output somewhere else. The reason why you can send any log message to stderr 😁

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[...] since the schema creates quite some overhead and traffic. [...] For traffic reasons it may be interesting to eventually provide the option

Yeah, this is probably the biggest argument for making it optional.

I think for now we can just keep it in and make it up to the receiver to discard it.

What are your thoughts on discarding it with a Plugin before sending it? We're streaming anywhere between 10k - 100k per second with timescaledb-event-streamer although it's on a private network I can see how one would like to exclude the schema from being sent to save on ingress/egress/networking cost, i.e.: like with a cloud provider.

payload, err := h.encoder.Marshal(envelope)
if err != nil {
return err
}
req, err := http.NewRequest("POST", *h.address, bytes.NewBuffer(payload))
if err != nil {
return err
}

req.Header = *h.headers
_, err = h.client.Do(req)
return err
}
1 change: 1 addition & 0 deletions internal/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
// Register built-in sinks
_ "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink/awskinesis"
_ "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink/awssqs"
_ "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink/http"
_ "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink/kafka"
_ "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink/nats"
_ "github.com/noctarius/timescaledb-event-streamer/internal/eventing/sink/redis"
Expand Down
32 changes: 32 additions & 0 deletions spi/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
Redis SinkType = "redis"
AwsKinesis SinkType = "kinesis"
AwsSQS SinkType = "sqs"
Http SinkType = "http"
)

type NamingStrategyType string
Expand Down Expand Up @@ -139,6 +140,7 @@ type SinkConfig struct {
Redis RedisConfig `toml:"redis" yaml:"redis"`
AwsKinesis AwsKinesisConfig `toml:"kinesis" yaml:"kinesis"`
AwsSqs AwsSqsConfig `toml:"sqs" yaml:"sqs"`
Http HttpConfig `toml:"http" yaml:"http"`
}

type EventFilterConfig struct {
Expand Down Expand Up @@ -287,6 +289,36 @@ type AwsConnectionConfig struct {
SessionToken string `toml:"sessiontoken" yaml:"sessionToken"`
}

type HttpConfig struct {
Url string `toml:"url" yaml:"url"`
Authentication HttpAuthenticationConfig `toml:"authentication" yaml:"authentication"`
TLS TLSConfig `toml:"tls" yaml:"tls"`
}

type HttpAuthenticationConfig struct {
Type HttpAuthenticationType `toml:"type" yaml:"type"`
Basic HttpBasicAuthenticationConfig `toml:"basic" yaml:"basic"`
Header HttpHeaderAuthenticationConfig `toml:"header" yaml:"header"`
}

type HttpBasicAuthenticationConfig struct {
Username string `toml:"username" yaml:"username"`
Password string `toml:"password" yaml:"password"`
}

type HttpHeaderAuthenticationConfig struct {
Name string `toml:"name" yaml:"name"`
Value string `toml:"value" yaml:"value"`
}

type HttpAuthenticationType string

const (
NoneAuthentication HttpAuthenticationType = "none"
BasicAuthentication HttpAuthenticationType = "basic"
HeaderAuthentication HttpAuthenticationType = "header"
)

type Config struct {
PostgreSQL PostgreSQLConfig `toml:"postgresql" yaml:"postgresql"`
Sink SinkConfig `toml:"sink" yaml:"sink"`
Expand Down
10 changes: 10 additions & 0 deletions spi/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,14 @@ const (
PropertySqsAwsAccessKeyId = "sink.sqs.aws.accesskeyid"
PropertySqsAwsSecretAccessKey = "sink.sqs.aws.secretaccesskey"
PropertySqsAwsSessionToken = "sink.sqs.aws.sessiontoken"

PropertyHttpUrl = "sink.http.url"
PropertyHttpAuthenticationType = "sink.http.authentication.type"
PropertyHttpBasicAuthenticationUsername = "sink.http.authentication.basic.username"
PropertyHttpBasicAuthenticationPassword = "sink.http.authentication.basic.password"
PropertyHttpHeaderAuthenticationHeaderName = "sink.http.authentication.header.name"
PropertyHttpHeaderAuthenticationHeaderValue = "sink.http.authentication.header.value"
PropertyHttpTlsEnabled = "sink.http.tls.enabled"
PropertyHttpTlsSkipVerify = "sink.http.tls.skipverify"
PropertyHttpTlsClientAuth = "sink.http.tls.clientauth"
)
Loading