Skip to content

Commit

Permalink
(DAL) Remove config id reference (#2290)
Browse files Browse the repository at this point in the history
* wip

* fix: fix test failure

* fix: update based on feedbacks
  • Loading branch information
nick-bisonai authored Sep 28, 2024
1 parent fd3d9ae commit 1bc27b2
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 71 deletions.
1 change: 1 addition & 0 deletions node/pkg/aggregator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Proof = types.Proof
type GlobalAggregate = types.GlobalAggregate

type SubmissionData struct {
Symbol string
GlobalAggregate GlobalAggregate
Proof Proof
}
Expand Down
1 change: 1 addition & 0 deletions node/pkg/aggregator/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func PublishGlobalAggregateAndProof(ctx context.Context, name string, globalAggr
return nil
}
data := SubmissionData{
Symbol: name,
GlobalAggregate: globalAggregate,
Proof: proof,
}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/dal/apiv2/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func (s *ServerV2) HealthCheckHandler(w http.ResponseWriter, r *http.Request) {
}

func (s *ServerV2) SymbolsHandler(w http.ResponseWriter, r *http.Request) {
result := make([]string, 0, len(s.hub.Configs))
for key := range s.hub.Configs {
result := make([]string, 0, len(s.hub.Symbols))
for key := range s.hub.Symbols {
result = append(result, key)
}

Expand Down
44 changes: 34 additions & 10 deletions node/pkg/dal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package dal

import (
"context"
"errors"
"fmt"
"os"
"time"

Expand All @@ -12,13 +12,16 @@ import (
"bisonai.com/miko/node/pkg/dal/hub"
"bisonai.com/miko/node/pkg/dal/utils/keycache"
"bisonai.com/miko/node/pkg/dal/utils/stats"
errorsentinel "bisonai.com/miko/node/pkg/error"
"bisonai.com/miko/node/pkg/utils/request"

"github.com/rs/zerolog/log"
)

type Config = types.Config

const baseMikoConfigUrl = "https://config.orakl.network/%s_configs.json"

func Run(ctx context.Context) error {
log.Debug().Msg("Starting DAL API server")

Expand All @@ -28,25 +31,26 @@ func Run(ctx context.Context) error {
keyCache := keycache.NewAPIKeyCache(1 * time.Hour)
keyCache.CleanupLoop(10 * time.Minute)

adminEndpoint := os.Getenv("ORAKL_NODE_ADMIN_URL")
if adminEndpoint == "" {
return errors.New("ORAKL_NODE_ADMIN_URL is not set")
chain := os.Getenv("CHAIN")
if chain == "" {
log.Error().Msg("CHAIN environment variable not set")
return errorsentinel.ErrDalChainEnvNotFound
}

configs, err := fetchConfigs(ctx, adminEndpoint)
symbols, err := fetchSymbols(chain)
if err != nil {
log.Error().Err(err).Msg("Failed to fetch configs")
log.Error().Err(err).Msg("Failed to fetch symbols")
return err
}

collector, err := collector.NewCollector(ctx, configs)
collector, err := collector.NewCollector(ctx, symbols)
if err != nil {
log.Error().Err(err).Msg("Failed to setup collector")
return err
}
collector.Start(ctx)

hub := hub.HubSetup(ctx, configs)
hub := hub.HubSetup(ctx, symbols)
go hub.Start(ctx, collector)

err = apiv2.Start(ctx, apiv2.WithCollector(collector), apiv2.WithHub(hub), apiv2.WithKeyCache(keyCache), apiv2.WithStatsApp(statsApp))
Expand All @@ -58,6 +62,26 @@ func Run(ctx context.Context) error {
return nil
}

func fetchConfigs(ctx context.Context, endpoint string) ([]Config, error) {
return request.Request[[]Config](request.WithEndpoint(endpoint + "/config"))
func fetchSymbols(chain string) ([]string, error) {
type ConfigEntry struct {
Name string `json:"name"`
}

results, err := request.Request[[]ConfigEntry](
request.WithEndpoint(fmt.Sprintf(baseMikoConfigUrl, chain)),
request.WithTimeout(5*time.Second))
if err != nil {
return nil, err
}

if len(results) == 0 {
return nil, errorsentinel.ErrDalSymbolsNotFound
}

var symbols []string
for _, result := range results {
symbols = append(symbols, result.Name)
}

return symbols, nil
}
48 changes: 26 additions & 22 deletions node/pkg/dal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ const (
type Config = types.Config

type Collector struct {
OutgoingStream map[string]chan *dalcommon.OutgoingSubmissionData
Symbols map[int32]string
FeedHashes map[int32][]byte
LatestTimestamps map[int32]time.Time
OutgoingStream map[string]chan *dalcommon.OutgoingSubmissionData

FeedHashes map[string][]byte
LatestTimestamps map[string]time.Time
LatestData map[string]*dalcommon.OutgoingSubmissionData
CachedWhitelist []klaytncommon.Address

Expand All @@ -50,7 +50,7 @@ type Collector struct {
mu sync.RWMutex
}

func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {
func NewCollector(ctx context.Context, symbols []string) (*Collector, error) {
kaiaWebsocketUrl := os.Getenv("KAIA_WEBSOCKET_URL")
if kaiaWebsocketUrl == "" {
return nil, errors.New("KAIA_WEBSOCKET_URL is not set")
Expand Down Expand Up @@ -92,22 +92,20 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {
}

collector := &Collector{
OutgoingStream: make(map[string]chan *dalcommon.OutgoingSubmissionData, len(configs)),
Symbols: make(map[int32]string, len(configs)),
FeedHashes: make(map[int32][]byte, len(configs)),
LatestTimestamps: make(map[int32]time.Time),
OutgoingStream: make(map[string]chan *dalcommon.OutgoingSubmissionData, len(symbols)),
FeedHashes: make(map[string][]byte, len(symbols)),
LatestTimestamps: make(map[string]time.Time),
LatestData: make(map[string]*dalcommon.OutgoingSubmissionData),
chainReader: chainReader,
CachedWhitelist: initialWhitelist,
submissionProxyContractAddr: submissionProxyContractAddr,
}

redisTopics := []string{}
for _, config := range configs {
collector.OutgoingStream[config.Name] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.Symbols[config.ID] = config.Name
collector.FeedHashes[config.ID] = crypto.Keccak256([]byte(config.Name))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(config.Name))
for _, symbol := range symbols {
collector.OutgoingStream[symbol] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.FeedHashes[symbol] = crypto.Keccak256([]byte(symbol))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(symbol))
}

baseRediscribe, err := db.NewRediscribe(
Expand Down Expand Up @@ -162,7 +160,7 @@ func (c *Collector) GetLatestData(symbol string) (*dalcommon.OutgoingSubmissionD
func (c *Collector) GetAllLatestData() []dalcommon.OutgoingSubmissionData {
c.mu.RLock()
defer c.mu.RUnlock()
result := make([]dalcommon.OutgoingSubmissionData, 0, len(c.Symbols))
result := make([]dalcommon.OutgoingSubmissionData, 0, len(c.FeedHashes))
for _, value := range c.LatestData {
result = append(result, *value)
}
Expand Down Expand Up @@ -202,9 +200,9 @@ func (c *Collector) compareAndSwapLatestTimestamp(data *aggregator.SubmissionDat
c.mu.Lock()
defer c.mu.Unlock()

old, ok := c.LatestTimestamps[data.GlobalAggregate.ConfigID]
old, ok := c.LatestTimestamps[data.Symbol]
if !ok || data.GlobalAggregate.Timestamp.After(old) {
c.LatestTimestamps[data.GlobalAggregate.ConfigID] = data.GlobalAggregate.Timestamp
c.LatestTimestamps[data.Symbol] = data.GlobalAggregate.Timestamp
return true
}

Expand All @@ -218,7 +216,7 @@ func (c *Collector) processIncomingData(ctx context.Context, data *aggregator.Su
default:
valid := c.compareAndSwapLatestTimestamp(data)
if !valid {
log.Debug().Str("Player", "DalCollector").Str("Symbol", c.Symbols[data.GlobalAggregate.ConfigID]).Msg("old data recieved")
log.Debug().Str("Player", "DalCollector").Str("Symbol", data.Symbol).Msg("old data recieved")
return
}

Expand All @@ -241,15 +239,21 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data *aggreg
c.mu.RLock()
whitelist := c.CachedWhitelist
c.mu.RUnlock()

feedHashBytes, ok := c.FeedHashes[data.Symbol]
if !ok {
return nil, errorsentinel.ErrDalFeedHashNotFound
}

orderedProof, err := orderProof(
ctx,
data.Proof.Proof,
data.GlobalAggregate.Value,
data.GlobalAggregate.Timestamp,
c.Symbols[data.GlobalAggregate.ConfigID],
data.Symbol,
whitelist)
if err != nil {
log.Error().Err(err).Str("Player", "DalCollector").Str("Symbol", c.Symbols[data.GlobalAggregate.ConfigID]).Msg("failed to order proof")
log.Error().Err(err).Str("Player", "DalCollector").Str("Symbol", data.Symbol).Msg("failed to order proof")
if errors.Is(err, errorsentinel.ErrDalSignerNotWhitelisted) {
go func(ctx context.Context, chainHelper *websocketchainreader.ChainReader, contractAddress string) {
newList, getAllOraclesErr := getAllOracles(ctx, chainHelper, contractAddress)
Expand All @@ -265,11 +269,11 @@ func (c *Collector) IncomingDataToOutgoingData(ctx context.Context, data *aggreg
return nil, err
}
return &dalcommon.OutgoingSubmissionData{
Symbol: c.Symbols[data.GlobalAggregate.ConfigID],
Symbol: data.Symbol,
Value: strconv.FormatInt(data.GlobalAggregate.Value, 10),
AggregateTime: strconv.FormatInt(data.GlobalAggregate.Timestamp.UnixMilli(), 10),
Proof: formatBytesToHex(orderedProof),
FeedHash: formatBytesToHex(c.FeedHashes[data.GlobalAggregate.ConfigID]),
FeedHash: formatBytesToHex(feedHashBytes),
Decimals: DefaultDecimals,
}, nil
}
Expand Down
42 changes: 20 additions & 22 deletions node/pkg/dal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"
"time"

"bisonai.com/miko/node/pkg/common/types"
"bisonai.com/miko/node/pkg/dal/collector"
dalcommon "bisonai.com/miko/node/pkg/dal/common"
"bisonai.com/miko/node/pkg/dal/utils/stats"
Expand All @@ -15,16 +14,14 @@ import (
"nhooyr.io/websocket/wsjson"
)

type Config = types.Config

type Subscription struct {
Method string `json:"method"`
Params []string `json:"params"`
}

type Hub struct {
Configs map[string]Config
Clients map[*websocket.Conn]map[string]any
Symbols map[string]struct{}
Clients map[*websocket.Conn]map[string]struct{}
Register chan *websocket.Conn
Unregister chan *websocket.Conn
broadcast map[string]chan *dalcommon.OutgoingSubmissionData
Expand All @@ -36,20 +33,20 @@ const (
CleanupInterval = time.Hour
)

func HubSetup(ctx context.Context, configs []Config) *Hub {
configMap := make(map[string]Config)
for _, config := range configs {
configMap[config.Name] = config
func HubSetup(ctx context.Context, symbols []string) *Hub {
symbolsMap := make(map[string]struct{})
for _, symbol := range symbols {
symbolsMap[symbol] = struct{}{}
}

hub := NewHub(configMap)
hub := NewHub(symbolsMap)
return hub
}

func NewHub(configs map[string]Config) *Hub {
func NewHub(symbols map[string]struct{}) *Hub {
return &Hub{
Configs: configs,
Clients: make(map[*websocket.Conn]map[string]any),
Symbols: symbols,
Clients: make(map[*websocket.Conn]map[string]struct{}),
Register: make(chan *websocket.Conn),
Unregister: make(chan *websocket.Conn),
broadcast: make(map[string]chan *dalcommon.OutgoingSubmissionData),
Expand All @@ -61,8 +58,9 @@ func (h *Hub) Start(ctx context.Context, collector *collector.Collector) {

h.initializeBroadcastChannels(collector)

for symbol := range h.Configs {
go h.broadcastDataForSymbol(ctx, symbol)
for symbol := range h.Symbols {
sym := symbol // Capture loop variable to avoid potential race condition
go h.broadcastDataForSymbol(ctx, sym)
}

go h.cleanupJob(ctx)
Expand All @@ -74,13 +72,13 @@ func (h *Hub) HandleSubscription(ctx context.Context, client *websocket.Conn, ms

subscriptions, ok := h.Clients[client]
if !ok {
subscriptions = map[string]any{}
subscriptions = map[string]struct{}{}
}

valid := []string{}
for _, param := range msg.Params {
symbol := strings.TrimPrefix(param, "submission@")
if _, ok := h.Configs[symbol]; !ok {
if _, ok := h.Symbols[symbol]; !ok {
continue
}
subscriptions[symbol] = struct{}{}
Expand Down Expand Up @@ -116,7 +114,7 @@ func (h *Hub) addClient(client *websocket.Conn) {
if _, ok := h.Clients[client]; ok {
return
}
h.Clients[client] = make(map[string]any)
h.Clients[client] = make(map[string]struct{})
}

func (h *Hub) removeClient(client *websocket.Conn) {
Expand Down Expand Up @@ -146,18 +144,18 @@ func (h *Hub) initializeBroadcastChannels(collector *collector.Collector) {

func (h *Hub) broadcastDataForSymbol(ctx context.Context, symbol string) {
for data := range h.broadcast[symbol] {
go h.castSubmissionData(ctx, data, &symbol)
go h.castSubmissionData(ctx, data, symbol)
}
}

func (h *Hub) castSubmissionData(ctx context.Context, data *dalcommon.OutgoingSubmissionData, symbol *string) {
func (h *Hub) castSubmissionData(ctx context.Context, data *dalcommon.OutgoingSubmissionData, symbol string) {
var wg sync.WaitGroup

h.mu.RLock()
defer h.mu.RUnlock()

for client, subscriptions := range h.Clients {
if _, ok := subscriptions[*symbol]; ok {
if _, ok := subscriptions[symbol]; ok {
wg.Add(1)
go func(entry *websocket.Conn) {
defer wg.Done()
Expand Down Expand Up @@ -189,7 +187,7 @@ func (h *Hub) cleanup() {
h.mu.Lock()
defer h.mu.Unlock()

newClients := make(map[*websocket.Conn]map[string]any, len(h.Clients))
newClients := make(map[*websocket.Conn]map[string]struct{}, len(h.Clients))
for client, subscriptions := range h.Clients {
if len(subscriptions) > 0 {
newClients[client] = subscriptions
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/dal/tests/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCollectorStartAndStop(t *testing.T) {
collector := testItems.Collector
assert.True(t, collector.IsRunning)

assert.Greater(t, len(collector.Symbols), 0)
assert.Greater(t, len(collector.FeedHashes), 0)
collector.Stop()
assert.False(t, collector.IsRunning)
}
Expand All @@ -51,7 +51,7 @@ func TestCollectorStream(t *testing.T) {
time.Sleep(20 * time.Millisecond)

collector := testItems.Collector
assert.Greater(t, len(collector.Symbols), 0)
assert.Greater(t, len(collector.FeedHashes), 0)
assert.True(t, collector.IsRunning)

headers := map[string]string{"X-API-Key": testItems.ApiKey}
Expand Down
Loading

0 comments on commit 1bc27b2

Please sign in to comment.