Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Add TLS option to HTTP transport" #90

Merged
merged 1 commit into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 13 additions & 33 deletions pkg/protocol/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package http

import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
Expand All @@ -24,6 +23,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
httpClient "github.com/cloudevents/sdk-go/v2/client"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
httpP "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/google/uuid"
"github.com/redhat-cne/sdk-go/pkg/channel"
"github.com/redhat-cne/sdk-go/pkg/localmetrics"
Expand All @@ -42,7 +42,7 @@ var (
// Protocol ...
type Protocol struct {
protocol.Binder
Protocol *cehttp.Protocol
Protocol *httpP.Protocol
}
type ServiceResourcePath string

Expand All @@ -61,7 +61,6 @@ type Server struct {
Publishers []*types.URI
ServiceName string
Port int
TLS *tls.Config
DataIn <-chan *channel.DataChan
DataOut chan<- *channel.DataChan
Client httpClient.Client
Expand All @@ -77,14 +76,13 @@ type Server struct {
}

// InitServer initialize http configurations
func InitServer(serviceName string, port int, tls *tls.Config, storePath string, dataIn <-chan *channel.DataChan,
func InitServer(serviceName string, port int, storePath string, dataIn <-chan *channel.DataChan,
dataOut chan<- *channel.DataChan, closeCh <-chan struct{},
onStatusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error,
processEventFn func(e interface{}) error, options ...bool) (*Server, error) {
processEventFn func(e interface{}) error) (*Server, error) {
server := Server{
Sender: map[uuid.UUID]map[ServiceResourcePath]*Protocol{},
Port: port,
TLS: tls,
DataIn: dataIn,
ServiceName: serviceName,
DataOut: dataOut,
Expand All @@ -94,19 +92,12 @@ func InitServer(serviceName string, port int, tls *tls.Config, storePath string,
subscriberAPI: subscriberApi.GetAPIInstance(storePath),
statusReceiveOverrideFn: onStatusReceiveOverrideFn,
processEventFn: processEventFn,
httpServer: &http.Server{
ReadHeaderTimeout: time.Duration(time.Duration.Seconds(10)),
},
clientID: func(serviceName string) uuid.UUID {
var namespace = uuid.NameSpaceURL
var url = []byte(serviceName)
return uuid.NewMD5(namespace, url)
}(serviceName),
}
// If testing, can't use instance API singletone
if len(options) > 0 && options[0] {
server.subscriberAPI = subscriberApi.GetNewAPIInstance(storePath)
}
log.Infof(" registering publishing http service for client id %s", server.clientID.String())
return &server, nil
}
Expand All @@ -119,13 +110,6 @@ func (h *Server) Start(wg *sync.WaitGroup) error {
log.Errorf("failed to create handler: %s", err.Error())
return err
}
if h.TLS != nil {
h.httpServer.TLSConfig = h.TLS.Clone()
transport := http.Transport{
TLSClientConfig: h.TLS.Clone(),
}
p.Client.Transport = &transport
}
subscriptionHandler, err := cloudevents.NewHTTPReceiveHandler(ctx, p, func(e cloudevents.Event) {
eventType := channel.SUBSCRIBER
status := channel.NEW
Expand Down Expand Up @@ -304,17 +288,13 @@ func (h *Server) Start(wg *sync.WaitGroup) error {
wg.Add(1)
log.Infof("starting publisher/subscriber http transporter %d", h.Port)
go wait.Until(func() {
h.httpServer.ReadHeaderTimeout = RequestReadHeaderTimeout
h.httpServer.Addr = fmt.Sprintf(":%d", h.Port)
h.httpServer.Handler = r
h.ReloadSubsFromStore()
if h.TLS != nil {
log.Info("starting TLS server")
err = h.httpServer.ListenAndServeTLS("", "")
} else {
log.Info("starting http server")
err = h.httpServer.ListenAndServe()
h.httpServer = &http.Server{
ReadHeaderTimeout: RequestReadHeaderTimeout,
Addr: fmt.Sprintf(":%d", h.Port),
Handler: r,
}
h.ReloadSubsFromStore()
err := h.httpServer.ListenAndServe()
if err != nil {
log.Errorf("restarting due to error with http messaging server %s\n", err.Error())
}
Expand Down Expand Up @@ -378,7 +358,7 @@ func (h *Server) SetProcessEventFn(fn func(e interface{}) error) {
h.processEventFn = fn
}

// httpProcessor ...
// HTTPProcessor ...
//Server the web Server listens on data and do either create subscribers and acts as publisher
/*
//create a status ping
Expand Down Expand Up @@ -624,7 +604,7 @@ func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress, r
}

// NewClient ...
func (h *Server) NewClient(host string, connOption []cehttp.Option) (httpClient.Client, error) {
func (h *Server) NewClient(host string, connOption []httpP.Option) (httpClient.Client, error) {
//--
c, err2 := cloudevents.NewClientHTTP(cloudevents.WithTarget(host))
if err2 != nil {
Expand Down Expand Up @@ -676,7 +656,7 @@ func (h *Server) NewSender(clientID uuid.UUID, address string) error {
h.SetSender(clientID, l)
for _, s := range []ServiceResourcePath{DEFAULT, HEALTH, EVENT} {
l[s] = &Protocol{}
//server.NewClient(host, []cehttp.Option{})
//server.NewClient(host, []httpP.Option{})
targetURL := fmt.Sprintf("%s%s", address, s)
protocol, err := cloudevents.NewHTTP(cloudevents.WithTarget(targetURL))
if err != nil {
Expand Down
33 changes: 16 additions & 17 deletions pkg/protocol/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"
"net/url"
"os"
"path"
"sync"
"testing"
"time"
Expand All @@ -29,15 +28,15 @@ import (
func strptr(s string) *string { return &s }

var (
storePath = "."
storePath = "."

subscriptionOneID = "123e4567-e89b-12d3-a456-426614174001"
subscriptionNotFoundID = "223e4567-e89b-12d3-a456-426614174001"
serverAddress = types.ParseURI("http://localhost:8089")
clientAddress = types.ParseURI("http://localhost:8087")
hostPort = 8089
clientPort = 8087

serverClientID = func(serviceName string) uuid.UUID {
serverClientID = func(serviceName string) uuid.UUID {
var namespace = uuid.NameSpaceURL
var url = []byte(serviceName)
return uuid.NewMD5(namespace, url)
Expand All @@ -53,9 +52,10 @@ var (
ID: subscriptionOneID,
Resource: "/test/test/1",
}

subscriptionNotFound = &pubsub.PubSub{
ID: subscriptionNotFoundID,
Resource: "/test/test/3",
Resource: "/test/test/2",
}
)
var (
Expand Down Expand Up @@ -115,7 +115,7 @@ func createClient(t *testing.T, clientS *ceHttp.Server, closeCh chan struct{}, c
in := make(chan *channel.DataChan, 10)
var err error
assert.Nil(t, clientS)
clientS, err = ceHttp.InitServer(clientAddress.String(), clientPort, nil, storePath, in, clientOutChannel, closeCh, nil, nil, true)
clientS, err = ceHttp.InitServer(clientAddress.String(), clientPort, storePath, in, clientOutChannel, closeCh, nil, nil)
assert.Nil(t, err)
clientS.RegisterPublishers(serverAddress)
wg := sync.WaitGroup{}
Expand All @@ -135,17 +135,16 @@ func createClient(t *testing.T, clientS *ceHttp.Server, closeCh chan struct{}, c

<-closeCh
}

func TestSubscribeCreated(t *testing.T) {
in := make(chan *channel.DataChan, 10)
out := make(chan *channel.DataChan, 10)
closeCh := make(chan struct{})
eventChannel := make(chan *channel.DataChan, 10)
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, nil, storePath, in, out, closeCh, nil, nil, true)
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil)
assert.Nil(t, err)

wg := sync.WaitGroup{}
// Start the server and channel processor
// Start the server and channel proceesor
err = server.Start(&wg)
assert.Nil(t, err)
server.HTTPProcessor(&wg)
Expand All @@ -169,7 +168,7 @@ func TestSendEvent(t *testing.T) {
out := make(chan *channel.DataChan, 10)
clientOutChannel := make(chan *channel.DataChan)
closeCh := make(chan struct{})
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, nil, storePath, in, out, closeCh, nil, nil, true)
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil)
assert.Nil(t, err)
wg := sync.WaitGroup{}
// Start the server and channel proceesor
Expand Down Expand Up @@ -223,15 +222,15 @@ func TestSendSuccess(t *testing.T) {
out := make(chan *channel.DataChan)
clientOutChannel := make(chan *channel.DataChan)
closeCh := make(chan struct{})
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, nil, storePath, in, out, closeCh, func(e cloudevents.Event, dataChan *channel.DataChan) error {
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, func(e cloudevents.Event, dataChan *channel.DataChan) error {
dataChan.Address = clientAddress.String()
e.SetType(channel.EVENT.String())
if err := ceHttp.Post(fmt.Sprintf("%s/event", clientAddress), e); err != nil {
log.Errorf("error %s sending event %v at %s", err, e, clientAddress)
return err
}
return nil
}, nil, true)
}, nil)
assert.Nil(t, err)
wg := sync.WaitGroup{}
// Start the server and channel processor
Expand All @@ -255,7 +254,7 @@ func TestHealth(t *testing.T) {
closeCh := make(chan struct{})
var status int
var urlErr error
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, nil, storePath, in, out, closeCh, nil, nil, true)
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil)
assert.Nil(t, err)

wg := sync.WaitGroup{}
Expand All @@ -275,7 +274,7 @@ func TestSender(t *testing.T) {
out := make(chan *channel.DataChan)
closeCh := make(chan struct{})

server, err := ceHttp.InitServer(serverAddress.String(), hostPort, nil, storePath, in, out, closeCh, nil, nil, true)
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, nil, nil)
assert.Nil(t, err)
wg := sync.WaitGroup{}
// Start the server and channel processor
Expand Down Expand Up @@ -303,7 +302,7 @@ func TestStatusWithSubscription(t *testing.T) {
d.Data = &ce
return nil
}
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, nil, storePath, in, out, closeCh, onStatusReceiveOverrideFn, nil, true)
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, onStatusReceiveOverrideFn, nil)
assert.Nil(t, err)

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -361,7 +360,7 @@ func TestStatusWithOutSubscription(t *testing.T) {
d.Data = &ce
return nil
}
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, nil, storePath, in, out, closeCh, onStatusReceiveOverrideFn, nil, true)
server, err := ceHttp.InitServer(serverAddress.String(), hostPort, storePath, in, out, closeCh, onStatusReceiveOverrideFn, nil)
assert.Nil(t, err)

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -395,5 +394,5 @@ func TestStatusWithOutSubscription(t *testing.T) {
}

func TestTeardown(t *testing.T) {
_ = os.Remove(fmt.Sprintf("./%s.json", path.Join(storePath, clientClientID.String())))
_ = os.Remove(fmt.Sprintf("./%s.json", clientClientID.String()))
}
1 change: 0 additions & 1 deletion pkg/protocol/http/https.go

This file was deleted.

Loading