Skip to content

Commit

Permalink
style: run gofumpt
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Aug 21, 2023
1 parent aa73ad5 commit c5a805e
Show file tree
Hide file tree
Showing 135 changed files with 522 additions and 336 deletions.
48 changes: 25 additions & 23 deletions bitswap/benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func BenchmarkFixedDelay(b *testing.B) {
}

out, _ := json.MarshalIndent(benchmarkLog, "", " ")
_ = os.WriteFile("tmp/benchmark.json", out, 0666)
_ = os.WriteFile("tmp/benchmark.json", out, 0o666)
printResults(benchmarkLog)
}

Expand Down Expand Up @@ -182,28 +182,30 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) {
}

out, _ := json.MarshalIndent(benchmarkLog, "", " ")
_ = os.WriteFile("tmp/benchmark.json", out, 0666)
_ = os.WriteFile("tmp/benchmark.json", out, 0o666)
printResults(benchmarkLog)
}

const datacenterSpeed = 5 * time.Millisecond
const fastSpeed = 60 * time.Millisecond
const mediumSpeed = 200 * time.Millisecond
const slowSpeed = 800 * time.Millisecond
const superSlowSpeed = 4000 * time.Millisecond
const datacenterDistribution = 3 * time.Millisecond
const distribution = 20 * time.Millisecond
const datacenterBandwidth = 125000000.0
const datacenterBandwidthDeviation = 3000000.0
const fastBandwidth = 1250000.0
const fastBandwidthDeviation = 300000.0
const mediumBandwidth = 500000.0
const mediumBandwidthDeviation = 80000.0
const slowBandwidth = 100000.0
const slowBandwidthDeviation = 16500.0
const rootBlockSize = 800
const stdBlockSize = 8000
const largeBlockSize = int64(256 * 1024)
const (
datacenterSpeed = 5 * time.Millisecond
fastSpeed = 60 * time.Millisecond
mediumSpeed = 200 * time.Millisecond
slowSpeed = 800 * time.Millisecond
superSlowSpeed = 4000 * time.Millisecond
datacenterDistribution = 3 * time.Millisecond
distribution = 20 * time.Millisecond
datacenterBandwidth = 125000000.0
datacenterBandwidthDeviation = 3000000.0
fastBandwidth = 1250000.0
fastBandwidthDeviation = 300000.0
mediumBandwidth = 500000.0
mediumBandwidthDeviation = 80000.0
slowBandwidth = 100000.0
slowBandwidthDeviation = 16500.0
rootBlockSize = 800
stdBlockSize = 8000
largeBlockSize = int64(256 * 1024)
)

func BenchmarkRealWorld(b *testing.B) {
benchmarkLog = nil
Expand Down Expand Up @@ -240,7 +242,7 @@ func BenchmarkRealWorld(b *testing.B) {
subtestDistributeAndFetchRateLimited(b, 300, 200, slowNetworkDelay, slowBandwidthGenerator, stdBlockSize, bstoreLatency, allToAll, batchFetchAll)
})
out, _ := json.MarshalIndent(benchmarkLog, "", " ")
_ = os.WriteFile("tmp/rw-benchmark.json", out, 0666)
_ = os.WriteFile("tmp/rw-benchmark.json", out, 0o666)
printResults(benchmarkLog)
}

Expand All @@ -263,7 +265,7 @@ func BenchmarkDatacenter(b *testing.B) {
subtestDistributeAndFetchRateLimited(b, 3, 100, datacenterNetworkDelay, datacenterBandwidthGenerator, largeBlockSize, bstoreLatency, allToAll, unixfsFileFetch)
})
out, _ := json.MarshalIndent(benchmarkLog, "", " ")
_ = os.WriteFile("tmp/rb-benchmark.json", out, 0666)
_ = os.WriteFile("tmp/rb-benchmark.json", out, 0o666)
printResults(benchmarkLog)
}

Expand Down Expand Up @@ -304,7 +306,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
})

out, _ := json.MarshalIndent(benchmarkLog, "", " ")
_ = os.WriteFile("tmp/rb-benchmark.json", out, 0666)
_ = os.WriteFile("tmp/rb-benchmark.json", out, 0o666)
printResults(benchmarkLog)
}

Expand Down
8 changes: 5 additions & 3 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ type bitswap interface {
WantlistForPeer(p peer.ID) []cid.Cid
}

var _ exchange.SessionExchange = (*Bitswap)(nil)
var _ bitswap = (*Bitswap)(nil)
var HasBlockBufferSize = defaults.HasBlockBufferSize
var (
_ exchange.SessionExchange = (*Bitswap)(nil)
_ bitswap = (*Bitswap)(nil)
HasBlockBufferSize = defaults.HasBlockBufferSize
)

type Bitswap struct {
*client.Client
Expand Down
2 changes: 1 addition & 1 deletion bitswap/bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ func TestSendToWantingPeer(t *testing.T) {
if !blkrecvd.Cid().Equals(alpha.Cid()) {
t.Fatal("Wrong block!")
}

}

func TestEmptyKey(t *testing.T) {
Expand Down Expand Up @@ -828,6 +827,7 @@ func (tsl *testingScoreLedger) Start(scorePeer server.ScorePeerFunc) {
tsl.scorePeer = scorePeer
close(tsl.started)
}

func (tsl *testingScoreLedger) Stop() {
close(tsl.closed)
}
Expand Down
3 changes: 2 additions & 1 deletion bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
self peer.ID,
) bssm.Session {
return bssession.New(sessctx, sessmgr, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
Expand Down
7 changes: 4 additions & 3 deletions bitswap/client/internal/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type WantFunc func(context.Context, []cid.Cid)
// blocks, a want function, and a close function, and returns a channel of
// incoming blocks.
func AsyncGetBlocks(ctx context.Context, sessctx context.Context, keys []cid.Cid, notif notifications.PubSub,
want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
want WantFunc, cwants func([]cid.Cid),
) (<-chan blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Getter.AsyncGetBlocks")
defer span.End()

Expand Down Expand Up @@ -99,8 +100,8 @@ func AsyncGetBlocks(ctx context.Context, sessctx context.Context, keys []cid.Cid
// If the context is cancelled or the incoming channel closes, calls cfun with
// any keys corresponding to blocks that were never received.
func handleIncoming(ctx context.Context, sessctx context.Context, remaining *cid.Set,
in <-chan blocks.Block, out chan blocks.Block, cfun func([]cid.Cid)) {

in <-chan blocks.Block, out chan blocks.Block, cfun func([]cid.Cid),
) {
ctx, cancel := context.WithCancel(ctx)

// Clean up before exiting this function, and call the cancel function on
Expand Down
5 changes: 2 additions & 3 deletions bitswap/client/internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func newDontHaveTimeoutMgrWithParams(
messageLatencyMultiplier int,
maxExpectedWantProcessTime time.Duration,
clock clock.Clock,
timeoutsTriggered chan struct{}) *dontHaveTimeoutMgr {

timeoutsTriggered chan struct{},
) *dontHaveTimeoutMgr {
ctx, shutdown := context.WithCancel(context.Background())
mqp := &dontHaveTimeoutMgr{
clock: clock,
Expand Down Expand Up @@ -222,7 +222,6 @@ func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
// checkForTimeouts checks pending wants to see if any are over the timeout.
// Note: this function should only be called within the lock.
func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {

if len(dhtm.wantQueue) == 0 {
return
}
Expand Down
10 changes: 6 additions & 4 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"go.uber.org/zap"
)

var log = logging.Logger("bitswap")
var sflog = log.Desugar()
var (
log = logging.Logger("bitswap")
sflog = log.Desugar()
)

const (
defaultRebroadcastInterval = 30 * time.Second
Expand Down Expand Up @@ -240,8 +242,8 @@ func newMessageQueue(
maxValidLatency time.Duration,
dhTimeoutMgr DontHaveTimeoutManager,
clock clock.Clock,
events chan messageEvent) *MessageQueue {

events chan messageEvent,
) *MessageQueue {
ctx, cancel := context.WithCancel(ctx)
return &MessageQueue{
ctx: ctx,
Expand Down
11 changes: 8 additions & 3 deletions bitswap/client/internal/messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (fp *fakeDontHaveTimeoutMgr) AddPending(ks []cid.Cid) {
}
fp.ks = s.Keys()
}

func (fp *fakeDontHaveTimeoutMgr) CancelPending(ks []cid.Cid) {
fp.lk.Lock()
defer fp.lk.Unlock()
Expand All @@ -74,18 +75,21 @@ func (fp *fakeDontHaveTimeoutMgr) CancelPending(ks []cid.Cid) {
}
fp.ks = s.Keys()
}

func (fp *fakeDontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
fp.lk.Lock()
defer fp.lk.Unlock()

fp.latencyUpds = append(fp.latencyUpds, elapsed)
}

func (fp *fakeDontHaveTimeoutMgr) latencyUpdates() []time.Duration {
fp.lk.Lock()
defer fp.lk.Unlock()

return fp.latencyUpds
}

func (fp *fakeDontHaveTimeoutMgr) pendingCount() int {
fp.lk.Lock()
defer fp.lk.Unlock()
Expand All @@ -101,8 +105,8 @@ type fakeMessageSender struct {
}

func newFakeMessageSender(reset chan<- struct{},
messagesSent chan<- []bsmsg.Entry, supportsHave bool) *fakeMessageSender {

messagesSent chan<- []bsmsg.Entry, supportsHave bool,
) *fakeMessageSender {
return &fakeMessageSender{
reset: reset,
messagesSent: messagesSent,
Expand All @@ -126,7 +130,8 @@ func mockTimeoutCb(peer.ID, []cid.Cid) {}
func collectMessages(ctx context.Context,
t *testing.T,
messagesSent <-chan []bsmsg.Entry,
timeout time.Duration) [][]bsmsg.Entry {
timeout time.Duration,
) [][]bsmsg.Entry {
var messagesReceived [][]bsmsg.Entry
timeoutctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
Expand Down
1 change: 0 additions & 1 deletion bitswap/client/internal/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (ps *impl) Shutdown() {
// is closed if the |ctx| times out or is cancelled, or after receiving the blocks
// corresponding to |keys|.
func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block {

blocksCh := make(chan blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
if len(keys) == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func TestPublishSubscribe(t *testing.T) {
}

assertBlocksEqual(t, blockRecvd, blockSent)

}

func TestSubscribeMany(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions bitswap/client/internal/peermanager/peermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ func (fp *mockPeerQueue) Shutdown() {}
func (fp *mockPeerQueue) AddBroadcastWantHaves(whs []cid.Cid) {
fp.msgs <- msg{fp.p, nil, whs, nil}
}

func (fp *mockPeerQueue) AddWants(wbs []cid.Cid, whs []cid.Cid) {
fp.msgs <- msg{fp.p, wbs, whs, nil}
}

func (fp *mockPeerQueue) AddCancels(cs []cid.Cid) {
fp.msgs <- msg{fp.p, nil, nil, cs}
}

func (fp *mockPeerQueue) ResponseReceived(ks []cid.Cid) {
}

Expand Down Expand Up @@ -271,6 +274,7 @@ func TestSendCancels(t *testing.T) {
func (s *sess) ID() uint64 {
return s.id
}

func (s *sess) SignalAvailability(p peer.ID, isAvailable bool) {
s.available[p] = isAvailable
}
Expand Down Expand Up @@ -332,8 +336,7 @@ func TestSessionRegistration(t *testing.T) {
}
}

type benchPeerQueue struct {
}
type benchPeerQueue struct{}

func (*benchPeerQueue) Startup() {}
func (*benchPeerQueue) Shutdown() {}
Expand Down
4 changes: 4 additions & 0 deletions bitswap/client/internal/peermanager/peerwantmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type gauge struct {
func (g *gauge) Inc() {
g.count++
}

func (g *gauge) Dec() {
g.count--
}
Expand All @@ -40,13 +41,16 @@ func (mpq *mockPQ) Shutdown() {}
func (mpq *mockPQ) AddBroadcastWantHaves(whs []cid.Cid) {
mpq.bcst = append(mpq.bcst, whs...)
}

func (mpq *mockPQ) AddWants(wbs []cid.Cid, whs []cid.Cid) {
mpq.wbs = append(mpq.wbs, wbs...)
mpq.whs = append(mpq.whs, whs...)
}

func (mpq *mockPQ) AddCancels(cs []cid.Cid) {
mpq.cancels = append(mpq.cancels, cs...)
}

func (mpq *mockPQ) ResponseReceived(ks []cid.Cid) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func TestNormalSimultaneousFetch(t *testing.T) {
if fpn.queriesMade != 2 {
t.Fatal("Did not dedup provider requests running simultaneously")
}

}

func TestDedupingProviderRequests(t *testing.T) {
Expand Down Expand Up @@ -256,7 +255,6 @@ func TestPeersWithConnectionErrorsNotAddedToPeerList(t *testing.T) {
if len(firstPeersReceived) != 0 || len(secondPeersReceived) != 0 {
t.Fatal("Did not filter out peers with connection issues")
}

}

func TestRateLimitingRequests(t *testing.T) {
Expand Down
10 changes: 6 additions & 4 deletions bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"go.uber.org/zap"
)

var log = logging.Logger("bs:sess")
var sflog = log.Desugar()
var (
log = logging.Logger("bs:sess")
sflog = log.Desugar()
)

const (
broadcastLiveWantsLimit = 64
Expand Down Expand Up @@ -146,8 +148,8 @@ func New(
notif notifications.PubSub,
initialSearchDelay time.Duration,
periodicSearchDelay delay.D,
self peer.ID) *Session {

self peer.ID,
) *Session {
ctx, cancel := context.WithCancel(ctx)
s := &Session{
sw: newSessionWants(broadcastLiveWantsLimit),
Expand Down
2 changes: 0 additions & 2 deletions bitswap/client/internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ func TestSessionGetBlocks(t *testing.T) {
}

_, err := session.GetBlocks(ctx, cids)

if err != nil {
t.Fatal("error getting blocks")
}
Expand Down Expand Up @@ -344,7 +343,6 @@ func TestSessionOnPeersExhausted(t *testing.T) {
cids = append(cids, block.Cid())
}
_, err := session.GetBlocks(ctx, cids)

if err != nil {
t.Fatal("error getting blocks")
}
Expand Down
10 changes: 6 additions & 4 deletions bitswap/client/internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ type change struct {
availability peerAvailability
}

type onSendFn func(to peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
type onPeersExhaustedFn func([]cid.Cid)
type (
onSendFn func(to peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
onPeersExhaustedFn func([]cid.Cid)
)

// sessionWantSender is responsible for sending want-have and want-block to
// peers. For each want, it sends a single optimistic want-block request to
Expand Down Expand Up @@ -111,8 +113,8 @@ type sessionWantSender struct {
}

func newSessionWantSender(sid uint64, pm PeerManager, spm SessionPeerManager, canceller SessionWantsCanceller,
bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn) sessionWantSender {

bpm *bsbpm.BlockPresenceManager, onSend onSendFn, onPeersExhausted onPeersExhaustedFn,
) sessionWantSender {
ctx, cancel := context.WithCancel(context.Background())
sws := sessionWantSender{
ctx: ctx,
Expand Down
Loading

0 comments on commit c5a805e

Please sign in to comment.