Skip to content

Commit

Permalink
Websocket plugin: fix and simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
andig authored Apr 13, 2023
1 parent 3ca3566 commit 6f13339
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 89 deletions.
43 changes: 27 additions & 16 deletions provider/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,23 @@ func (p *HTTP) request(url string, body ...string) ([]byte, error) {
return p.val, p.err
}

var _ StringProvider = (*HTTP)(nil)

// StringGetter sends string request
func (p *HTTP) StringGetter() func() (string, error) {
return func() (string, error) {
b, err := p.request(p.url, p.body)

if err == nil && p.pipeline != nil {
b, err = p.pipeline.Process(b)
}

return string(b), err
}
}

var _ FloatProvider = (*HTTP)(nil)

// FloatGetter parses float from request
func (p *HTTP) FloatGetter() func() (float64, error) {
g := p.StringGetter()
Expand All @@ -179,14 +196,13 @@ func (p *HTTP) FloatGetter() func() (float64, error) {
}

f, err := strconv.ParseFloat(s, 64)
if err == nil {
f *= p.scale
}

return f, err
return f * p.scale, err
}
}

var _ IntProvider = (*HTTP)(nil)

// IntGetter parses int64 from request
func (p *HTTP) IntGetter() func() (int64, error) {
g := p.FloatGetter()
Expand All @@ -197,18 +213,7 @@ func (p *HTTP) IntGetter() func() (int64, error) {
}
}

// StringGetter sends string request
func (p *HTTP) StringGetter() func() (string, error) {
return func() (string, error) {
b, err := p.request(p.url, p.body)

if err == nil && p.pipeline != nil {
b, err = p.pipeline.Process(b)
}

return string(b), err
}
}
var _ BoolProvider = (*HTTP)(nil)

// BoolGetter parses bool from request
func (p *HTTP) BoolGetter() func() (bool, error) {
Expand Down Expand Up @@ -236,20 +241,26 @@ func (p *HTTP) set(param string, val interface{}) error {
return err
}

var _ SetIntProvider = (*HTTP)(nil)

// IntSetter sends int request
func (p *HTTP) IntSetter(param string) func(int64) error {
return func(val int64) error {
return p.set(param, val)
}
}

var _ SetStringProvider = (*HTTP)(nil)

// StringSetter sends string request
func (p *HTTP) StringSetter(param string) func(string) error {
return func(val string) error {
return p.set(param, val)
}
}

var _ SetBoolProvider = (*HTTP)(nil)

// BoolSetter sends bool request
func (p *HTTP) BoolSetter(param string) func(bool) error {
return func(val bool) error {
Expand Down
119 changes: 46 additions & 73 deletions provider/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,26 @@ import (
"sync"
"time"

"github.com/evcc-io/evcc/provider/pipeline"
"github.com/evcc-io/evcc/util"
"github.com/evcc-io/evcc/util/jq"
"github.com/evcc-io/evcc/util/request"
"github.com/evcc-io/evcc/util/transport"
"github.com/gorilla/websocket"
"github.com/itchyny/gojq"
)

const retryDelay = 5 * time.Second

// Socket implements websocket request provider
type Socket struct {
*request.Helper
log *util.Logger
mux sync.Mutex
wait *util.Waiter
url string
headers map[string]string
scale float64
jq *gojq.Query
val interface{}
log *util.Logger
mux sync.Mutex
wait *util.Waiter
url string
headers map[string]string
scale float64
pipeline *pipeline.Pipeline
val []byte // Cached http response value
}

func init() {
Expand All @@ -39,15 +38,16 @@ func init() {
// NewSocketProviderFromConfig creates a HTTP provider
func NewSocketProviderFromConfig(other map[string]interface{}) (IntProvider, error) {
cc := struct {
URI string
Headers map[string]string
Jq string
Scale float64
Insecure bool
Auth Auth
Timeout time.Duration
URI string
Headers map[string]string
pipeline.Settings `mapstructure:",squash"`
Scale float64
Insecure bool
Auth Auth
Timeout time.Duration
}{
Headers: make(map[string]string),
Scale: 1,
}

if err := util.DecodeOther(other, &cc); err != nil {
Expand Down Expand Up @@ -83,13 +83,9 @@ func NewSocketProviderFromConfig(other map[string]interface{}) (IntProvider, err
p.Client.Transport = request.NewTripper(log, transport.Insecure())
}

if cc.Jq != "" {
op, err := gojq.Parse(cc.Jq)
if err != nil {
return nil, fmt.Errorf("invalid jq query: %s", p.jq)
}

p.jq = op
var err error
if p.pipeline, err = pipeline.New(cc.Settings); err != nil {
return nil, err
}

go p.listen()
Expand Down Expand Up @@ -127,22 +123,14 @@ func (p *Socket) listen() {
p.log.TRACE.Printf("recv: %s", b)

p.mux.Lock()
if p.jq != nil {
v, err := jq.Query(p.jq, b)
if err == nil {
p.val = v
p.wait.Update()
}
} else {
p.val = string(b)
p.wait.Update()
}
p.val = b
p.wait.Update()
p.mux.Unlock()
}
}
}

func (p *Socket) hasValue() (interface{}, error) {
func (p *Socket) hasValue() ([]byte, error) {
if late := p.wait.Overdue(); late > 0 {
return nil, fmt.Errorf("outdated: %v", late.Truncate(time.Second))
}
Expand All @@ -153,75 +141,60 @@ func (p *Socket) hasValue() (interface{}, error) {
return p.val, nil
}

var _ StringProvider = (*Socket)(nil)

// StringGetter sends string request
func (p *Socket) StringGetter() func() (string, error) {
return func() (string, error) {
v, err := p.hasValue()
b, err := p.hasValue()
if err != nil {
return "", err
}

return jq.String(v)
v, err := p.pipeline.Process(b)

return string(v), err
}
}

var _ FloatProvider = (*Socket)(nil)

// FloatGetter parses float from string getter
func (p *Socket) FloatGetter() func() (float64, error) {
g := p.StringGetter()

return func() (float64, error) {
v, err := p.hasValue()
s, err := g()
if err != nil {
return 0, err
}

// v is always string when jq not used
if p.jq == nil {
v, err = strconv.ParseFloat(v.(string), 64)
if err != nil {
return 0, err
}
}
f, err := strconv.ParseFloat(s, 64)

f, err := jq.Float64(v)
return f * p.scale, err
}
}

var _ IntProvider = (*Socket)(nil)

// IntGetter parses int64 from float getter
func (p *Socket) IntGetter() func() (int64, error) {
return func() (int64, error) {
v, err := p.hasValue()
if err != nil {
return 0, err
}

// v is always string when jq not used
if p.jq == nil {
v, err = strconv.ParseInt(v.(string), 10, 64)
if err != nil {
return 0, err
}
}

i, err := jq.Int64(v)
f := float64(i) * p.scale
g := p.FloatGetter()

return func() (int64, error) {
f, err := g()
return int64(math.Round(f)), err
}
}

var _ BoolProvider = (*Socket)(nil)

// BoolGetter parses bool from string getter
func (p *Socket) BoolGetter() func() (bool, error) {
return func() (bool, error) {
v, err := p.hasValue()
if err != nil {
return false, err
}
g := p.StringGetter()

// v is always string when jq not used
if p.jq == nil {
v = util.Truish(v.(string))
}

return jq.Bool(v)
return func() (bool, error) {
s, err := g()
return util.Truish(s), err
}
}

1 comment on commit 6f13339

@ecoCuyo
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hallo AndiG,

seit der Version V0.116.1 funktioniert die Verwendung von WebSockets mit dem vzlogger push-server nicht mehr.
Siehe Websocket issue "[meter name] jq: empty result" #7633 und Update to 0.116.1/.2/.3 with failure Volkszähler WebSocket API #7532

Gibt es dazu einen Fix oder etwas, das ich in meiner Konfiguration anpassen müsste?

Viele Grüße
Armin

Please sign in to comment.