Skip to content

Commit

Permalink
improve metric endpoint reporting (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n authored Jul 24, 2024
1 parent ae25de6 commit f8a2512
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 186 deletions.
11 changes: 3 additions & 8 deletions aggregator/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"google.golang.org/grpc/peer"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"

"github.com/AvaProtocol/ap-avs/core/config"
Expand Down Expand Up @@ -59,14 +57,14 @@ type OperatorPool struct {
db storage.Storage
}

func (o *OperatorPool) Checkin(payload *avsproto.Checkin, remoteIp string) error {
func (o *OperatorPool) Checkin(payload *avsproto.Checkin) error {
now := time.Now()

status := &OperatorNode{
Address: payload.Address,
LastPingEpoch: now.Unix(),
MetricsPort: payload.MetricsPort,
RemoteIP: remoteIp,
RemoteIP: payload.RemoteIP,
Version: payload.Version,
}

Expand Down Expand Up @@ -98,10 +96,7 @@ func (o *OperatorPool) GetAll() []*OperatorNode {
}

func (r *RpcServer) Ping(ctx context.Context, payload *avsproto.Checkin) (*avsproto.CheckinResp, error) {
p, _ := peer.FromContext(ctx)
remoteIp := strings.Split(p.Addr.String(), ":")[0]

if err := r.operatorPool.Checkin(payload, remoteIp); err != nil {
if err := r.operatorPool.Checkin(payload); err != nil {
return nil, fmt.Errorf("cannot update operator status error: %w", err)
}

Expand Down
5 changes: 3 additions & 2 deletions aggregator/resources/index.gohtml
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@
<div class="min-w-0 flex-auto">
{{ if ne .Version "" }}
<p class="text-sm font-semibold leading-6 text-white">
v{{ .Version }}
{{ .Version }}
</p>
{{ end }}
<!--
{{ if gt .MetricsPort 0 }}
<p class="text-sm leading-6 text-white">
<a href="http://{{ .RemoteIP }}:{{ .MetricsPort }}/metrics">Metric</a>
</p>
{{ end }}
{{ end }}-->
</div>
</div>

Expand Down
40 changes: 40 additions & 0 deletions core/ipfetcher/ipfetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ipfetcher

import (
"io/ioutil"
"net"
"net/http"
"strings"
"time"
)

// GetIP fetches the public IP address from icanhazip.com
func GetIP() (string, error) {
// Create a custom HTTP client with timeout settings
client := &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 10 * time.Second,
}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
},
}

// Make the GET request
resp, err := client.Get("https://icanhazip.com")
if err != nil {
return "", err
}
defer resp.Body.Close()

// Read the response body
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}

// Trim any surrounding whitespace from the response body
ip := strings.TrimSpace(string(body))
return ip, nil
}
52 changes: 43 additions & 9 deletions operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/AvaProtocol/ap-avs/version"

"github.com/AvaProtocol/ap-avs/core/timekeeper"
"github.com/AvaProtocol/ap-avs/core/ipfetcher"

// insecure for local dev
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -67,6 +68,8 @@ type OperatorConfig struct {
EnableNodeApi bool `yaml:"enable_node_api"`

DbPath string `yaml:"db_path"`

PublicMetricsPort int32
}

type Operator struct {
Expand Down Expand Up @@ -106,7 +109,7 @@ type Operator struct {

elapsing *timekeeper.Elapsing

metricsPort int32
publicIP string
}

func RunWithConfig(configPath string) {
Expand Down Expand Up @@ -276,12 +279,6 @@ func NewOperatorFromConfig(c OperatorConfig) (*Operator, error) {
}
aggregatorRpcClient := avsproto.NewAggregatorClient(aggregatorConn)

parts := strings.Split(c.EigenMetricsIpPortAddress, ":")
if len(parts) !=2 {
panic(fmt.Errorf("EigenMetricsIpPortAddress: %s in operator config file is malform", c.EigenMetricsIpPortAddress))
}
metricsPort, _ := strconv.Atoi(parts[1])

operator := &Operator{
config: c,
logger: logger,
Expand Down Expand Up @@ -310,8 +307,6 @@ func NewOperatorFromConfig(c OperatorConfig) (*Operator, error) {

txManager: txMgr,
elapsing: elapsing,

metricsPort: int32(metricsPort),
}

operator.PopulateKnownConfigByChainID(chainId)
Expand Down Expand Up @@ -387,6 +382,45 @@ func (o *Operator) retryConnect() error{
return nil
}

// Optimistic get public ip address of the operator
// the IP address is used in combination with
func (o *Operator) GetPublicIP() string {
if o.publicIP == "" {
var err error
o.publicIP, err = ipfetcher.GetIP()
if err != nil {
// We will retry and eventually successful, the public ip isn't
// being used widely in our operation, only for metric scrape
o.logger.Errorf("error fetching public ip address %v", err)
}
}

return o.publicIP
}

func (c *OperatorConfig) GetPublicMetricPort() int32 {
// If we had port from env, use it, if not, we parse the port from config
if c.PublicMetricsPort > 0 {
return c.PublicMetricsPort
}

port := os.Getenv("PUBLIC_METRICS_PORT");
if port == "" {
parts := strings.Split(c.EigenMetricsIpPortAddress, ":")
if len(parts) !=2 {
panic(fmt.Errorf("EigenMetricsIpPortAddress: %s in operator config file is malform", c.EigenMetricsIpPortAddress))
}

port = parts[1]
}

portNum, _ := strconv.Atoi(port)

c.PublicMetricsPort = int32(portNum)
return c.PublicMetricsPort
}


// // Takes a NewTaskCreatedLog struct as input and returns a TaskResponseHeader struct.
// // The TaskResponseHeader struct is the struct that is signed and sent to the contract as a task response.
// func (o *Operator) ProcessNewTaskCreatedLog(newTaskCreatedLog *cstaskmanager.ContractAutomationTaskManagerNewTaskCreated) *cstaskmanager.IAutomationTaskManagerTaskResponse {
Expand Down
3 changes: 2 additions & 1 deletion operator/worker_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func (o *Operator) PingServer() {
// TODO: generate signature with bls key
Signature: "pending",
Version: version.Get(),
MetricsPort: o.metricsPort,
RemoteIP: o.GetPublicIP(),
MetricsPort: o.config.GetPublicMetricPort(),
})

elapsed := time.Now().Sub(start)
Expand Down
Loading

0 comments on commit f8a2512

Please sign in to comment.