Skip to content

Commit

Permalink
fix ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
Mizaro authored and tsaikd committed May 6, 2023
1 parent 682f182 commit 44bcd20
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ linters:
#- lll
- misspell
- nakedret
#- noctx
- noctx
- nolintlint
#- revive
- staticcheck
Expand Down
6 changes: 3 additions & 3 deletions input/http/inputhttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (t *InputConfig) Start(
}

func (t *InputConfig) Request(ctx context.Context, msgChan chan<- logevent.LogEvent) {
data, err := t.SendRequest()
data, err := t.SendRequest(ctx)
extra := map[string]any{
"host": t.hostname,
"url": t.URL,
Expand All @@ -121,13 +121,13 @@ func (t *InputConfig) Request(ctx context.Context, msgChan chan<- logevent.LogEv
}
}

func (t *InputConfig) SendRequest() (data []byte, err error) {
func (t *InputConfig) SendRequest(ctx context.Context) (data []byte, err error) {
var raw []byte
if t.Method != "HEAD" && t.Method != "GET" {
return nil, errors.New("unknown method")
}

req, err := http.NewRequest(t.Method, t.URL, http.NoBody)
req, err := http.NewRequestWithContext(ctx, t.Method, t.URL, http.NoBody)
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions input/httplisten/httplisten_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"testing"
"time"

"github.com/tsaikd/gogstash/internal/httpctx"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -46,7 +48,7 @@ input:

time.Sleep(500 * time.Millisecond)

resp, err := http.Post("http://127.0.0.1:8089/", "application/json", bytes.NewReader([]byte("{\"foo\":\"bar\"}")))
resp, err := httpctx.Post(ctx, "http://127.0.0.1:8089/", "application/json", bytes.NewReader([]byte("{\"foo\":\"bar\"}")))
require.NoError(err)
defer resp.Body.Close()

Expand Down Expand Up @@ -91,7 +93,7 @@ input:

client := &http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{RootCAs: roots}}}

resp, err := client.Post("https://127.0.0.1:8989/tls/", "application/json", bytes.NewReader([]byte("{\"foo\":\"bar\"}")))
resp, err := httpctx.ClientPost(ctx, client, "https://127.0.0.1:8989/tls/", "application/json", bytes.NewReader([]byte("{\"foo\":\"bar\"}")))
require.NoError(err)
defer resp.Body.Close()
assert.Equal(http.StatusOK, resp.StatusCode)
Expand Down Expand Up @@ -147,7 +149,7 @@ input:

client := &http.Client{Transport: &transport}

resp1, err := client.Post("https://127.0.0.1:8999/tls2/", "application/json", bytes.NewReader([]byte("{\"foo2\":\"bar2\"}")))
resp1, err := httpctx.ClientPost(ctx, client, "https://127.0.0.1:8999/tls2/", "application/json", bytes.NewReader([]byte("{\"foo2\":\"bar2\"}")))
defer func(r *http.Response) {
if r != nil && r.Body != nil {
_ = r.Body.Close()
Expand All @@ -157,7 +159,7 @@ input:

// case 2: with correct client cert
tlsConfig.Certificates = []tls.Certificate{clientCert}
resp, err := client.Post("https://127.0.0.1:8999/tls2/", "application/json", bytes.NewReader([]byte("{\"foo2\":\"bar2\"}")))
resp, err := httpctx.ClientPost(ctx, client, "https://127.0.0.1:8999/tls2/", "application/json", bytes.NewReader([]byte("{\"foo2\":\"bar2\"}")))
require.NoError(err)
defer func(r *http.Response) {
if r != nil && r.Body != nil {
Expand Down
54 changes: 54 additions & 0 deletions internal/httpctx/contextrequest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package httpctx

import (
"context"
"io"
"net/http"
)

// ClientPost issues a POST to the specified URL.
//
// Caller should close resp.Body when done reading from it.
//
// If the provided body is an io.Closer, it is closed after the
// request.
//
// To set custom headers, use NewRequestWithContext and Client.Do.
func ClientPost(ctx context.Context, c *http.Client, url, contentType string, body io.Reader) (resp *http.Response, err error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", contentType)
return c.Do(req)
}

// Post issues a POST to the specified URL using DefaultClient.
//
// Caller should close resp.Body when done reading from it.
//
// If the provided body is an io.Closer, it is closed after the
// request.
//
// To set custom headers, use NewRequestWithContext and Client.Do.
func Post(ctx context.Context, url, contentType string, body io.Reader) (resp *http.Response, err error) {
return ClientPost(ctx, http.DefaultClient, url, contentType, body)
}

// ClientGet issues a GET to the specified URL.
//
// Caller should close resp.Body when done reading from it.
func ClientGet(ctx context.Context, c *http.Client, url string) (resp *http.Response, err error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody)
if err != nil {
return nil, err
}
return c.Do(req)
}

// Get issues a GET to the specified URL using DefaultClient.
//
// Caller should close resp.Body when done reading from it.
func Get(ctx context.Context, url string) (resp *http.Response, err error) {
return ClientGet(ctx, http.DefaultClient, url)
}
25 changes: 15 additions & 10 deletions output/gelf/gelf_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"compress/flate"
"compress/gzip"
"compress/zlib"
"context"
"crypto/rand"
"encoding/json"
"fmt"
Expand All @@ -19,6 +20,8 @@ import (
"strings"
"sync"
"time"

"github.com/tsaikd/gogstash/internal/httpctx"
)

// regex used later
Expand All @@ -39,8 +42,8 @@ type GELFConfig struct {
}

type GELFWriter interface {
WriteCustomMessage(m *Message) error
WriteMessage(sm *SimpleMessage) error
WriteCustomMessage(ctx context.Context, m *Message) error
WriteMessage(ctx context.Context, sm *SimpleMessage) error
}

// UDPWriter implements io.Writer and is used to send both discrete
Expand Down Expand Up @@ -315,7 +318,7 @@ type writerCloserResetter interface {
// specified in the call to NewWriter(). It assumes all the fields are
// filled out appropriately. In general, clients will want to use
// Write, rather than WriteMessage.
func (w *UDPWriter) WriteCustomMessage(m *Message) error {
func (w *UDPWriter) WriteCustomMessage(ctx context.Context, m *Message) error {
w.mu.Lock()
defer w.mu.Unlock()

Expand Down Expand Up @@ -376,15 +379,15 @@ func (w *UDPWriter) WriteCustomMessage(m *Message) error {

// WriteMessage allow to send messsage to gelf Server
// It only request basic fields and will handle conversion & co
func (w *UDPWriter) WriteMessage(sm *SimpleMessage) error {
func (w *UDPWriter) WriteMessage(ctx context.Context, sm *SimpleMessage) error {
cleanExtra, err := prepareExtra(sm.Extra)
if err != nil {
return err
}

sm.Extra = cleanExtra

return w.WriteCustomMessage(constructMessage(sm))
return w.WriteCustomMessage(ctx, constructMessage(sm))
}

func (m *Message) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -422,17 +425,19 @@ type HTTPWriter struct {
httpClient *http.Client
}

func (h HTTPWriter) WriteCustomMessage(m *Message) error {
func (h HTTPWriter) WriteCustomMessage(ctx context.Context, m *Message) error {
mBytes, err := json.Marshal(m)
if err != nil {
return err
}

resp, err := h.httpClient.Post(h.config.Host, "application/json", bytes.NewBuffer(mBytes))
resp, err := httpctx.ClientPost(ctx, h.httpClient, h.config.Host, "application/json", bytes.NewBuffer(mBytes))
if err != nil {
return err
}
defer resp.Body.Close()
if resp != nil && resp.Body != nil {
defer resp.Body.Close()
}

if resp.StatusCode != 204 {
return fmt.Errorf("got code %s, expected 204", resp.Status)
Expand All @@ -443,13 +448,13 @@ func (h HTTPWriter) WriteCustomMessage(m *Message) error {

// WriteMessage allow to send messsage to gelf Server
// It only request basic fields and will handle conversion & co
func (h HTTPWriter) WriteMessage(sm *SimpleMessage) error {
func (h HTTPWriter) WriteMessage(ctx context.Context, sm *SimpleMessage) error {
cleanExtra, err := prepareExtra(sm.Extra)
if err != nil {
return err
}

sm.Extra = cleanExtra

return h.WriteCustomMessage(constructMessage(sm))
return h.WriteCustomMessage(ctx, constructMessage(sm))
}
16 changes: 7 additions & 9 deletions output/gelf/outputgelf.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,13 @@ func (t *OutputConfig) OutputEvent(ctx context.Context, event logevent.LogEvent)
}

for _, w := range t.gelfWriters {
err := w.WriteMessage(
&SimpleMessage{
Extra: event.Extra,
Host: host,
Level: level,
Message: event.Message,
Timestamp: event.Timestamp,
},
)
err := w.WriteMessage(ctx, &SimpleMessage{
Extra: event.Extra,
Host: host,
Level: level,
Message: event.Message,
Timestamp: event.Timestamp,
})
if err != nil {
goglog.Logger.Errorf("outputgelf: %s", err.Error())
err = t.queue.Queue(ctx, event)
Expand Down
2 changes: 1 addition & 1 deletion output/loki/outputloki.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (t *OutputConfig) Output(ctx context.Context, event logevent.LogEvent) (err
}

url := t.URLs[i]
req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(raw))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(raw))
if err != nil {
goglog.Logger.Errorf("output loki: %v", err)
return err
Expand Down
5 changes: 3 additions & 2 deletions output/prometheus/outputprometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package outputprometheus
import (
"context"
"io"
"net/http"
"strings"
"testing"
"time"

"github.com/tsaikd/gogstash/internal/httpctx"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -61,7 +62,7 @@ output:
}

func getMetric() (string, error) {
resp, err := http.Get("http://127.0.0.1:8080/metrics")
resp, err := httpctx.Get(context.Background(), "http://127.0.0.1:8080/metrics")
if err != nil {
return "", err
}
Expand Down

0 comments on commit 44bcd20

Please sign in to comment.