Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(refactor): move node logic to the node package #533

Merged
merged 5 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions cmd/masa-node/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package main

import (
"github.com/masa-finance/masa-oracle/node"
"github.com/masa-finance/masa-oracle/pkg/config"
pubsub "github.com/masa-finance/masa-oracle/pkg/pubsub"
"github.com/masa-finance/masa-oracle/pkg/workers"
)

func initOptions(cfg *config.AppConfig) ([]node.Option, *workers.WorkHandlerManager, *pubsub.PublicKeySubscriptionHandler) {
// WorkerManager configuration
// XXX: this needs to be moved under config, but now it's here as there are import cycles given singletons
workerManagerOptions := []workers.WorkerOptionFunc{}

masaNodeOptions := []node.Option{
node.EnableStaked,
// config.WithService(),
node.WithEnvironment(cfg.Environment),
node.WithVersion(cfg.Version),
node.WithPort(cfg.PortNbr),
node.WithBootNodes(cfg.Bootnodes...),
}

if cfg.TwitterScraper {
workerManagerOptions = append(workerManagerOptions, workers.EnableTwitterWorker)
masaNodeOptions = append(masaNodeOptions, node.IsTwitterScraper)
}

if cfg.TelegramScraper {
// XXX: Telegram scraper is not implemented yet in the worker (?)
masaNodeOptions = append(masaNodeOptions, node.IsTelegramScraper)
}

if cfg.DiscordScraper {
workerManagerOptions = append(workerManagerOptions, workers.EnableDiscordScraperWorker)
masaNodeOptions = append(masaNodeOptions, node.IsDiscordScraper)
}

if cfg.WebScraper {
workerManagerOptions = append(workerManagerOptions, workers.EnableWebScraperWorker)
masaNodeOptions = append(masaNodeOptions, node.IsWebScraper)
}

if cfg.LlmServer {
workerManagerOptions = append(workerManagerOptions, workers.EnableLLMServerWorker)
masaNodeOptions = append(masaNodeOptions, node.IsLlmServer)
}

workHandlerManager := workers.NewWorkHandlerManager(workerManagerOptions...)
blockChainEventTracker := node.NewBlockChain()
pubKeySub := &pubsub.PublicKeySubscriptionHandler{}

// TODO: Where the config is involved, move to the config the generation of
// Node options
masaNodeOptions = append(masaNodeOptions, []node.Option{
// Register the worker manager
node.WithMasaProtocolHandler(
config.WorkerProtocol,
workHandlerManager.HandleWorkerStream,
),
node.WithPubSubHandler(config.PublicKeyTopic, pubKeySub, false),
node.WithPubSubHandler(config.BlockTopic, blockChainEventTracker, true),
}...)

if cfg.Validator {
// Subscribe and if actor start monitoring actor workers
// considering all that matters is if the node is staked
// and other peers can do work we only need to check this here
// if this peer can or cannot scrape or write that is checked in other places
masaNodeOptions = append(masaNodeOptions,
node.WithService(blockChainEventTracker.Start),
)
}

if cfg.UDP {
masaNodeOptions = append(masaNodeOptions, node.EnableUDP)
}

if cfg.TCP {
masaNodeOptions = append(masaNodeOptions, node.EnableTCP)
}

if cfg.Validator {
masaNodeOptions = append(masaNodeOptions, node.IsValidator)
}

return masaNodeOptions, workHandlerManager, pubKeySub
}
31 changes: 11 additions & 20 deletions cmd/masa-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import (
"github.com/multiformats/go-multiaddr"

"github.com/masa-finance/masa-oracle/internal/versioning"
"github.com/masa-finance/masa-oracle/pkg/workers"

"github.com/sirupsen/logrus"

masa "github.com/masa-finance/masa-oracle/pkg"
"github.com/masa-finance/masa-oracle/node"
"github.com/masa-finance/masa-oracle/pkg/api"
"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/masa-finance/masa-oracle/pkg/db"
Expand Down Expand Up @@ -73,41 +72,33 @@ func main() {

isValidator := cfg.Validator

masaNodeOptions, workHandlerManager, pubKeySub := initOptions(cfg)
// Create a new OracleNode
node, err := masa.NewOracleNode(ctx, config.EnableStaked)
masaNode, err := node.NewOracleNode(ctx, masaNodeOptions...)

if err != nil {
logrus.Fatal(err)
}
err = node.Start()

err = masaNode.Start()
if err != nil {
logrus.Fatal(err)
}

node.NodeTracker.GetAllNodeData()
if cfg.TwitterScraper && cfg.DiscordScraper && cfg.WebScraper {
logrus.Warn("[+] Node is set as all types of scrapers. This may not be intended behavior.")
}

if cfg.AllowedPeer {
cfg.AllowedPeerId = node.Host.ID().String()
cfg.AllowedPeerId = masaNode.Host.ID().String()
cfg.AllowedPeerPublicKey = keyManager.HexPubKey
logrus.Infof("[+] Allowed peer with ID: %s and PubKey: %s", cfg.AllowedPeerId, cfg.AllowedPeerPublicKey)
} else {
logrus.Warn("[-] This node is not set as the allowed peer")
}

// Init cache resolver
db.InitResolverCache(node, keyManager)

// Subscribe and if actor start monitoring actor workers
// considering all that matters is if the node is staked
// and other peers can do work we only need to check this here
// if this peer can or cannot scrape or write that is checked in other places
if node.IsStaked {
node.Host.SetStreamHandler(config.ProtocolWithVersion(config.WorkerProtocol), workers.GetWorkHandlerManager().HandleWorkerStream)
go masa.SubscribeToBlocks(ctx, node)
go node.NodeTracker.ClearExpiredWorkerTimeouts()
}
db.InitResolverCache(masaNode, keyManager)

// Listen for SIGINT (CTRL+C)
c := make(chan os.Signal, 1)
Expand All @@ -116,7 +107,7 @@ func main() {
// Cancel the context when SIGINT is received
go func() {
<-c
nodeData := node.NodeTracker.GetNodeData(node.Host.ID().String())
nodeData := masaNode.NodeTracker.GetNodeData(masaNode.Host.ID().String())
if nodeData != nil {
nodeData.Left()
}
Expand All @@ -130,7 +121,7 @@ func main() {
}
}()

router := api.SetupRoutes(node)
router := api.SetupRoutes(masaNode, workHandlerManager, pubKeySub)
go func() {
err = router.Run()
if err != nil {
Expand All @@ -139,7 +130,7 @@ func main() {
}()

// Get the multiaddress and IP address of the node
multiAddr := node.GetMultiAddrs() // Get the multiaddress
multiAddr := masaNode.GetMultiAddrs() // Get the multiaddress
ipAddr, err := multiAddr.ValueForProtocol(multiaddr.P_IP4) // Get the IP address
// Display the welcome message with the multiaddress and IP address
config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, isValidator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion)
Expand Down
212 changes: 212 additions & 0 deletions node/blockchain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package node

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"reflect"
"sync"
"time"

shell "github.com/ipfs/go-ipfs-api"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/masa-finance/masa-oracle/pkg/chain"
"github.com/sirupsen/logrus"
)

// Blockchain Implementation

type BlockData struct {
Block uint64 `json:"block"`
InputData interface{} `json:"input_data"`
TransactionHash string `json:"transaction_hash"`
PreviousHash string `json:"previous_hash"`
TransactionNonce int `json:"nonce"`
}

type Blocks struct {
BlockData []BlockData `json:"blocks"`
}

type BlockEvents struct{}

type BlockEventTracker struct {
BlockEvents []BlockEvents
BlockTopic *pubsub.Topic
mu sync.Mutex
blocksCh chan *pubsub.Message
}

func NewBlockChain() *BlockEventTracker {
return &BlockEventTracker{
blocksCh: make(chan *pubsub.Message),
}
}

// HandleMessage processes incoming pubsub messages containing block events.
// It unmarshals the message data into a slice of BlockEvents and appends them
// to the tracker's BlockEvents slice.
func (b *BlockEventTracker) HandleMessage(m *pubsub.Message) {
var blockEvents any

// Try to decode as base64 first
decodedData, err := base64.StdEncoding.DecodeString(string(m.Data))
if err == nil {
m.Data = decodedData
}

// Try to unmarshal as JSON
err = json.Unmarshal(m.Data, &blockEvents)
if err != nil {
// If JSON unmarshal fails, try to interpret as string
blockEvents = string(m.Data)
}

b.mu.Lock()
defer b.mu.Unlock()

switch v := blockEvents.(type) {
case []BlockEvents:
b.BlockEvents = append(b.BlockEvents, v...)
case BlockEvents:
b.BlockEvents = append(b.BlockEvents, v)
case map[string]interface{}:
// Convert map to BlockEvents struct
newBlockEvent := BlockEvents{}
// You might need to add logic here to properly convert the map to BlockEvents
b.BlockEvents = append(b.BlockEvents, newBlockEvent)
case []interface{}:
// Convert each item in the slice to BlockEvents
for _, item := range v {
if be, ok := item.(BlockEvents); ok {
b.BlockEvents = append(b.BlockEvents, be)
}
}
case string:
// Handle string data
newBlockEvent := BlockEvents{}
// You might need to add logic here to properly convert the string to BlockEvents
b.BlockEvents = append(b.BlockEvents, newBlockEvent)
default:
logrus.Warnf("[-] Unexpected data type in message: %v", reflect.TypeOf(v))
}

b.blocksCh <- m
}

func updateBlocks(ctx context.Context, node *OracleNode) error {

var existingBlocks Blocks
blocks := chain.GetBlockchain(node.Blockchain)

for _, block := range blocks {
var inputData interface{}
err := json.Unmarshal(block.Data, &inputData)
if err != nil {
inputData = string(block.Data) // Fallback to string if unmarshal fails
}

blockData := BlockData{
Block: block.Block,
InputData: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%v", inputData))),
TransactionHash: fmt.Sprintf("%x", block.Hash),
PreviousHash: fmt.Sprintf("%x", block.Link),
TransactionNonce: int(block.Nonce),
}
existingBlocks.BlockData = append(existingBlocks.BlockData, blockData)
}
jsonData, err := json.Marshal(existingBlocks)
if err != nil {
return err
}

err = node.DHT.PutValue(ctx, "/db/blocks", jsonData)
if err != nil {
logrus.Warningf("[-] Unable to store block on DHT: %v", err)
}

if os.Getenv("IPFS_URL") != "" {

infuraURL := fmt.Sprintf("https://%s:%s@%s", os.Getenv("PID"), os.Getenv("PS"), os.Getenv("IPFS_URL"))
sh := shell.NewShell(infuraURL)

jsonBytes, err := json.Marshal(jsonData)
if err != nil {
logrus.Errorf("[-] Error marshalling JSON: %s", err)
}

reader := bytes.NewReader(jsonBytes)

hash, err := sh.AddWithOpts(reader, true, true)
if err != nil {
logrus.Errorf("[-] Error persisting to IPFS: %s", err)
} else {
logrus.Printf("[+] Ledger persisted with IPFS hash: https://dwn.infura-ipfs.io/ipfs/%s\n", hash)
_ = node.DHT.PutValue(ctx, "/db/ipfs", []byte(fmt.Sprintf("https://dwn.infura-ipfs.io/ipfs/%s", hash)))

}
}

return nil
}

func (b *BlockEventTracker) Start(ctx context.Context, node *OracleNode) {
err := node.Blockchain.Init()
if err != nil {
logrus.Error(err)
}

updateTicker := time.NewTicker(time.Second * 60)
defer updateTicker.Stop()

for {
select {
case block, ok := <-b.blocksCh:
if !ok {
logrus.Error("[-] Block channel closed")
return
}
if err := processBlock(node, block); err != nil {
logrus.Errorf("[-] Error processing block: %v", err)
// Consider adding a retry mechanism or circuit breaker here
}

case <-updateTicker.C:
logrus.Info("[+] blockchain tick")
if err := updateBlocks(ctx, node); err != nil {
logrus.Errorf("[-] Error updating blocks: %v", err)
// Consider adding a retry mechanism or circuit breaker here
}

case <-ctx.Done():
logrus.Info("[+] Context cancelled, stopping block subscription")
return
}
}
}

func processBlock(node *OracleNode, block *pubsub.Message) error {
blocks := chain.GetBlockchain(node.Blockchain)
for _, b := range blocks {
if string(b.Data) == string(block.Data) {
return nil // Block already exists
}
}

if err := node.Blockchain.AddBlock(block.Data); err != nil {
return fmt.Errorf("[-] failed to add block: %w", err)
}

if node.Blockchain.LastHash != nil {
b, err := node.Blockchain.GetBlock(node.Blockchain.LastHash)
if err != nil {
return fmt.Errorf("[-] failed to get last block: %w", err)
}
b.Print()
}

return nil
}
Loading
Loading