Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ocpp: cache and re-use initial status (4th attempt) #18597

Merged
merged 6 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions charger/ocpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ func NewOCPP(id string, connector int, idTag string,
) (*OCPP, error) {
log := util.NewLogger(fmt.Sprintf("%s-%d", lo.CoalesceOrEmpty(id, "ocpp"), connector))

log.DEBUG.Printf("!! registering %s:%d", id, connector)

cp, err := ocpp.Instance().RegisterChargepoint(id,
func() *ocpp.CP {
return ocpp.NewChargePoint(log, id)
Expand All @@ -164,6 +166,8 @@ func NewOCPP(id string, connector int, idTag string,
return nil, err
}

log.DEBUG.Printf("!! connected %s:%d", id, connector)

if cp.NumberOfConnectors > 0 && connector > cp.NumberOfConnectors {
return nil, fmt.Errorf("invalid connector: %d", connector)
}
Expand Down
27 changes: 23 additions & 4 deletions charger/ocpp/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,28 @@ func NewConnector(log *util.Logger, id int, cp *CP, idTag string) (*Connector, e
remoteIdTag: idTag,
}

err := cp.registerConnector(id, conn)
if err := cp.registerConnector(id, conn); err != nil {
return nil, err
}

// trigger status for all connectors

var ok bool
// apply cached status if available
instance.WithConnectorStatus(cp.ID(), id, func(status *core.StatusNotificationRequest) {
if _, err := cp.OnStatusNotification(status); err == nil {
ok = true
}
})

// only trigger if we don't already have a status
if !ok && cp.HasRemoteTriggerFeature {
if err := cp.TriggerMessageRequest(0, core.StatusNotificationFeatureName); err != nil {
cp.log.WARN.Printf("failed triggering StatusNotification: %v", err)
}
}

return conn, err
return conn, nil
}

func (conn *Connector) TestClock(clock clock.Clock) {
Expand Down Expand Up @@ -88,7 +107,7 @@ func (conn *Connector) WatchDog(timeout time.Duration) {
update := conn.clock.Since(conn.meterUpdated) > timeout
conn.mu.Unlock()

if update {
if update && conn.cp.HasRemoteTriggerFeature {
conn.TriggerMessageRequest(core.MeterValuesFeatureName)
}
}
Expand All @@ -103,7 +122,7 @@ func (conn *Connector) Initialized() error {
case <-conn.statusC:
return nil

case <-trigger: // try to trigger StatusNotification again as last resort
case <-trigger: // try to trigger StatusNotification again as last resort even when the charger does not report RemoteTrigger support
conn.TriggerMessageRequest(core.StatusNotificationFeatureName)

case <-timeout:
Expand Down
2 changes: 2 additions & 0 deletions charger/ocpp/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type connTestSuite struct {
}

func (suite *connTestSuite) SetupTest() {
// setup instance
Instance()
suite.cp = NewChargePoint(util.NewLogger("foo"), "abc")
suite.conn, _ = NewConnector(util.NewLogger("foo"), 1, suite.cp, "")

Expand Down
7 changes: 0 additions & 7 deletions charger/ocpp/cp_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,6 @@ func (cp *CP) Setup(meterValues string, meterInterval time.Duration) error {
cp.log.DEBUG.Printf("failed configuring %s: %v", KeyWebSocketPingInterval, err)
}

// trigger status for all connectors
if cp.HasRemoteTriggerFeature {
if err := cp.TriggerMessageRequest(0, core.StatusNotificationFeatureName); err != nil {
cp.log.WARN.Printf("failed triggering StatusNotification: %v", err)
}
}

return nil
}

Expand Down
124 changes: 68 additions & 56 deletions charger/ocpp/cs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,26 @@ import (

"github.com/evcc-io/evcc/util"
ocpp16 "github.com/lorenzodonini/ocpp-go/ocpp1.6"
"github.com/lorenzodonini/ocpp-go/ocpp1.6/core"
)

type CS struct {
mu sync.Mutex
log *util.Logger
ocpp16.CentralSystem
cps map[string]*CP
init map[string]*sync.Mutex
txnId atomic.Int64
type registration struct {
mu sync.RWMutex
setup sync.RWMutex // serialises chargepoint setup
cp *CP // guarded by setup and CS mutexes
status map[int]*core.StatusNotificationRequest // guarded by mu mutex
}

// Register registers a charge point with the central system.
// The charge point identified by id may already be connected in which case initial connection is triggered.
func (cs *CS) register(id string, new *CP) error {
cs.mu.Lock()
defer cs.mu.Unlock()

cp, ok := cs.cps[id]

// case 1: charge point neither registered nor physically connected
if !ok {
cs.cps[id] = new
return nil
}

// case 2: duplicate registration of id empty
if id == "" {
return errors.New("cannot have >1 charge point with empty station id")
}

// case 3: charge point not registered but physically already connected
if cp == nil {
cs.cps[id] = new
new.connect(true)
}
func newRegistration() *registration {
return &registration{status: make(map[int]*core.StatusNotificationRequest)}
}

return nil
type CS struct {
ocpp16.CentralSystem
mu sync.Mutex
log *util.Logger
regs map[string]*registration // guarded by mu mutex
txnId atomic.Int64
}

// errorHandler logs error channel
Expand All @@ -58,38 +41,67 @@ func (cs *CS) ChargepointByID(id string) (*CP, error) {
cs.mu.Lock()
defer cs.mu.Unlock()

cp, ok := cs.cps[id]
reg, ok := cs.regs[id]
if !ok {
return nil, fmt.Errorf("unknown charge point: %s", id)
}
if cp == nil {
if reg.cp == nil {
return nil, fmt.Errorf("charge point not configured: %s", id)
}
return cp, nil
return reg.cp, nil
}

func (cs *CS) WithConnectorStatus(id string, connector int, fun func(status *core.StatusNotificationRequest)) {
cs.mu.Lock()
defer cs.mu.Unlock()

if reg, ok := cs.regs[id]; ok {
reg.mu.RLock()
if status, ok := reg.status[connector]; ok {
fun(status)
}
reg.mu.RUnlock()
}
}

// RegisterChargepoint registers a charge point with the central system of returns an already registered charge point
func (cs *CS) RegisterChargepoint(id string, newfun func() *CP, init func(*CP) error) (*CP, error) {
cs.mu.Lock()
cpmu, ok := cs.init[id]
if !ok {
cpmu = new(sync.Mutex)
cs.init[id] = cpmu

// prepare shadow state
reg, registered := cs.regs[id]
if !registered {
reg = newRegistration()
cs.regs[id] = reg
}
cs.mu.Unlock()

// serialise on chargepoint id
cpmu.Lock()
defer cpmu.Unlock()
reg.setup.Lock()
defer reg.setup.Unlock()

cp := reg.cp

cs.mu.Unlock()

// setup already completed?
if cp != nil {
// duplicate registration of id empty
if id == "" {
return nil, errors.New("cannot have >1 charge point with empty station id")
}

// already registered?
if cp, err := cs.ChargepointByID(id); err == nil {
return cp, nil
}

// first time- registration should not error
cp := newfun()
if err := cs.register(id, cp); err != nil {
return nil, err
// first time- create the charge point
cp = newfun()

cs.mu.Lock()
reg.cp = cp
cs.mu.Unlock()

if registered {
cp.connect(true)
}

return cp, init(cp)
Expand All @@ -101,28 +113,28 @@ func (cs *CS) NewChargePoint(chargePoint ocpp16.ChargePointConnection) {
defer cs.mu.Unlock()

// check for configured charge point
cp, ok := cs.cps[chargePoint.ID()]
reg, ok := cs.regs[chargePoint.ID()]
if ok {
cs.log.DEBUG.Printf("charge point connected: %s", chargePoint.ID())

// trigger initial connection if charge point is already setup
if cp != nil {
if cp := reg.cp; cp != nil {
cp.connect(true)
}

return
}

// check for configured anonymous charge point
cp, ok = cs.cps[""]
if ok && cp != nil {
reg, ok = cs.regs[""]
if ok && reg.cp != nil {
cp := reg.cp
cs.log.INFO.Printf("charge point connected, registering: %s", chargePoint.ID())

// update id
cp.RegisterID(chargePoint.ID())

cs.cps[chargePoint.ID()] = cp
delete(cs.cps, "")
cs.regs[chargePoint.ID()] = reg
delete(cs.regs, "")

cp.connect(true)

Expand All @@ -133,7 +145,7 @@ func (cs *CS) NewChargePoint(chargePoint ocpp16.ChargePointConnection) {

// register unknown charge point
// when charge point setup is complete, it will eventually be associated with the connected id
cs.cps[chargePoint.ID()] = nil
cs.regs[chargePoint.ID()] = newRegistration()
}

// ChargePointDisconnected implements ocpp16.ChargePointConnectionHandler
Expand Down
9 changes: 9 additions & 0 deletions charger/ocpp/cs_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ func (cs *CS) OnMeterValues(id string, request *core.MeterValuesRequest) (*core.
}

func (cs *CS) OnStatusNotification(id string, request *core.StatusNotificationRequest) (*core.StatusNotificationConfirmation, error) {
cs.mu.Lock()
// cache status for future cp connection
if reg, ok := cs.regs[id]; ok && request != nil {
reg.mu.Lock()
reg.status[request.ConnectorId] = request
reg.mu.Unlock()
}
cs.mu.Unlock()

if cp, err := cs.ChargepointByID(id); err == nil {
return cp.OnStatusNotification(request)
}
Expand Down
4 changes: 2 additions & 2 deletions charger/ocpp/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func Instance() *CS {

instance = &CS{
log: log,
cps: make(map[string]*CP),
init: make(map[string]*sync.Mutex),
regs: make(map[string]*registration),
CentralSystem: cs,
}

instance.txnId.Store(time.Now().UTC().Unix())

ocppj.SetLogger(instance)
Expand Down