Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronise cache before publishing events #10668

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ var (
log = util.NewLogger("main")
cfgFile string

ignoreEmpty = "" // ignore empty keys
ignoreLogs = []string{"log"} // ignore log messages, including warn/error
ignoreMqtt = []string{"log", "auth", "releaseNotes"} // excessive size may crash certain brokers
ignoreLogs = []string{"log"} // ignore log messages, including warn/error
ignoreMqtt = []string{"log", "auth", "releaseNotes"} // excessive size may crash certain brokers
)

// rootCmd represents the base command when called without any subcommands
Expand Down Expand Up @@ -139,8 +138,11 @@ func runRoot(cmd *cobra.Command, args []string) {
cache := util.NewCache()
go cache.Run(pipe.NewDropper(ignoreLogs...).Pipe(tee.Attach()))

// create web server
// web socket hub
socketHub := server.NewSocketHub()
go socketHub.Run(tee.Attach(), cache)

// web server
httpd := server.NewHTTPd(fmt.Sprintf(":%d", conf.Network.Port), socketHub)

// metrics
Expand All @@ -153,9 +155,6 @@ func runRoot(cmd *cobra.Command, args []string) {
httpd.Router().PathPrefix("/debug/").Handler(http.DefaultServeMux)
}

// publish to UI
go socketHub.Run(pipe.NewDropper(ignoreEmpty).Pipe(tee.Attach()), cache)

// setup values channel
valueChan := make(chan util.Param)
go tee.Run(valueChan)
Expand Down Expand Up @@ -193,13 +192,13 @@ func runRoot(cmd *cobra.Command, args []string) {

// setup database
if err == nil && conf.Influx.URL != "" {
configureInflux(conf.Influx, site, pipe.NewDropper(append(ignoreLogs, ignoreEmpty)...).Pipe(tee.Attach()))
configureInflux(conf.Influx, site, pipe.NewDropper(ignoreLogs...).Pipe(tee.Attach()))
}

// setup mqtt publisher
if err == nil && conf.Mqtt.Broker != "" {
publisher := server.NewMQTT(strings.Trim(conf.Mqtt.Topic, "/"))
go publisher.Run(site, pipe.NewDropper(append(ignoreMqtt, ignoreEmpty)...).Pipe(tee.Attach()))
go publisher.Run(site, pipe.NewDropper(ignoreMqtt...).Pipe(tee.Attach()))
}

// announce on mDNS
Expand All @@ -213,9 +212,9 @@ func runRoot(cmd *cobra.Command, args []string) {
}

// setup messaging
var pushChan chan push.Event
var pushChan chan<- push.Event
if err == nil {
pushChan, err = configureMessengers(conf.Messaging, valueChan, cache)
pushChan, err = configureMessengers(conf.Messaging, cache)
}

// run shutdown functions on stop
Expand Down
4 changes: 2 additions & 2 deletions cmd/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func configureEEBus(conf map[string]interface{}) error {
}

// setup messaging
func configureMessengers(conf messagingConfig, valueChan chan util.Param, cache *util.Cache) (chan push.Event, error) {
func configureMessengers(conf messagingConfig, cache *util.Cache) (chan<- push.Event, error) {
messageChan := make(chan push.Event, 1)

messageHub, err := push.NewHub(conf.Events, cache)
Expand All @@ -555,7 +555,7 @@ func configureMessengers(conf messagingConfig, valueChan chan util.Param, cache
messageHub.Add(impl)
}

go messageHub.Run(messageChan, valueChan)
go messageHub.Run(messageChan, cache.Attach())

return messageChan, nil
}
Expand Down
17 changes: 7 additions & 10 deletions push/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ func (h *Hub) apply(ev Event, tmpl string) (string, error) {
}

// Run is the Hub's main publishing loop
func (h *Hub) Run(events <-chan Event, valueChan chan util.Param) {
func (h *Hub) Run(events <-chan Event, valueChan <-chan util.Param) {
log := util.NewLogger("push")

for ev := range events {
fmt.Println("event", ev)

if len(h.sender) == 0 {
continue
}
Expand All @@ -85,11 +87,6 @@ func (h *Hub) Run(events <-chan Event, valueChan chan util.Param) {
continue
}

// let cache catch up, refs https://github.com/evcc-io/evcc/pull/445
flushC := util.Flusher()
valueChan <- util.Param{Val: flushC}
<-flushC

title, err := h.apply(ev, definition.Title)
if err != nil {
log.ERROR.Printf("invalid title template for %s: %v", ev.Event, err)
Expand All @@ -102,12 +99,12 @@ func (h *Hub) Run(events <-chan Event, valueChan chan util.Param) {
continue
}

for _, sender := range h.sender {
if strings.TrimSpace(msg) != "" {
if strings.TrimSpace(msg) != "" {
for _, sender := range h.sender {
go sender.Send(title, msg)
} else {
log.DEBUG.Printf("did not send empty message template for %s: %v", ev.Event, err)
}
} else {
log.DEBUG.Printf("did not send empty message template for %s: %v", ev.Event, err)
}
}
}
57 changes: 25 additions & 32 deletions server/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/evcc-io/evcc/core/site"
"github.com/evcc-io/evcc/util"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/write"
influxapi "github.com/influxdata/influxdb-client-go/v2/api"
influxlog "github.com/influxdata/influxdb-client-go/v2/log"
)

Expand All @@ -30,11 +30,10 @@ type InfluxConfig struct {
// Influx is a influx publisher
type Influx struct {
sync.Mutex
log *util.Logger
clock clock.Clock
client influxdb2.Client
org string
database string
log *util.Logger
clock clock.Clock
client influxdb2.Client
writer influxapi.WriteAPI
}

// NewInfluxClient creates new publisher for influx
Expand All @@ -52,28 +51,32 @@ func NewInfluxClient(url, token, org, user, password, database string) *Influx {
// handle error logging in writer
influxlog.Log = nil

writer := client.WriteAPI(org, database)

// log errors
go func() {
for err := range writer.Errors() {
// log async as we're part of the logging loop
go log.ERROR.Println(err)
}
}()

return &Influx{
log: log,
clock: clock.New(),
client: client,
org: org,
database: database,
log: log,
clock: clock.New(),
client: client,
writer: writer,
}
}

// pointWriter is the minimal interface for influxdb2 api.Writer
type pointWriter interface {
WritePoint(point *write.Point)
}

// writePoint asynchronously writes a point to influx
func (m *Influx) writePoint(writer pointWriter, key string, fields map[string]any, tags map[string]string) {
func (m *Influx) writePoint(key string, fields map[string]any, tags map[string]string) {
m.log.TRACE.Printf("write %s=%v (%v)", key, fields, tags)
writer.WritePoint(influxdb2.NewPoint(key, tags, fields, m.clock.Now()))
m.writer.WritePoint(influxdb2.NewPoint(key, tags, fields, m.clock.Now()))
}

// writeComplexPoint asynchronously writes a point to influx
func (m *Influx) writeComplexPoint(writer pointWriter, param util.Param, tags map[string]string) {
func (m *Influx) writeComplexPoint(param util.Param, tags map[string]string) {
fields := make(map[string]any)

switch val := param.Val.(type) {
Expand Down Expand Up @@ -124,29 +127,19 @@ func (m *Influx) writeComplexPoint(writer pointWriter, param util.Param, tags ma
fields["value"] = v
tags["id"] = strconv.Itoa(i + 1)

m.writePoint(writer, key, fields, tags)
m.writePoint(key, fields, tags)
}
}
}

return
}

m.writePoint(writer, param.Key, fields, tags)
m.writePoint(param.Key, fields, tags)
}

// Run Influx publisher
func (m *Influx) Run(site site.API, in <-chan util.Param) {
writer := m.client.WriteAPI(m.org, m.database)

// log errors
go func() {
for err := range writer.Errors() {
// log async as we're part of the logging loop
go m.log.ERROR.Println(err)
}
}()

// add points to batch for async writing
for param := range in {
tags := make(map[string]string)
Expand All @@ -159,7 +152,7 @@ func (m *Influx) Run(site site.API, in <-chan util.Param) {
}
}

m.writeComplexPoint(writer, param, tags)
m.writeComplexPoint(param, tags)
}

m.client.Close()
Expand Down
38 changes: 22 additions & 16 deletions util/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,14 @@ package util
import (
"fmt"
"sync"
"time"
)

// Cache is a data store
type Cache struct {
sync.Mutex
val map[string]Param
}

// flush is the value type used as parameter for flushing the cache.
// Flushing is implemented by closing the channel. At this time, it is guaranteed
// that the cache has catched up processing all pending messages.
type flush chan struct{}

// Flusher returns a new flush channel
func Flusher() flush {
return make(flush)
val map[string]Param
recv chan Param
}

// NewCache creates cache
Expand All @@ -28,23 +20,37 @@ func NewCache() *Cache {
}
}

// Attach creates a new receiver channel and attaches it to the tee
func (c *Cache) Attach() <-chan Param {
if c.recv != nil {
panic("cache already attached")
}

c.recv = make(chan Param, 16)
return c.recv
}

// Run adds input channel's values to cache
func (c *Cache) Run(in <-chan Param) {
log := NewLogger("cache")

for p := range in {
if flushC, ok := p.Val.(flush); ok {
close(flushC)
continue
}

key := p.Key
if p.Loadpoint != nil {
key = fmt.Sprintf("lp-%d/%s", *p.Loadpoint+1, key)
}

log.TRACE.Printf("%s: %v", key, p.Val)
c.Add(p.UniqueID(), p)

// send downstream after adding to cache
if c.recv != nil {
select {
case c.recv <- p:
case <-time.After(time.Second):
fmt.Println("blocked: cache", p)
}
}
}
}

Expand Down
19 changes: 10 additions & 9 deletions util/tee.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package util

import (
"fmt"
"reflect"
"sync"
"time"
)

// TeeAttacher allows attaching a listener to a tee
type TeeAttacher interface {
Attach() <-chan Param
}

// Tee distributes parameters to subscribers
type Tee struct {
mu sync.Mutex
Expand Down Expand Up @@ -41,10 +38,14 @@ func (t *Tee) Run(in <-chan Param) {
}
}

t.mu.Lock()
for _, recv := range t.recv {
recv <- msg
for i, recv := range t.recv {
t.mu.Lock()
select {
case recv <- msg:
case <-time.After(time.Second):
fmt.Println("blocked: tee", i, msg)
}
t.mu.Unlock()
}
t.mu.Unlock()
}
}
Loading