Skip to content

Commit

Permalink
Abort HA Realization Logic After Timeout
Browse files Browse the repository at this point in the history
A strange HA behavior was reported in #787, resulting in both instances
being active.

The logs contained an entry of the previous active instance exiting the
HA.realize() method successfully after 1m9s. This, however, should not
be possible as the method's context is deadlined to a minute after the
heartbeat was received.

However, as it turns out, executing COMMIT on a database transaction is
not bound to the transaction's context, allowing to survive longer. To
mitigate this, another context watch was introduced. Doing so allows
directly handing over, while the other instance can now take over due to
the expired heartbeat in the database.

As a related change, the HA.insertEnvironment() method was inlined into
the retryable function to use the deadlined context. Otherwise, this
might block afterwards, as it was used within HA.realize(), but without
the passed context.

Since the retryable HA function may be executed a few times before
succeeding, the inserted heartbeat value will be directly outdated. The
heartbeat logic was slightly altered to always use the latest heartbeat
time value.

In addition, the main loop select cases for hactx.Done() and ctx.Done()
were unified, as hactx is a derived ctx. A closed ctx case may be lost
as the hactx case could have been chosen.
  • Loading branch information
oxzi committed Sep 23, 2024
1 parent 5e81041 commit c13752a
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 30 deletions.
7 changes: 4 additions & 3 deletions cmd/icingadb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,10 @@ func run() int {

cancelHactx()
case <-hactx.Done():
// Nothing to do here, surrounding loop will terminate now.
if ctx.Err() != nil {
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
}
// Otherwise, there is nothing to do here, surrounding loop will terminate now.
case <-ha.Done():
if err := ha.Err(); err != nil {
logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error"))
Expand All @@ -337,8 +340,6 @@ func run() int {
cancelHactx()

return ExitFailure
case <-ctx.Done():
logger.Fatalf("%+v", errors.New("main context closed unexpectedly"))
case s := <-sig:
logger.Infow("Exiting due to signal", zap.String("signal", s.String()))
cancelHactx()
Expand Down
67 changes: 40 additions & 27 deletions pkg/icingadb/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (h *HA) controller() {
}
tt := t.Time()
if tt.After(now.Add(1 * time.Second)) {
h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt))
h.logger.Warnw("Received heartbeat from the future", zap.Time("time", tt))
}
if tt.Before(now.Add(-1 * peerTimeout)) {
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))
Expand Down Expand Up @@ -218,7 +218,7 @@ func (h *HA) controller() {

// Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time.
realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime())
err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents)
err = h.realize(realizeCtx, s, envId, shouldLogRoutineEvents)
cancelRealizeCtx()
if errors.Is(err, context.DeadlineExceeded) {
h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time")
Expand Down Expand Up @@ -264,11 +264,16 @@ func (h *HA) controller() {

// realize a HA cycle triggered by a heartbeat event.
//
// The context passed is expected to have a deadline, otherwise the method will panic. This deadline is strictly
// enforced to abort the realization logic the moment the context expires.
//
// shouldLogRoutineEvents indicates if recurrent events should be logged.
//
// The internal, retryable function always fetches the last received heartbeat's timestamp instead of reusing the one
// from the calling controller loop. Doing so results in inserting a more accurate timestamp if a retry happens.
func (h *HA) realize(
ctx context.Context,
s *icingaredisv1.IcingaStatus,
t *types.UnixMilli,
envId types.Binary,
shouldLogRoutineEvents bool,
) error {
Expand Down Expand Up @@ -300,6 +305,7 @@ func (h *HA) realize(
if errBegin != nil {
return errors.Wrap(errBegin, "can't start transaction")
}
defer func() { _ = tx.Rollback() }()

query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+
"WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock
Expand Down Expand Up @@ -350,7 +356,7 @@ func (h *HA) realize(
EnvironmentMeta: v1.EnvironmentMeta{
EnvironmentId: envId,
},
Heartbeat: *t,
Heartbeat: types.UnixMilli(time.UnixMilli(h.heartbeat.LastMessage())),
Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true},
EndpointId: s.EndpointId,
Icinga2Version: s.Version,
Expand All @@ -370,23 +376,48 @@ func (h *HA) realize(

if takeover != "" {
stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?")
_, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId)
if _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId); err != nil {
return database.CantPerformQuery(err, stmt)
}

if err != nil {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
stmt, _ = h.db.BuildInsertIgnoreStmt(h.environment)
if _, err := h.db.NamedExecContext(ctx, stmt, h.environment); err != nil {
return database.CantPerformQuery(err, stmt)
}
}

if err := tx.Commit(); err != nil {
return errors.Wrap(err, "can't commit transaction")
// In general, cancellation does not work for commit and rollback. Some database drivers may support a
// context-based abort, but only if the DBMS allows it. This was also discussed in the initial PR adding
// context support to sql: https://github.com/golang/go/issues/15123#issuecomment-245882486
//
// The following is implementation knowledge, not covered by the API specification. Go's sql.Tx.Commit() -
// which is not being overridden by sqlx.Tx - performs a preflight check on the context before handing over
// to the driver's Commit method. The drivers may behave differently. For example, the used
// github.com/go-sql-driver/mysql package calls its internal exec method with a "COMMIT" query, writing and
// reading packets without honoring the context.
//
// In a nutshell, one cannot expect a Commit() call to be covered by the transaction context. Thus, an
// additional check wraps the call, ensuring that this function will be left when the context is done.
commitErrCh := make(chan error, 1)
go func() { commitErrCh <- tx.Commit() }()

select {
case err := <-commitErrCh:
if err != nil {
return errors.Wrap(err, "can't commit transaction")
}
case <-ctx.Done():
return ctx.Err()
}

return nil
},
retry.Retryable,
backoff.NewExponentialWithJitter(256*time.Millisecond, 3*time.Second),
retry.Settings{
// Intentionally no timeout is set, as we use a context with a deadline.
// Intentionally, no timeout is set because a context with a deadline is used and QuickContextExit is set.
OnRetryableError: func(_ time.Duration, attempt uint64, err, lastErr error) {
if lastErr == nil || err.Error() != lastErr.Error() {
log := h.logger.Debugw
Expand Down Expand Up @@ -420,12 +451,6 @@ func (h *HA) realize(
}

if takeover != "" {
// Insert the environment after each heartbeat takeover if it does not already exist in the database
// as the environment may have changed, although this is likely to happen very rarely.
if err := h.insertEnvironment(); err != nil {
return errors.Wrap(err, "can't insert environment")
}

h.signalTakeover(takeover)
} else if otherResponsible {
if state, _ := h.state.Load(); !state.otherResponsible {
Expand All @@ -445,18 +470,6 @@ func (h *HA) realizeLostHeartbeat() {
}
}

// insertEnvironment inserts the environment from the specified state into the database if it does not already exist.
func (h *HA) insertEnvironment() error {
// Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does.
stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment)

if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil {
return database.CantPerformQuery(err, stmt)
}

return nil
}

func (h *HA) removeInstance(ctx context.Context) {
h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId)))
// Intentionally not using h.ctx here as it's already cancelled.
Expand Down
16 changes: 16 additions & 0 deletions pkg/icingaredis/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Heartbeat struct {
active bool
events chan *HeartbeatMessage
lastReceivedMs int64
lastMessageMs int64
cancelCtx context.CancelFunc
client *redis.Client
done chan struct{}
Expand Down Expand Up @@ -62,6 +63,11 @@ func (h *Heartbeat) LastReceived() int64 {
return atomic.LoadInt64(&h.lastReceivedMs)
}

// LastMessage returns the last message's time in ms.
func (h *Heartbeat) LastMessage() int64 {
return atomic.LoadInt64(&h.lastMessageMs)
}

// Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any.
// Implements the io.Closer interface.
func (h *Heartbeat) Close() error {
Expand Down Expand Up @@ -139,6 +145,15 @@ func (h *Heartbeat) controller(ctx context.Context) {
}

atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli())

statsT, err := m.stats.Time()
if err != nil {
h.logger.Warnw("Received Icinga heartbeat with invalid stats time", zap.Error(err))
atomic.StoreInt64(&h.lastMessageMs, 0)
} else {
atomic.StoreInt64(&h.lastMessageMs, statsT.Time().UnixMilli())
}

h.sendEvent(m)
case <-time.After(Timeout):
if h.active {
Expand All @@ -150,6 +165,7 @@ func (h *Heartbeat) controller(ctx context.Context) {
}

atomic.StoreInt64(&h.lastReceivedMs, 0)
atomic.StoreInt64(&h.lastMessageMs, 0)
case <-ctx.Done():
return ctx.Err()
}
Expand Down

0 comments on commit c13752a

Please sign in to comment.