From f3018a6c4911b000d94c9d3b3bcf5e88922405a2 Mon Sep 17 00:00:00 2001 From: Charles Billette Date: Thu, 12 Sep 2024 11:24:56 -0400 Subject: [PATCH] Add price fetching and storing capabilities Introduced a new module for fetching historical prices from an external API and storing them in the database. This includes adding necessary database schema modifications, SQL queries, and background tracking logic in the main application. --- cmd/honey-tracker/main.go | 34 ++++++ data/psql.go | 26 +++++ data/sql.go | 4 + .../models/example/dbt_all_payments.sql | 21 ++-- .../models/example/dbt_payment_per_month.sql | 4 +- go.mod | 2 +- price/fetcher.go | 109 ++++++++++++++++++ price/models.go | 13 +++ 8 files changed, 200 insertions(+), 13 deletions(-) create mode 100644 price/fetcher.go create mode 100644 price/models.go diff --git a/cmd/honey-tracker/main.go b/cmd/honey-tracker/main.go index 71a1c5c..2e46480 100644 --- a/cmd/honey-tracker/main.go +++ b/cmd/honey-tracker/main.go @@ -6,6 +6,8 @@ import ( "os" "time" + "github.com/streamingfast/honey-tracker/price" + "github.com/spf13/cobra" "github.com/streamingfast/bstream" "github.com/streamingfast/cli/sflags" @@ -117,6 +119,12 @@ func rootRun(cmd *cobra.Command, args []string) error { sinker.OnTerminating(func(err error) { logger.Error("sinker terminating", zap.Error(err)) }) + + go func() { + err := trackPrice(db) + panic(err) + }() + err = sinker.Run(ctx) if err != nil { return fmt.Errorf("runnning sinker:%w", err) @@ -132,6 +140,32 @@ func main() { fmt.Println("Goodbye!") } +func trackPrice(db *data.Psql, logger *zap.Logger) error { + for { + from, _, err := db.FetchLastPrice() + if err != nil { + return fmt.Errorf("fetching last price: %w", err) + } + + prices := make(chan *price.HistoricalPrice) + go func() { + err := price.Fetch(from, time.Now(), prices, logger) + if err != nil { + panic(err) + } + }() + + for historicalPrice := range prices { + t := time.Unix(historicalPrice.UnixTime, 0) + err := db.InsertPrice(t, historicalPrice.Value) + if err != nil { + return fmt.Errorf("inserting price: %w", err) + } + } + time.Sleep(2 * time.Minute) + } +} + func checkError(err error) { if err != nil { panic(err) diff --git a/data/psql.go b/data/psql.go index 47a4808..1904e3d 100644 --- a/data/psql.go +++ b/data/psql.go @@ -4,6 +4,7 @@ import ( "database/sql" "errors" "fmt" + "time" "go.uber.org/zap" @@ -35,6 +36,7 @@ type PreparedStatement struct { insertMapConsumption *sql.Stmt insertBurn *sql.Stmt insertCursor *sql.Stmt + insertPrice *sql.Stmt } var preparedStatement *PreparedStatement @@ -155,6 +157,11 @@ func NewPostgreSQL(psqlInfo *PsqlInfo, logger *zap.Logger) *Psql { panic(err) } + insertPrice, err := db.Prepare("INSERT INTO hivemapper.prices (timestamp, price) VALUES ($1, $2) ON CONFLICT (timestamp) DO NOTHING") + if err != nil { + panic(err) + } + preparedStatement = &PreparedStatement{ insertMint: insertMing, insertTransaction: insertTransaction, @@ -177,6 +184,7 @@ func NewPostgreSQL(psqlInfo *PsqlInfo, logger *zap.Logger) *Psql { insertMapConsumption: insertMapConsumption, insertBurn: insertBurn, insertCursor: insertCursor, + insertPrice: insertPrice, } return &Psql{ @@ -577,6 +585,14 @@ func (p *Psql) insertBurns(dbTransactionID int64, burn *pb.Burn) (dbMintID int64 return } +func (p *Psql) InsertPrice(timestamp time.Time, price float64) error { + _, err := preparedStatement.insertPrice.Exec(timestamp, price) + if err != nil { + return fmt.Errorf("inserting price: %w", err) + } + return nil +} + func (p *Psql) HandleBurns(dbBlockID int64, burns []*pb.Burn) error { for _, burn := range burns { dbTransactionID, err := p.handleTransaction(dbBlockID, burn.TrxHash) @@ -636,6 +652,16 @@ func (p *Psql) FetchCursor() (*sink.Cursor, error) { return nil, nil } +func (p *Psql) FetchLastPrice() (timestamp time.Time, price float64, err error) { + row := p.db.QueryRow("SELECT timestamp, price FROM hivemapper.prices ORDER BY timestamp DESC LIMIT 1") + if row.Err() != nil { + return time.Now(), 0, fmt.Errorf("selecting price: %w", row.Err()) + } + + err = row.Scan(×tamp, &price) + return timestamp, price, err +} + func (p *Psql) BeginTransaction() error { tx, err := p.db.Begin() if err != nil { diff --git a/data/sql.go b/data/sql.go index 1d693d6..2010f90 100644 --- a/data/sql.go +++ b/data/sql.go @@ -13,6 +13,10 @@ CREATE TABLE IF NOT EXISTS hivemapper.blocks ( hash TEXT NOT NULL, timestamp TIMESTAMP NOT NULL ); +CREATE TABLE IF NOT EXISTS hivemapper.prices ( + timestamp TIMESTAMP PRIMARY KEY, + price DECIMAL NOT NULL +); CREATE TABLE IF NOT EXISTS hivemapper.transactions ( id SERIAL PRIMARY KEY, diff --git a/dbt/hivemapper/models/example/dbt_all_payments.sql b/dbt/hivemapper/models/example/dbt_all_payments.sql index ccbfd92..b281d99 100644 --- a/dbt/hivemapper/models/example/dbt_all_payments.sql +++ b/dbt/hivemapper/models/example/dbt_all_payments.sql @@ -4,7 +4,8 @@ indexes=[ {'columns': ['type']}, {'columns': ['block_number']}, - {'columns': ['timestamp']}, + {'columns': ['block_time']}, + {'columns': ['transaction_hash']}, {'columns': ['payee_address']}, ] ) @@ -16,7 +17,7 @@ select 'regular' as type, 0 as map_consumption_reward, 0 as operational_payment, 0 as fleet_payment, - b.timestamp, + b.timestamp as block_time, b.number as block_number, t.hash as trx_hash, m.to_address payee_address, @@ -37,7 +38,7 @@ select 'ai_payments' as type, 0 as map_consumption_reward, 0 as operational_payment, 0 as fleet_payment, - b.timestamp, + b.timestamp as block_time, b.number as block_number, t.hash as trx_hash, m.to_address payee_address, @@ -58,7 +59,7 @@ select 'map_consumption_reward' as type, m.amount as map_consumption_reward, 0 as operational_payment, 0 as fleet_payment, - b.timestamp, + b.timestamp as block_time, b.number as block_number, t.hash as trx_hash, m.to_address payee_address, @@ -79,7 +80,7 @@ select 'operational_payments' as type, 0 as map_consumption_reward, m.amount as operational_payment, 0 as fleet_payment, - b.timestamp, + b.timestamp as block_time, b.number as block_number, t.hash as trx_hash, m.to_address payee_address, @@ -100,11 +101,11 @@ select 'fleet_payments' as type, 0 as map_consumption_reward, 0 as operational_payment, m.amount as fleet_payment, - b.timestamp, - b.number as block_number, - t.hash as trx_hash, - m.to_address payee_address, - p.fleet_mint_id = m.id as is_fleet + b.timestamp as block_time, + b.number as block_number, + t.hash as trx_hash, + m.to_address as payee_address, + p.fleet_mint_id = m.id as is_fleet from hivemapper.blocks b inner join hivemapper.transactions t on t.block_id = b.id inner join hivemapper.mints m on m.transaction_id = t.id diff --git a/dbt/hivemapper/models/example/dbt_payment_per_month.sql b/dbt/hivemapper/models/example/dbt_payment_per_month.sql index 58a0829..ebbf41f 100644 --- a/dbt/hivemapper/models/example/dbt_payment_per_month.sql +++ b/dbt/hivemapper/models/example/dbt_payment_per_month.sql @@ -1,11 +1,11 @@ {{ config(materialized='table') }} select - DATE_TRUNC('month', p.timestamp) as month, + DATE_TRUNC('month', p.block_time) as month, sum(p.regular_payment) as regular, sum(p.ai_payment) as ai, sum(p.map_consumption_reward) as map_consumption, sum(p.operational_payment) as operational, sum(p.fleet_payment) as fleet from hivemapper.dbt_all_payments p -group by DATE_TRUNC('month', p.timestamp) \ No newline at end of file +group by DATE_TRUNC('month', p.block_time) \ No newline at end of file diff --git a/go.mod b/go.mod index 9a73d8f..78e47d0 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/streamingfast/honey-tracker -go 1.20 +go 1.22 require ( github.com/lib/pq v1.10.9 diff --git a/price/fetcher.go b/price/fetcher.go new file mode 100644 index 0000000..604acfe --- /dev/null +++ b/price/fetcher.go @@ -0,0 +1,109 @@ +package price + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "time" + + "go.uber.org/zap" +) + +type timeRange struct { + From time.Time `json:"from"` + To time.Time `json:"to"` +} + +func Fetch(fromTime time.Time, toTime time.Time, out chan *HistoricalPrice, logger *zap.Logger) error { + var chunks []*timeRange + timeDiff := toTime.Sub(fromTime) + chunksCount := timeDiff.Nanoseconds() / int64(24*time.Hour) + logger.Info("chunks count:", zap.Int64("count", chunksCount)) + + if chunksCount == 0 { + chunks = append(chunks, &timeRange{ + From: fromTime, + To: toTime, + }) + } else { + startTime := fromTime + for range chunksCount { + endTime := startTime.Add(24 * time.Hour) + chunks = append(chunks, &timeRange{ + From: startTime, + To: endTime, + }) + startTime = endTime + } + } + + for _, chunk := range chunks { + prices, err := fetch(chunk.From, chunk.To, "5m", logger) + if err != nil { + return fmt.Errorf("fetching chunk: %w", err) + } + firstPrice := prices[0] + fixed := false + for _, p := range prices { + if !fixed && p.UnixTime != chunk.From.Unix() { + diff := p.UnixTime - chunk.From.Unix() + fixCount := diff / int64(5*time.Minute) + for i := int64(0); i <= fixCount; i++ { + out <- &HistoricalPrice{ + UnixTime: chunk.From.Add(5 * time.Minute * time.Duration(i)).Unix(), + Value: firstPrice.Value, + } + } + fixed = true + } + out <- p + } + time.Sleep(100 * time.Millisecond) //prevent rate limit 13 request sec + } + + close(out) + + fmt.Println("price channel closed") + + return nil +} + +func fetch(fromTime time.Time, toTime time.Time, period string, logger *zap.Logger) ([]*HistoricalPrice, error) { + fmt.Println("fetching from:", fromTime, "to:", toTime) + client := &http.Client{} + url := fmt.Sprintf( + "https://public-api.birdeye.so/defi/history_price?address=4vMsoUT2BWatFweudnQM1xedRLfJgJ7hswhcpz4xgBTy&address_type=token&type=%s&time_from=%d&time_to=%d", + period, + fromTime.Unix(), + toTime.Unix(), + ) + logger.Info("calling:", zap.String("url", url)) + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, fmt.Errorf("creating request: %w", err) + } + + req.Header.Add("X-Api-Key", "c28c10a82a614930af83a48a3b47189d") + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("sending request: %w", err) + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading response: %w", err) + } + response := &Response{} + err = json.Unmarshal(body, response) + if err != nil { + return nil, fmt.Errorf("unmarshaling response: %w", err) + } + + if !response.Success { + return nil, fmt.Errorf("unsuccess request: %s", string(body)) + } + return response.Data.Items, err + +} diff --git a/price/models.go b/price/models.go new file mode 100644 index 0000000..fa0fcd3 --- /dev/null +++ b/price/models.go @@ -0,0 +1,13 @@ +package price + +type Response struct { + Data struct { + Items []*HistoricalPrice `json:"items"` + } `json:"data"` + Success bool `json:"success"` +} + +type HistoricalPrice struct { + UnixTime int64 `json:"unixTime"` + Value float64 `json:"value"` +}