Skip to content

Commit

Permalink
bump all dependencies, remove unused flags, no serves firehose V2 pro…
Browse files Browse the repository at this point in the history
…tocol
  • Loading branch information
sduchesneau committed Sep 28, 2022
1 parent 7c7b538 commit 92b76d7
Show file tree
Hide file tree
Showing 13 changed files with 337 additions and 135 deletions.
16 changes: 6 additions & 10 deletions cmd/firearweave/cli/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,15 @@ func init() {
}
dmetering.SetDefaultMeter(metering)

var possibleIndexSizes []uint64
for _, size := range viper.GetIntSlice("firehose-block-index-sizes") {
if size < 0 {
return nil, fmt.Errorf("invalid negative size for firehose-block-index-sizes: %d", size)
}
possibleIndexSizes = append(possibleIndexSizes, uint64(size))
mergedBlocksStoreURL, oneBlockStoreURL, forkedBlocksStoreURL, err := getCommonStoresURLs(runtime.AbsDataDir)
if err != nil {
return nil, err
}

sfDataDir := runtime.AbsDataDir
return firehoseApp.New(appLogger, &firehoseApp.Config{
MergedBlocksStoreURL: MustReplaceDataDir(sfDataDir, viper.GetString("common-merged-blocks-store-url")),
OneBlocksStoreURL: MustReplaceDataDir(sfDataDir, viper.GetString("common-one-blocks-store-url")),
ForkedBlocksStoreURL: MustReplaceDataDir(sfDataDir, viper.GetString("common-forked-blocks-store-url")),
MergedBlocksStoreURL: mergedBlocksStoreURL,
OneBlocksStoreURL: oneBlockStoreURL,
ForkedBlocksStoreURL: forkedBlocksStoreURL,
BlockStreamAddr: viper.GetString("common-live-source-addr"),
GRPCListenAddr: viper.GetString("firehose-grpc-listen-addr"),
GRPCShutdownGracePeriod: time.Second,
Expand Down
7 changes: 7 additions & 0 deletions cmd/firearweave/cli/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package cli

import (
"github.com/streamingfast/logging"
)

var rootLog, _ = logging.RootLogger("firearweave", "github.com/streamingfast/firehose-arweave/cmd/firearweave")
3 changes: 0 additions & 3 deletions cmd/firearweave/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ import (
"github.com/streamingfast/derr"
"github.com/streamingfast/dlauncher/flags"
"github.com/streamingfast/dlauncher/launcher"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

var rootLog, _ = logging.RootLogger("firearweave", "github.com/streamingfast/firehose-arweave/cmd/firearweave/cli")

var RootCmd = &cobra.Command{Use: "firearweave", Short: "Arweave on StreamingFast"}
var allFlags = make(map[string]bool) // used as global because of async access to cobra init functions

Expand Down
37 changes: 12 additions & 25 deletions cmd/firearweave/cli/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,22 @@ func init() {
// and avoid the duplication? Note that this duplicate happens in many other apps, we might need to re-think our
// init flow and call init after the factory and giving it the instantiated app...
InitFunc: func(runtime *launcher.Runtime) (err error) {
sfDataDir := runtime.AbsDataDir

if err = mkdirStorePathIfLocal(mustReplaceDataDir(sfDataDir, viper.GetString("common-merged-blocks-store-url"))); err != nil {
return
}

if err = mkdirStorePathIfLocal(mustReplaceDataDir(sfDataDir, viper.GetString("common-one-blocks-store-url"))); err != nil {
return
}

if err = mkdirStorePathIfLocal(mustReplaceDataDir(sfDataDir, viper.GetString("merger-state-file"))); err != nil {
return
}

return nil
},
FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {
sfDataDir := runtime.AbsDataDir
mergedBlocksStoreURL, oneBlockStoreURL, forkedBlocksStoreURL, err := getCommonStoresURLs(runtime.AbsDataDir)
if err != nil {
return nil, err
}
return mergerApp.New(&mergerApp.Config{

StorageMergedBlocksFilesPath: MustReplaceDataDir(sfDataDir, viper.GetString("common-merged-blocks-store-url")),
StorageOneBlockFilesPath: MustReplaceDataDir(sfDataDir, viper.GetString("common-one-blocks-store-url")),
StorageForkedBlocksFilesPath: MustReplaceDataDir(sfDataDir, viper.GetString("common-forked-blocks-store-url")),

GRPCListenAddr: viper.GetString("merger-grpc-listen-addr"),
PruneForkedBlocksAfter: viper.GetUint64("merger-prune-forked-blocks-after"),
StopBlock: viper.GetUint64("merger-stop-block"),
TimeBetweenPruning: viper.GetDuration("merger-time-between-store-pruning"),
TimeBetweenPolling: viper.GetDuration("merger-time-between-store-lookups"),
StorageOneBlockFilesPath: oneBlockStoreURL,
StorageMergedBlocksFilesPath: mergedBlocksStoreURL,
StorageForkedBlocksFilesPath: forkedBlocksStoreURL,
GRPCListenAddr: viper.GetString("merger-grpc-listen-addr"),
PruneForkedBlocksAfter: viper.GetUint64("merger-prune-forked-blocks-after"),
StopBlock: viper.GetUint64("merger-stop-block"),
TimeBetweenPruning: viper.GetDuration("merger-time-between-store-pruning"),
TimeBetweenPolling: viper.GetDuration("merger-time-between-store-lookups"),
}), nil
},
})
Expand Down
12 changes: 6 additions & 6 deletions cmd/firearweave/cli/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,18 @@ func nodeFactoryFunc(flagPrefix, kind string) func(*launcher.Runtime) (launcher.
}

blockStreamServer := blockstream.NewUnmanagedServer(blockstream.ServerOptionWithLogger(appLogger))
oneBlockStoreURL := mustReplaceDataDir(sfDataDir, viper.GetString("common-one-blocks-store-url"))
mergedBlockStoreURL := mustReplaceDataDir(sfDataDir, viper.GetString("common-merged-blocks-store-url"))

mergedBlocksStoreURL, oneBlockStoreURL, _, err := getCommonStoresURLs(runtime.AbsDataDir)
workingDir := mustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir"))
gprcListenAdrr := viper.GetString("reader-node-grpc-listen-addr")
mergeThresholdBlockAge := viper.GetString("reader-node-merge-threshold-block-age")
waitTimeForUploadOnShutdown := viper.GetDuration("reader-node-wait-upload-complete-on-shutdown")
oneBlockFileSuffix := viper.GetString("reader-node-one-block-suffix")
blocksChanCapacity := viper.GetInt("reader-node-blocks-chan-capacity")

readerPlugin, err := getReaderLogPlugin(
blockStreamServer,
oneBlockStoreURL,
mergedBlockStoreURL,
mergeThresholdBlockAge,
mergedBlocksStoreURL,
workingDir,
batchStartBlockNum,
batchStopBlockNum,
Expand All @@ -196,7 +194,7 @@ func nodeFactoryFunc(flagPrefix, kind string) func(*launcher.Runtime) (launcher.
Operator: chainOperator,
MindreaderPlugin: readerPlugin,
MetricsAndReadinessManager: metricsAndReadinessManager,
RegisterGRPCService: func(server *grpc.Server) error {
RegisterGRPCService: func(server grpc.ServiceRegistrar) error {
pbheadinfo.RegisterHeadInfoServer(server, blockStreamServer)
pbbstream.RegisterBlockStreamServer(server, blockStreamServer)

Expand Down Expand Up @@ -260,10 +258,12 @@ func buildNodeArguments(nodeDataDir, nodeRole string, endpoints []string, start,
func buildMetricsAndReadinessManager(name string, maxLatency time.Duration) *nodeManager.MetricsAndReadinessManager {
headBlockTimeDrift := metrics.NewHeadBlockTimeDrift(name)
headBlockNumber := metrics.NewHeadBlockNumber(name)
appReadiness := metrics.NewAppReadiness(name)

metricsAndReadinessManager := nodeManager.NewMetricsAndReadinessManager(
headBlockTimeDrift,
headBlockNumber,
appReadiness,
maxLatency,
)
return metricsAndReadinessManager
Expand Down
5 changes: 2 additions & 3 deletions cmd/firearweave/cli/reader-node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
package cli

import (
"github.com/streamingfast/firehose-arweave/codec"
"time"

"github.com/streamingfast/firehose-arweave/codec"

"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/blockstream"
Expand Down Expand Up @@ -45,7 +46,6 @@ func registerReaderNodeFlags(cmd *cobra.Command) error {
hostname value is a good value to use.
`))
cmd.Flags().Duration("reader-node-wait-upload-complete-on-shutdown", 30*time.Second, "When the reader is shutting down, it will wait up to that amount of time for the archiver to finish uploading the blocks before leaving anyway")
cmd.Flags().String("reader-node-merge-threshold-block-age", "24h", "When processing blocks with a blocktime older than this threshold, they will be automatically merged")

return nil
}
Expand All @@ -54,7 +54,6 @@ func getReaderLogPlugin(
blockStreamServer *blockstream.Server,
oneBlockStoreURL string,
mergedBlockStoreURL string,
mergeThresholdBlockAge string,
workingDir string,
batchStartBlockNum uint64,
batchStopBlockNum uint64,
Expand Down
7 changes: 5 additions & 2 deletions cmd/firearweave/cli/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ func init() {
return nil
},
FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {
sfDataDir := runtime.AbsDataDir
_, oneBlockStoreURL, _, err := getCommonStoresURLs(runtime.AbsDataDir)
if err != nil {
return nil, err
}
return relayerApp.New(&relayerApp.Config{
SourcesAddr: viper.GetStringSlice("relayer-source"),
OneBlocksURL: MustReplaceDataDir(sfDataDir, viper.GetString("common-one-blocks-store-url")),
OneBlocksURL: oneBlockStoreURL,
GRPCListenAddr: viper.GetString("relayer-grpc-listen-addr"),
MaxSourceLatency: viper.GetDuration("relayer-max-source-latency"),
}), nil
Expand Down
8 changes: 2 additions & 6 deletions cmd/firearweave/cli/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,9 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/streamingfast/dgrpc"
"github.com/streamingfast/dlauncher/launcher"
)

func init() {
dgrpc.Verbosity = 2
}

func setupCmd(cmd *cobra.Command) error {
cmd.SilenceUsage = true

Expand Down Expand Up @@ -62,10 +57,11 @@ func setupCmd(cmd *cobra.Command) error {

launcher.SetupLogger(rootLog, &launcher.LoggingOptions{
WorkingDir: viper.GetString("global-data-dir"),
Verbosity: viper.GetInt("global-verbose"),
Verbosity: viper.GetInt("global-verbose") + 1,
LogFormat: viper.GetString("global-log-format"),
LogToFile: isMatchingCommand(cmds, logToFileOn) && viper.GetBool("global-log-to-file"),
LogListenAddr: viper.GetString("global-log-level-switcher-listen-addr"),
LogToStderr: true,
})
launcher.SetupTracing("firearweave")
launcher.SetupAnalyticsMetrics(rootLog, viper.GetString("global-metrics-listen-addr"), viper.GetString("global-pprof-listen-addr"))
Expand Down
36 changes: 36 additions & 0 deletions cmd/firearweave/cli/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"path/filepath"
"strings"

"github.com/spf13/viper"
"github.com/streamingfast/cli"
"github.com/streamingfast/logging"
"go.uber.org/zap"
Expand All @@ -44,6 +45,41 @@ func mkdirStorePathIfLocal(storeURL string) (err error) {
return
}

var commonStoresCreated bool
var indexStoreCreated bool

func mustGetCommonStoresURLs(dataDir string) (mergedBlocksStoreURL, oneBlocksStoreURL, forkedBlocksStoreURL string) {
var err error
mergedBlocksStoreURL, oneBlocksStoreURL, forkedBlocksStoreURL, err = getCommonStoresURLs(dataDir)
if err != nil {
panic(err)
}

return
}

func getCommonStoresURLs(dataDir string) (mergedBlocksStoreURL, oneBlocksStoreURL, forkedBlocksStoreURL string, err error) {
mergedBlocksStoreURL = MustReplaceDataDir(dataDir, viper.GetString("common-merged-blocks-store-url"))
oneBlocksStoreURL = MustReplaceDataDir(dataDir, viper.GetString("common-one-block-store-url"))
forkedBlocksStoreURL = MustReplaceDataDir(dataDir, viper.GetString("common-forked-blocks-store-url"))

if commonStoresCreated {
return
}

if err = mkdirStorePathIfLocal(forkedBlocksStoreURL); err != nil {
return
}
if err = mkdirStorePathIfLocal(oneBlocksStoreURL); err != nil {
return
}
if err = mkdirStorePathIfLocal(mergedBlocksStoreURL); err != nil {
return
}
commonStoresCreated = true
return
}

func getDirsToMake(storeURL string) []string {
parts := strings.Split(storeURL, "://")
if len(parts) > 1 {
Expand Down
1 change: 0 additions & 1 deletion devel/standard/standard.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ start:
# debugging
reader-node-debug-firehose-logs: false
reader-node-log-to-zap: true
reader-node-merge-threshold-block-age: never
# Once fully live with chain, those should be removed, they are used so that Firehose serves
# blocks even if the chain is not live yet.
relayer-max-source-latency: 999999999s
Expand Down
28 changes: 12 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,33 @@ module github.com/streamingfast/firehose-arweave
go 1.16

require (
cloud.google.com/go/iam v0.3.0 // indirect
github.com/ShinyTrinkets/overseer v0.3.0
github.com/abourget/llerrgroup v0.2.0 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/spf13/cobra v1.4.0
github.com/spf13/viper v1.8.1
github.com/streamingfast/bstream v0.0.2-0.20220809161028-014a633a8d8e
github.com/streamingfast/bstream v0.0.2-0.20220916182101-7a027bfdcffb
github.com/streamingfast/cli v0.0.4-0.20220113202443-f7bcefa38f7e
github.com/streamingfast/dauth v0.0.0-20220404140613-a40f4cd81626
github.com/streamingfast/derr v0.0.0-20220526184630-695c21740145
github.com/streamingfast/dgrpc v0.0.0-20220307180102-b2d417ac8da7
github.com/streamingfast/dlauncher v0.0.0-20220307153121-5674e1b64d40
github.com/streamingfast/dlauncher v0.0.0-20220909121534-7a9aa91dbb32
github.com/streamingfast/dmetering v0.0.0-20220307162406-37261b4b3de9
github.com/streamingfast/dmetrics v0.0.0-20220307162521-2389094ab4a1
github.com/streamingfast/dstore v0.1.1-0.20220607202639-35118aeaf648
github.com/streamingfast/firehose v0.1.1-0.20220804184723-a790c529fe15
github.com/streamingfast/dmetrics v0.0.0-20220811180000-3e513057d17c
github.com/streamingfast/dstore v0.1.1-0.20220921155016-7a52fdb3fe5f
github.com/streamingfast/firehose v0.1.1-0.20220909121738-2f3bc007ea2b
github.com/streamingfast/firehose-arweave/types v0.0.0-20220509041238-3d3270820c99
github.com/streamingfast/logging v0.0.0-20220511154537-ce373d264338
github.com/streamingfast/merger v0.0.3-0.20220803202246-1277c51d3487
github.com/streamingfast/node-manager v0.0.2-0.20220804015313-01ef0ea2678c
github.com/streamingfast/merger v0.0.3-0.20220909122033-9ca15beb25f5
github.com/streamingfast/node-manager v0.0.2-0.20220912235129-6c08463b0c01
github.com/streamingfast/pbgo v0.0.6-0.20220801202203-c32e42ac42a8
github.com/streamingfast/relayer v0.0.2-0.20220802193804-8c63614023a9
github.com/streamingfast/sf-tools v0.0.0-20220808190933-7bf947e2cc81
github.com/stretchr/testify v1.7.1
github.com/streamingfast/relayer v0.0.2-0.20220909122435-e67fbc964fd9
github.com/streamingfast/sf-tools v0.0.0-20220830151952-184d6e9a6bb9
github.com/stretchr/testify v1.8.0
go.uber.org/zap v1.21.0
google.golang.org/grpc v1.44.0
google.golang.org/protobuf v1.27.1
google.golang.org/grpc v1.49.0
google.golang.org/protobuf v1.28.0
)

replace github.com/ShinyTrinkets/overseer => github.com/streamingfast/overseer v0.2.1-0.20210326144022-ee491780e3ef
Loading

0 comments on commit 92b76d7

Please sign in to comment.