Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support neuron/kafka ping #2591

Merged
merged 9 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions extensions/sinks/kafka/ext/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ type kafkaConf struct {
Headers interface{} `json:"headers"`
}

func (m *kafkaSink) Ping(_ string, props map[string]interface{}) error {
if err := m.Configure(props); err != nil {
return err
}
for _, broker := range strings.Split(m.c.Brokers, ",") {
err := ping(m.tlsConfig, broker)
if err != nil {
return err
}
}
return nil
}

func (m *kafkaSink) Configure(props map[string]interface{}) error {
c := &sinkConf{
Brokers: "localhost:9092",
Expand Down Expand Up @@ -279,3 +292,13 @@ func (m *kafkaSink) parseHeaders(ctx api.StreamContext, data interface{}) ([]kaf
}
return nil, nil
}

func ping(tlsConfig *tls.Config, address string) error {
d := &kafkago.Dialer{TLS: tlsConfig}
c, err := d.Dial("tcp", address)
if err != nil {
return err
}
defer c.Close()
return nil
}
25 changes: 25 additions & 0 deletions extensions/sources/kafka/ext/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type KafkaSource struct {
reader *kafkago.Reader
offset int64
tlsConfig *tls.Config
sc *kafkaSourceConf
}

type kafkaSourceConf struct {
Expand All @@ -44,6 +45,19 @@ type kafkaSourceConf struct {
MaxBytes int `json:"maxBytes"`
}

func (s *KafkaSource) Ping(d string, props map[string]interface{}) error {
if err := s.Configure(d, props); err != nil {
return err
}
for _, broker := range strings.Split(s.sc.Brokers, ",") {
err := ping(s.tlsConfig, broker)
if err != nil {
return err
}
}
return nil
}

func (c *kafkaSourceConf) validate() error {
if len(strings.Split(c.Brokers, ",")) == 0 {
return fmt.Errorf("brokers can not be empty")
Expand Down Expand Up @@ -116,6 +130,7 @@ func (s *KafkaSource) Configure(topic string, props map[string]interface{}) erro
}
reader := kafkago.NewReader(readerConfig)
s.reader = reader
s.sc = kConf
if err := s.reader.SetOffset(kafkago.LastOffset); err != nil {
return err
}
Expand Down Expand Up @@ -183,3 +198,13 @@ func (s *KafkaSource) GetOffset() (interface{}, error) {
func GetSource() api.Source {
return &KafkaSource{}
}

func ping(tlsConfig *tls.Config, address string) error {
d := &kafkago.Dialer{TLS: tlsConfig}
c, err := d.Dial("tcp", address)
if err != nil {
return err
}
defer c.Close()
return nil
}
15 changes: 3 additions & 12 deletions internal/io/neuron/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ package neuron

import (
"encoding/json"
"errors"
"fmt"
"net/url"
"sort"
"sync/atomic"

"github.com/lf-edge/ekuiper/internal/conf"
"github.com/lf-edge/ekuiper/internal/topo/context"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/cast"
"github.com/lf-edge/ekuiper/pkg/errorx"
Expand Down Expand Up @@ -53,18 +51,11 @@ func (s *sink) Ping(_ string, props map[string]interface{}) error {
if err := s.Configure(props); err != nil {
return err
}
ctx := context.Background()
cli, err := createOrGetConnection(ctx, s.c.Url)
u, err := url.Parse(s.c.Url)
if err != nil {
return err
}
defer func() {
closeConnection(ctx, s.c.Url)
}()
if atomic.LoadInt32(&cli.opened) == 1 {
return nil
}
return errors.New("neuron sink ping failed")
return ping(u)
}

func (s *sink) Configure(props map[string]interface{}) error {
Expand Down
19 changes: 19 additions & 0 deletions internal/io/neuron/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/lf-edge/ekuiper/internal/io/mock"
"github.com/lf-edge/ekuiper/pkg/errorx"
)
Expand Down Expand Up @@ -118,3 +120,20 @@ func sinkConnExpTest(t *testing.T) {
t.Errorf("error mismatch:\n\nexp=%s\n\ngot=%s\n\n", expErrStr, err.Error())
}
}

func TestSinkPing(t *testing.T) {
config := map[string]interface{}{
"url": "non-tcp",
"raw": true,
}
s := &sink{}
err := s.Ping("", config)
require.Error(t, err)
config = map[string]interface{}{
"url": "tcp://127.0.0.1:4313",
"raw": true,
}
s = &sink{}
err = s.Ping("", config)
require.Error(t, err)
}
30 changes: 20 additions & 10 deletions internal/io/neuron/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
package neuron

import (
"errors"
"fmt"
"sync/atomic"
"net/http"
"net/url"

"github.com/lf-edge/ekuiper/internal/io/memory/pubsub"
"github.com/lf-edge/ekuiper/internal/topo/context"
"github.com/lf-edge/ekuiper/pkg/api"
"github.com/lf-edge/ekuiper/pkg/cast"
"github.com/lf-edge/ekuiper/pkg/errorx"
"github.com/lf-edge/ekuiper/pkg/infra"
)

Expand All @@ -39,16 +39,11 @@
if err := s.Configure(dataSource, props); err != nil {
return err
}
ctx := context.Background()
cli, err := createOrGetConnection(ctx, s.c.Url)
u, err := url.Parse(s.c.Url)
if err != nil {
return err
}
defer closeConnection(ctx, s.c.Url)
if atomic.LoadInt32(&cli.opened) == 1 {
return nil
}
return errors.New("neuron source ping failed")
return ping(u)
}

func (s *source) Configure(_ string, props map[string]interface{}) error {
Expand Down Expand Up @@ -94,3 +89,18 @@
func GetSource() *source {
return &source{}
}

func ping(u *url.URL) error {
if u.Scheme == "tcp" {
r, err := http.Get(fmt.Sprintf("http://%v/api/v2/ping", u.Host))
if err != nil {
return err
}
if r.StatusCode == http.StatusOK {
return nil
}
return fmt.Errorf("neuron ping failed, code:%v", r.StatusCode)

Check warning on line 102 in internal/io/neuron/source.go

View check run for this annotation

Codecov / codecov/patch

internal/io/neuron/source.go#L99-L102

Added lines #L99 - L102 were not covered by tests
}

return errorx.New("only tcp neuron url support ping")
}
18 changes: 18 additions & 0 deletions internal/io/neuron/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/benbjohnson/clock"
"github.com/stretchr/testify/require"
_ "go.nanomsg.org/mangos/v3/transport/ipc"

"github.com/lf-edge/ekuiper/internal/conf"
Expand Down Expand Up @@ -75,3 +76,20 @@ func connectFailTest(t *testing.T) {
server.Close()
time.Sleep(1 * time.Second)
}

func TestSourcePing(t *testing.T) {
config := map[string]interface{}{
"url": "non-tcp",
"raw": true,
}
s := &source{}
err := s.Ping("", config)
require.Error(t, err)
config = map[string]interface{}{
"url": "tcp://127.0.0.1:4313",
"raw": true,
}
s = &source{}
err = s.Ping("", config)
require.Error(t, err)
}
2 changes: 1 addition & 1 deletion internal/topo/node/source_connector_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestSubError(t *testing.T) {
var errResult error
go func() {
defer wg.Done()
ticker := time.After(2 * time.Second)
ticker := time.After(5 * time.Second)
for {
select {
case sg := <-errCh:
Expand Down
Loading