From 919f152bee0a50707e5e62475fe037484d54e077 Mon Sep 17 00:00:00 2001 From: mudler Date: Mon, 26 Aug 2024 20:09:45 +0200 Subject: [PATCH 1/4] chore(refactor): create the node package Signed-off-by: mudler --- cmd/masa-node/main.go | 26 +-- node/blockchain.go | 214 +++++++++++++++++++++++ {pkg => node}/oracle_node.go | 234 +++----------------------- {pkg => node}/oracle_node_listener.go | 2 +- {pkg => node}/oracle_node_test.go | 2 +- {pkg => node}/subscriptions.go | 2 +- pkg/api/api.go | 6 +- pkg/api/handlers_node.go | 4 +- pkg/api/routes.go | 4 +- pkg/config/options.go | 33 +++- pkg/db/operations.go | 6 +- pkg/db/resolver_cache.go | 10 +- pkg/tests/api_test.go | 6 +- pkg/tests/integration/tracker_test.go | 2 +- pkg/workers/types/request_response.go | 4 +- pkg/workers/worker_manager.go | 6 +- pkg/workers/worker_selection.go | 4 +- 17 files changed, 304 insertions(+), 261 deletions(-) create mode 100644 node/blockchain.go rename {pkg => node}/oracle_node.go (66%) rename {pkg => node}/oracle_node_listener.go (99%) rename {pkg => node}/oracle_node_test.go (98%) rename {pkg => node}/subscriptions.go (99%) diff --git a/cmd/masa-node/main.go b/cmd/masa-node/main.go index b99516e3..e777e470 100644 --- a/cmd/masa-node/main.go +++ b/cmd/masa-node/main.go @@ -13,7 +13,7 @@ import ( "github.com/sirupsen/logrus" - masa "github.com/masa-finance/masa-oracle/pkg" + "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/api" "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/db" @@ -74,22 +74,22 @@ func main() { isValidator := cfg.Validator // Create a new OracleNode - node, err := masa.NewOracleNode(ctx, config.EnableStaked) + masaNode, err := node.NewOracleNode(ctx, config.EnableStaked) if err != nil { logrus.Fatal(err) } - err = node.Start() + err = masaNode.Start() if err != nil { logrus.Fatal(err) } - node.NodeTracker.GetAllNodeData() + masaNode.NodeTracker.GetAllNodeData() if cfg.TwitterScraper && cfg.DiscordScraper && cfg.WebScraper { logrus.Warn("[+] Node is set as all types of scrapers. This may not be intended behavior.") } if cfg.AllowedPeer { - cfg.AllowedPeerId = node.Host.ID().String() + cfg.AllowedPeerId = masaNode.Host.ID().String() cfg.AllowedPeerPublicKey = keyManager.HexPubKey logrus.Infof("[+] Allowed peer with ID: %s and PubKey: %s", cfg.AllowedPeerId, cfg.AllowedPeerPublicKey) } else { @@ -97,16 +97,16 @@ func main() { } // Init cache resolver - db.InitResolverCache(node, keyManager) + db.InitResolverCache(masaNode, keyManager) // Subscribe and if actor start monitoring actor workers // considering all that matters is if the node is staked // and other peers can do work we only need to check this here // if this peer can or cannot scrape or write that is checked in other places - if node.IsStaked { - node.Host.SetStreamHandler(config.ProtocolWithVersion(config.WorkerProtocol), workers.GetWorkHandlerManager().HandleWorkerStream) - go masa.SubscribeToBlocks(ctx, node) - go node.NodeTracker.ClearExpiredWorkerTimeouts() + if masaNode.IsStaked { + masaNode.Host.SetStreamHandler(config.ProtocolWithVersion(config.WorkerProtocol), workers.GetWorkHandlerManager().HandleWorkerStream) + go node.SubscribeToBlocks(ctx, masaNode) + go masaNode.NodeTracker.ClearExpiredWorkerTimeouts() } // Listen for SIGINT (CTRL+C) @@ -116,7 +116,7 @@ func main() { // Cancel the context when SIGINT is received go func() { <-c - nodeData := node.NodeTracker.GetNodeData(node.Host.ID().String()) + nodeData := masaNode.NodeTracker.GetNodeData(masaNode.Host.ID().String()) if nodeData != nil { nodeData.Left() } @@ -130,7 +130,7 @@ func main() { } }() - router := api.SetupRoutes(node) + router := api.SetupRoutes(masaNode) go func() { err = router.Run() if err != nil { @@ -139,7 +139,7 @@ func main() { }() // Get the multiaddress and IP address of the node - multiAddr := node.GetMultiAddrs() // Get the multiaddress + multiAddr := masaNode.GetMultiAddrs() // Get the multiaddress ipAddr, err := multiAddr.ValueForProtocol(multiaddr.P_IP4) // Get the IP address // Display the welcome message with the multiaddress and IP address config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, isValidator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion) diff --git a/node/blockchain.go b/node/blockchain.go new file mode 100644 index 00000000..a3bddb76 --- /dev/null +++ b/node/blockchain.go @@ -0,0 +1,214 @@ +package node + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "os" + "reflect" + "sync" + "time" + + shell "github.com/ipfs/go-ipfs-api" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/masa-finance/masa-oracle/pkg/chain" + "github.com/sirupsen/logrus" +) + +// Blockchain Implementation +var ( + blocksCh = make(chan *pubsub.Message) +) + +type BlockData struct { + Block uint64 `json:"block"` + InputData interface{} `json:"input_data"` + TransactionHash string `json:"transaction_hash"` + PreviousHash string `json:"previous_hash"` + TransactionNonce int `json:"nonce"` +} + +type Blocks struct { + BlockData []BlockData `json:"blocks"` +} + +type BlockEvents struct{} + +type BlockEventTracker struct { + BlockEvents []BlockEvents + BlockTopic *pubsub.Topic + mu sync.Mutex +} + +// HandleMessage processes incoming pubsub messages containing block events. +// It unmarshals the message data into a slice of BlockEvents and appends them +// to the tracker's BlockEvents slice. +func (b *BlockEventTracker) HandleMessage(m *pubsub.Message) { + var blockEvents any + + // Try to decode as base64 first + decodedData, err := base64.StdEncoding.DecodeString(string(m.Data)) + if err == nil { + m.Data = decodedData + } + + // Try to unmarshal as JSON + err = json.Unmarshal(m.Data, &blockEvents) + if err != nil { + // If JSON unmarshal fails, try to interpret as string + blockEvents = string(m.Data) + } + + b.mu.Lock() + defer b.mu.Unlock() + + switch v := blockEvents.(type) { + case []BlockEvents: + b.BlockEvents = append(b.BlockEvents, v...) + case BlockEvents: + b.BlockEvents = append(b.BlockEvents, v) + case map[string]interface{}: + // Convert map to BlockEvents struct + newBlockEvent := BlockEvents{} + // You might need to add logic here to properly convert the map to BlockEvents + b.BlockEvents = append(b.BlockEvents, newBlockEvent) + case []interface{}: + // Convert each item in the slice to BlockEvents + for _, item := range v { + if be, ok := item.(BlockEvents); ok { + b.BlockEvents = append(b.BlockEvents, be) + } + } + case string: + // Handle string data + newBlockEvent := BlockEvents{} + // You might need to add logic here to properly convert the string to BlockEvents + b.BlockEvents = append(b.BlockEvents, newBlockEvent) + default: + logrus.Warnf("[-] Unexpected data type in message: %v", reflect.TypeOf(v)) + } + + blocksCh <- m +} + +func updateBlocks(ctx context.Context, node *OracleNode) error { + + var existingBlocks Blocks + blocks := chain.GetBlockchain(node.Blockchain) + + for _, block := range blocks { + var inputData interface{} + err := json.Unmarshal(block.Data, &inputData) + if err != nil { + inputData = string(block.Data) // Fallback to string if unmarshal fails + } + + blockData := BlockData{ + Block: block.Block, + InputData: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%v", inputData))), + TransactionHash: fmt.Sprintf("%x", block.Hash), + PreviousHash: fmt.Sprintf("%x", block.Link), + TransactionNonce: int(block.Nonce), + } + existingBlocks.BlockData = append(existingBlocks.BlockData, blockData) + } + jsonData, err := json.Marshal(existingBlocks) + if err != nil { + return err + } + + err = node.DHT.PutValue(ctx, "/db/blocks", jsonData) + if err != nil { + logrus.Warningf("[-] Unable to store block on DHT: %v", err) + } + + if os.Getenv("IPFS_URL") != "" { + + infuraURL := fmt.Sprintf("https://%s:%s@%s", os.Getenv("PID"), os.Getenv("PS"), os.Getenv("IPFS_URL")) + sh := shell.NewShell(infuraURL) + + jsonBytes, err := json.Marshal(jsonData) + if err != nil { + logrus.Errorf("[-] Error marshalling JSON: %s", err) + } + + reader := bytes.NewReader(jsonBytes) + + hash, err := sh.AddWithOpts(reader, true, true) + if err != nil { + logrus.Errorf("[-] Error persisting to IPFS: %s", err) + } else { + logrus.Printf("[+] Ledger persisted with IPFS hash: https://dwn.infura-ipfs.io/ipfs/%s\n", hash) + _ = node.DHT.PutValue(ctx, "/db/ipfs", []byte(fmt.Sprintf("https://dwn.infura-ipfs.io/ipfs/%s", hash))) + + } + } + + return nil +} + +func SubscribeToBlocks(ctx context.Context, node *OracleNode) { + if !node.IsValidator { + return + } + + go func() { + err := node.Blockchain.Init() + if err != nil { + logrus.Error(err) + } + }() + + updateTicker := time.NewTicker(time.Second * 60) + defer updateTicker.Stop() + + for { + select { + case block, ok := <-blocksCh: + if !ok { + logrus.Error("[-] Block channel closed") + return + } + if err := processBlock(node, block); err != nil { + logrus.Errorf("[-] Error processing block: %v", err) + // Consider adding a retry mechanism or circuit breaker here + } + + case <-updateTicker.C: + logrus.Info("[+] blockchain tick") + if err := updateBlocks(ctx, node); err != nil { + logrus.Errorf("[-] Error updating blocks: %v", err) + // Consider adding a retry mechanism or circuit breaker here + } + + case <-ctx.Done(): + logrus.Info("[+] Context cancelled, stopping block subscription") + return + } + } +} + +func processBlock(node *OracleNode, block *pubsub.Message) error { + blocks := chain.GetBlockchain(node.Blockchain) + for _, b := range blocks { + if string(b.Data) == string(block.Data) { + return nil // Block already exists + } + } + + if err := node.Blockchain.AddBlock(block.Data); err != nil { + return fmt.Errorf("[-] failed to add block: %w", err) + } + + if node.Blockchain.LastHash != nil { + b, err := node.Blockchain.GetBlock(node.Blockchain.LastHash) + if err != nil { + return fmt.Errorf("[-] failed to get last block: %w", err) + } + b.Print() + } + + return nil +} diff --git a/pkg/oracle_node.go b/node/oracle_node.go similarity index 66% rename from pkg/oracle_node.go rename to node/oracle_node.go index a4cd379b..bd24159d 100644 --- a/pkg/oracle_node.go +++ b/node/oracle_node.go @@ -1,15 +1,9 @@ -package masa +package node import ( - "bytes" "context" - "encoding/base64" - "encoding/json" "fmt" - "os" - "reflect" "strings" - "sync" "time" ethereumCrypto "github.com/ethereum/go-ethereum/crypto" @@ -30,15 +24,12 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/sirupsen/logrus" - shell "github.com/ipfs/go-ipfs-api" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/masa-finance/masa-oracle/internal/versioning" "github.com/masa-finance/masa-oracle/pkg/chain" "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/masacrypto" myNetwork "github.com/masa-finance/masa-oracle/pkg/network" - pubsub2 "github.com/masa-finance/masa-oracle/pkg/pubsub" + "github.com/masa-finance/masa-oracle/pkg/pubsub" ) type OracleNode struct { @@ -47,10 +38,9 @@ type OracleNode struct { priorityAddrs multiaddr.Multiaddr multiAddrs []multiaddr.Multiaddr DHT *dht.IpfsDHT - Context context.Context PeerChan chan myNetwork.PeerEvent - NodeTracker *pubsub2.NodeEventTracker - PubSubManager *pubsub2.Manager + NodeTracker *pubsub.NodeEventTracker + PubSubManager *pubsub.Manager Signature string IsStaked bool IsValidator bool @@ -60,10 +50,11 @@ type OracleNode struct { IsWebScraper bool IsLlmServer bool StartTime time.Time - WorkerTracker *pubsub2.WorkerEventTracker + WorkerTracker *pubsub.WorkerEventTracker BlockTracker *BlockEventTracker Blockchain *chain.Chain options config.AppOption + Context context.Context } // GetMultiAddrs returns the priority multiaddr for this node. @@ -147,7 +138,7 @@ func NewOracleNode(ctx context.Context, opts ...config.Option) (*OracleNode, err return nil, err } - subscriptionManager, err := pubsub2.NewPubSubManager(ctx, hst) + subscriptionManager, err := pubsub.NewPubSubManager(ctx, hst) if err != nil { return nil, err } @@ -156,9 +147,9 @@ func NewOracleNode(ctx context.Context, opts ...config.Option) (*OracleNode, err Host: hst, Protocol: config.ProtocolWithVersion(config.OracleProtocol), multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst), - Context: ctx, PeerChan: make(chan myNetwork.PeerEvent), - NodeTracker: pubsub2.NewNodeEventTracker(versioning.ProtocolVersion, cfg.Environment, hst.ID().String()), + NodeTracker: pubsub.NewNodeEventTracker(versioning.ProtocolVersion, cfg.Environment, hst.ID().String()), + Context: ctx, PubSubManager: subscriptionManager, IsStaked: o.IsStaked, IsValidator: cfg.Validator, @@ -196,9 +187,14 @@ func (node *OracleNode) Start() (err error) { node.Host.SetStreamHandler(node.Protocol, node.handleStream) node.Host.SetStreamHandler(config.ProtocolWithVersion(config.NodeDataSyncProtocol), node.ReceiveNodeData) + for pid, n := range node.options.ProtocolHandlers { + node.Host.SetStreamHandler(pid, n) + } + if node.IsStaked { node.Host.SetStreamHandler(config.ProtocolWithVersion(config.NodeGossipTopic), node.GossipNodeData) } + node.Host.Network().Notify(node.NodeTracker) go node.ListenToNodeTracker() @@ -211,7 +207,7 @@ func (node *OracleNode) Start() (err error) { publicKeyHex = masacrypto.KeyManagerInstance().EthAddress } - myNodeData := pubsub2.GetSelfNodeData(node.Host, node.IsStaked, node.priorityAddrs, publicKeyHex) + myNodeData := pubsub.GetSelfNodeData(node.Host, node.IsStaked, node.priorityAddrs, publicKeyHex) bootstrapNodes, err := myNetwork.GetBootNodesMultiAddress(append(config.GetInstance().Bootnodes, node.options.Bootnodes...)) if err != nil { @@ -228,6 +224,10 @@ func (node *OracleNode) Start() (err error) { return err } + for _, p := range node.options.Services { + go p(node.Context, node.Host) + } + go myNetwork.Discover(node.Context, node.Host, node.DHT, node.Protocol) nodeData := node.NodeTracker.GetNodeData(node.Host.ID().String()) @@ -360,199 +360,3 @@ func (node *OracleNode) LogActiveTopics() { logrus.Info("[-] No active topics.") } } - -// Blockchain Implementation -var ( - blocksCh = make(chan *pubsub.Message) -) - -type BlockData struct { - Block uint64 `json:"block"` - InputData interface{} `json:"input_data"` - TransactionHash string `json:"transaction_hash"` - PreviousHash string `json:"previous_hash"` - TransactionNonce int `json:"nonce"` -} - -type Blocks struct { - BlockData []BlockData `json:"blocks"` -} - -type BlockEvents struct{} - -type BlockEventTracker struct { - BlockEvents []BlockEvents - BlockTopic *pubsub.Topic - mu sync.Mutex -} - -// HandleMessage processes incoming pubsub messages containing block events. -// It unmarshals the message data into a slice of BlockEvents and appends them -// to the tracker's BlockEvents slice. -func (b *BlockEventTracker) HandleMessage(m *pubsub.Message) { - var blockEvents any - - // Try to decode as base64 first - decodedData, err := base64.StdEncoding.DecodeString(string(m.Data)) - if err == nil { - m.Data = decodedData - } - - // Try to unmarshal as JSON - err = json.Unmarshal(m.Data, &blockEvents) - if err != nil { - // If JSON unmarshal fails, try to interpret as string - blockEvents = string(m.Data) - } - - b.mu.Lock() - defer b.mu.Unlock() - - switch v := blockEvents.(type) { - case []BlockEvents: - b.BlockEvents = append(b.BlockEvents, v...) - case BlockEvents: - b.BlockEvents = append(b.BlockEvents, v) - case map[string]interface{}: - // Convert map to BlockEvents struct - newBlockEvent := BlockEvents{} - // You might need to add logic here to properly convert the map to BlockEvents - b.BlockEvents = append(b.BlockEvents, newBlockEvent) - case []interface{}: - // Convert each item in the slice to BlockEvents - for _, item := range v { - if be, ok := item.(BlockEvents); ok { - b.BlockEvents = append(b.BlockEvents, be) - } - } - case string: - // Handle string data - newBlockEvent := BlockEvents{} - // You might need to add logic here to properly convert the string to BlockEvents - b.BlockEvents = append(b.BlockEvents, newBlockEvent) - default: - logrus.Warnf("[-] Unexpected data type in message: %v", reflect.TypeOf(v)) - } - - blocksCh <- m -} - -func updateBlocks(ctx context.Context, node *OracleNode) error { - - var existingBlocks Blocks - blocks := chain.GetBlockchain(node.Blockchain) - - for _, block := range blocks { - var inputData interface{} - err := json.Unmarshal(block.Data, &inputData) - if err != nil { - inputData = string(block.Data) // Fallback to string if unmarshal fails - } - - blockData := BlockData{ - Block: block.Block, - InputData: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%v", inputData))), - TransactionHash: fmt.Sprintf("%x", block.Hash), - PreviousHash: fmt.Sprintf("%x", block.Link), - TransactionNonce: int(block.Nonce), - } - existingBlocks.BlockData = append(existingBlocks.BlockData, blockData) - } - jsonData, err := json.Marshal(existingBlocks) - if err != nil { - return err - } - - err = node.DHT.PutValue(ctx, "/db/blocks", jsonData) - if err != nil { - logrus.Warningf("[-] Unable to store block on DHT: %v", err) - } - - if os.Getenv("IPFS_URL") != "" { - - infuraURL := fmt.Sprintf("https://%s:%s@%s", os.Getenv("PID"), os.Getenv("PS"), os.Getenv("IPFS_URL")) - sh := shell.NewShell(infuraURL) - - jsonBytes, err := json.Marshal(jsonData) - if err != nil { - logrus.Errorf("[-] Error marshalling JSON: %s", err) - } - - reader := bytes.NewReader(jsonBytes) - - hash, err := sh.AddWithOpts(reader, true, true) - if err != nil { - logrus.Errorf("[-] Error persisting to IPFS: %s", err) - } else { - logrus.Printf("[+] Ledger persisted with IPFS hash: https://dwn.infura-ipfs.io/ipfs/%s\n", hash) - _ = node.DHT.PutValue(ctx, "/db/ipfs", []byte(fmt.Sprintf("https://dwn.infura-ipfs.io/ipfs/%s", hash))) - - } - } - - return nil -} - -func SubscribeToBlocks(ctx context.Context, node *OracleNode) { - if !node.IsValidator { - return - } - - go func() { - err := node.Blockchain.Init() - if err != nil { - logrus.Error(err) - } - }() - - updateTicker := time.NewTicker(time.Second * 60) - defer updateTicker.Stop() - - for { - select { - case block, ok := <-blocksCh: - if !ok { - logrus.Error("[-] Block channel closed") - return - } - if err := processBlock(node, block); err != nil { - logrus.Errorf("[-] Error processing block: %v", err) - // Consider adding a retry mechanism or circuit breaker here - } - - case <-updateTicker.C: - logrus.Info("[+] blockchain tick") - if err := updateBlocks(ctx, node); err != nil { - logrus.Errorf("[-] Error updating blocks: %v", err) - // Consider adding a retry mechanism or circuit breaker here - } - - case <-ctx.Done(): - logrus.Info("[+] Context cancelled, stopping block subscription") - return - } - } -} - -func processBlock(node *OracleNode, block *pubsub.Message) error { - blocks := chain.GetBlockchain(node.Blockchain) - for _, b := range blocks { - if string(b.Data) == string(block.Data) { - return nil // Block already exists - } - } - - if err := node.Blockchain.AddBlock(block.Data); err != nil { - return fmt.Errorf("[-] failed to add block: %w", err) - } - - if node.Blockchain.LastHash != nil { - b, err := node.Blockchain.GetBlock(node.Blockchain.LastHash) - if err != nil { - return fmt.Errorf("[-] failed to get last block: %w", err) - } - b.Print() - } - - return nil -} diff --git a/pkg/oracle_node_listener.go b/node/oracle_node_listener.go similarity index 99% rename from pkg/oracle_node_listener.go rename to node/oracle_node_listener.go index 8c026157..78991a9a 100644 --- a/pkg/oracle_node_listener.go +++ b/node/oracle_node_listener.go @@ -1,4 +1,4 @@ -package masa +package node import ( "bufio" diff --git a/pkg/oracle_node_test.go b/node/oracle_node_test.go similarity index 98% rename from pkg/oracle_node_test.go rename to node/oracle_node_test.go index cbfc8df8..e7c594c9 100644 --- a/pkg/oracle_node_test.go +++ b/node/oracle_node_test.go @@ -1,4 +1,4 @@ -package masa +package node import ( "crypto/ecdsa" diff --git a/pkg/subscriptions.go b/node/subscriptions.go similarity index 99% rename from pkg/subscriptions.go rename to node/subscriptions.go index 9c4d3eab..1c3668b3 100644 --- a/pkg/subscriptions.go +++ b/node/subscriptions.go @@ -1,4 +1,4 @@ -package masa +package node import ( "github.com/masa-finance/masa-oracle/pkg/config" diff --git a/pkg/api/api.go b/pkg/api/api.go index 98c7adb3..7bbe2611 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -7,17 +7,17 @@ import ( "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" - masa "github.com/masa-finance/masa-oracle/pkg" + node "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/event" ) type API struct { - Node *masa.OracleNode + Node *node.OracleNode EventTracker *event.EventTracker } // NewAPI creates a new API instance with the given OracleNode. -func NewAPI(node *masa.OracleNode) *API { +func NewAPI(node *node.OracleNode) *API { eventTracker := event.NewEventTracker(nil) if eventTracker == nil { logrus.Error("Failed to create EventTracker") diff --git a/pkg/api/handlers_node.go b/pkg/api/handlers_node.go index 3f3dfed4..69e759de 100644 --- a/pkg/api/handlers_node.go +++ b/pkg/api/handlers_node.go @@ -16,7 +16,7 @@ import ( "github.com/gin-gonic/gin" - masa "github.com/masa-finance/masa-oracle/pkg" + "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/pubsub" ) @@ -52,7 +52,7 @@ func (api *API) GetNodeDataHandler() gin.HandlerFunc { if endIndex > totalRecords { endIndex = totalRecords } - nodeDataPage := masa.NodeDataPage{ + nodeDataPage := node.NodeDataPage{ Data: allNodeData[startIndex:endIndex], PageNumber: pageNbr, TotalPages: totalPages, diff --git a/pkg/api/routes.go b/pkg/api/routes.go index df5dd8c7..c3bae4e0 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -18,7 +18,7 @@ import ( "path/filepath" "runtime" - masa "github.com/masa-finance/masa-oracle/pkg" + "github.com/masa-finance/masa-oracle/node" swaggerFiles "github.com/swaggo/files" // swagger embed files ginSwagger "github.com/swaggo/gin-swagger" // ginSwagger middleware ) @@ -31,7 +31,7 @@ var htmlTemplates embed.FS // Routes are added for peers, ads, subscriptions, node data, public keys, // topics, the DHT, node status, and serving HTML pages. Middleware is added // for CORS and templates. -func SetupRoutes(node *masa.OracleNode) *gin.Engine { +func SetupRoutes(node *node.OracleNode) *gin.Engine { gin.SetMode(gin.ReleaseMode) router := gin.Default() diff --git a/pkg/config/options.go b/pkg/config/options.go index 7dc14456..73f9f0ce 100644 --- a/pkg/config/options.go +++ b/pkg/config/options.go @@ -1,10 +1,20 @@ package config +import ( + "context" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/protocol" +) + type AppOption struct { - DisableCLIParse bool - IsStaked bool - Bootnodes []string - RandomIdentity bool + DisableCLIParse bool + IsStaked bool + Bootnodes []string + RandomIdentity bool + Services []func(ctx context.Context, node host.Host) + ProtocolHandlers map[protocol.ID]network.StreamHandler } type Option func(*AppOption) @@ -32,3 +42,18 @@ func WithBootNodes(bootnodes ...string) Option { o.Bootnodes = append(o.Bootnodes, bootnodes...) } } + +func WithService(plugins ...func(ctx context.Context, node host.Host)) Option { + return func(o *AppOption) { + o.Services = append(o.Services, plugins...) + } +} + +func WithProtocolHandler(pid protocol.ID, n network.StreamHandler) Option { + return func(o *AppOption) { + if o.ProtocolHandlers == nil { + o.ProtocolHandlers = make(map[protocol.ID]network.StreamHandler) + } + o.ProtocolHandlers[pid] = n + } +} diff --git a/pkg/db/operations.go b/pkg/db/operations.go index ac59c389..cd845076 100644 --- a/pkg/db/operations.go +++ b/pkg/db/operations.go @@ -10,7 +10,7 @@ import ( "os" "time" - masa "github.com/masa-finance/masa-oracle/pkg" + "github.com/masa-finance/masa-oracle/node" "github.com/sirupsen/logrus" ) @@ -24,7 +24,7 @@ type WorkEvent struct { // WriteData encapsulates the logic for writing data to the database, // including access control checks from access_control.go. -func WriteData(node *masa.OracleNode, key string, value []byte) error { +func WriteData(node *node.OracleNode, key string, value []byte) error { if !isAuthorized(node.Host.ID().String()) { logrus.WithFields(logrus.Fields{ "nodeID": node.Host.ID().String(), @@ -67,7 +67,7 @@ func WriteData(node *masa.OracleNode, key string, value []byte) error { // ReadData reads the value for the given key from the database. // It requires the host for access control verification before reading. -func ReadData(node *masa.OracleNode, key string) ([]byte, error) { +func ReadData(node *node.OracleNode, key string) ([]byte, error) { logrus.WithFields(logrus.Fields{ "nodeID": node.Host.ID().String(), "isAuthorized": true, diff --git a/pkg/db/resolver_cache.go b/pkg/db/resolver_cache.go index 1c1b7c12..26f6e22e 100644 --- a/pkg/db/resolver_cache.go +++ b/pkg/db/resolver_cache.go @@ -11,7 +11,7 @@ import ( "github.com/masa-finance/masa-oracle/pkg/masacrypto" "github.com/masa-finance/masa-oracle/pkg/pubsub" - masa "github.com/masa-finance/masa-oracle/pkg" + "github.com/masa-finance/masa-oracle/node" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" @@ -43,7 +43,7 @@ type Record struct { // The purpose of this function is to initialize the resolver cache and perform any necessary setup or configuration. It associates the resolver cache with the provided Masa Oracle node and key manager. // // Note: The specific implementation details of the `InitResolverCache` function are not provided in the given code snippet. The function signature suggests that it initializes the resolver cache, but the actual initialization logic would be present in the function body. -func InitResolverCache(node *masa.OracleNode, keyManager *masacrypto.KeyManager) { +func InitResolverCache(node *node.OracleNode, keyManager *masacrypto.KeyManager) { var err error cachePath := config.GetInstance().CachePath if cachePath == "" { @@ -170,7 +170,7 @@ func QueryAll(ctx context.Context) ([]Record, error) { // sync periodically calls iterateAndPublish to synchronize the node's state with // the dht on the provided interval. It runs this in a loop, exiting // when the context is canceled. -func sync(ctx context.Context, node *masa.OracleNode, interval time.Duration) { +func sync(ctx context.Context, node *node.OracleNode, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() @@ -188,7 +188,7 @@ func sync(ctx context.Context, node *masa.OracleNode, interval time.Duration) { // by querying all records, and publishing each one to the dht. It // logs any errors encountered. This allows periodically syncing the node's // cached data with the latest dht state. -func iterateAndPublish(ctx context.Context, node *masa.OracleNode) { +func iterateAndPublish(ctx context.Context, node *node.OracleNode) { records, err := QueryAll(ctx) if err != nil { logrus.Errorf("[-] Error querying all records: %+v", err) @@ -232,7 +232,7 @@ func iterateAndPublish(ctx context.Context, node *masa.OracleNode) { // monitorNodeData periodically publishes the local node's status to the // dht, and syncs node status data published by other nodes. // It runs a ticker to call iterateAndPublish on the provided interval. -func monitorNodeData(ctx context.Context, node *masa.OracleNode) { +func monitorNodeData(ctx context.Context, node *node.OracleNode) { syncInterval := time.Second * 60 err := node.PubSubManager.Subscribe(config.TopicWithVersion(config.NodeGossipTopic), node.NodeTracker) if err != nil { diff --git a/pkg/tests/api_test.go b/pkg/tests/api_test.go index 6b96717f..8636bb17 100644 --- a/pkg/tests/api_test.go +++ b/pkg/tests/api_test.go @@ -3,16 +3,16 @@ package tests import ( "testing" - masa "github.com/masa-finance/masa-oracle/pkg" + "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/api" ) func TestAPI(t *testing.T) { // Create a new OracleNode instance - node := &masa.OracleNode{} + n := &node.OracleNode{} // Initialize the API - api := api.NewAPI(node) + api := api.NewAPI(n) // Test API initialization if api == nil { diff --git a/pkg/tests/integration/tracker_test.go b/pkg/tests/integration/tracker_test.go index c45e6b13..bf9f46ac 100644 --- a/pkg/tests/integration/tracker_test.go +++ b/pkg/tests/integration/tracker_test.go @@ -5,7 +5,7 @@ import ( "context" "fmt" - . "github.com/masa-finance/masa-oracle/pkg" + . "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/config" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" diff --git a/pkg/workers/types/request_response.go b/pkg/workers/types/request_response.go index 5a4ed211..9847f010 100644 --- a/pkg/workers/types/request_response.go +++ b/pkg/workers/types/request_response.go @@ -5,7 +5,7 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/sirupsen/logrus" - masa "github.com/masa-finance/masa-oracle/pkg" + "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/pubsub" ) @@ -14,7 +14,7 @@ type Worker struct { IPAddr string AddrInfo *peer.AddrInfo NodeData pubsub.NodeData - Node *masa.OracleNode + Node *node.OracleNode } func NewWorker(isLocal bool, nd *pubsub.NodeData) *Worker { diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index f0aa6bdd..1b030af4 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -14,7 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/sirupsen/logrus" - masa "github.com/masa-finance/masa-oracle/pkg" + "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/event" "github.com/masa-finance/masa-oracle/pkg/workers/handlers" @@ -102,7 +102,7 @@ func (whm *WorkHandlerManager) getWorkHandler(wType data_types.WorkerType) (Work return info.Handler, true } -func (whm *WorkHandlerManager) DistributeWork(node *masa.OracleNode, workRequest data_types.WorkRequest) (response data_types.WorkResponse) { +func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest data_types.WorkRequest) (response data_types.WorkResponse) { category := data_types.WorkerTypeToCategory(workRequest.WorkType) remoteWorkers, localWorker := GetEligibleWorkers(node, category) @@ -181,7 +181,7 @@ func (whm *WorkHandlerManager) DistributeWork(node *masa.OracleNode, workRequest return response } -func (whm *WorkHandlerManager) sendWorkToWorker(node *masa.OracleNode, worker data_types.Worker, workRequest data_types.WorkRequest) (response data_types.WorkResponse) { +func (whm *WorkHandlerManager) sendWorkToWorker(node *node.OracleNode, worker data_types.Worker, workRequest data_types.WorkRequest) (response data_types.WorkResponse) { ctxWithTimeout, cancel := context.WithTimeout(context.Background(), workerConfig.WorkerResponseTimeout) defer cancel() // Cancel the context when done to release resources diff --git a/pkg/workers/worker_selection.go b/pkg/workers/worker_selection.go index 69cbb44b..25b2e468 100644 --- a/pkg/workers/worker_selection.go +++ b/pkg/workers/worker_selection.go @@ -6,14 +6,14 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/sirupsen/logrus" - masa "github.com/masa-finance/masa-oracle/pkg" + "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/pubsub" data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) // GetEligibleWorkers Uses the new NodeTracker method to get the eligible workers for a given message type // I'm leaving this returning an array so that we can easily increase the number of workers in the future -func GetEligibleWorkers(node *masa.OracleNode, category pubsub.WorkerCategory) ([]data_types.Worker, *data_types.Worker) { +func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory) ([]data_types.Worker, *data_types.Worker) { workers := make([]data_types.Worker, 0) nodes := node.NodeTracker.GetEligibleWorkerNodes(category) var localWorker *data_types.Worker From 17c89377e849edc2e37d052aace056126e07e21b Mon Sep 17 00:00:00 2001 From: Bob Stevens <35038919+restevens402@users.noreply.github.com> Date: Wed, 18 Sep 2024 16:54:30 -0700 Subject: [PATCH 2/4] Squashed commit of the following: commit 4940215f134040ef449d3a4cdf059bf03339c90b Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed Sep 18 19:47:58 2024 +0200 build(deps): bump github.com/ollama/ollama from 0.3.9 to 0.3.11 (#558) Bumps [github.com/ollama/ollama](https://github.com/ollama/ollama) from 0.3.9 to 0.3.11. - [Release notes](https://github.com/ollama/ollama/releases) - [Commits](https://github.com/ollama/ollama/compare/v0.3.9...v0.3.11) --- updated-dependencies: - dependency-name: github.com/ollama/ollama dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> commit b3022cc18fd539524876ac69f6970f2d2aa3344d Author: Ettore Di Giacinto Date: Wed Sep 18 16:16:09 2024 +0200 chore(ci): rename gosec workflow Signed-off-by: Ettore Di Giacinto commit 7afe67f426e3bef854e726da6943859312cfdecb Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue Sep 17 17:46:35 2024 +0200 build(deps): bump github.com/gotd/td from 0.108.0 to 0.110.0 (#555) Bumps [github.com/gotd/td](https://github.com/gotd/td) from 0.108.0 to 0.110.0. - [Release notes](https://github.com/gotd/td/releases) - [Commits](https://github.com/gotd/td/compare/v0.108.0...v0.110.0) --- updated-dependencies: - dependency-name: github.com/gotd/td dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> commit d2a7aef64e308fb037d20e05ae596c264ca66f07 Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue Sep 17 17:46:16 2024 +0200 build(deps): bump github.com/sashabaranov/go-openai from 1.29.1 to 1.29.2 (#554) build(deps): bump github.com/sashabaranov/go-openai Bumps [github.com/sashabaranov/go-openai](https://github.com/sashabaranov/go-openai) from 1.29.1 to 1.29.2. - [Release notes](https://github.com/sashabaranov/go-openai/releases) - [Commits](https://github.com/sashabaranov/go-openai/compare/v1.29.1...v1.29.2) --- updated-dependencies: - dependency-name: github.com/sashabaranov/go-openai dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/secscan.yml | 2 +- go.mod | 14 +++++++------- go.sum | 28 ++++++++++++++-------------- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/.github/workflows/secscan.yml b/.github/workflows/secscan.yml index a61a9204..b0df6633 100644 --- a/.github/workflows/secscan.yml +++ b/.github/workflows/secscan.yml @@ -9,7 +9,7 @@ on: - cron: '0 0 * * 0' jobs: - tests: + gosec: runs-on: ubuntu-latest env: GO111MODULE: on diff --git a/go.mod b/go.mod index 9365b4b8..e9ed2304 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/golang-jwt/jwt/v4 v4.5.0 github.com/google/uuid v1.6.0 github.com/gotd/contrib v0.20.0 - github.com/gotd/td v0.108.0 + github.com/gotd/td v0.110.0 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-leveldb v0.5.0 @@ -30,11 +30,11 @@ require ( github.com/masa-finance/masa-twitter-scraper v0.0.0-20240910224030-76a02c878bd6 github.com/multiformats/go-multiaddr v0.13.0 github.com/multiformats/go-multihash v0.2.3 - github.com/ollama/ollama v0.3.9 + github.com/ollama/ollama v0.3.11 github.com/onsi/ginkgo/v2 v2.20.2 github.com/onsi/gomega v1.34.2 github.com/rivo/tview v0.0.0-20240505185119-ed116790de0f - github.com/sashabaranov/go-openai v1.29.1 + github.com/sashabaranov/go-openai v1.29.2 github.com/sirupsen/logrus v1.9.3 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.19.0 @@ -48,7 +48,7 @@ require ( github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect github.com/KyleBanks/depth v1.2.1 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect - github.com/PuerkitoBio/goquery v1.9.1 // indirect + github.com/PuerkitoBio/goquery v1.9.2 // indirect github.com/andybalholm/cascadia v1.3.2 // indirect github.com/antchfx/htmlquery v1.2.3 // indirect github.com/antchfx/xmlquery v1.3.1 // indirect @@ -231,9 +231,9 @@ require ( github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/otel v1.28.0 // indirect - go.opentelemetry.io/otel/metric v1.28.0 // indirect - go.opentelemetry.io/otel/trace v1.28.0 // indirect + go.opentelemetry.io/otel v1.29.0 // indirect + go.opentelemetry.io/otel/metric v1.29.0 // indirect + go.opentelemetry.io/otel/trace v1.29.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/dig v1.18.0 // indirect go.uber.org/fx v1.22.2 // indirect diff --git a/go.sum b/go.sum index 2d7f2588..cf81c962 100644 --- a/go.sum +++ b/go.sum @@ -19,8 +19,8 @@ github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/PuerkitoBio/goquery v1.5.1/go.mod h1:GsLWisAFVj4WgDibEWF4pvYnkVQBpKBKeU+7zCJoLcc= -github.com/PuerkitoBio/goquery v1.9.1 h1:mTL6XjbJTZdpfL+Gwl5U2h1l9yEkJjhmlTeV9VPW7UI= -github.com/PuerkitoBio/goquery v1.9.1/go.mod h1:cW1n6TmIMDoORQU5IU/P1T3tGFunOeXEpGP2WHRwkbY= +github.com/PuerkitoBio/goquery v1.9.2 h1:4/wZksC3KgkQw7SQgkKotmKljk0M6V8TUvA8Wb4yPeE= +github.com/PuerkitoBio/goquery v1.9.2/go.mod h1:GHPCaP0ODyyxqcNoFGYlAprUFH81NuRPd0GX3Zu2Mvk= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= @@ -306,8 +306,8 @@ github.com/gotd/ige v0.2.2 h1:XQ9dJZwBfDnOGSTxKXBGP4gMud3Qku2ekScRjDWWfEk= github.com/gotd/ige v0.2.2/go.mod h1:tuCRb+Y5Y3eNTo3ypIfNpQ4MFjrnONiL2jN2AKZXmb0= github.com/gotd/neo v0.1.5 h1:oj0iQfMbGClP8xI59x7fE/uHoTJD7NZH9oV1WNuPukQ= github.com/gotd/neo v0.1.5/go.mod h1:9A2a4bn9zL6FADufBdt7tZt+WMhvZoc5gWXihOPoiBQ= -github.com/gotd/td v0.108.0 h1:aU4ishpnJYmkEf8hdNw8O4CRKdpT9pK2lDIIIjm+ADA= -github.com/gotd/td v0.108.0/go.mod h1:rHtaG0hd4EY0ice4f9CVH/JxsA7ZICqkcH3aFSVZplg= +github.com/gotd/td v0.110.0 h1:t68cFivL9U9LQaWsaGO5FhalUYhQg6BUMbkM71W09R0= +github.com/gotd/td v0.110.0/go.mod h1:mwQQQrrAn3wizT37UjBAUB4lTy1j2RHnkRJ4z9ivqGs= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -529,8 +529,8 @@ github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= -github.com/ollama/ollama v0.3.9 h1:m389BH0Gimt/U/dTFAX2p495o3vQgbz22kAsYLRuR2E= -github.com/ollama/ollama v0.3.9/go.mod h1:YrWoNkFnPOYsnDvsf/Ztb1wxU9/IXrNsQHqcxbY2r94= +github.com/ollama/ollama v0.3.11 h1:Fs1B5WjXYUvr5bkMZZpUJfiqIAxrymujRidFABwMeV8= +github.com/ollama/ollama v0.3.11/go.mod h1:YrWoNkFnPOYsnDvsf/Ztb1wxU9/IXrNsQHqcxbY2r94= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= @@ -648,8 +648,8 @@ github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6g github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/saintfish/chardet v0.0.0-20120816061221-3af4cd4741ca h1:NugYot0LIVPxTvN8n+Kvkn6TrbMyxQiuvKdEwFdR9vI= github.com/saintfish/chardet v0.0.0-20120816061221-3af4cd4741ca/go.mod h1:uugorj2VCxiV1x+LzaIdVa9b4S4qGAcH6cbhh4qVxOU= -github.com/sashabaranov/go-openai v1.29.1 h1:AlB+vwpg1tibwr83OKXLsI4V1rnafVyTlw0BjR+6WUM= -github.com/sashabaranov/go-openai v1.29.1/go.mod h1:lj5b/K+zjTSFxVLijLSTDZuP7adOgerWeFyZLUhAKRg= +github.com/sashabaranov/go-openai v1.29.2 h1:jYpp1wktFoOvxHnum24f/w4+DFzUdJnu83trr5+Slh0= +github.com/sashabaranov/go-openai v1.29.2/go.mod h1:lj5b/K+zjTSFxVLijLSTDZuP7adOgerWeFyZLUhAKRg= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= @@ -778,12 +778,12 @@ github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQ go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= -go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= -go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= -go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q= -go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s= -go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= -go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= +go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= +go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= +go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= +go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= +go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= From 82bc5e55cb3910b87799507d7264bcf03d8dec0f Mon Sep 17 00:00:00 2001 From: mudler Date: Wed, 18 Sep 2024 19:35:04 +0200 Subject: [PATCH 3/4] feat: less singletons - Untie blockchain as a singleton, make it possible to create more instances - Allow to add protocols from the MasaOracle - Untie worker manager and avoid singleton Signed-off-by: mudler --- cmd/masa-node/main.go | 58 +++++++++++++++++++++++-- node/blockchain.go | 28 ++++++------ node/oracle_node.go | 45 +++++++++++++++---- node/oracle_node_listener.go | 4 +- node/protocol.go | 74 ++++++++++++++++++++++++++++++++ node/subscriptions.go | 29 ------------- node/types/subscription.go | 11 +++++ pkg/api/api.go | 16 ++++--- pkg/api/handlers_data.go | 28 ++++++------ pkg/api/handlers_node.go | 20 ++------- pkg/api/handlers_topic.go | 5 +-- pkg/api/routes.go | 6 ++- pkg/config/app.go | 25 +++++++++++ pkg/config/constants.go | 22 ---------- pkg/config/options.go | 51 +++++++++++++++++++--- pkg/db/resolver_cache.go | 5 ++- pkg/pubsub/manager.go | 20 ++++----- pkg/pubsub/node_data.go | 28 +----------- pkg/pubsub/node_event_tracker.go | 10 +++-- pkg/tests/api_test.go | 7 ++- pkg/workers/options.go | 32 ++++++++++++++ pkg/workers/worker_manager.go | 74 +++++++++++++++----------------- 22 files changed, 385 insertions(+), 213 deletions(-) create mode 100644 node/protocol.go delete mode 100644 node/subscriptions.go create mode 100644 node/types/subscription.go create mode 100644 pkg/workers/options.go diff --git a/cmd/masa-node/main.go b/cmd/masa-node/main.go index e777e470..84e9be0c 100644 --- a/cmd/masa-node/main.go +++ b/cmd/masa-node/main.go @@ -11,6 +11,7 @@ import ( "github.com/masa-finance/masa-oracle/internal/versioning" "github.com/masa-finance/masa-oracle/pkg/workers" + pubsub "github.com/masa-finance/masa-oracle/pkg/pubsub" "github.com/sirupsen/logrus" "github.com/masa-finance/masa-oracle/node" @@ -73,11 +74,58 @@ func main() { isValidator := cfg.Validator + // WorkerManager configuration + // XXX: this needs to be moved under config, but now it's here as there are import cycles given singletons + workerManagerOptions := []workers.WorkerOptionFunc{} + + if cfg.TwitterScraper { + workerManagerOptions = append(workerManagerOptions, workers.EnableTwitterWorker) + } + + if cfg.TelegramScraper { + workerManagerOptions = append(workerManagerOptions, workers.EnableDiscordScraperWorker) + } + + if cfg.DiscordScraper { + workerManagerOptions = append(workerManagerOptions, workers.EnableDiscordScraperWorker) + } + + if cfg.WebScraper { + workerManagerOptions = append(workerManagerOptions, workers.EnableWebScraperWorker) + } + + workHandlerManager := workers.NewWorkHandlerManager(workerManagerOptions...) + blockChainEventTracker := node.NewBlockChain() + + masaNodeOptions := []config.Option{ + config.EnableStaked, + // config.WithService(), + config.WithEnvironment(config.GetInstance().Environment), + config.WithVersion(config.GetInstance().Version), + } + + // Register the worker manager + masaNodeOptions = append(masaNodeOptions, + config.WithMasaProtocolHandler( + config.WorkerProtocol, + workHandlerManager.HandleWorkerStream, + ), + ) + + pubKeySub := &pubsub.PublicKeySubscriptionHandler{} + + masaNodeOptions = append(masaNodeOptions, + config.WithPubSubHandler(config.PublicKeyTopic, pubKeySub, false), + config.WithPubSubHandler(config.BlockTopic, blockChainEventTracker, true), + ) + // Create a new OracleNode - masaNode, err := node.NewOracleNode(ctx, config.EnableStaked) + masaNode, err := node.NewOracleNode(ctx, masaNodeOptions...) + if err != nil { logrus.Fatal(err) } + err = masaNode.Start() if err != nil { logrus.Fatal(err) @@ -104,8 +152,10 @@ func main() { // and other peers can do work we only need to check this here // if this peer can or cannot scrape or write that is checked in other places if masaNode.IsStaked { - masaNode.Host.SetStreamHandler(config.ProtocolWithVersion(config.WorkerProtocol), workers.GetWorkHandlerManager().HandleWorkerStream) - go node.SubscribeToBlocks(ctx, masaNode) + if masaNode.IsValidator { + go blockChainEventTracker.Start(ctx, masaNode) + } + go masaNode.NodeTracker.ClearExpiredWorkerTimeouts() } @@ -130,7 +180,7 @@ func main() { } }() - router := api.SetupRoutes(masaNode) + router := api.SetupRoutes(masaNode, workHandlerManager, pubKeySub) go func() { err = router.Run() if err != nil { diff --git a/node/blockchain.go b/node/blockchain.go index a3bddb76..b5ac5ba3 100644 --- a/node/blockchain.go +++ b/node/blockchain.go @@ -18,9 +18,6 @@ import ( ) // Blockchain Implementation -var ( - blocksCh = make(chan *pubsub.Message) -) type BlockData struct { Block uint64 `json:"block"` @@ -40,6 +37,13 @@ type BlockEventTracker struct { BlockEvents []BlockEvents BlockTopic *pubsub.Topic mu sync.Mutex + blocksCh chan *pubsub.Message +} + +func NewBlockChain() *BlockEventTracker { + return &BlockEventTracker{ + blocksCh: make(chan *pubsub.Message), + } } // HandleMessage processes incoming pubsub messages containing block events. @@ -90,7 +94,7 @@ func (b *BlockEventTracker) HandleMessage(m *pubsub.Message) { logrus.Warnf("[-] Unexpected data type in message: %v", reflect.TypeOf(v)) } - blocksCh <- m + b.blocksCh <- m } func updateBlocks(ctx context.Context, node *OracleNode) error { @@ -149,24 +153,18 @@ func updateBlocks(ctx context.Context, node *OracleNode) error { return nil } -func SubscribeToBlocks(ctx context.Context, node *OracleNode) { - if !node.IsValidator { - return +func (b *BlockEventTracker) Start(ctx context.Context, node *OracleNode) { + err := node.Blockchain.Init() + if err != nil { + logrus.Error(err) } - go func() { - err := node.Blockchain.Init() - if err != nil { - logrus.Error(err) - } - }() - updateTicker := time.NewTicker(time.Second * 60) defer updateTicker.Stop() for { select { - case block, ok := <-blocksCh: + case block, ok := <-b.blocksCh: if !ok { logrus.Error("[-] Block channel closed") return diff --git a/node/oracle_node.go b/node/oracle_node.go index bd24159d..d54821f7 100644 --- a/node/oracle_node.go +++ b/node/oracle_node.go @@ -143,9 +143,8 @@ func NewOracleNode(ctx context.Context, opts ...config.Option) (*OracleNode, err return nil, err } - return &OracleNode{ + n := &OracleNode{ Host: hst, - Protocol: config.ProtocolWithVersion(config.OracleProtocol), multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst), PeerChan: make(chan myNetwork.PeerEvent), NodeTracker: pubsub.NewNodeEventTracker(versioning.ProtocolVersion, cfg.Environment, hst.ID().String()), @@ -160,7 +159,10 @@ func NewOracleNode(ctx context.Context, opts ...config.Option) (*OracleNode, err IsLlmServer: cfg.LlmServer, Blockchain: &chain.Chain{}, options: *o, - }, nil + } + + n.Protocol = n.protocolWithVersion(config.OracleProtocol) + return n, nil } func (node *OracleNode) generateEthHexKeyForRandomIdentity() (string, error) { @@ -177,6 +179,26 @@ func (node *OracleNode) generateEthHexKeyForRandomIdentity() (string, error) { return common.BytesToAddress(ethereumCrypto.Keccak256(rawKey[1:])[12:]).Hex(), nil } +func getNodeData(host host.Host, isStaked bool, addr multiaddr.Multiaddr, publicEthAddress string) *pubsub.NodeData { + // GetSelfNodeData converts the local node's data into a JSON byte array. + // It populates a NodeData struct with the node's ID, staking status, and Ethereum address. + // The NodeData struct is then marshalled into a JSON byte array. + // Returns nil if there is an error marshalling to JSON. + // Create and populate NodeData + nodeData := pubsub.NewNodeData(addr, host.ID(), publicEthAddress, pubsub.ActivityJoined) + nodeData.MultiaddrsString = addr.String() + nodeData.IsStaked = isStaked + nodeData.IsTwitterScraper = config.GetInstance().TwitterScraper + nodeData.IsDiscordScraper = config.GetInstance().DiscordScraper + nodeData.IsTelegramScraper = config.GetInstance().TelegramScraper + nodeData.IsWebScraper = config.GetInstance().WebScraper + nodeData.IsValidator = config.GetInstance().Validator + nodeData.IsActive = true + nodeData.Version = versioning.ProtocolVersion + + return nodeData +} + // Start initializes the OracleNode by setting up libp2p stream handlers, // connecting to the DHT and bootnodes, and subscribing to topics. It launches // goroutines to handle discovered peers, listen to the node tracker, and @@ -185,14 +207,18 @@ func (node *OracleNode) Start() (err error) { logrus.Infof("[+] Starting node with ID: %s", node.GetMultiAddrs().String()) node.Host.SetStreamHandler(node.Protocol, node.handleStream) - node.Host.SetStreamHandler(config.ProtocolWithVersion(config.NodeDataSyncProtocol), node.ReceiveNodeData) + node.Host.SetStreamHandler(node.protocolWithVersion(config.NodeDataSyncProtocol), node.ReceiveNodeData) for pid, n := range node.options.ProtocolHandlers { node.Host.SetStreamHandler(pid, n) } + for protocol, n := range node.options.MasaProtocolHandlers { + node.Host.SetStreamHandler(node.protocolWithVersion(protocol), n) + } + if node.IsStaked { - node.Host.SetStreamHandler(config.ProtocolWithVersion(config.NodeGossipTopic), node.GossipNodeData) + node.Host.SetStreamHandler(node.protocolWithVersion(config.NodeGossipTopic), node.GossipNodeData) } node.Host.Network().Notify(node.NodeTracker) @@ -207,14 +233,14 @@ func (node *OracleNode) Start() (err error) { publicKeyHex = masacrypto.KeyManagerInstance().EthAddress } - myNodeData := pubsub.GetSelfNodeData(node.Host, node.IsStaked, node.priorityAddrs, publicKeyHex) + myNodeData := getNodeData(node.Host, node.IsStaked, node.priorityAddrs, publicKeyHex) bootstrapNodes, err := myNetwork.GetBootNodesMultiAddress(append(config.GetInstance().Bootnodes, node.options.Bootnodes...)) if err != nil { return err } - node.DHT, err = myNetwork.WithDHT(node.Context, node.Host, bootstrapNodes, node.Protocol, config.MasaPrefix, node.PeerChan, myNodeData) + node.DHT, err = myNetwork.WithDHT(node.Context, node.Host, bootstrapNodes, node.Protocol, masaPrefix, node.PeerChan, myNodeData) if err != nil { return err } @@ -235,13 +261,14 @@ func (node *OracleNode) Start() (err error) { nodeData = myNodeData nodeData.SelfIdentified = true } - nodeData.Joined() + nodeData.Joined(node.options.Version) node.NodeTracker.HandleNodeData(*nodeData) // call SubscribeToTopics on startup - if err := SubscribeToTopics(node); err != nil { + if err := node.subscribeToTopics(); err != nil { return err } + node.StartTime = time.Now() return nil diff --git a/node/oracle_node_listener.go b/node/oracle_node_listener.go index 78991a9a..0266c0d8 100644 --- a/node/oracle_node_listener.go +++ b/node/oracle_node_listener.go @@ -42,7 +42,7 @@ func (node *OracleNode) ListenToNodeTracker() { continue } // Publish the JSON data on the node.topic - err = node.PubSubManager.Publish(config.TopicWithVersion(config.NodeGossipTopic), jsonData) + err = node.PublishTopic(config.NodeGossipTopic, jsonData) if err != nil { logrus.Errorf("[-] Error publishing node data: %v", err) } @@ -135,7 +135,7 @@ func (node *OracleNode) SendNodeData(peerID peer.ID) { totalRecords := len(nodeData) totalPages := int(math.Ceil(float64(totalRecords) / float64(config.PageSize))) - stream, err := node.Host.NewStream(node.Context, peerID, config.ProtocolWithVersion(config.NodeDataSyncProtocol)) + stream, err := node.Host.NewStream(node.Context, peerID, node.protocolWithVersion(config.NodeDataSyncProtocol)) if err != nil { // node.NodeTracker.RemoveNodeData(peerID.String()) return diff --git a/node/protocol.go b/node/protocol.go new file mode 100644 index 00000000..b07c814f --- /dev/null +++ b/node/protocol.go @@ -0,0 +1,74 @@ +package node + +import ( + "context" + "fmt" + + "github.com/masa-finance/masa-oracle/node/types" + + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/masa-finance/masa-oracle/pkg/config" +) + +const ( + masaPrefix = "/masa" +) + +// ProtocolWithVersion returns a libp2p protocol ID string +// with the configured version and environment suffix. +func (node *OracleNode) protocolWithVersion(protocolName string) protocol.ID { + if node.options.Environment == "" { + return protocol.ID(fmt.Sprintf("%s/%s/%s", masaPrefix, protocolName, node.options.Version)) + } + return protocol.ID(fmt.Sprintf("%s/%s/%s-%s", masaPrefix, protocolName, node.options.Version, node.options.Environment)) +} + +// TopicWithVersion returns a topic string with the configured version +// and environment suffix. +func (node *OracleNode) topicWithVersion(protocolName string) string { + if node.options.Environment == "" { + return fmt.Sprintf("%s/%s/%s", masaPrefix, protocolName, node.options.Version) + } + return fmt.Sprintf("%s/%s/%s-%s", masaPrefix, protocolName, node.options.Version, node.options.Environment) +} + +func (node *OracleNode) ProtocolStream(ctx context.Context, peerID peer.ID, protocolName string) (network.Stream, error) { + return node.Host.NewStream(ctx, peerID, node.protocolWithVersion(protocolName)) +} + +// SubscribeToTopics handles the subscription to various topics for an OracleNode. +// It subscribes the node to the NodeGossipTopic, AdTopic, and PublicKeyTopic. +// Each subscription is managed through the node's PubSubManager, which orchestrates the message passing for these topics. +// Errors during subscription are logged and returned, halting the process to ensure the node's correct setup before operation. +func (node *OracleNode) subscribeToTopics() error { + for _, handler := range node.options.PubSubHandles { + if err := node.SubscribeTopic(handler.ProtocolName, handler.Handler, handler.IncludeSelf); err != nil { + return err + } + } + + // Subscribe to NodeGossipTopic to participate in the network's gossip protocol. + if err := node.SubscribeTopic(config.NodeGossipTopic, node.NodeTracker, false); err != nil { + return err + } + + return nil +} + +func (node *OracleNode) PublishTopic(protocolName string, data []byte) error { + return node.PubSubManager.Publish(node.topicWithVersion(protocolName), data) +} + +func (node *OracleNode) PublishTopicMessage(protocolName string, data string) error { + return node.PubSubManager.PublishMessage(node.topicWithVersion(protocolName), data) +} + +func (node *OracleNode) SubscribeTopic(protocolName string, handler types.SubscriptionHandler, includeSelf bool) error { + return node.PubSubManager.AddSubscription(node.topicWithVersion(protocolName), handler, includeSelf) +} + +func (node *OracleNode) Subscribe(protocolName string, handler types.SubscriptionHandler) error { + return node.PubSubManager.Subscribe(node.topicWithVersion(protocolName), handler) +} diff --git a/node/subscriptions.go b/node/subscriptions.go deleted file mode 100644 index 1c3668b3..00000000 --- a/node/subscriptions.go +++ /dev/null @@ -1,29 +0,0 @@ -package node - -import ( - "github.com/masa-finance/masa-oracle/pkg/config" - pubsub2 "github.com/masa-finance/masa-oracle/pkg/pubsub" -) - -// SubscribeToTopics handles the subscription to various topics for an OracleNode. -// It subscribes the node to the NodeGossipTopic, AdTopic, and PublicKeyTopic. -// Each subscription is managed through the node's PubSubManager, which orchestrates the message passing for these topics. -// Errors during subscription are logged and returned, halting the process to ensure the node's correct setup before operation. -func SubscribeToTopics(node *OracleNode) error { - // Subscribe to NodeGossipTopic to participate in the network's gossip protocol. - if err := node.PubSubManager.AddSubscription(config.TopicWithVersion(config.NodeGossipTopic), node.NodeTracker, false); err != nil { - return err - } - - // Subscribe to BlockTopic to track and process block data within the network. - if err := node.PubSubManager.AddSubscription(config.TopicWithVersion(config.BlockTopic), &BlockEventTracker{}, true); err != nil { - return err - } - - // Subscribe to PublicKeyTopic to manage and verify public keys within the network. - if err := node.PubSubManager.AddSubscription(config.TopicWithVersion(config.PublicKeyTopic), &pubsub2.PublicKeySubscriptionHandler{}, false); err != nil { - return err - } - - return nil -} diff --git a/node/types/subscription.go b/node/types/subscription.go new file mode 100644 index 00000000..347555ec --- /dev/null +++ b/node/types/subscription.go @@ -0,0 +1,11 @@ +package types + +import ( + pubsub "github.com/libp2p/go-libp2p-pubsub" +) + +// SubscriptionHandler defines the interface for handling pubsub messages. +// Implementations should subscribe to topics and handle incoming messages. +type SubscriptionHandler interface { + HandleMessage(msg *pubsub.Message) +} diff --git a/pkg/api/api.go b/pkg/api/api.go index 7bbe2611..82d0ca12 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -9,15 +9,19 @@ import ( node "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/event" + "github.com/masa-finance/masa-oracle/pkg/pubsub" + "github.com/masa-finance/masa-oracle/pkg/workers" ) type API struct { - Node *node.OracleNode - EventTracker *event.EventTracker + Node *node.OracleNode + EventTracker *event.EventTracker + WorkManager *workers.WorkHandlerManager + PubKeySubscriptionHandler *pubsub.PublicKeySubscriptionHandler } // NewAPI creates a new API instance with the given OracleNode. -func NewAPI(node *node.OracleNode) *API { +func NewAPI(node *node.OracleNode, workManager *workers.WorkHandlerManager, pubkeySubscriptionHandler *pubsub.PublicKeySubscriptionHandler) *API { eventTracker := event.NewEventTracker(nil) if eventTracker == nil { logrus.Error("Failed to create EventTracker") @@ -26,8 +30,10 @@ func NewAPI(node *node.OracleNode) *API { } api := &API{ - Node: node, - EventTracker: eventTracker, + Node: node, + EventTracker: eventTracker, + WorkManager: workManager, + PubKeySubscriptionHandler: pubkeySubscriptionHandler, } logrus.Debugf("Created API instance with EventTracker: %v", api.EventTracker) diff --git a/pkg/api/handlers_data.go b/pkg/api/handlers_data.go index c186ce8b..e6563575 100644 --- a/pkg/api/handlers_data.go +++ b/pkg/api/handlers_data.go @@ -38,7 +38,7 @@ type LLMChat struct { Stream bool `json:"stream"` } -// SendWorkRequest sends a work request to a worker for processing. +// sendWorkRequest sends a work request to a worker for processing. // It marshals the request details into JSON and sends it over a libp2p stream. // It is currently re-using the response channel map for this; however, it could be a simple synchronous call // in which case the worker handlers would be responseible for preparing the data to be sent back to the client @@ -51,13 +51,13 @@ type LLMChat struct { // // Returns: // - error: An error object if the request could not be sent or processed, otherwise nil. -func SendWorkRequest(api *API, requestID string, workType data_types.WorkerType, bodyBytes []byte, wg *sync.WaitGroup) error { +func (api *API) sendWorkRequest(requestID string, workType data_types.WorkerType, bodyBytes []byte, wg *sync.WaitGroup) error { request := data_types.WorkRequest{ WorkType: workType, RequestId: requestID, Data: bodyBytes, } - response := workers.GetWorkHandlerManager().DistributeWork(api.Node, request) + response := api.WorkManager.DistributeWork(api.Node, request) responseChannel, exists := workers.GetResponseChannelMap().Get(requestID) if !exists { return fmt.Errorf("response channel not found") @@ -176,7 +176,7 @@ func (api *API) SearchTweetsProfile() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - err = SendWorkRequest(api, requestID, data_types.TwitterProfile, bodyBytes, wg) + err = api.sendWorkRequest(requestID, data_types.TwitterProfile, bodyBytes, wg) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } @@ -218,7 +218,7 @@ func (api *API) SearchTweetsRecent() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - err = SendWorkRequest(api, requestID, data_types.Twitter, bodyBytes, wg) + err = api.sendWorkRequest(requestID, data_types.Twitter, bodyBytes, wg) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } @@ -261,7 +261,7 @@ func (api *API) SearchTwitterFollowers() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - err = SendWorkRequest(api, requestID, data_types.TwitterFollowers, bodyBytes, wg) + err = api.sendWorkRequest(requestID, data_types.TwitterFollowers, bodyBytes, wg) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } @@ -300,7 +300,7 @@ func (api *API) SearchDiscordProfile() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - err = SendWorkRequest(api, requestID, data_types.DiscordProfile, bodyBytes, wg) + err = api.sendWorkRequest(requestID, data_types.DiscordProfile, bodyBytes, wg) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } @@ -347,7 +347,7 @@ func (api *API) SearchChannelMessages() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - err = SendWorkRequest(api, requestID, data_types.DiscordChannelMessages, bodyBytes, wg) + err = api.sendWorkRequest(requestID, data_types.DiscordChannelMessages, bodyBytes, wg) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return @@ -383,7 +383,7 @@ func (api *API) SearchGuildChannels() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - err = SendWorkRequest(api, requestID, data_types.DiscordGuildChannels, bodyBytes, wg) + err = api.sendWorkRequest(requestID, data_types.DiscordGuildChannels, bodyBytes, wg) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } @@ -409,7 +409,7 @@ func (api *API) SearchUserGuilds() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - err = SendWorkRequest(api, requestID, data_types.DiscordUserGuilds, bodyBytes, wg) + err = api.sendWorkRequest(requestID, data_types.DiscordUserGuilds, bodyBytes, wg) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } @@ -571,7 +571,7 @@ func (api *API) WebData() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - err = SendWorkRequest(api, requestID, data_types.Web, bodyBytes, wg) + err = api.sendWorkRequest(requestID, data_types.Web, bodyBytes, wg) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } @@ -659,7 +659,7 @@ func (api *API) GetChannelMessagesHandler() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - err = SendWorkRequest(api, requestID, data_types.TelegramChannelMessages, bodyBytes, wg) + err = api.sendWorkRequest(requestID, data_types.TelegramChannelMessages, bodyBytes, wg) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } @@ -716,7 +716,7 @@ func (api *API) LocalLlmChat() gin.HandlerFunc { defer workers.GetResponseChannelMap().Delete(requestID) go handleWorkResponse(c, responseCh, wg) - err = SendWorkRequest(api, requestID, data_types.LLMChat, bodyBytes, wg) + err = api.sendWorkRequest(requestID, data_types.LLMChat, bodyBytes, wg) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } @@ -960,7 +960,7 @@ func (api *API) Test() gin.HandlerFunc { return } - err = api.Node.PubSubManager.Publish(config.TopicWithVersion(config.BlockTopic), bodyBytes) + err = api.Node.PublishTopic(config.BlockTopic, bodyBytes) if err != nil { logrus.Errorf("[-] Error publishing block: %v", err) c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) diff --git a/pkg/api/handlers_node.go b/pkg/api/handlers_node.go index 69e759de..f5f4ef75 100644 --- a/pkg/api/handlers_node.go +++ b/pkg/api/handlers_node.go @@ -206,8 +206,7 @@ func (api *API) PublishPublicKeyHandler() gin.HandlerFunc { } // Publish the public key using its string representation, data, and signature - publicKeyTopic := config.TopicWithVersion(config.PublicKeyTopic) - if err := api.Node.PubSubManager.Publish(publicKeyTopic, msgBytes); err != nil { + if err := api.Node.PublishTopic(config.PublicKeyTopic, msgBytes); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } @@ -229,20 +228,7 @@ func (api *API) GetPublicKeysHandler() gin.HandlerFunc { return } - // Use the PublicKeyTopic constant from the masa package - handler, err := api.Node.PubSubManager.GetHandler(string(config.ProtocolWithVersion(config.PublicKeyTopic))) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) - return - } - - publicKeyHandler, ok := handler.(*pubsub.PublicKeySubscriptionHandler) - if !ok { - c.JSON(http.StatusInternalServerError, gin.H{"error": "handler is not of type PublicKeySubscriptionHandler"}) - return - } - - publicKeys := publicKeyHandler.GetPublicKeys() + publicKeys := api.PubKeySubscriptionHandler.GetPublicKeys() c.JSON(http.StatusOK, gin.H{ "success": true, "publicKeys": publicKeys, @@ -363,7 +349,7 @@ func (api *API) PostNodeStatusHandler() gin.HandlerFunc { logrus.Printf("jsonData %s", jsonData) // Publish the message to the specified topic. - if err := api.Node.PubSubManager.Publish(config.TopicWithVersion(config.NodeGossipTopic), jsonData); err != nil { + if err := api.Node.PublishTopic(config.NodeGossipTopic, jsonData); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } diff --git a/pkg/api/handlers_topic.go b/pkg/api/handlers_topic.go index c3004c20..2512c6ad 100644 --- a/pkg/api/handlers_topic.go +++ b/pkg/api/handlers_topic.go @@ -5,7 +5,6 @@ import ( "github.com/gin-gonic/gin" - "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/pubsub" ) @@ -30,7 +29,7 @@ func (api *API) CreateNewTopicHandler() gin.HandlerFunc { topicHandler := pubsub.NewTopicHandler() // Use the AddSubscription method to create the new topic and subscribe the TopicHandler to it. - if err := api.Node.PubSubManager.AddSubscription(config.TopicWithVersion(request.TopicName), topicHandler, false); err != nil { + if err := api.Node.SubscribeTopic(request.TopicName, topicHandler, false); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } @@ -58,7 +57,7 @@ func (api *API) PostToTopicHandler() gin.HandlerFunc { } // Publish the message to the specified topic. - if err := api.Node.PubSubManager.PublishMessage(config.TopicWithVersion(request.TopicName), request.Message); err != nil { + if err := api.Node.PublishTopicMessage(request.TopicName, request.Message); err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } diff --git a/pkg/api/routes.go b/pkg/api/routes.go index c3bae4e0..4f548367 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -12,6 +12,8 @@ import ( "github.com/golang-jwt/jwt/v4" "github.com/masa-finance/masa-oracle/docs" + "github.com/masa-finance/masa-oracle/pkg/pubsub" + "github.com/masa-finance/masa-oracle/pkg/workers" "github.com/gin-contrib/cors" @@ -31,11 +33,11 @@ var htmlTemplates embed.FS // Routes are added for peers, ads, subscriptions, node data, public keys, // topics, the DHT, node status, and serving HTML pages. Middleware is added // for CORS and templates. -func SetupRoutes(node *node.OracleNode) *gin.Engine { +func SetupRoutes(node *node.OracleNode, workerManager *workers.WorkHandlerManager, pubkeySubscriptionHandler *pubsub.PublicKeySubscriptionHandler) *gin.Engine { gin.SetMode(gin.ReleaseMode) router := gin.Default() - API := NewAPI(node) + API := NewAPI(node, workerManager, pubkeySubscriptionHandler) // Initialize CORS middleware with a configuration that allows all origins and specifies // the HTTP methods and headers that can be used in requests. diff --git a/pkg/config/app.go b/pkg/config/app.go index 198cd800..e9fa077c 100644 --- a/pkg/config/app.go +++ b/pkg/config/app.go @@ -271,3 +271,28 @@ func (c *AppConfig) HasBootnodes() bool { return c.Bootnodes[0] != "" } + +/* + +func (c *AppConfig) WorkerManagerOptions() []workers.WorkerOptionFunc { + workerManagerOptions := []workers.WorkerOptionFunc{} + if c.TwitterScraper { + workerManagerOptions = append(workerManagerOptions, workers.EnableTwitterWorker) + } + + if c.TelegramScraper { + workerManagerOptions = append(workerManagerOptions, workers.EnableDiscordScraperWorker) + } + + if c.DiscordScraper { + workerManagerOptions = append(workerManagerOptions, workers.EnableDiscordScraperWorker) + } + + if c.WebScraper { + workerManagerOptions = append(workerManagerOptions, workers.EnableWebScraperWorker) + } + + return workerManagerOptions +} + +*/ diff --git a/pkg/config/constants.go b/pkg/config/constants.go index d3fd5a1e..f350ffe3 100644 --- a/pkg/config/constants.go +++ b/pkg/config/constants.go @@ -6,8 +6,6 @@ import ( "io" "net/http" "os" - - "github.com/libp2p/go-libp2p/core/protocol" ) // ModelType defines a type for model strings. @@ -98,7 +96,6 @@ const ( CachePath = "CACHE_PATH" Faucet = "FAUCET" - MasaPrefix = "/masa" OracleProtocol = "oracle_protocol" WorkerProtocol = "worker_protocol" NodeDataSyncProtocol = "nodeDataSync" @@ -126,26 +123,7 @@ const ( LlmCfUrl = "LLM_CF_URL" ) -// ProtocolWithVersion returns a libp2p protocol ID string -// with the configured version and environment suffix. -func ProtocolWithVersion(protocolName string) protocol.ID { - if GetInstance().Environment == "" { - return protocol.ID(fmt.Sprintf("%s/%s/%s", MasaPrefix, protocolName, GetInstance().Version)) - } - return protocol.ID(fmt.Sprintf("%s/%s/%s-%s", MasaPrefix, protocolName, GetInstance().Version, GetInstance().Environment)) -} - -// TopicWithVersion returns a topic string with the configured version -// and environment suffix. -func TopicWithVersion(protocolName string) string { - if GetInstance().Environment == "" { - return fmt.Sprintf("%s/%s/%s", MasaPrefix, protocolName, GetInstance().Version) - } - return fmt.Sprintf("%s/%s/%s-%s", MasaPrefix, protocolName, GetInstance().Version, GetInstance().Environment) -} - // Function to call the Cloudflare API and parse the response - func GetCloudflareModels() ([]string, error) { url := "https://api.cloudflare.com/client/v4/accounts/a72433aa3bb83aecaca1bc8acecdb166/ai/models/search" req, err := http.NewRequest("GET", url, nil) diff --git a/pkg/config/options.go b/pkg/config/options.go index 73f9f0ce..50712bcd 100644 --- a/pkg/config/options.go +++ b/pkg/config/options.go @@ -3,18 +3,30 @@ package config import ( "context" + "github.com/masa-finance/masa-oracle/node/types" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" ) type AppOption struct { - DisableCLIParse bool - IsStaked bool - Bootnodes []string - RandomIdentity bool - Services []func(ctx context.Context, node host.Host) - ProtocolHandlers map[protocol.ID]network.StreamHandler + DisableCLIParse bool + IsStaked bool + Bootnodes []string + RandomIdentity bool + Services []func(ctx context.Context, node host.Host) + PubSubHandles []PubSubHandlers + ProtocolHandlers map[protocol.ID]network.StreamHandler + MasaProtocolHandlers map[string]network.StreamHandler + Environment string + Version string +} + +type PubSubHandlers struct { + ProtocolName string + Handler types.SubscriptionHandler + IncludeSelf bool } type Option func(*AppOption) @@ -57,3 +69,30 @@ func WithProtocolHandler(pid protocol.ID, n network.StreamHandler) Option { o.ProtocolHandlers[pid] = n } } + +func WithEnvironment(env string) Option { + return func(o *AppOption) { + o.Environment = env + } +} + +func WithVersion(version string) Option { + return func(o *AppOption) { + o.Version = version + } +} + +func WithMasaProtocolHandler(pid string, n network.StreamHandler) Option { + return func(o *AppOption) { + if o.MasaProtocolHandlers == nil { + o.MasaProtocolHandlers = make(map[string]network.StreamHandler) + } + o.MasaProtocolHandlers[pid] = n + } +} + +func WithPubSubHandler(protocolName string, handler types.SubscriptionHandler, includeSelf bool) Option { + return func(o *AppOption) { + o.PubSubHandles = append(o.PubSubHandles, PubSubHandlers{protocolName, handler, includeSelf}) + } +} diff --git a/pkg/db/resolver_cache.go b/pkg/db/resolver_cache.go index 26f6e22e..4473c8da 100644 --- a/pkg/db/resolver_cache.go +++ b/pkg/db/resolver_cache.go @@ -234,7 +234,8 @@ func iterateAndPublish(ctx context.Context, node *node.OracleNode) { // It runs a ticker to call iterateAndPublish on the provided interval. func monitorNodeData(ctx context.Context, node *node.OracleNode) { syncInterval := time.Second * 60 - err := node.PubSubManager.Subscribe(config.TopicWithVersion(config.NodeGossipTopic), node.NodeTracker) + + err := node.Subscribe(config.NodeGossipTopic, node.NodeTracker) if err != nil { logrus.Errorf("[-] Error subscribing to node gossip topic: %v", err) } @@ -257,7 +258,7 @@ func monitorNodeData(ctx context.Context, node *node.OracleNode) { } jsonData, _ := json.Marshal(nodeData) - e := node.PubSubManager.Publish(config.TopicWithVersion(config.NodeGossipTopic), jsonData) + e := node.PublishTopic(config.NodeGossipTopic, jsonData) if e != nil { logrus.Errorf("[-] Error publishing node data: %v", e) } diff --git a/pkg/pubsub/manager.go b/pkg/pubsub/manager.go index 9b704b23..ff0f839c 100644 --- a/pkg/pubsub/manager.go +++ b/pkg/pubsub/manager.go @@ -6,22 +6,18 @@ import ( "fmt" "os" + "github.com/masa-finance/masa-oracle/node/types" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/sirupsen/logrus" ) -// SubscriptionHandler defines the interface for handling pubsub messages. -// Implementations should subscribe to topics and handle incoming messages. -type SubscriptionHandler interface { - HandleMessage(msg *pubsub.Message) -} - type Manager struct { ctx context.Context topics map[string]*pubsub.Topic subscriptions map[string]*pubsub.Subscription - handlers map[string]SubscriptionHandler + handlers map[string]types.SubscriptionHandler gossipSub *pubsub.PubSub host host.Host } @@ -40,7 +36,7 @@ func NewPubSubManager(ctx context.Context, host host.Host) (*Manager, error) { / ctx: ctx, subscriptions: make(map[string]*pubsub.Subscription), topics: make(map[string]*pubsub.Topic), - handlers: make(map[string]SubscriptionHandler), + handlers: make(map[string]types.SubscriptionHandler), gossipSub: gossipSub, host: host, } @@ -70,7 +66,7 @@ func (sm *Manager) createTopic(topicName string) (*pubsub.Topic, error) { // It creates the topic if needed, subscribes to it, and adds the subscription // and handler to the manager's maps. It launches a goroutine to handle incoming // messages, skipping messages from self, and calling the handler on each message. -func (sm *Manager) AddSubscription(topicName string, handler SubscriptionHandler, includeSelf bool) error { +func (sm *Manager) AddSubscription(topicName string, handler types.SubscriptionHandler, includeSelf bool) error { topic, err := sm.createTopic(topicName) if err != nil { return err @@ -141,9 +137,9 @@ func (sm *Manager) Publish(topic string, data []byte) error { return t.Publish(sm.ctx, data) } -// GetHandler returns the SubscriptionHandler for the given topic name. +// GetHandler returns the types.SubscriptionHandler for the given topic name. // It returns an error if no handler exists for the given topic. -func (sm *Manager) GetHandler(topic string) (SubscriptionHandler, error) { +func (sm *Manager) GetHandler(topic string) (types.SubscriptionHandler, error) { handler, ok := sm.handlers[topic] if !ok { return nil, fmt.Errorf("no handler for topic %s", topic) @@ -209,7 +205,7 @@ func (sm *Manager) PublishMessage(topicName, message string) error { // given topic name. It gets the existing subscription, saves it and the // handler, and starts a goroutine to call the handler for each new message. // Returns an error if unable to get the subscription. -func (sm *Manager) Subscribe(topicName string, handler SubscriptionHandler) error { +func (sm *Manager) Subscribe(topicName string, handler types.SubscriptionHandler) error { sub, err := sm.GetSubscription(topicName) if err != nil { return err diff --git a/pkg/pubsub/node_data.go b/pkg/pubsub/node_data.go index d80e9f90..4bd69f6f 100644 --- a/pkg/pubsub/node_data.go +++ b/pkg/pubsub/node_data.go @@ -5,10 +5,6 @@ import ( "fmt" "time" - "github.com/masa-finance/masa-oracle/internal/versioning" - "github.com/masa-finance/masa-oracle/pkg/config" - - "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/sirupsen/logrus" @@ -160,7 +156,7 @@ func (n *NodeData) WebScraper() bool { // Joined updates the NodeData when the node joins the network. // It sets the join times, activity, active status, and logs based on stake status. -func (n *NodeData) Joined() { +func (n *NodeData) Joined(nodeVersion string) { now := time.Now() if n.FirstJoinedUnix == 0 { n.FirstJoinedUnix = now.Unix() @@ -170,7 +166,7 @@ func (n *NodeData) Joined() { n.Activity = ActivityJoined n.IsActive = true - n.Version = config.GetInstance().Version + n.Version = nodeVersion logMessage := fmt.Sprintf("[+] %s node joined: %s", map[bool]string{true: "Staked", false: "Unstaked"}[n.IsStaked], n.MultiaddrsString) if n.IsStaked { @@ -248,26 +244,6 @@ func (n *NodeData) UpdateAccumulatedUptime() { n.AccumulatedUptimeStr = n.AccumulatedUptime.String() } -// GetSelfNodeData converts the local node's data into a JSON byte array. -// It populates a NodeData struct with the node's ID, staking status, and Ethereum address. -// The NodeData struct is then marshalled into a JSON byte array. -// Returns nil if there is an error marshalling to JSON. -func GetSelfNodeData(host host.Host, isStaked bool, addr multiaddr.Multiaddr, publicEthAddress string) *NodeData { - // Create and populate NodeData - nodeData := NewNodeData(addr, host.ID(), publicEthAddress, ActivityJoined) - nodeData.MultiaddrsString = addr.String() - nodeData.IsStaked = isStaked - nodeData.IsTwitterScraper = config.GetInstance().TwitterScraper - nodeData.IsDiscordScraper = config.GetInstance().DiscordScraper - nodeData.IsTelegramScraper = config.GetInstance().TelegramScraper - nodeData.IsWebScraper = config.GetInstance().WebScraper - nodeData.IsValidator = config.GetInstance().Validator - nodeData.IsActive = true - nodeData.Version = versioning.ProtocolVersion - - return nodeData -} - func (n *NodeData) MergeMultiaddresses(addr multiaddr.Multiaddr) { addrExists := false for _, existingAddr := range n.Multiaddrs { diff --git a/pkg/pubsub/node_event_tracker.go b/pkg/pubsub/node_event_tracker.go index c19f7f59..91231148 100644 --- a/pkg/pubsub/node_event_tracker.go +++ b/pkg/pubsub/node_event_tracker.go @@ -21,6 +21,7 @@ type NodeEventTracker struct { nodeData *SafeMap nodeDataFile string ConnectBuffer map[string]ConnectBufferEntry + nodeVersion string } type ConnectBufferEntry struct { @@ -35,6 +36,7 @@ type ConnectBufferEntry struct { func NewNodeEventTracker(version, environment string, hostId string) *NodeEventTracker { net := &NodeEventTracker{ nodeData: NewSafeMap(), + nodeVersion: version, NodeDataChan: make(chan *NodeData), nodeDataFile: fmt.Sprintf("%s_%s_node_data.json", version, environment), ConnectBuffer: make(map[string]ConnectBufferEntry), @@ -83,7 +85,7 @@ func (net *NodeEventTracker) Connected(n network.Network, c network.Conn) { // Node appears already connected, buffer this connect event net.ConnectBuffer[peerID] = ConnectBufferEntry{NodeData: nodeData, ConnectTime: time.Now()} } else { - nodeData.Joined() + nodeData.Joined(net.nodeVersion) err := net.AddOrUpdateNodeData(nodeData, true) if err != nil { logrus.Error("[-] Error adding or updating node data: ", err) @@ -118,7 +120,7 @@ func (net *NodeEventTracker) Disconnected(n network.Network, c network.Conn) { if buffered.NodeData != nil { buffered.NodeData.Left() delete(net.ConnectBuffer, peerID) - buffered.NodeData.Joined() + buffered.NodeData.Joined(net.nodeVersion) net.NodeDataChan <- buffered.NodeData } else { nodeData.Left() @@ -306,7 +308,7 @@ func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip nd, ok := net.nodeData.Get(nodeData.PeerId.String()) if !ok { nodeData.SelfIdentified = true - nodeData.Joined() + nodeData.Joined(net.nodeVersion) net.NodeDataChan <- nodeData net.nodeData.Set(nodeData.PeerId.String(), nodeData) } else { @@ -385,7 +387,7 @@ func (net *NodeEventTracker) ClearExpiredBufferEntries() { // first force a leave event so that timestamps are updated properly entry.NodeData.Left() // Buffer period expired without a disconnect, process connect - entry.NodeData.Joined() + entry.NodeData.Joined(net.nodeVersion) net.NodeDataChan <- entry.NodeData delete(net.ConnectBuffer, peerID) } diff --git a/pkg/tests/api_test.go b/pkg/tests/api_test.go index 8636bb17..4784fd1a 100644 --- a/pkg/tests/api_test.go +++ b/pkg/tests/api_test.go @@ -5,18 +5,21 @@ import ( "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/api" + "github.com/masa-finance/masa-oracle/pkg/pubsub" + "github.com/masa-finance/masa-oracle/pkg/workers" ) func TestAPI(t *testing.T) { // Create a new OracleNode instance n := &node.OracleNode{} + whm := &workers.WorkHandlerManager{} + pubKeySub := &pubsub.PublicKeySubscriptionHandler{} // Initialize the API - api := api.NewAPI(n) + api := api.NewAPI(n, whm, pubKeySub) // Test API initialization if api == nil { t.Fatal("Failed to initialize API") } - } diff --git a/pkg/workers/options.go b/pkg/workers/options.go new file mode 100644 index 00000000..0f261748 --- /dev/null +++ b/pkg/workers/options.go @@ -0,0 +1,32 @@ +package workers + +type WorkerOption struct { + isTwitterWorker bool + isWebScraperWorker bool + isLLMServerWorker bool + isDiscordScraperWorker bool +} + +type WorkerOptionFunc func(*WorkerOption) + +var EnableTwitterWorker = func(o *WorkerOption) { + o.isTwitterWorker = true +} + +var EnableWebScraperWorker = func(o *WorkerOption) { + o.isWebScraperWorker = true +} + +var EnableLLMServerWorker = func(o *WorkerOption) { + o.isLLMServerWorker = true +} + +var EnableDiscordScraperWorker = func(o *WorkerOption) { + o.isDiscordScraperWorker = true +} + +func (a *WorkerOption) Apply(opts ...WorkerOptionFunc) { + for _, opt := range opts { + opt(a) + } +} diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index 1b030af4..f44b1c66 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -21,20 +21,41 @@ import ( data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) -var ( - instance *WorkHandlerManager - once sync.Once -) +func NewWorkHandlerManager(opts ...WorkerOptionFunc) *WorkHandlerManager { + options := &WorkerOption{} + options.Apply(opts...) -func GetWorkHandlerManager() *WorkHandlerManager { - once.Do(func() { - instance = &WorkHandlerManager{ - handlers: make(map[data_types.WorkerType]*WorkHandlerInfo), - eventTracker: event.NewEventTracker(nil), - } - instance.setupHandlers() - }) - return instance + whm := &WorkHandlerManager{ + handlers: make(map[data_types.WorkerType]*WorkHandlerInfo), + eventTracker: event.NewEventTracker(nil), + } + + if options.isTwitterWorker { + whm.addWorkHandler(data_types.Twitter, &handlers.TwitterQueryHandler{}) + whm.addWorkHandler(data_types.TwitterFollowers, &handlers.TwitterFollowersHandler{}) + whm.addWorkHandler(data_types.TwitterProfile, &handlers.TwitterProfileHandler{}) + whm.addWorkHandler(data_types.TwitterSentiment, &handlers.TwitterSentimentHandler{}) + whm.addWorkHandler(data_types.TwitterTrends, &handlers.TwitterTrendsHandler{}) + } + + if options.isWebScraperWorker { + whm.addWorkHandler(data_types.Web, &handlers.WebHandler{}) + whm.addWorkHandler(data_types.WebSentiment, &handlers.WebSentimentHandler{}) + } + + if options.isLLMServerWorker { + whm.addWorkHandler(data_types.LLMChat, &handlers.LLMChatHandler{}) + } + + if options.isDiscordScraperWorker { + whm.addWorkHandler(data_types.Discord, &handlers.DiscordProfileHandler{}) + whm.addWorkHandler(data_types.DiscordChannelMessages, &handlers.DiscordChannelHandler{}) + whm.addWorkHandler(data_types.DiscordSentiment, &handlers.DiscordSentimentHandler{}) + whm.addWorkHandler(data_types.DiscordGuildChannels, &handlers.DiscordGuildHandler{}) + whm.addWorkHandler(data_types.DiscordUserGuilds, &handlers.DiscoreUserGuildsHandler{}) + } + + return whm } // ErrHandlerNotFound is an error returned when a work handler cannot be found. @@ -59,31 +80,6 @@ type WorkHandlerManager struct { eventTracker *event.EventTracker } -func (whm *WorkHandlerManager) setupHandlers() { - cfg := config.GetInstance() - if cfg.TwitterScraper { - whm.addWorkHandler(data_types.Twitter, &handlers.TwitterQueryHandler{}) - whm.addWorkHandler(data_types.TwitterFollowers, &handlers.TwitterFollowersHandler{}) - whm.addWorkHandler(data_types.TwitterProfile, &handlers.TwitterProfileHandler{}) - whm.addWorkHandler(data_types.TwitterSentiment, &handlers.TwitterSentimentHandler{}) - whm.addWorkHandler(data_types.TwitterTrends, &handlers.TwitterTrendsHandler{}) - } - if cfg.WebScraper { - whm.addWorkHandler(data_types.Web, &handlers.WebHandler{}) - whm.addWorkHandler(data_types.WebSentiment, &handlers.WebSentimentHandler{}) - } - if cfg.LlmServer { - whm.addWorkHandler(data_types.LLMChat, &handlers.LLMChatHandler{}) - } - if cfg.DiscordScraper { - whm.addWorkHandler(data_types.Discord, &handlers.DiscordProfileHandler{}) - whm.addWorkHandler(data_types.DiscordChannelMessages, &handlers.DiscordChannelHandler{}) - whm.addWorkHandler(data_types.DiscordSentiment, &handlers.DiscordSentimentHandler{}) - whm.addWorkHandler(data_types.DiscordGuildChannels, &handlers.DiscordGuildHandler{}) - whm.addWorkHandler(data_types.DiscordUserGuilds, &handlers.DiscoreUserGuildsHandler{}) - } -} - // addWorkHandler registers a new work handler under a specific name. func (whm *WorkHandlerManager) addWorkHandler(wType data_types.WorkerType, handler WorkHandler) { whm.mu.Lock() @@ -192,7 +188,7 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *node.OracleNode, worker da } else { //whm.eventTracker.TrackRemoteWorkerConnection(worker.AddrInfo.ID.String()) logrus.Debugf("[+] Connection established with node: %s", worker.AddrInfo.ID.String()) - stream, err := node.Host.NewStream(ctxWithTimeout, worker.AddrInfo.ID, config.ProtocolWithVersion(config.WorkerProtocol)) + stream, err := node.ProtocolStream(ctxWithTimeout, worker.AddrInfo.ID, config.WorkerProtocol) if err != nil { response.Error = fmt.Sprintf("error opening stream: %v", err) whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String()) From a20ece81c5043cba533110753d17fdf9acc1535e Mon Sep 17 00:00:00 2001 From: mudler Date: Thu, 19 Sep 2024 16:35:44 +0200 Subject: [PATCH 4/4] feat(node): do not consume config - Untie config from viper business logics - Now options are consistently used inside the node rather the config singletons - Reduce usage of config's singleton - Generate options from Viper configuration - Attach services to main node business logic to start up services Signed-off-by: mudler --- cmd/masa-node/config.go | 88 +++++++++++++++ cmd/masa-node/main.go | 61 +--------- node/options.go | 157 ++++++++++++++++++++++++++ node/oracle_node.go | 106 ++++++++--------- node/oracle_node_listener.go | 6 +- node/protocol.go | 14 +-- pkg/api/handlers_data.go | 6 +- pkg/api/routes.go | 2 +- pkg/config/app.go | 47 +------- pkg/config/options.go | 98 ---------------- pkg/tests/integration/tracker_test.go | 15 ++- 11 files changed, 316 insertions(+), 284 deletions(-) create mode 100644 cmd/masa-node/config.go create mode 100644 node/options.go delete mode 100644 pkg/config/options.go diff --git a/cmd/masa-node/config.go b/cmd/masa-node/config.go new file mode 100644 index 00000000..578ca092 --- /dev/null +++ b/cmd/masa-node/config.go @@ -0,0 +1,88 @@ +package main + +import ( + "github.com/masa-finance/masa-oracle/node" + "github.com/masa-finance/masa-oracle/pkg/config" + pubsub "github.com/masa-finance/masa-oracle/pkg/pubsub" + "github.com/masa-finance/masa-oracle/pkg/workers" +) + +func initOptions(cfg *config.AppConfig) ([]node.Option, *workers.WorkHandlerManager, *pubsub.PublicKeySubscriptionHandler) { + // WorkerManager configuration + // XXX: this needs to be moved under config, but now it's here as there are import cycles given singletons + workerManagerOptions := []workers.WorkerOptionFunc{} + + masaNodeOptions := []node.Option{ + node.EnableStaked, + // config.WithService(), + node.WithEnvironment(cfg.Environment), + node.WithVersion(cfg.Version), + node.WithPort(cfg.PortNbr), + node.WithBootNodes(cfg.Bootnodes...), + } + + if cfg.TwitterScraper { + workerManagerOptions = append(workerManagerOptions, workers.EnableTwitterWorker) + masaNodeOptions = append(masaNodeOptions, node.IsTwitterScraper) + } + + if cfg.TelegramScraper { + // XXX: Telegram scraper is not implemented yet in the worker (?) + masaNodeOptions = append(masaNodeOptions, node.IsTelegramScraper) + } + + if cfg.DiscordScraper { + workerManagerOptions = append(workerManagerOptions, workers.EnableDiscordScraperWorker) + masaNodeOptions = append(masaNodeOptions, node.IsDiscordScraper) + } + + if cfg.WebScraper { + workerManagerOptions = append(workerManagerOptions, workers.EnableWebScraperWorker) + masaNodeOptions = append(masaNodeOptions, node.IsWebScraper) + } + + if cfg.LlmServer { + workerManagerOptions = append(workerManagerOptions, workers.EnableLLMServerWorker) + masaNodeOptions = append(masaNodeOptions, node.IsLlmServer) + } + + workHandlerManager := workers.NewWorkHandlerManager(workerManagerOptions...) + blockChainEventTracker := node.NewBlockChain() + pubKeySub := &pubsub.PublicKeySubscriptionHandler{} + + // TODO: Where the config is involved, move to the config the generation of + // Node options + masaNodeOptions = append(masaNodeOptions, []node.Option{ + // Register the worker manager + node.WithMasaProtocolHandler( + config.WorkerProtocol, + workHandlerManager.HandleWorkerStream, + ), + node.WithPubSubHandler(config.PublicKeyTopic, pubKeySub, false), + node.WithPubSubHandler(config.BlockTopic, blockChainEventTracker, true), + }...) + + if cfg.Validator { + // Subscribe and if actor start monitoring actor workers + // considering all that matters is if the node is staked + // and other peers can do work we only need to check this here + // if this peer can or cannot scrape or write that is checked in other places + masaNodeOptions = append(masaNodeOptions, + node.WithService(blockChainEventTracker.Start), + ) + } + + if cfg.UDP { + masaNodeOptions = append(masaNodeOptions, node.EnableUDP) + } + + if cfg.TCP { + masaNodeOptions = append(masaNodeOptions, node.EnableTCP) + } + + if cfg.Validator { + masaNodeOptions = append(masaNodeOptions, node.IsValidator) + } + + return masaNodeOptions, workHandlerManager, pubKeySub +} diff --git a/cmd/masa-node/main.go b/cmd/masa-node/main.go index 84e9be0c..409688ef 100644 --- a/cmd/masa-node/main.go +++ b/cmd/masa-node/main.go @@ -9,9 +9,7 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/masa-finance/masa-oracle/internal/versioning" - "github.com/masa-finance/masa-oracle/pkg/workers" - pubsub "github.com/masa-finance/masa-oracle/pkg/pubsub" "github.com/sirupsen/logrus" "github.com/masa-finance/masa-oracle/node" @@ -74,51 +72,7 @@ func main() { isValidator := cfg.Validator - // WorkerManager configuration - // XXX: this needs to be moved under config, but now it's here as there are import cycles given singletons - workerManagerOptions := []workers.WorkerOptionFunc{} - - if cfg.TwitterScraper { - workerManagerOptions = append(workerManagerOptions, workers.EnableTwitterWorker) - } - - if cfg.TelegramScraper { - workerManagerOptions = append(workerManagerOptions, workers.EnableDiscordScraperWorker) - } - - if cfg.DiscordScraper { - workerManagerOptions = append(workerManagerOptions, workers.EnableDiscordScraperWorker) - } - - if cfg.WebScraper { - workerManagerOptions = append(workerManagerOptions, workers.EnableWebScraperWorker) - } - - workHandlerManager := workers.NewWorkHandlerManager(workerManagerOptions...) - blockChainEventTracker := node.NewBlockChain() - - masaNodeOptions := []config.Option{ - config.EnableStaked, - // config.WithService(), - config.WithEnvironment(config.GetInstance().Environment), - config.WithVersion(config.GetInstance().Version), - } - - // Register the worker manager - masaNodeOptions = append(masaNodeOptions, - config.WithMasaProtocolHandler( - config.WorkerProtocol, - workHandlerManager.HandleWorkerStream, - ), - ) - - pubKeySub := &pubsub.PublicKeySubscriptionHandler{} - - masaNodeOptions = append(masaNodeOptions, - config.WithPubSubHandler(config.PublicKeyTopic, pubKeySub, false), - config.WithPubSubHandler(config.BlockTopic, blockChainEventTracker, true), - ) - + masaNodeOptions, workHandlerManager, pubKeySub := initOptions(cfg) // Create a new OracleNode masaNode, err := node.NewOracleNode(ctx, masaNodeOptions...) @@ -131,7 +85,6 @@ func main() { logrus.Fatal(err) } - masaNode.NodeTracker.GetAllNodeData() if cfg.TwitterScraper && cfg.DiscordScraper && cfg.WebScraper { logrus.Warn("[+] Node is set as all types of scrapers. This may not be intended behavior.") } @@ -147,18 +100,6 @@ func main() { // Init cache resolver db.InitResolverCache(masaNode, keyManager) - // Subscribe and if actor start monitoring actor workers - // considering all that matters is if the node is staked - // and other peers can do work we only need to check this here - // if this peer can or cannot scrape or write that is checked in other places - if masaNode.IsStaked { - if masaNode.IsValidator { - go blockChainEventTracker.Start(ctx, masaNode) - } - - go masaNode.NodeTracker.ClearExpiredWorkerTimeouts() - } - // Listen for SIGINT (CTRL+C) c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) diff --git a/node/options.go b/node/options.go new file mode 100644 index 00000000..770a661c --- /dev/null +++ b/node/options.go @@ -0,0 +1,157 @@ +package node + +import ( + "context" + + "github.com/masa-finance/masa-oracle/node/types" + + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/protocol" +) + +type NodeOption struct { + DisableCLIParse bool + IsStaked bool + UDP bool + TCP bool + IsValidator bool + PortNbr int + + IsTwitterScraper bool + IsDiscordScraper bool + IsTelegramScraper bool + IsWebScraper bool + IsLlmServer bool + + Bootnodes []string + RandomIdentity bool + Services []func(ctx context.Context, node *OracleNode) + PubSubHandles []PubSubHandlers + ProtocolHandlers map[protocol.ID]network.StreamHandler + MasaProtocolHandlers map[string]network.StreamHandler + Environment string + Version string +} + +type PubSubHandlers struct { + ProtocolName string + Handler types.SubscriptionHandler + IncludeSelf bool +} + +type Option func(*NodeOption) + +var DisableCLIParse = func(o *NodeOption) { + o.DisableCLIParse = true +} + +var EnableStaked = func(o *NodeOption) { + o.IsStaked = true +} + +var EnableRandomIdentity = func(o *NodeOption) { + o.RandomIdentity = true +} + +var EnableTCP = func(o *NodeOption) { + o.TCP = true +} + +var EnableUDP = func(o *NodeOption) { + o.UDP = true +} + +var IsValidator = func(o *NodeOption) { + o.IsValidator = true +} + +var IsTwitterScraper = func(o *NodeOption) { + o.IsTwitterScraper = true +} + +var IsDiscordScraper = func(o *NodeOption) { + o.IsDiscordScraper = true +} + +var IsTelegramScraper = func(o *NodeOption) { + o.IsTelegramScraper = true +} + +var IsWebScraper = func(o *NodeOption) { + o.IsWebScraper = true +} + +var IsLlmServer = func(o *NodeOption) { + o.IsLlmServer = true +} + +func (a *NodeOption) Apply(opts ...Option) { + for _, opt := range opts { + opt(a) + } +} + +// HasBootnodes checks if the AppConfig has any bootnodes configured. +// It returns true if there is at least one bootnode in the Bootnodes slice and it is not an empty string. +// Otherwise, it returns false, indicating that no bootnodes are configured. +func (a *NodeOption) HasBootnodes() bool { + if len(a.Bootnodes) == 0 { + return false + } + + return a.Bootnodes[0] != "" +} + +func WithBootNodes(bootnodes ...string) Option { + return func(o *NodeOption) { + o.Bootnodes = append(o.Bootnodes, bootnodes...) + } +} + +func WithService(plugins ...func(ctx context.Context, node *OracleNode)) Option { + return func(o *NodeOption) { + o.Services = append(o.Services, plugins...) + } +} + +func WithProtocolHandler(pid protocol.ID, n network.StreamHandler) Option { + return func(o *NodeOption) { + if o.ProtocolHandlers == nil { + o.ProtocolHandlers = make(map[protocol.ID]network.StreamHandler) + } + o.ProtocolHandlers[pid] = n + } +} + +func WithEnvironment(env string) Option { + return func(o *NodeOption) { + o.Environment = env + } +} + +func WithVersion(version string) Option { + return func(o *NodeOption) { + o.Version = version + } +} + +func WithMasaProtocolHandler(pid string, n network.StreamHandler) Option { + return func(o *NodeOption) { + if o.MasaProtocolHandlers == nil { + o.MasaProtocolHandlers = make(map[string]network.StreamHandler) + } + o.MasaProtocolHandlers[pid] = n + } +} + +func WithPubSubHandler(protocolName string, handler types.SubscriptionHandler, includeSelf bool) Option { + return func(o *NodeOption) { + o.PubSubHandles = append(o.PubSubHandles, PubSubHandlers{protocolName, handler, includeSelf}) + } +} + +func WithPort(port int) Option { + return func(o *NodeOption) { + o.PortNbr = port + } +} diff --git a/node/oracle_node.go b/node/oracle_node.go index d54821f7..05a99ddd 100644 --- a/node/oracle_node.go +++ b/node/oracle_node.go @@ -33,28 +33,21 @@ import ( ) type OracleNode struct { - Host host.Host - Protocol protocol.ID - priorityAddrs multiaddr.Multiaddr - multiAddrs []multiaddr.Multiaddr - DHT *dht.IpfsDHT - PeerChan chan myNetwork.PeerEvent - NodeTracker *pubsub.NodeEventTracker - PubSubManager *pubsub.Manager - Signature string - IsStaked bool - IsValidator bool - IsTwitterScraper bool - IsDiscordScraper bool - IsTelegramScraper bool - IsWebScraper bool - IsLlmServer bool - StartTime time.Time - WorkerTracker *pubsub.WorkerEventTracker - BlockTracker *BlockEventTracker - Blockchain *chain.Chain - options config.AppOption - Context context.Context + Host host.Host + Protocol protocol.ID + priorityAddrs multiaddr.Multiaddr + multiAddrs []multiaddr.Multiaddr + DHT *dht.IpfsDHT + PeerChan chan myNetwork.PeerEvent + NodeTracker *pubsub.NodeEventTracker + PubSubManager *pubsub.Manager + Signature string + StartTime time.Time + WorkerTracker *pubsub.WorkerEventTracker + BlockTracker *BlockEventTracker + Blockchain *chain.Chain + Options NodeOption + Context context.Context } // GetMultiAddrs returns the priority multiaddr for this node. @@ -83,12 +76,11 @@ func (node *OracleNode) GetP2PMultiAddrs() ([]multiaddr.Multiaddr, error) { // NewOracleNode creates a new OracleNode instance with the provided context and // staking status. It initializes the libp2p host, DHT, pubsub manager, and other // components needed for an Oracle node to join the network and participate. -func NewOracleNode(ctx context.Context, opts ...config.Option) (*OracleNode, error) { - o := &config.AppOption{} +func NewOracleNode(ctx context.Context, opts ...Option) (*OracleNode, error) { + o := &NodeOption{} o.Apply(opts...) // Start with the default scaling limits. - cfg := config.GetInstance(opts...) scalingLimits := rcmgr.DefaultLimits concreteLimits := scalingLimits.AutoScale() limiter := rcmgr.NewFixedLimiter(concreteLimits) @@ -120,13 +112,13 @@ func NewOracleNode(ctx context.Context, opts ...config.Option) (*OracleNode, err // sudo sysctl -w net.core.rmem_max=7500000 // sudo sysctl -w net.core.wmem_max=7500000 // sudo sysctl -p - if cfg.UDP { - addrStr = append(addrStr, fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", cfg.PortNbr)) + if o.UDP { + addrStr = append(addrStr, fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", o.PortNbr)) libp2pOptions = append(libp2pOptions, libp2p.Transport(quic.NewTransport)) } - if cfg.TCP { + if o.TCP { securityOptions = append(securityOptions, libp2p.Security(libp2ptls.ID, libp2ptls.New)) - addrStr = append(addrStr, fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", cfg.PortNbr)) + addrStr = append(addrStr, fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", o.PortNbr)) libp2pOptions = append(libp2pOptions, libp2p.Transport(tcp.NewTCPTransport)) libp2pOptions = append(libp2pOptions, libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport)) } @@ -144,21 +136,14 @@ func NewOracleNode(ctx context.Context, opts ...config.Option) (*OracleNode, err } n := &OracleNode{ - Host: hst, - multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst), - PeerChan: make(chan myNetwork.PeerEvent), - NodeTracker: pubsub.NewNodeEventTracker(versioning.ProtocolVersion, cfg.Environment, hst.ID().String()), - Context: ctx, - PubSubManager: subscriptionManager, - IsStaked: o.IsStaked, - IsValidator: cfg.Validator, - IsTwitterScraper: cfg.TwitterScraper, - IsDiscordScraper: cfg.DiscordScraper, - IsTelegramScraper: cfg.TelegramScraper, - IsWebScraper: cfg.WebScraper, - IsLlmServer: cfg.LlmServer, - Blockchain: &chain.Chain{}, - options: *o, + Host: hst, + multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst), + PeerChan: make(chan myNetwork.PeerEvent), + NodeTracker: pubsub.NewNodeEventTracker(versioning.ProtocolVersion, o.Environment, hst.ID().String()), + Context: ctx, + PubSubManager: subscriptionManager, + Blockchain: &chain.Chain{}, + Options: *o, } n.Protocol = n.protocolWithVersion(config.OracleProtocol) @@ -179,7 +164,7 @@ func (node *OracleNode) generateEthHexKeyForRandomIdentity() (string, error) { return common.BytesToAddress(ethereumCrypto.Keccak256(rawKey[1:])[12:]).Hex(), nil } -func getNodeData(host host.Host, isStaked bool, addr multiaddr.Multiaddr, publicEthAddress string) *pubsub.NodeData { +func (node *OracleNode) getNodeData(host host.Host, addr multiaddr.Multiaddr, publicEthAddress string) *pubsub.NodeData { // GetSelfNodeData converts the local node's data into a JSON byte array. // It populates a NodeData struct with the node's ID, staking status, and Ethereum address. // The NodeData struct is then marshalled into a JSON byte array. @@ -187,12 +172,12 @@ func getNodeData(host host.Host, isStaked bool, addr multiaddr.Multiaddr, public // Create and populate NodeData nodeData := pubsub.NewNodeData(addr, host.ID(), publicEthAddress, pubsub.ActivityJoined) nodeData.MultiaddrsString = addr.String() - nodeData.IsStaked = isStaked - nodeData.IsTwitterScraper = config.GetInstance().TwitterScraper - nodeData.IsDiscordScraper = config.GetInstance().DiscordScraper - nodeData.IsTelegramScraper = config.GetInstance().TelegramScraper - nodeData.IsWebScraper = config.GetInstance().WebScraper - nodeData.IsValidator = config.GetInstance().Validator + nodeData.IsStaked = node.Options.IsStaked + nodeData.IsTwitterScraper = node.Options.IsTwitterScraper + nodeData.IsDiscordScraper = node.Options.IsDiscordScraper + nodeData.IsTelegramScraper = node.Options.IsLlmServer + nodeData.IsWebScraper = node.Options.IsWebScraper + nodeData.IsValidator = node.Options.IsValidator nodeData.IsActive = true nodeData.Version = versioning.ProtocolVersion @@ -209,15 +194,15 @@ func (node *OracleNode) Start() (err error) { node.Host.SetStreamHandler(node.Protocol, node.handleStream) node.Host.SetStreamHandler(node.protocolWithVersion(config.NodeDataSyncProtocol), node.ReceiveNodeData) - for pid, n := range node.options.ProtocolHandlers { + for pid, n := range node.Options.ProtocolHandlers { node.Host.SetStreamHandler(pid, n) } - for protocol, n := range node.options.MasaProtocolHandlers { + for protocol, n := range node.Options.MasaProtocolHandlers { node.Host.SetStreamHandler(node.protocolWithVersion(protocol), n) } - if node.IsStaked { + if node.Options.IsStaked { node.Host.SetStreamHandler(node.protocolWithVersion(config.NodeGossipTopic), node.GossipNodeData) } @@ -225,17 +210,18 @@ func (node *OracleNode) Start() (err error) { go node.ListenToNodeTracker() go node.handleDiscoveredPeers() + go node.NodeTracker.ClearExpiredWorkerTimeouts() var publicKeyHex string - if node.options.RandomIdentity { + if node.Options.RandomIdentity { publicKeyHex, _ = node.generateEthHexKeyForRandomIdentity() } else { publicKeyHex = masacrypto.KeyManagerInstance().EthAddress } - myNodeData := getNodeData(node.Host, node.IsStaked, node.priorityAddrs, publicKeyHex) + myNodeData := node.getNodeData(node.Host, node.priorityAddrs, publicKeyHex) - bootstrapNodes, err := myNetwork.GetBootNodesMultiAddress(append(config.GetInstance().Bootnodes, node.options.Bootnodes...)) + bootstrapNodes, err := myNetwork.GetBootNodesMultiAddress(node.Options.Bootnodes) if err != nil { return err } @@ -250,8 +236,8 @@ func (node *OracleNode) Start() (err error) { return err } - for _, p := range node.options.Services { - go p(node.Context, node.Host) + for _, p := range node.Options.Services { + go p(node.Context, node) } go myNetwork.Discover(node.Context, node.Host, node.DHT, node.Protocol) @@ -261,7 +247,7 @@ func (node *OracleNode) Start() (err error) { nodeData = myNodeData nodeData.SelfIdentified = true } - nodeData.Joined(node.options.Version) + nodeData.Joined(node.Options.Version) node.NodeTracker.HandleNodeData(*nodeData) // call SubscribeToTopics on startup diff --git a/node/oracle_node_listener.go b/node/oracle_node_listener.go index 0266c0d8..f6ad7121 100644 --- a/node/oracle_node_listener.go +++ b/node/oracle_node_listener.go @@ -29,7 +29,7 @@ func (node *OracleNode) ListenToNodeTracker() { case nodeData := <-node.NodeTracker.NodeDataChan: time.Sleep(1 * time.Second) jsonData, err := json.Marshal(nodeData) - if node.IsValidator { + if node.Options.IsValidator { _ = json.Unmarshal(jsonData, &nodeData) err = node.DHT.PutValue(context.Background(), "/db/"+nodeData.PeerId.String(), jsonData) if err != nil { @@ -51,7 +51,7 @@ func (node *OracleNode) ListenToNodeTracker() { // the node start time is greater than 5 minutes ago, // call SendNodeData in a separate goroutine if nodeData.Activity == pubsub2.ActivityJoined && - (!config.GetInstance().HasBootnodes() || time.Since(node.StartTime) > time.Minute*5) { + (!node.Options.HasBootnodes() || time.Since(node.StartTime) > time.Minute*5) { go node.SendNodeData(nodeData.PeerId) } case <-node.Context.Done(): @@ -176,7 +176,7 @@ func (node *OracleNode) ReceiveNodeData(stream network.Stream) { for _, nd := range page.Data { - if node.IsValidator { + if node.Options.IsValidator { for _, p := range page.Data { jsonData, _ := json.Marshal(p) _ = json.Unmarshal(jsonData, &nd) diff --git a/node/protocol.go b/node/protocol.go index b07c814f..b6f6b3cf 100644 --- a/node/protocol.go +++ b/node/protocol.go @@ -19,19 +19,19 @@ const ( // ProtocolWithVersion returns a libp2p protocol ID string // with the configured version and environment suffix. func (node *OracleNode) protocolWithVersion(protocolName string) protocol.ID { - if node.options.Environment == "" { - return protocol.ID(fmt.Sprintf("%s/%s/%s", masaPrefix, protocolName, node.options.Version)) + if node.Options.Environment == "" { + return protocol.ID(fmt.Sprintf("%s/%s/%s", masaPrefix, protocolName, node.Options.Version)) } - return protocol.ID(fmt.Sprintf("%s/%s/%s-%s", masaPrefix, protocolName, node.options.Version, node.options.Environment)) + return protocol.ID(fmt.Sprintf("%s/%s/%s-%s", masaPrefix, protocolName, node.Options.Version, node.Options.Environment)) } // TopicWithVersion returns a topic string with the configured version // and environment suffix. func (node *OracleNode) topicWithVersion(protocolName string) string { - if node.options.Environment == "" { - return fmt.Sprintf("%s/%s/%s", masaPrefix, protocolName, node.options.Version) + if node.Options.Environment == "" { + return fmt.Sprintf("%s/%s/%s", masaPrefix, protocolName, node.Options.Version) } - return fmt.Sprintf("%s/%s/%s-%s", masaPrefix, protocolName, node.options.Version, node.options.Environment) + return fmt.Sprintf("%s/%s/%s-%s", masaPrefix, protocolName, node.Options.Version, node.Options.Environment) } func (node *OracleNode) ProtocolStream(ctx context.Context, peerID peer.ID, protocolName string) (network.Stream, error) { @@ -43,7 +43,7 @@ func (node *OracleNode) ProtocolStream(ctx context.Context, peerID peer.ID, prot // Each subscription is managed through the node's PubSubManager, which orchestrates the message passing for these topics. // Errors during subscription are logged and returned, halting the process to ensure the node's correct setup before operation. func (node *OracleNode) subscribeToTopics() error { - for _, handler := range node.options.PubSubHandles { + for _, handler := range node.Options.PubSubHandles { if err := node.SubscribeTopic(handler.ProtocolName, handler.Handler, handler.IncludeSelf); err != nil { return err } diff --git a/pkg/api/handlers_data.go b/pkg/api/handlers_data.go index e6563575..fb8fb443 100644 --- a/pkg/api/handlers_data.go +++ b/pkg/api/handlers_data.go @@ -537,7 +537,7 @@ func (api *API) SearchAllGuilds() gin.HandlerFunc { // On success, it returns the scraped data in a sanitized JSON response. On failure, it returns an appropriate error message and HTTP status code. func (api *API) WebData() gin.HandlerFunc { return func(c *gin.Context) { - if !api.Node.IsStaked { + if !api.Node.Options.IsStaked { c.JSON(http.StatusBadRequest, gin.H{"error": "Node has not staked and cannot participate"}) return } @@ -836,7 +836,7 @@ func (api *API) CfLlmChat() gin.HandlerFunc { func (api *API) GetBlocks() gin.HandlerFunc { return func(c *gin.Context) { - if !api.Node.IsValidator { + if !api.Node.Options.IsValidator { c.JSON(http.StatusBadRequest, gin.H{"error": "Node is not a validator and cannot access this endpoint"}) return } @@ -902,7 +902,7 @@ func (api *API) GetBlocks() gin.HandlerFunc { func (api *API) GetBlockByHash() gin.HandlerFunc { return func(c *gin.Context) { - if !api.Node.IsValidator { + if !api.Node.Options.IsValidator { c.JSON(http.StatusBadRequest, gin.H{"error": "Node is not a validator and cannot access this endpoint"}) return } diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 4f548367..285b5801 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -56,7 +56,7 @@ func SetupRoutes(node *node.OracleNode, workerManager *workers.WorkHandlerManage // Middleware to enforce API token authentication, excluding ignored routes. router.Use(func(c *gin.Context) { - if API.Node.IsStaked { + if API.Node.Options.IsStaked { c.Next() // Proceed to the next middleware or handler as a staked node. return } diff --git a/pkg/config/app.go b/pkg/config/app.go index e9fa077c..26d48d8a 100644 --- a/pkg/config/app.go +++ b/pkg/config/app.go @@ -110,10 +110,7 @@ type AppConfig struct { // If the unmarshalling fails, the instance is set to nil. // // Subsequent calls to GetInstance will return the same initialized instance. -func GetInstance(options ...Option) *AppConfig { - o := &AppOption{} - o.Apply(options...) - +func GetInstance() *AppConfig { once.Do(func() { instance = &AppConfig{} @@ -122,10 +119,8 @@ func GetInstance(options ...Option) *AppConfig { instance.setFileConfig(viper.GetString("FILE_PATH")) - if !o.DisableCLIParse { - if err := instance.setCommandLineConfig(); err != nil { - logrus.Fatal(err) - } + if err := instance.setCommandLineConfig(); err != nil { + logrus.Fatal(err) } if err := viper.Unmarshal(instance); err != nil { @@ -260,39 +255,3 @@ func (c *AppConfig) LogConfig() { logrus.Infof("%s: %v", field.Name, value) } } - -// HasBootnodes checks if the AppConfig has any bootnodes configured. -// It returns true if there is at least one bootnode in the Bootnodes slice and it is not an empty string. -// Otherwise, it returns false, indicating that no bootnodes are configured. -func (c *AppConfig) HasBootnodes() bool { - if len(c.Bootnodes) == 0 { - return false - } - - return c.Bootnodes[0] != "" -} - -/* - -func (c *AppConfig) WorkerManagerOptions() []workers.WorkerOptionFunc { - workerManagerOptions := []workers.WorkerOptionFunc{} - if c.TwitterScraper { - workerManagerOptions = append(workerManagerOptions, workers.EnableTwitterWorker) - } - - if c.TelegramScraper { - workerManagerOptions = append(workerManagerOptions, workers.EnableDiscordScraperWorker) - } - - if c.DiscordScraper { - workerManagerOptions = append(workerManagerOptions, workers.EnableDiscordScraperWorker) - } - - if c.WebScraper { - workerManagerOptions = append(workerManagerOptions, workers.EnableWebScraperWorker) - } - - return workerManagerOptions -} - -*/ diff --git a/pkg/config/options.go b/pkg/config/options.go deleted file mode 100644 index 50712bcd..00000000 --- a/pkg/config/options.go +++ /dev/null @@ -1,98 +0,0 @@ -package config - -import ( - "context" - - "github.com/masa-finance/masa-oracle/node/types" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/protocol" -) - -type AppOption struct { - DisableCLIParse bool - IsStaked bool - Bootnodes []string - RandomIdentity bool - Services []func(ctx context.Context, node host.Host) - PubSubHandles []PubSubHandlers - ProtocolHandlers map[protocol.ID]network.StreamHandler - MasaProtocolHandlers map[string]network.StreamHandler - Environment string - Version string -} - -type PubSubHandlers struct { - ProtocolName string - Handler types.SubscriptionHandler - IncludeSelf bool -} - -type Option func(*AppOption) - -var DisableCLIParse = func(o *AppOption) { - o.DisableCLIParse = true -} - -var EnableStaked = func(o *AppOption) { - o.IsStaked = true -} - -var EnableRandomIdentity = func(o *AppOption) { - o.RandomIdentity = true -} - -func (a *AppOption) Apply(opts ...Option) { - for _, opt := range opts { - opt(a) - } -} - -func WithBootNodes(bootnodes ...string) Option { - return func(o *AppOption) { - o.Bootnodes = append(o.Bootnodes, bootnodes...) - } -} - -func WithService(plugins ...func(ctx context.Context, node host.Host)) Option { - return func(o *AppOption) { - o.Services = append(o.Services, plugins...) - } -} - -func WithProtocolHandler(pid protocol.ID, n network.StreamHandler) Option { - return func(o *AppOption) { - if o.ProtocolHandlers == nil { - o.ProtocolHandlers = make(map[protocol.ID]network.StreamHandler) - } - o.ProtocolHandlers[pid] = n - } -} - -func WithEnvironment(env string) Option { - return func(o *AppOption) { - o.Environment = env - } -} - -func WithVersion(version string) Option { - return func(o *AppOption) { - o.Version = version - } -} - -func WithMasaProtocolHandler(pid string, n network.StreamHandler) Option { - return func(o *AppOption) { - if o.MasaProtocolHandlers == nil { - o.MasaProtocolHandlers = make(map[string]network.StreamHandler) - } - o.MasaProtocolHandlers[pid] = n - } -} - -func WithPubSubHandler(protocolName string, handler types.SubscriptionHandler, includeSelf bool) Option { - return func(o *AppOption) { - o.PubSubHandles = append(o.PubSubHandles, PubSubHandlers{protocolName, handler, includeSelf}) - } -} diff --git a/pkg/tests/integration/tracker_test.go b/pkg/tests/integration/tracker_test.go index bf9f46ac..7f8aa325 100644 --- a/pkg/tests/integration/tracker_test.go +++ b/pkg/tests/integration/tracker_test.go @@ -6,7 +6,6 @@ import ( "fmt" . "github.com/masa-finance/masa-oracle/node" - "github.com/masa-finance/masa-oracle/pkg/config" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -20,9 +19,9 @@ var _ = Describe("Oracle integration tests", func() { n, err := NewOracleNode( ctx, - config.EnableStaked, - config.DisableCLIParse, - config.EnableRandomIdentity, + EnableStaked, + DisableCLIParse, + EnableRandomIdentity, ) Expect(err).ToNot(HaveOccurred()) @@ -39,10 +38,10 @@ var _ = Describe("Oracle integration tests", func() { By(fmt.Sprintf("Generating second node with bootnodes %+v", bootNodes)) n2, err := NewOracleNode(ctx, - config.EnableStaked, - config.DisableCLIParse, - config.WithBootNodes(bootNodes...), - config.EnableRandomIdentity, + EnableStaked, + DisableCLIParse, + WithBootNodes(bootNodes...), + EnableRandomIdentity, ) Expect(err).ToNot(HaveOccurred()) Expect(n.Host.ID()).ToNot(Equal(n2.Host.ID()))