Skip to content

Commit

Permalink
Add price fetching and storing capabilities
Browse files Browse the repository at this point in the history
Introduced a new module for fetching historical prices from an external API and storing them in the database. This includes adding necessary database schema modifications, SQL queries, and background tracking logic in the main application.
  • Loading branch information
billettc committed Sep 12, 2024
1 parent 672490e commit f3018a6
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 13 deletions.
34 changes: 34 additions & 0 deletions cmd/honey-tracker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"os"
"time"

"github.com/streamingfast/honey-tracker/price"

"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
"github.com/streamingfast/cli/sflags"
Expand Down Expand Up @@ -117,6 +119,12 @@ func rootRun(cmd *cobra.Command, args []string) error {
sinker.OnTerminating(func(err error) {
logger.Error("sinker terminating", zap.Error(err))
})

go func() {
err := trackPrice(db)
panic(err)
}()

err = sinker.Run(ctx)
if err != nil {
return fmt.Errorf("runnning sinker:%w", err)
Expand All @@ -132,6 +140,32 @@ func main() {
fmt.Println("Goodbye!")
}

func trackPrice(db *data.Psql, logger *zap.Logger) error {
for {
from, _, err := db.FetchLastPrice()
if err != nil {
return fmt.Errorf("fetching last price: %w", err)
}

prices := make(chan *price.HistoricalPrice)
go func() {
err := price.Fetch(from, time.Now(), prices, logger)
if err != nil {
panic(err)
}
}()

for historicalPrice := range prices {
t := time.Unix(historicalPrice.UnixTime, 0)
err := db.InsertPrice(t, historicalPrice.Value)
if err != nil {
return fmt.Errorf("inserting price: %w", err)
}
}
time.Sleep(2 * time.Minute)
}
}

func checkError(err error) {
if err != nil {
panic(err)
Expand Down
26 changes: 26 additions & 0 deletions data/psql.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"errors"
"fmt"
"time"

"go.uber.org/zap"

Expand Down Expand Up @@ -35,6 +36,7 @@ type PreparedStatement struct {
insertMapConsumption *sql.Stmt
insertBurn *sql.Stmt
insertCursor *sql.Stmt
insertPrice *sql.Stmt
}

var preparedStatement *PreparedStatement
Expand Down Expand Up @@ -155,6 +157,11 @@ func NewPostgreSQL(psqlInfo *PsqlInfo, logger *zap.Logger) *Psql {
panic(err)
}

insertPrice, err := db.Prepare("INSERT INTO hivemapper.prices (timestamp, price) VALUES ($1, $2) ON CONFLICT (timestamp) DO NOTHING")
if err != nil {
panic(err)
}

preparedStatement = &PreparedStatement{
insertMint: insertMing,
insertTransaction: insertTransaction,
Expand All @@ -177,6 +184,7 @@ func NewPostgreSQL(psqlInfo *PsqlInfo, logger *zap.Logger) *Psql {
insertMapConsumption: insertMapConsumption,
insertBurn: insertBurn,
insertCursor: insertCursor,
insertPrice: insertPrice,
}

return &Psql{
Expand Down Expand Up @@ -577,6 +585,14 @@ func (p *Psql) insertBurns(dbTransactionID int64, burn *pb.Burn) (dbMintID int64
return
}

func (p *Psql) InsertPrice(timestamp time.Time, price float64) error {
_, err := preparedStatement.insertPrice.Exec(timestamp, price)
if err != nil {
return fmt.Errorf("inserting price: %w", err)
}
return nil
}

func (p *Psql) HandleBurns(dbBlockID int64, burns []*pb.Burn) error {
for _, burn := range burns {
dbTransactionID, err := p.handleTransaction(dbBlockID, burn.TrxHash)
Expand Down Expand Up @@ -636,6 +652,16 @@ func (p *Psql) FetchCursor() (*sink.Cursor, error) {
return nil, nil
}

func (p *Psql) FetchLastPrice() (timestamp time.Time, price float64, err error) {
row := p.db.QueryRow("SELECT timestamp, price FROM hivemapper.prices ORDER BY timestamp DESC LIMIT 1")
if row.Err() != nil {
return time.Now(), 0, fmt.Errorf("selecting price: %w", row.Err())
}

err = row.Scan(&timestamp, &price)
return timestamp, price, err
}

func (p *Psql) BeginTransaction() error {
tx, err := p.db.Begin()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions data/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ CREATE TABLE IF NOT EXISTS hivemapper.blocks (
hash TEXT NOT NULL,
timestamp TIMESTAMP NOT NULL
);
CREATE TABLE IF NOT EXISTS hivemapper.prices (
timestamp TIMESTAMP PRIMARY KEY,
price DECIMAL NOT NULL
);
CREATE TABLE IF NOT EXISTS hivemapper.transactions (
id SERIAL PRIMARY KEY,
Expand Down
21 changes: 11 additions & 10 deletions dbt/hivemapper/models/example/dbt_all_payments.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
indexes=[
{'columns': ['type']},
{'columns': ['block_number']},
{'columns': ['timestamp']},
{'columns': ['block_time']},
{'columns': ['transaction_hash']},
{'columns': ['payee_address']},
]
)
Expand All @@ -16,7 +17,7 @@ select 'regular' as type,
0 as map_consumption_reward,
0 as operational_payment,
0 as fleet_payment,
b.timestamp,
b.timestamp as block_time,
b.number as block_number,
t.hash as trx_hash,
m.to_address payee_address,
Expand All @@ -37,7 +38,7 @@ select 'ai_payments' as type,
0 as map_consumption_reward,
0 as operational_payment,
0 as fleet_payment,
b.timestamp,
b.timestamp as block_time,
b.number as block_number,
t.hash as trx_hash,
m.to_address payee_address,
Expand All @@ -58,7 +59,7 @@ select 'map_consumption_reward' as type,
m.amount as map_consumption_reward,
0 as operational_payment,
0 as fleet_payment,
b.timestamp,
b.timestamp as block_time,
b.number as block_number,
t.hash as trx_hash,
m.to_address payee_address,
Expand All @@ -79,7 +80,7 @@ select 'operational_payments' as type,
0 as map_consumption_reward,
m.amount as operational_payment,
0 as fleet_payment,
b.timestamp,
b.timestamp as block_time,
b.number as block_number,
t.hash as trx_hash,
m.to_address payee_address,
Expand All @@ -100,11 +101,11 @@ select 'fleet_payments' as type,
0 as map_consumption_reward,
0 as operational_payment,
m.amount as fleet_payment,
b.timestamp,
b.number as block_number,
t.hash as trx_hash,
m.to_address payee_address,
p.fleet_mint_id = m.id as is_fleet
b.timestamp as block_time,
b.number as block_number,
t.hash as trx_hash,
m.to_address as payee_address,
p.fleet_mint_id = m.id as is_fleet
from hivemapper.blocks b
inner join hivemapper.transactions t on t.block_id = b.id
inner join hivemapper.mints m on m.transaction_id = t.id
Expand Down
4 changes: 2 additions & 2 deletions dbt/hivemapper/models/example/dbt_payment_per_month.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
{{ config(materialized='table') }}

select
DATE_TRUNC('month', p.timestamp) as month,
DATE_TRUNC('month', p.block_time) as month,
sum(p.regular_payment) as regular,
sum(p.ai_payment) as ai,
sum(p.map_consumption_reward) as map_consumption,
sum(p.operational_payment) as operational,
sum(p.fleet_payment) as fleet
from hivemapper.dbt_all_payments p
group by DATE_TRUNC('month', p.timestamp)
group by DATE_TRUNC('month', p.block_time)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/streamingfast/honey-tracker

go 1.20
go 1.22

require (
github.com/lib/pq v1.10.9
Expand Down
109 changes: 109 additions & 0 deletions price/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package price

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"

"go.uber.org/zap"
)

type timeRange struct {
From time.Time `json:"from"`
To time.Time `json:"to"`
}

func Fetch(fromTime time.Time, toTime time.Time, out chan *HistoricalPrice, logger *zap.Logger) error {
var chunks []*timeRange
timeDiff := toTime.Sub(fromTime)
chunksCount := timeDiff.Nanoseconds() / int64(24*time.Hour)
logger.Info("chunks count:", zap.Int64("count", chunksCount))

if chunksCount == 0 {
chunks = append(chunks, &timeRange{
From: fromTime,
To: toTime,
})
} else {
startTime := fromTime
for range chunksCount {
endTime := startTime.Add(24 * time.Hour)
chunks = append(chunks, &timeRange{
From: startTime,
To: endTime,
})
startTime = endTime
}
}

for _, chunk := range chunks {
prices, err := fetch(chunk.From, chunk.To, "5m", logger)
if err != nil {
return fmt.Errorf("fetching chunk: %w", err)
}
firstPrice := prices[0]
fixed := false
for _, p := range prices {
if !fixed && p.UnixTime != chunk.From.Unix() {
diff := p.UnixTime - chunk.From.Unix()
fixCount := diff / int64(5*time.Minute)
for i := int64(0); i <= fixCount; i++ {
out <- &HistoricalPrice{
UnixTime: chunk.From.Add(5 * time.Minute * time.Duration(i)).Unix(),
Value: firstPrice.Value,
}
}
fixed = true
}
out <- p
}
time.Sleep(100 * time.Millisecond) //prevent rate limit 13 request sec
}

close(out)

fmt.Println("price channel closed")

return nil
}

func fetch(fromTime time.Time, toTime time.Time, period string, logger *zap.Logger) ([]*HistoricalPrice, error) {
fmt.Println("fetching from:", fromTime, "to:", toTime)
client := &http.Client{}
url := fmt.Sprintf(
"https://public-api.birdeye.so/defi/history_price?address=4vMsoUT2BWatFweudnQM1xedRLfJgJ7hswhcpz4xgBTy&address_type=token&type=%s&time_from=%d&time_to=%d",
period,
fromTime.Unix(),
toTime.Unix(),
)
logger.Info("calling:", zap.String("url", url))
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, fmt.Errorf("creating request: %w", err)
}

req.Header.Add("X-Api-Key", "c28c10a82a614930af83a48a3b47189d")
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("sending request: %w", err)
}

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading response: %w", err)
}
response := &Response{}
err = json.Unmarshal(body, response)
if err != nil {
return nil, fmt.Errorf("unmarshaling response: %w", err)
}

if !response.Success {
return nil, fmt.Errorf("unsuccess request: %s", string(body))
}
return response.Data.Items, err

}
13 changes: 13 additions & 0 deletions price/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package price

type Response struct {
Data struct {
Items []*HistoricalPrice `json:"items"`
} `json:"data"`
Success bool `json:"success"`
}

type HistoricalPrice struct {
UnixTime int64 `json:"unixTime"`
Value float64 `json:"value"`
}

0 comments on commit f3018a6

Please sign in to comment.