Skip to content

Refactor Rearchitecture

Tamjid Ahmed edited this page Nov 2, 2021 · 2 revisions

Reason for this refactor + redesign is to address existing issues in the price oracle. These are (but not limited to):

Proposal

  1. Decouple logging. See code example 1
  2. Heartbeat on demon(never ending) go-routines. This will help us monitor the go-routines, should they need restart/termination. See code example 2
  3. Add zitter on periodic queries to update the data. (Subscription functions.)

Code example

  1. Decoupling logging idea
type Logger interface {
  Errorw(string, ...interface{})
  Infow(string, ...interface{})
}

type Logging struct {
  sugar *zap.SugaredLogger
}

func (l *Logging) Errorw(msg string, keyAndValues ...interface{}) {
  l.sugar.Errorw(msg, keyAndValues)
}

func (l *Logging) Infow(msg string, keyAndValues ...interface{}) {
  l.sugar.Infow(msg, keyAndValues)
}

// In Code
xxx := someStruct {
  ...
  l : Logging{zap.New()},
  ...
}
// In test
type stabLogger struct {
  sb strings.builder
}

func (sl *stabLogger) Errorw(msg string, keyAndValues ...interface{}) {
  fmt.Sprintf(sl.sb, msg, keyAndValues)
}

func (sl *stabLogger) Infow(msg string, keyAndValues ...interface{}) {
  fmt.Sprintf(sl.sb, msg, keyAndValues)
}

// Then in test we can capture logs by calling sl.sb.String()

  1. Implementation of heartbeat, something like this
type template func(<-chan interface{}, time.Duration) <-chan interface{}

func or(chans ...<-chan interface{}) <-chan interface{} {
  switch len(chans) {
  case 0:
    return nil
  case 1:
    return chans[0]
  }
  orDone := make(chan interface{})
  go func() {
    defer close(orDone)
    switch len(chans) {
    case 2:
      select {
      case <-chans[0]:
      case <-chans[1]:
      }
    default:
      select {
      case <-chans[0]:
      case <-chans[1]:
      case <-chans[2]:
      case <-or(append(chans[3:], orDone)...):
      }
    }
  }()
  return orDone
}

func newStewards(timeout time.Duration, f template) template {
  return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {
    heartbeat := make(chan interface{})
    go func() {
      defer close(heartbeat)
      var fDone chan interface{}
      var fHeartbit <-chan interface{}

      start := func() {
        fDone = make(chan interface{})
        fHeartbit = f(or(fDone, done), timeout/2)
      }
      start()
      pulse := time.Tick(pulseInterval)
    monitorLoop:
      for {
        monitorTimeout := time.After(timeout)
        for {
          select {
          case <-pulse:
            select {
            case heartbeat <- struct{}{}:
            default:
            }
          case _, ok := <-done:
            log.Println("Channel open", ok)
            return
          case <-monitorTimeout:
            log.Println("Child function not responding; restarting")
            close(fDone)
            start()
            continue monitorLoop
          case <-fHeartbit: // Handle fatal errors if we need to.
            continue monitorLoop
          }
        }
      }
    }()
    return heartbeat
  }
}

Clone this wiki locally