From 2682c03a6dfd454fab0068604c2035f382258ead Mon Sep 17 00:00:00 2001 From: gadda Date: Wed, 22 Dec 2021 22:37:06 +0200 Subject: [PATCH 1/2] Custom open connection for different network types --- client.go | 6 ++++- client_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++++++ options.go | 17 +++++++++++++ options_test.go | 39 ++++++++++++++++++++++++++++++ 4 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 client_test.go create mode 100644 options_test.go diff --git a/client.go b/client.go index f33c7d0..70346a0 100644 --- a/client.go +++ b/client.go @@ -393,7 +393,11 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig) } // Start by opening the network connection (tcp, tls, ws) etc - conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer) + if c.options.CustomOpenConnectionFn != nil{ + conn, err = c.options.CustomOpenConnectionFn(broker, c.options) + } else { + conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer) + } if err != nil { ERROR.Println(CLI, err.Error()) WARN.Println(CLI, "failed to connect to broker, trying next") diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..45a03d0 --- /dev/null +++ b/client_test.go @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Seth Hoenig + * Allan Stockdill-Mander + * Mike Robertson + * Matt Brittan + */ +package mqtt + +import ( + "net" + "net/url" + "strings" + "testing" + "time" +) + +func TestCustomConnectionFunction(t *testing.T) { + // Set netpipe to emu + netClient, netServer := net.Pipe() + defer netClient.Close() + defer netServer.Close() + var firstMessage = "" + go func() { + // read first message only + bytes := make([]byte, 1024) + n, err := netServer.Read(bytes) + if err != nil { + t.Errorf("%v", err) + } + firstMessage = string(bytes[:n]) + }() + // Set custom network connection function and client connect + var customConnectionFunc OpenConnectionFunc = func(uri *url.URL, options ClientOptions) (net.Conn, error) { + return netClient, nil + } + options := &ClientOptions{ + CustomOpenConnectionFn: customConnectionFunc, + } + brokerAddr := netServer.LocalAddr().Network() + options.AddBroker(brokerAddr) + client := NewClient(options) + + // Try to connect using custom function, wait for 2 seconds, to pass MQTT first message + if token := client.Connect(); token.WaitTimeout(2*time.Second) && token.Error() != nil { + t.Errorf("%v", token.Error()) + } + + // Analyze first message sent by client and received by the server + if len(firstMessage) <= 0 || !strings.Contains(firstMessage, "MQTT") { + t.Error("no message recieved on connect") + } +} diff --git a/options.go b/options.go index e745258..936f3ff 100644 --- a/options.go +++ b/options.go @@ -57,6 +57,11 @@ type ReconnectHandler func(Client, *ClientOptions) // ConnectionAttemptHandler is invoked prior to making the initial connection. type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config +// OpenConnectionFunc is invoked to establish the underlying network connection +// Its purpose if for custom network transports. +// Does not carry out any MQTT specific handshakes. +type OpenConnectionFunc func(uri *url.URL, options ClientOptions) (net.Conn, error) + // ClientOptions contains configurable options for an Client. Note that these should be set using the // relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage. // WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy @@ -98,6 +103,7 @@ type ClientOptions struct { WebsocketOptions *WebsocketOptions MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming Dialer *net.Dialer + CustomOpenConnectionFn OpenConnectionFunc } // NewClientOptions will create a new ClientClientOptions type with some @@ -140,6 +146,7 @@ func NewClientOptions() *ClientOptions { HTTPHeaders: make(map[string][]string), WebsocketOptions: &WebsocketOptions{}, Dialer: &net.Dialer{Timeout: 30 * time.Second}, + CustomOpenConnectionFn: nil, } return o } @@ -429,3 +436,13 @@ func (o *ClientOptions) SetDialer(dialer *net.Dialer) *ClientOptions { o.Dialer = dialer return o } + +// SetCustomOpenConectionFn replaces the inbuilt function that establishes a network connection with a custom function. +// The passed in function should return an open `net.Conn` or an error (see the existing openConnection function for an example) +// It enables custom networking types in addition to the defaults (tcp, tls, websockets...) +func (o *ClientOptions) SetCustomOpenConectionFn(customOpenConnectionfn OpenConnectionFunc) *ClientOptions { + if customOpenConnectionfn != nil { + o.CustomOpenConnectionFn = customOpenConnectionfn + } + return o +} diff --git a/options_test.go b/options_test.go new file mode 100644 index 0000000..08d527d --- /dev/null +++ b/options_test.go @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Seth Hoenig + * Allan Stockdill-Mander + * Mike Robertson + * Måns Ansgariusson + */ + +// Portions copyright © 2018 TIBCO Software Inc. +package mqtt + +import ( + "fmt" + "net" + "net/url" + "testing" +) + +func TestSetCustomConnectionOptions(t *testing.T) { + var customConnectionFunc OpenConnectionFunc = func(uri *url.URL, options ClientOptions) (net.Conn, error) { + return nil, fmt.Errorf("not implemented open connection func") + } + options := &ClientOptions{} + options = options.SetCustomOpenConectionFn(customConnectionFunc) + if options.CustomOpenConnectionFn == nil { + t.Error("custom open connection function cannot be set") + } +} From dee4d3ae6e7215de3db67acbf8e233610f12a736 Mon Sep 17 00:00:00 2001 From: Gil Adda Date: Wed, 22 Dec 2021 22:37:06 +0200 Subject: [PATCH 2/2] Custom open connection for different network types --- client.go | 6 ++++- client_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++++++ options.go | 17 +++++++++++++ options_test.go | 39 ++++++++++++++++++++++++++++++ 4 files changed, 125 insertions(+), 1 deletion(-) create mode 100644 client_test.go create mode 100644 options_test.go diff --git a/client.go b/client.go index f33c7d0..70346a0 100644 --- a/client.go +++ b/client.go @@ -393,7 +393,11 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) { tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig) } // Start by opening the network connection (tcp, tls, ws) etc - conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer) + if c.options.CustomOpenConnectionFn != nil{ + conn, err = c.options.CustomOpenConnectionFn(broker, c.options) + } else { + conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, c.options.Dialer) + } if err != nil { ERROR.Println(CLI, err.Error()) WARN.Println(CLI, "failed to connect to broker, trying next") diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..45a03d0 --- /dev/null +++ b/client_test.go @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Seth Hoenig + * Allan Stockdill-Mander + * Mike Robertson + * Matt Brittan + */ +package mqtt + +import ( + "net" + "net/url" + "strings" + "testing" + "time" +) + +func TestCustomConnectionFunction(t *testing.T) { + // Set netpipe to emu + netClient, netServer := net.Pipe() + defer netClient.Close() + defer netServer.Close() + var firstMessage = "" + go func() { + // read first message only + bytes := make([]byte, 1024) + n, err := netServer.Read(bytes) + if err != nil { + t.Errorf("%v", err) + } + firstMessage = string(bytes[:n]) + }() + // Set custom network connection function and client connect + var customConnectionFunc OpenConnectionFunc = func(uri *url.URL, options ClientOptions) (net.Conn, error) { + return netClient, nil + } + options := &ClientOptions{ + CustomOpenConnectionFn: customConnectionFunc, + } + brokerAddr := netServer.LocalAddr().Network() + options.AddBroker(brokerAddr) + client := NewClient(options) + + // Try to connect using custom function, wait for 2 seconds, to pass MQTT first message + if token := client.Connect(); token.WaitTimeout(2*time.Second) && token.Error() != nil { + t.Errorf("%v", token.Error()) + } + + // Analyze first message sent by client and received by the server + if len(firstMessage) <= 0 || !strings.Contains(firstMessage, "MQTT") { + t.Error("no message recieved on connect") + } +} diff --git a/options.go b/options.go index e745258..936f3ff 100644 --- a/options.go +++ b/options.go @@ -57,6 +57,11 @@ type ReconnectHandler func(Client, *ClientOptions) // ConnectionAttemptHandler is invoked prior to making the initial connection. type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Config +// OpenConnectionFunc is invoked to establish the underlying network connection +// Its purpose if for custom network transports. +// Does not carry out any MQTT specific handshakes. +type OpenConnectionFunc func(uri *url.URL, options ClientOptions) (net.Conn, error) + // ClientOptions contains configurable options for an Client. Note that these should be set using the // relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage. // WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy @@ -98,6 +103,7 @@ type ClientOptions struct { WebsocketOptions *WebsocketOptions MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming Dialer *net.Dialer + CustomOpenConnectionFn OpenConnectionFunc } // NewClientOptions will create a new ClientClientOptions type with some @@ -140,6 +146,7 @@ func NewClientOptions() *ClientOptions { HTTPHeaders: make(map[string][]string), WebsocketOptions: &WebsocketOptions{}, Dialer: &net.Dialer{Timeout: 30 * time.Second}, + CustomOpenConnectionFn: nil, } return o } @@ -429,3 +436,13 @@ func (o *ClientOptions) SetDialer(dialer *net.Dialer) *ClientOptions { o.Dialer = dialer return o } + +// SetCustomOpenConectionFn replaces the inbuilt function that establishes a network connection with a custom function. +// The passed in function should return an open `net.Conn` or an error (see the existing openConnection function for an example) +// It enables custom networking types in addition to the defaults (tcp, tls, websockets...) +func (o *ClientOptions) SetCustomOpenConectionFn(customOpenConnectionfn OpenConnectionFunc) *ClientOptions { + if customOpenConnectionfn != nil { + o.CustomOpenConnectionFn = customOpenConnectionfn + } + return o +} diff --git a/options_test.go b/options_test.go new file mode 100644 index 0000000..08d527d --- /dev/null +++ b/options_test.go @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Seth Hoenig + * Allan Stockdill-Mander + * Mike Robertson + * Måns Ansgariusson + */ + +// Portions copyright © 2018 TIBCO Software Inc. +package mqtt + +import ( + "fmt" + "net" + "net/url" + "testing" +) + +func TestSetCustomConnectionOptions(t *testing.T) { + var customConnectionFunc OpenConnectionFunc = func(uri *url.URL, options ClientOptions) (net.Conn, error) { + return nil, fmt.Errorf("not implemented open connection func") + } + options := &ClientOptions{} + options = options.SetCustomOpenConectionFn(customConnectionFunc) + if options.CustomOpenConnectionFn == nil { + t.Error("custom open connection function cannot be set") + } +}