diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 4b94e0b..4a1a481 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -5,6 +5,8 @@ + + @@ -12,11 +14,15 @@ + - + + + + - - + @@ -85,6 +90,7 @@ + diff --git a/db_models/base.go b/db_models/base.go new file mode 100644 index 0000000..a590c0c --- /dev/null +++ b/db_models/base.go @@ -0,0 +1,28 @@ +package db_models + +import ( + "io/ioutil" + "net/http" + "os" +) + +func GetPublicIP() (string, error) { + resp, err := http.Get("https://ifconfig.me") // important to get the public ip if possible. + if err != nil { + return "", err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return "", err + } + return string(body), nil +} + +func GetHostname() string { + hostname, err := os.Hostname() + if err != nil { + return "unknown" + } + return hostname +} diff --git a/db_models/content.go b/db_models/content.go index 4924856..31f7329 100644 --- a/db_models/content.go +++ b/db_models/content.go @@ -1,9 +1,7 @@ package db_models import ( - "bytes" "encoding/json" - "fmt" "gorm.io/gorm" "time" ) @@ -24,22 +22,44 @@ type Content struct { func (u *Content) AfterSave(tx *gorm.DB) (err error) { - // log this on the event log table - messageBytes, err := json.Marshal(u) - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "Content", - LogEventObject: messageBytes, - LogEventId: u.ID, - LogEvent: fmt.Sprintf("Content %d saved", u.ID), - Collected: false, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - }) - return -} + var contentFromDb Content + tx.Model(&Content{}).Where("id = ?", u.ID).First(&contentFromDb) + + if contentFromDb.ID == 0 { + return + } + // get instance info + ip, err := GetPublicIP() + if err != nil { + return + } + + log := ContentLog{ + Name: contentFromDb.Name, + Size: contentFromDb.Size, + Cid: contentFromDb.Cid, + RequestingApiKey: contentFromDb.RequestingApiKey, + PieceCommitmentId: contentFromDb.PieceCommitmentId, + Status: contentFromDb.Status, + ConnectionMode: contentFromDb.ConnectionMode, + LastMessage: contentFromDb.LastMessage, + NodeInfo: GetHostname(), + RequesterInfo: ip, + SystemContentId: contentFromDb.ID, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } -func Transcode(in, out interface{}) { - buf := new(bytes.Buffer) - json.NewEncoder(buf).Encode(in) - json.NewDecoder(buf).Decode(out) + deltaMetricsBaseMessage := DeltaMetricsBaseMessage{ + ObjectType: "ContentLog", + Object: log, + } + + messageBytes, err := json.Marshal(deltaMetricsBaseMessage) + if err != nil { + return err + } + producer.Publish(messageBytes) + + return } diff --git a/db_models/content_deal.go b/db_models/content_deal.go index d1326fc..5eec60f 100644 --- a/db_models/content_deal.go +++ b/db_models/content_deal.go @@ -30,15 +30,54 @@ type ContentDeal struct { UpdatedAt time.Time `json:"updated_at"` } -func (u *ContentDeal) AfterSave(tx *gorm.DB) (err error) { - messageBytes, err := json.Marshal(u) - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "ContentDeal", - LogEventObject: messageBytes, - LogEventId: u.ID, - Collected: false, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - }) +func (u *ContentDeal) AfterCreate(tx *gorm.DB) (err error) { + + var contentDealLog ContentDeal + tx.Model(&ContentDeal{}).Where("id = ?", u.ID).First(&contentDealLog) + + if contentDealLog.ID == 0 { + return + } + // get instance info + ip, err := GetPublicIP() + if err != nil { + return + } + + log := ContentDealLog{ + Content: contentDealLog.Content, + PropCid: contentDealLog.PropCid, + DealUUID: contentDealLog.DealUUID, + Miner: contentDealLog.Miner, + DealID: contentDealLog.DealID, + Failed: contentDealLog.Failed, + Verified: contentDealLog.Verified, + Slashed: contentDealLog.Slashed, + FailedAt: contentDealLog.FailedAt, + DTChan: contentDealLog.DTChan, + TransferStarted: contentDealLog.TransferStarted, + TransferFinished: contentDealLog.TransferFinished, + OnChainAt: contentDealLog.OnChainAt, + SealedAt: contentDealLog.SealedAt, + LastMessage: contentDealLog.LastMessage, + DealProtocolVersion: contentDealLog.DealProtocolVersion, + MinerVersion: contentDealLog.MinerVersion, + NodeInfo: GetHostname(), + RequesterInfo: ip, + SystemContentDealId: contentDealLog.ID, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + deltaMetricsBaseMessage := DeltaMetricsBaseMessage{ + ObjectType: "ContentDealLog", + Object: log, + } + + messageBytes, err := json.Marshal(deltaMetricsBaseMessage) + if err != nil { + return err + } + producer.Publish(messageBytes) return } diff --git a/db_models/content_deal_proposal.go b/db_models/content_deal_proposal.go index f988e10..619c22c 100644 --- a/db_models/content_deal_proposal.go +++ b/db_models/content_deal_proposal.go @@ -16,15 +16,40 @@ type ContentDealProposal struct { UpdatedAt time.Time `json:"updated_at"` } -func (u *ContentDealProposal) AfterSave(tx *gorm.DB) (err error) { - messageBytes, err := json.Marshal(u) - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "ContentDealProposal", - LogEventObject: messageBytes, - LogEventId: u.ID, - Collected: false, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - }) +func (u *ContentDealProposal) AfterCreate(tx *gorm.DB) (err error) { + + var contentDealProposal ContentDealProposal + tx.Model(&ContentDealProposal{}).Where("id = ?", u.ID).First(&contentDealProposal) + + if contentDealProposal.ID == 0 { + return + } + + // get instance info + ip, err := GetPublicIP() + if err != nil { + return + } + + log := ContentDealProposalLog{ + Content: contentDealProposal.Content, + Unsigned: contentDealProposal.Unsigned, + Signed: contentDealProposal.Signed, + Meta: contentDealProposal.Meta, + NodeInfo: GetHostname(), + RequesterInfo: ip, + SystemContentDealProposalId: u.ID, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + deltaMetricsBaseMessage := DeltaMetricsBaseMessage{ + ObjectType: "ContentDealProposalLog", + Object: log, + } + + messageBytes, err := json.Marshal(deltaMetricsBaseMessage) + producer.Publish(messageBytes) + return } diff --git a/db_models/content_deal_proposal_parameters.go b/db_models/content_deal_proposal_parameters.go index 5c67195..d5314dc 100644 --- a/db_models/content_deal_proposal_parameters.go +++ b/db_models/content_deal_proposal_parameters.go @@ -20,15 +20,43 @@ type ContentDealProposalParameters struct { UpdatedAt time.Time `json:"updated_at" json:"updated-at"` } -func (u *ContentDealProposalParameters) AfterSave(tx *gorm.DB) (err error) { - messageBytes, err := json.Marshal(u) - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "ContentDealProposalParameters", - LogEventObject: messageBytes, - LogEventId: u.ID, - Collected: false, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - }) +func (u *ContentDealProposalParameters) AfterCreate(tx *gorm.DB) (err error) { + + var contentDealProposalParams ContentDealProposalParameters + tx.Model(&ContentDealProposalParameters{}).Where("id = ?", u.ID).First(&contentDealProposalParams) + + if contentDealProposalParams.ID == 0 { + return + } + + // get instance info + ip, err := GetPublicIP() + if err != nil { + return + } + log := ContentDealProposalParametersLog{ + Content: contentDealProposalParams.Content, + Label: contentDealProposalParams.Label, + Duration: contentDealProposalParams.Duration, + StartEpoch: contentDealProposalParams.StartEpoch, + EndEpoch: contentDealProposalParams.EndEpoch, + TransferParams: contentDealProposalParams.TransferParams, + RemoveUnsealedCopy: contentDealProposalParams.RemoveUnsealedCopy, + SkipIPNIAnnounce: contentDealProposalParams.SkipIPNIAnnounce, + NodeInfo: GetHostname(), + RequesterInfo: ip, + SystemContentDealProposalParametersId: u.ID, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + deltaMetricsBaseMessage := DeltaMetricsBaseMessage{ + ObjectType: "ContentDealProposalParametersLog", + Object: log, + } + + messageBytes, err := json.Marshal(deltaMetricsBaseMessage) + producer.Publish(messageBytes) + return } diff --git a/db_models/content_miner.go b/db_models/content_miner.go index 7fb896e..0f44a32 100644 --- a/db_models/content_miner.go +++ b/db_models/content_miner.go @@ -14,20 +14,38 @@ type ContentMiner struct { UpdatedAt time.Time `json:"updated_at"` } -func (u *ContentMiner) AfterSave(tx *gorm.DB) (err error) { +func (u *ContentMiner) AfterCreate(tx *gorm.DB) (err error) { - if u.Miner == "" { + var contentMiner ContentMiner + tx.Model(&ContentMiner{}).Where("id = ?", u.ID).First(&contentMiner) + + if contentMiner.ID == 0 { return } - messageBytes, err := json.Marshal(u) - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "ContentMiner", - LogEventObject: messageBytes, - LogEventId: u.ID, - Collected: false, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - }) + // get instance info + ip, err := GetPublicIP() + if err != nil { + return + } + + log := ContentMinerLog{ + Content: contentMiner.Content, + Miner: contentMiner.Miner, + NodeInfo: GetHostname(), + RequesterInfo: ip, + SystemContentMinerId: u.ID, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + deltaMetricsBaseMessage := DeltaMetricsBaseMessage{ + ObjectType: "ContentMinerLog", + Object: log, + } + + messageBytes, err := json.Marshal(deltaMetricsBaseMessage) + producer.Publish(messageBytes) + return } diff --git a/db_models/content_wallet.go b/db_models/content_wallet.go index 95dc507..0287c77 100644 --- a/db_models/content_wallet.go +++ b/db_models/content_wallet.go @@ -14,20 +14,37 @@ type ContentWallet struct { UpdatedAt time.Time `json:"updated_at"` } -func (u *ContentWallet) AfterSave(tx *gorm.DB) (err error) { +func (u *ContentWallet) AfterCreate(tx *gorm.DB) (err error) { - if u.Wallet == "" { + var contentWallet ContentWallet + tx.Model(&ContentWallet{}).Where("id = ?", u.ID).First(&contentWallet) + + if contentWallet.ID == 0 { + return + } + + // get instance info + ip, err := GetPublicIP() + if err != nil { return } + log := ContentWalletLog{ + Content: contentWallet.Content, + Wallet: contentWallet.Wallet, + NodeInfo: GetHostname(), + RequesterInfo: ip, + SystemContentWalletId: u.ID, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + deltaMetricsBaseMessage := DeltaMetricsBaseMessage{ + ObjectType: "ContentWalletLog", + Object: log, + } + + messageBytes, err := json.Marshal(deltaMetricsBaseMessage) + producer.Publish(messageBytes) - messageBytes, err := json.Marshal(u) - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "ContentWallet", - LogEventObject: messageBytes, - LogEventId: u.ID, - Collected: false, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - }) return } diff --git a/db_models/database.go b/db_models/database.go index 7e2814e..2ab7546 100644 --- a/db_models/database.go +++ b/db_models/database.go @@ -2,6 +2,7 @@ package db_models import ( "fmt" + "github.com/application-research/delta-db/messaging" "gorm.io/gorm/logger" "time" @@ -10,6 +11,17 @@ import ( "gorm.io/gorm" ) +var producer *messaging.DeltaMetricsMessageProducer + +type DeltaMetricsBaseMessage struct { + ObjectType string `json:"object_type"` + Object interface{} `json:"object"` +} + +func init() { + producer = messaging.NewDeltaMetricsMessageProducer() +} + func OpenDatabase(dbDsn string) (*gorm.DB, error) { // use postgres var DB *gorm.DB @@ -35,7 +47,7 @@ func OpenDatabase(dbDsn string) (*gorm.DB, error) { } func ConfigureModels(db *gorm.DB) { - db.AutoMigrate(&Content{}, &ContentDeal{}, &PieceCommitment{}, &MinerInfo{}, &MinerPrice{}, &LogEvent{}, &ContentMiner{}, &ProcessContentCounter{}, &ContentWallet{}, &ContentDealProposalParameters{}, &Wallet{}, &ContentDealProposal{}, &InstanceMeta{}, &RetryDealCount{}) + db.AutoMigrate(&Content{}, &ContentDeal{}, &PieceCommitment{}, &MinerInfo{}, &MinerPrice{}, &messaging.LogEvent{}, &ContentMiner{}, &ProcessContentCounter{}, &ContentWallet{}, &ContentDealProposalParameters{}, &Wallet{}, &ContentDealProposal{}, &InstanceMeta{}, &RetryDealCount{}) } type ProcessContentCounter struct { @@ -70,17 +82,6 @@ type MinerPrice struct { UpdatedAt time.Time `json:"updated_at"` } -type Wallet struct { - ID int64 `gorm:"primaryKey"` - UuId string `json:"uuid"` - Addr string `json:"addr"` - Owner string `json:"owner"` - KeyType string `json:"key_type"` - PrivateKey string `json:"private_key"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` -} - type AdminUser struct { ID int64 `gorm:"primaryKey"` Username string `json:"username"` diff --git a/db_models/instance_meta.go b/db_models/instance_meta.go index 8137418..cbab5c3 100644 --- a/db_models/instance_meta.go +++ b/db_models/instance_meta.go @@ -1,14 +1,16 @@ package db_models import ( - "encoding/json" - "gorm.io/gorm" "time" ) type InstanceMeta struct { // gorm id ID int64 `gorm:"primary_key" json:"id"` + InstanceHostName string `json:"instance_host_name"` + InstanceNodeName string `json:"instance_node_name"` + OSDetails string `json:"os_details"` + PublicIp string `json:"public_ip"` MemoryLimit uint64 `json:"memory_limit"` CpuLimit uint64 `json:"cpu_limit"` StorageLimit uint64 `json:"storage_limit"` @@ -28,25 +30,3 @@ type InstanceMeta struct { CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } - -func (u *InstanceMeta) BeforeSave(tx *gorm.DB) (err error) { - return -} - -func (u *InstanceMeta) BeforeCreate(tx *gorm.DB) (err error) { - return -} - -func (u *InstanceMeta) AfterSave(tx *gorm.DB) (err error) { - // log this on the event log table - messageBytes, err := json.Marshal(u) - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "InstanceMeta", - LogEventObject: messageBytes, - LogEventId: u.ID, - Collected: false, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - }) - return -} diff --git a/db_models/log_event.go b/db_models/log_event.go index af66260..297ebb4 100644 --- a/db_models/log_event.go +++ b/db_models/log_event.go @@ -1,46 +1 @@ package db_models - -import ( - "encoding/json" - "fmt" - "github.com/application-research/delta-db/messaging" - "gorm.io/gorm" - "time" -) - -var producer *messaging.DeltaMetricsMessageProducer - -type DeltaMetricsBaseMessage struct { - ObjectType string `json:"object_type"` - Object interface{} `json:"object"` -} - -func init() { - producer = messaging.NewDeltaMetricsMessageProducer() -} - -// time series log events -type LogEvent struct { - ID int64 `gorm:"primaryKey"` // auto increment - LogEventType string `json:"log_event_type"` // content, deal, piece_commitment, upload, miner, info - LogEventObject []byte `json:"event_object"` - LogEventId int64 `json:"log_event_id"` // object id - LogEvent string `json:"log_event"` // description - Collected bool `json:"collected"` // has this event been collected by the collector? - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} - -func (u *LogEvent) AfterCreate(tx *gorm.DB) (err error) { - // send to collector.. - fmt.Println("LogEvent AfterSave -- time to send it to the collector!!") - deltaMetricsBaseMessage := DeltaMetricsBaseMessage{ - ObjectType: "LogEvent", - Object: u, - } - messageBytes, err := json.Marshal(deltaMetricsBaseMessage) - producer.Publish(messageBytes) - // update the log event to say it has been collected - - return -} diff --git a/db_models/log_models.go b/db_models/log_models.go new file mode 100644 index 0000000..73e6d44 --- /dev/null +++ b/db_models/log_models.go @@ -0,0 +1,267 @@ +package db_models + +import ( + "time" +) + +// +//type LogEvent struct { +// ID int64 `gorm:"primaryKey"` // auto increment +// LogEventType string `json:"log_event"` // content, deal, piece_commitment, upload, miner, info +// LogEventObject string `json:"event_object"` +// LogEventId int64 `json:"log_event_id"` // object id +// LogEvent string `json:"log_event"` // description +// CreatedAt time.Time `json:"created_at"` // auto set +// UpdatedAt time.Time `json:"updated_at"` +//} + +// action events +type DeltaStartupLogs struct { + ID int64 `gorm:"primaryKey"` // auto increment + NodeInfo string `json:"node_info"` + OSDetails string `json:"os_details"` + IPAddress string `json:"ip_address"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type DealEndpointRequestLog struct { + ID int64 `gorm:"primaryKey"` // auto increment + NodeInfo string `json:"node_info"` + Information string `json:"information"` +} + +type DealContentRequestLog struct { + ID int64 `gorm:"primaryKey"` // auto increment + NodeInfo string `json:"node_info"` + RequestingApiKey string `json:"requesting_api_key"` + EventMessage string `json:"event_message"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type DealPieceCommitmentRequestLog struct { + ID int64 `gorm:"primaryKey"` // auto increment + NodeInfo string `json:"node_info"` + RequestingApiKey string `json:"requesting_api_key"` + EventMessage string `json:"event_message"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type ContentPrepareLog struct { + ID int64 `gorm:"primaryKey"` // auto increment + NodeInfo string `json:"node_info"` + RequestingApiKey string `json:"requesting_api_key"` + EventMessage string `json:"event_message"` + ContentDealProposal ContentDealProposal `json:"content_deal_proposal"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} +type ContentAnnounceLog struct { + ID int64 `gorm:"primaryKey"` // auto increment + NodeInfo string `json:"node_info"` + RequestingApiKey string `json:"requesting_api_key"` + EventMessage string `json:"event_message"` + ContentDealProposal ContentDealProposal `json:"content_deal_proposal"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type OpenStatsLog struct { + ID int64 `gorm:"primaryKey"` // auto increment + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type RepairRequestLog struct { + ID int64 `gorm:"primaryKey"` // auto increment + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + EventMessage string `json:"event_message"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type NodeRequestLog struct { + ID int64 `gorm:"primaryKey"` // auto increment + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +// job events +type PieceCommitmentJobLog struct { + ID int64 `gorm:"primaryKey"` // auto increment + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type StorageDealMakeJobLog struct { + ID int64 `gorm:"primaryKey"` // auto increment + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type DataTransferStatusJobLog struct { + ID int64 `gorm:"primaryKey"` // auto increment + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type InstanceMetaJobLog struct { + ID int64 + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +// ContentLog time series log events +type ContentLog struct { + ID int64 `gorm:"primaryKey"` // auto increment + Name string `json:"name"` + Size int64 `json:"size"` + Cid string `json:"cid"` + RequestingApiKey string `json:"requesting_api_key,omitempty"` + PieceCommitmentId int64 `json:"piece_commitment_id,omitempty"` + Status string `json:"status"` + ConnectionMode string `json:"connection_mode"` // offline or online + LastMessage string `json:"last_message"` + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + SystemContentId int64 `json:"system_content_id"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +// ContentDealLog time series content deal events +type ContentDealLog struct { + ID int64 `gorm:"primaryKey"` + Content int64 `json:"content" gorm:"index:,option:CONCURRENTLY"` + PropCid string `json:"propCid"` + DealUUID string `json:"dealUuid"` + Miner string `json:"miner"` + DealID int64 `json:"dealId"` + Failed bool `json:"failed"` + Verified bool `json:"verified"` + Slashed bool `json:"slashed"` + FailedAt time.Time `json:"failedAt,omitempty"` + DTChan string `json:"dtChan" gorm:"index"` + TransferStarted time.Time `json:"transferStarted"` + TransferFinished time.Time `json:"transferFinished"` + OnChainAt time.Time `json:"onChainAt"` + SealedAt time.Time `json:"sealedAt"` + LastMessage string `json:"lastMessage"` + DealProtocolVersion string `json:"deal_protocol_version"` + MinerVersion string `json:"miner_version,omitempty"` + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + SystemContentDealId int64 `json:"system_content_deal_id"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type ContentMinerLog struct { + ID int64 `gorm:"primaryKey"` + Content int64 `json:"content" gorm:"index:,option:CONCURRENTLY"` + Miner string `json:"miner"` + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + SystemContentMinerId int64 `json:"system_content_miner_id"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type ContentWalletLog struct { + ID int64 `gorm:"primaryKey"` + Content int64 `json:"content" gorm:"index:,option:CONCURRENTLY"` + Wallet string `json:"wallet_meta"` + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + SystemContentWalletId int64 `json:"system_content_miner_id"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type PieceCommitmentLog struct { + ID int64 `gorm:"primaryKey"` + Cid string `json:"cid"` + Piece string `json:"piece"` + Size int64 `json:"size"` + PaddedPieceSize uint64 `json:"padded_piece_size"` + UnPaddedPieceSize uint64 `json:"unnpadded_piece_size"` + Status string `json:"status"` // open, in-progress, completed (closed). + LastMessage string `json:"last_message"` + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + SystemContentPieceCommitmentId int64 `json:"system_content_piece_commitment_id"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type ContentDealProposalLog struct { + ID int64 `gorm:"primaryKey"` + Content int64 `json:"content" gorm:"index:,option:CONCURRENTLY"` + Unsigned string `json:"unsigned"` + Signed string `json:"signed"` + Meta string `json:"meta"` + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + SystemContentDealProposalId int64 `json:"system_content_deal_proposal_id"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type ContentDealProposalParametersLog struct { + ID int64 `gorm:"primaryKey"` + Content int64 `json:"content" gorm:"index:,option:CONCURRENTLY"` + Label string `json:"label,omitempty"` + Duration int64 `json:"duration,omitempty"` + StartEpoch int64 `json:"start_epoch,omitempty"` + EndEpoch int64 `json:"end_epoch,omitempty"` + TransferParams string `json:"transfer_params,omitempty"` + RemoveUnsealedCopy bool `json:"remove_unsealed_copy,omitempty"` + SkipIPNIAnnounce bool `json:"skip_ipni_announce,omitempty"` + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + SystemContentDealProposalParametersId int64 `json:"system_content_deal_proposal_id"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + +type WalletLog struct { + ID int64 `gorm:"primaryKey"` + UuId string `json:"uuid"` + Addr string `json:"addr"` + Owner string `json:"owner"` + KeyType string `json:"key_type"` + PrivateKey string `json:"private_key"` + NodeInfo string `json:"node_info"` + RequesterInfo string `json:"requester_info"` + RequestingApiKey string `json:"requesting_api_key"` + SystemWalletId int64 `json:"system_content_deal_proposal_id"` + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} diff --git a/db_models/piece_commitment.go b/db_models/piece_commitment.go index 2b52ebe..d471a7d 100644 --- a/db_models/piece_commitment.go +++ b/db_models/piece_commitment.go @@ -2,7 +2,6 @@ package db_models import ( "encoding/json" - "fmt" "gorm.io/gorm" "time" ) @@ -20,38 +19,42 @@ type PieceCommitment struct { UpdatedAt time.Time `json:"updated_at"` } -func (u *PieceCommitment) BeforeSave(tx *gorm.DB) (err error) { - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "ContentMiner Save", - LogEventId: u.ID, - LogEvent: fmt.Sprintf("ContentMiner %d saved", u.ID), - CreatedAt: time.Time{}, - UpdatedAt: time.Time{}, - }) - return -} +func (u *PieceCommitment) AfterCreate(tx *gorm.DB) (err error) { -func (u *PieceCommitment) BeforeCreate(tx *gorm.DB) (err error) { - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "ContentMiner Create", - LogEventId: u.ID, - LogEvent: fmt.Sprintf("ContentMiner %d create", u.ID), - CreatedAt: time.Time{}, - UpdatedAt: time.Time{}, - }) - return -} + var pieceComm PieceCommitment + tx.Model(&PieceCommitment{}).Where("id = ?", u.ID).First(&pieceComm) + + if pieceComm.ID == 0 { + return + } + + // get instance info + ip, err := GetPublicIP() + if err != nil { + return + } + log := PieceCommitmentLog{ + Cid: pieceComm.Cid, + Piece: pieceComm.Piece, + Size: pieceComm.Size, + PaddedPieceSize: pieceComm.PaddedPieceSize, + UnPaddedPieceSize: pieceComm.UnPaddedPieceSize, + Status: pieceComm.Status, + LastMessage: pieceComm.LastMessage, + NodeInfo: GetHostname(), + RequesterInfo: ip, + SystemContentPieceCommitmentId: u.ID, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + deltaMetricsBaseMessage := DeltaMetricsBaseMessage{ + ObjectType: "PieceCommitmentLog", + Object: log, + } + + messageBytes, err := json.Marshal(deltaMetricsBaseMessage) + producer.Publish(messageBytes) -func (u *PieceCommitment) AfterSave(tx *gorm.DB) (err error) { - // log this on the event log table - messageBytes, err := json.Marshal(u) - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "PieceCommitment", - LogEventObject: messageBytes, - LogEventId: u.ID, - Collected: false, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - }) return } diff --git a/db_models/repair_request.go b/db_models/repair_request.go index 94c3215..e4a8014 100644 --- a/db_models/repair_request.go +++ b/db_models/repair_request.go @@ -1,8 +1,6 @@ package db_models import ( - "fmt" - "gorm.io/gorm" "time" ) @@ -13,36 +11,3 @@ type RepairRequest struct { CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } - -func (u *RepairRequest) BeforeSave(tx *gorm.DB) (err error) { - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "Content Save", - LogEventId: u.ID, - LogEvent: fmt.Sprintf("Content %d saved", u.ID), - CreatedAt: time.Time{}, - UpdatedAt: time.Time{}, - }) - return -} - -func (u *RepairRequest) BeforeCreate(tx *gorm.DB) (err error) { - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "Content Create", - LogEventId: u.ID, - LogEvent: fmt.Sprintf("Content %d create", u.ID), - CreatedAt: time.Time{}, - UpdatedAt: time.Time{}, - }) - return -} - -func (u *RepairRequest) AfterSave(tx *gorm.DB) (err error) { - tx.Model(&LogEvent{}).Save(&LogEvent{ - LogEventType: "After Content Save", - LogEventId: u.ID, - LogEvent: fmt.Sprintf("After content %d saved", u.ID), - CreatedAt: time.Now(), - UpdatedAt: time.Now(), - }) - return -} diff --git a/db_models/wallet.go b/db_models/wallet.go new file mode 100644 index 0000000..c9e4724 --- /dev/null +++ b/db_models/wallet.go @@ -0,0 +1,59 @@ +package db_models + +import ( + "encoding/json" + "gorm.io/gorm" + "time" +) + +// time series log events +type Wallet struct { + ID int64 `gorm:"primaryKey"` + UuId string `json:"uuid"` + Addr string `json:"addr"` + Owner string `json:"owner"` + KeyType string `json:"key_type"` + PrivateKey string `json:"private_key"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +func (u *Wallet) AfterCreate(tx *gorm.DB) (err error) { + var walletFromDb Wallet + tx.Model(&Wallet{}).Where("id = ?", u.ID).First(&walletFromDb) + + if walletFromDb.ID == 0 { + return + } + // get instance info + ip, err := GetPublicIP() + if err != nil { + return + } + + log := WalletLog{ + UuId: walletFromDb.UuId, + Addr: walletFromDb.Addr, + Owner: walletFromDb.Owner, + KeyType: "REDACTED", + PrivateKey: "REDACTED", + NodeInfo: GetHostname(), + RequesterInfo: ip, + SystemWalletId: u.ID, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + deltaMetricsBaseMessage := DeltaMetricsBaseMessage{ + ObjectType: "WalletLog", + Object: log, + } + + messageBytes, err := json.Marshal(deltaMetricsBaseMessage) + if err != nil { + return err + } + producer.Publish(messageBytes) + + return +} diff --git a/event_models/base.go b/event_models/base.go deleted file mode 100644 index bbf5b47..0000000 --- a/event_models/base.go +++ /dev/null @@ -1 +0,0 @@ -package event_models diff --git a/event_models/log_event.go b/event_models/log_event.go deleted file mode 100644 index c39400c..0000000 --- a/event_models/log_event.go +++ /dev/null @@ -1,186 +0,0 @@ -package event_models - -import ( - "github.com/application-research/delta-db/db_models" - "time" -) - -// -//type LogEvent struct { -// ID int64 `gorm:"primaryKey"` // auto increment -// LogEventType string `json:"log_event"` // content, deal, piece_commitment, upload, miner, info -// LogEventObject string `json:"event_object"` -// LogEventId int64 `json:"log_event_id"` // object id -// LogEvent string `json:"log_event"` // description -// CreatedAt time.Time `json:"created_at"` // auto set -// UpdatedAt time.Time `json:"updated_at"` -//} - -// action events -type DeltaStartupLogs struct { - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - OSDetails string `json:"os_details"` - IPAddress string `json:"ip_address"` - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} - -type DealEndpointRequestLog struct { - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - Information string `json:"information"` -} - -type DealContentRequestLog struct { - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequestingApiKey string `json:"requesting_api_key"` - EventMessage string `json:"event_message"` - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} - -type DealPieceCommitmentRequestLog struct { - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequestingApiKey string `json:"requesting_api_key"` - EventMessage string `json:"event_message"` - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} - -type ContentPrepareLog struct { - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequestingApiKey string `json:"requesting_api_key"` - EventMessage string `json:"event_message"` - ContentDealProposal db_models.ContentDealProposal `json:"content_deal_proposal"` - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} -type ContentAnnounceLog struct { - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequestingApiKey string `json:"requesting_api_key"` - EventMessage string `json:"event_message"` - ContentDealProposal db_models.ContentDealProposal `json:"content_deal_proposal"` - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} - -type OpenStatsLog struct { - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} - -type RepairRequestLog struct { - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - RequestingApiKey string `json:"requesting_api_key"` - EventMessage string `json:"event_message"` - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} - -type NodeRequestLog struct { - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - RequestingApiKey string `json:"requesting_api_key"` - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} - -// job events -type PieceCommitmentJobLog struct { - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - RequestingApiKey string `json:"requesting_api_key"` - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} - -type StorageDealMakeJobLog struct { - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - RequestingApiKey string `json:"requesting_api_key"` - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} - -type DataTransferStatusJobLog struct { - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - RequestingApiKey string `json:"requesting_api_key"` - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} - -type InstanceMetaJobLog struct { - ID int64 - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - RequestingApiKey string `json:"requesting_api_key"` - CreatedAt time.Time `json:"created_at"` // auto set - UpdatedAt time.Time `json:"updated_at"` -} - -// ContentLog time series log events -type ContentLog struct { - db_models.Content - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - SystemContentId int64 `json:"system_content_id"` -} - -// ContentDealLog time series content deal events -type ContentDealLog struct { - db_models.ContentDeal - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - RequestingApiKey string `json:"requesting_api_key"` - SystemContentDealId int64 `json:"system_content_deal_id"` -} - -type ContentMinerLog struct { - db_models.ContentMiner - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - RequestingApiKey string `json:"requesting_api_key"` - SystemContentMinerId int64 `json:"system_content_miner_id"` -} - -type PieceCommitmentLog struct { - db_models.PieceCommitment - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - RequestingApiKey string `json:"requesting_api_key"` - SystemContentPieceCommitmentId int64 `json:"system_content_piece_commitment_id"` -} - -type ContentDealProposalLog struct { - db_models.ContentDealProposal - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - RequestingApiKey string `json:"requesting_api_key"` - SystemContentDealProposalId int64 `json:"system_content_deal_proposal_id"` -} - -type ContentDealProposalParameter struct { - db_models.ContentDealProposalParameters - ID int64 `gorm:"primaryKey"` // auto increment - NodeInfo string `json:"node_info"` - RequesterInfo string `json:"requester_info"` - RequestingApiKey string `json:"requesting_api_key"` - SystemContentDealProposalId int64 `json:"system_content_deal_proposal_id"` -} diff --git a/messaging/base.go b/messaging/base.go index 6c0d618..dcb2369 100644 --- a/messaging/base.go +++ b/messaging/base.go @@ -1,10 +1,14 @@ package messaging -import "encoding/json" +import ( + "encoding/json" + "time" +) var ( MetricsTopicUrl = "145.40.77.207:4150" PrimaryTopic = "delta-metric-events" + PrimaryChannel = "delta-logs" producer *DeltaMetricsMessageProducer ) @@ -16,6 +20,16 @@ type DeltaMetricsBaseMessage struct { Object interface{} `json:"object"` } +type LogEvent struct { + ID int64 `gorm:"primaryKey"` // auto increment + LogEventType string `json:"log_event_type"` // content, deal, piece_commitment, upload, miner, info + LogEventObject []byte `json:"event_object"` + LogEventId int64 `json:"log_event_id"` // object id + LogEvent string `json:"log_event"` // description + CreatedAt time.Time `json:"created_at"` // auto set + UpdatedAt time.Time `json:"updated_at"` +} + func init() { producer = NewDeltaMetricsMessageProducer() } @@ -35,6 +49,19 @@ func (p *DeltaMetricsTracer) Trace(message DeltaMetricsBaseMessage) error { return nil } +func (p *DeltaMetricsTracer) TraceLog(message LogEvent) error { + baseMessage := DeltaMetricsBaseMessage{ + ObjectType: "LogEvent", + Object: message, + } + messageBytes, err := json.Marshal(baseMessage) + if err != nil { + return err + } + producer.Publish(messageBytes) + return nil +} + func (p *DeltaMetricsTracer) TraceMessageString(message string) error { err := producer.Publish([]byte(message)) if err != nil { diff --git a/messaging/consumer.go b/messaging/consumer.go index 8ebdfdf..82ce77e 100644 --- a/messaging/consumer.go +++ b/messaging/consumer.go @@ -10,7 +10,7 @@ type DeltaMetricsMessageConsumer struct { } func NewDeltaMetricsMessageConsumer() *DeltaMetricsMessageConsumer { - consumer, err := nsq.NewConsumer(PrimaryTopic, MetricsTopicUrl, nsq.NewConfig()) + consumer, err := nsq.NewConsumer(PrimaryTopic, PrimaryChannel, nsq.NewConfig()) if err != nil { log.Fatalf("Could not create consumer: %v", err) }