Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/active peer filters #65

Merged
merged 4 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/db/postgresql/active_peers_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func (c *DBClient) getActivePeers() ([]int, error) {
id,
peer_id
FROM peer_info
WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL
WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL and to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
`,
LastActivityValidRange,
)
if err != nil {
return activePeers, errors.Wrap(err, "unable to retrieve active peer's ids")
Expand All @@ -62,7 +63,6 @@ func (c *DBClient) getActivePeers() ([]int, error) {
}
activePeers = append(activePeers, id)
}

return activePeers, nil
}

Expand All @@ -73,6 +73,10 @@ func (c *DBClient) activePeersBackup() error {
if err != nil {
return errors.Wrap(err, "unable to backup active peers")
}
if len(activePeers) <= 0 {
log.Infof("tried to persist %d active peers (skipped)", len(activePeers))
return nil
}

// backup the list of active peers
_, err = c.psqlPool.Exec(
Expand Down
93 changes: 71 additions & 22 deletions pkg/db/postgresql/crawler_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
log "github.com/sirupsen/logrus"
)

var (
LastActivityValidRange = 180 // 6 Months
)

// this file contains all the list of queries to extract the metrics from the Crawler (as agnostic as possible from the network)

// Basic call over the whole list of non-deprecated peers
Expand All @@ -20,14 +24,19 @@ func (db *DBClient) GetClientDistribution() (map[string]interface{}, error) {
client_name, count(client_name) as count
FROM peer_info
WHERE
deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL
deprecated = 'false' and
attempted = 'true' and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
GROUP BY client_name
ORDER BY count DESC;
`,
LastActivityValidRange,
)
// make sure we close the rows and we free the connection/session
defer rows.Close()
if err != nil {
fmt.Print("\n", err.Error())
return cliDist, errors.Wrap(err, "unable to fetch client distribution")
}

Expand Down Expand Up @@ -57,10 +66,14 @@ func (db *DBClient) GetVersionDistribution() (map[string]interface{}, error) {
count(client_version) as cnt
FROM peer_info
WHERE
deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL
deprecated = 'false' and
attempted = 'true' and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
GROUP BY client_name, client_version
ORDER BY client_name DESC, cnt DESC;
`,
LastActivityValidRange,
)
// make sure we close the rows and we free the connection/session
defer rows.Close()
Expand Down Expand Up @@ -99,11 +112,15 @@ func (db *DBClient) GetGeoDistribution() (map[string]interface{}, error) {
ips.country_code
FROM peer_info
RIGHT JOIN ips on peer_info.ip = ips.ip
WHERE deprecated = 'false' and attempted = 'true' and client_name IS NOT NULL
WHERE deprecated = 'false' and
attempted = 'true' and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
) as aux
GROUP BY country_code
ORDER BY cnt DESC;
`,
LastActivityValidRange,
)
// make sure we close the rows and we free the connection/session
defer rows.Close()
Expand Down Expand Up @@ -133,10 +150,15 @@ func (db *DBClient) GetOsDistribution() (map[string]interface{}, error) {
client_os,
count(client_os) as nodes
FROM peer_info
WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL
WHERE deprecated='false' and
attempted='true' and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
GROUP BY client_os
ORDER BY nodes DESC;
`)
`,
LastActivityValidRange,
)
if err != nil {
return summary, err
}
Expand All @@ -158,10 +180,15 @@ func (db *DBClient) GetArchDistribution() (map[string]interface{}, error) {
client_arch,
count(client_arch) as nodes
FROM peer_info
WHERE deprecated='false' and attempted='true' and client_name IS NOT NULL
WHERE deprecated='false' and
attempted='true' and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
GROUP BY client_arch
ORDER BY nodes DESC;
`)
`,
LastActivityValidRange,
)
if err != nil {
return summary, err
}
Expand Down Expand Up @@ -193,9 +220,15 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) {
ips.mobile
FROM peer_info as pi
INNER JOIN ips ON pi.ip=ips.ip
WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.mobile='true'
WHERE pi.deprecated='false' and
attempted = 'true' and
client_name IS NOT NULL and
ips.mobile='true' and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
) as aux
`).Scan(&mobile)
`,
LastActivityValidRange,
).Scan(&mobile)
if err != nil {
return summary, err
}
Expand All @@ -218,9 +251,14 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) {
ips.proxy
FROM peer_info as pi
INNER JOIN ips ON pi.ip=ips.ip
WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.proxy='true'
WHERE pi.deprecated='false' and
attempted = 'true' and
client_name IS NOT NULL and ips.proxy='true' and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
) as aux
`).Scan(&proxy)
`,
LastActivityValidRange,
).Scan(&proxy)
if err != nil {
return summary, err
}
Expand All @@ -243,9 +281,15 @@ func (db *DBClient) GetHostingDistribution() (map[string]interface{}, error) {
ips.hosting
FROM peer_info as pi
INNER JOIN ips ON pi.ip=ips.ip
WHERE pi.deprecated='false' and attempted = 'true' and client_name IS NOT NULL and ips.hosting='true'
WHERE pi.deprecated='false' and
attempted = 'true' and
client_name IS NOT NULL and
ips.hosting='true' and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
) as aux
`).Scan(&hosted)
`,
LastActivityValidRange,
).Scan(&hosted)
if err != nil {
return summary, err
}
Expand Down Expand Up @@ -278,12 +322,14 @@ func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) {
ELSE '+1s'
END as latency
FROM peer_info
WHERE deprecated=false and client_name IS NOT NULL
WHERE deprecated=false and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
) as t
GROUP BY t.latency
ORDER BY nodes DESC;

`,
LastActivityValidRange,
)
if err != nil {
return summary, err
Expand All @@ -295,11 +341,11 @@ func (db *DBClient) GetRTTDistribution() (map[string]interface{}, error) {
err = rows.Scan(
&rttRange,
&rttValue,
)
)
if err != nil {
return summary, err
return summary, err
}
summary[rttRange] = rttValue
summary[rttRange] = rttValue
}
return summary, nil
}
Expand All @@ -318,13 +364,16 @@ func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) {
ip,
count(ip) as nodes
FROM peer_info
WHERE deprecated = false and client_name IS NOT NULL
WHERE deprecated = false and
client_name IS NOT NULL and
to_timestamp(last_activity) > CURRENT_TIMESTAMP - ($1 * INTERVAL '1 DAY')
GROUP BY ip
ORDER BY nodes DESC
) as t
GROUP BY nodes
ORDER BY number_of_ips DESC;
`,
LastActivityValidRange,
)
if err != nil {
return summary, err
Expand All @@ -336,11 +385,11 @@ func (db *DBClient) GetIPDistribution() (map[string]interface{}, error) {
err = rows.Scan(
&nodesPerIP,
&ips,
)
)
if err != nil {
return summary, err
return summary, err
}
summary[fmt.Sprintf("%d", nodesPerIP)] = ips
summary[fmt.Sprintf("%d", nodesPerIP)] = ips
}
return summary, nil
}
2 changes: 1 addition & 1 deletion pkg/db/postgresql/peer_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package postgresql
import (
"time"

"github.com/jackc/pgx/v4"
pgx "github.com/jackc/pgx/v4"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/migalabs/armiarma/pkg/db/models"
"github.com/migalabs/armiarma/pkg/utils"
Expand Down
18 changes: 11 additions & 7 deletions pkg/db/postgresql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var (
noQueryResult string = "no result"
)


type DBClient struct {
// Control Variables
ctx context.Context
Expand All @@ -49,11 +48,11 @@ type DBClient struct {
}

func NewDBClient(
ctx context.Context,
p2pNetwork utils.NetworkType,
loginStr string,
dailyBackupInt time.Duration,
options ...DBOption) (*DBClient, error) {
ctx context.Context,
p2pNetwork utils.NetworkType,
loginStr string,
dailyBackupInt time.Duration,
options ...DBOption) (*DBClient, error) {
// check if the login string has enough len
if len(loginStr) == 0 {
return nil, errors.New("empty db-endpoint provided")
Expand Down Expand Up @@ -102,7 +101,7 @@ options ...DBOption) (*DBClient, error) {
persistC: persistC,
doneC: make(chan struct{}),
wg: &wg,
persistConnEvents: true,
persistConnEvents: true,
}

// Check for all the available options
Expand Down Expand Up @@ -336,6 +335,11 @@ func (c *DBClient) launchPersister() {
}

func (c *DBClient) dailyBackupheartbeat() {
// make a first backup of the active peers(if any)
err := c.activePeersBackup()
if err != nil {
log.Error(err)
}
ticker := time.NewTicker(c.dailyBackupInterval)
for {
select {
Expand Down
Loading