Skip to content

Commit

Permalink
feat: wait on reserve
Browse files Browse the repository at this point in the history
  • Loading branch information
ralph-pichler committed May 15, 2021
1 parent e9e591b commit cd54501
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 13 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/beekeeper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
run: |
echo -e "127.0.0.10\tregistry.localhost" | sudo tee -a /etc/hosts
for ((i=0; i<REPLICA; i++)); do echo -e "127.0.1.$((i+1))\tbee-${i}.localhost bee-${i}-debug.localhost"; done | sudo tee -a /etc/hosts
timeout 30m ./beeinfra.sh install --local -r "${REPLICA}" --bootnode /dnsaddr/localhost --geth --k3s --pay-threshold 1500000000000 --postage
timeout 30m ./beeinfra.sh install --local -r "${REPLICA}" --bootnode /dnsaddr/localhost --geth --k3s --pay-threshold 1000000000000 --postage
- name: Test pingpong
id: pingpong-1
run: until ./beekeeper check pingpong --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}"; do echo "waiting for pingpong..."; sleep .3; done
Expand All @@ -73,7 +73,7 @@ jobs:
run: ./beekeeper check fullconnectivity --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}"
- name: Test settlements
id: settlements-1
run: ./beekeeper check settlements --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}" --upload-node-count "${REPLICA}" -t 1500000000000
run: ./beekeeper check settlements --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}" --upload-node-count "${REPLICA}" -t 1000000000000
- name: Test pushsync (chunks)
id: pushsync-chunks-1
run: ./beekeeper check pushsync --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}" --upload-node-count "${REPLICA}" --chunks-per-node 3 --upload-chunks --retry-delay 10s
Expand Down Expand Up @@ -101,7 +101,7 @@ jobs:
cp /etc/rancher/k3s/k3s.yaml ~/.kube/config
- name: Set testing cluster (Node connection and clef enabled)
run: |
timeout 30m ./beeinfra.sh install --local -r "${REPLICA}" --geth --clef --k3s --pay-threshold 1500000000000 --postage
timeout 30m ./beeinfra.sh install --local -r "${REPLICA}" --geth --clef --k3s --pay-threshold 1000000000000 --postage
- name: Test pingpong
id: pingpong-2
run: until ./beekeeper check pingpong --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}"; do echo "waiting for pingpong..."; sleep .3; done
Expand All @@ -110,7 +110,7 @@ jobs:
run: ./beekeeper check fullconnectivity --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}"
- name: Test settlements
id: settlements-2
run: ./beekeeper check settlements --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}" --upload-node-count "${REPLICA}" -t 1500000000000
run: ./beekeeper check settlements --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}" --upload-node-count "${REPLICA}" -t 1000000000000
- name: Destroy the cluster
run: |
./beeinfra.sh uninstall
Expand All @@ -126,7 +126,7 @@ jobs:
cp /etc/rancher/k3s/k3s.yaml ~/.kube/config
- name: Set testing cluster (storage incentives setup)
run: |
timeout 10m ./beeinfra.sh install --local -r "${REPLICA}" --geth --k3s --pay-threshold 1500000000000 --postage --db-capacity 100
timeout 10m ./beeinfra.sh install --local -r "${REPLICA}" --geth --k3s --pay-threshold 1000000000000 --postage --db-capacity 100
- name: Test gc
id: gc-chunk-1
run: ./beekeeper check gc --cache-capacity 100 --api-scheme http --debug-api-scheme http --disable-namespace --debug-api-domain localhost --api-domain localhost --node-count "${REPLICA}"
Expand Down
72 changes: 70 additions & 2 deletions pkg/accounting/accounting.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type accountingPeer struct {
paymentThreshold *big.Int // the threshold at which the peer expects us to pay
refreshTimestamp int64 // last time we attempted time-based settlement
paymentOngoing bool // indicate if we are currently settling with the peer
paymentChan chan struct{}
}

// Accounting is the main implementation of the accounting interface.
Expand Down Expand Up @@ -145,6 +146,44 @@ func NewAccounting(
}, nil
}

func (a *Accounting) increasedExpectedDebt(peer swarm.Address, accountingPeer *accountingPeer, price uint64) (*big.Int, error) {
currentBalance, err := a.Balance(peer)
if err != nil {
if !errors.Is(err, ErrPeerNoBalance) {
return nil, fmt.Errorf("failed to load balance: %w", err)
}
}
currentDebt := new(big.Int).Neg(currentBalance)
if currentDebt.Cmp(big.NewInt(0)) < 0 {
currentDebt.SetInt64(0)
}

bigPrice := new(big.Int).SetUint64(price)
nextReserved := new(big.Int).Add(accountingPeer.reservedBalance, bigPrice)

expectedDebt := new(big.Int).Add(currentDebt, nextReserved)

threshold := new(big.Int).Set(accountingPeer.paymentThreshold)
if threshold.Cmp(a.earlyPayment) > 0 {
threshold.Sub(threshold, a.earlyPayment)
} else {
threshold.SetInt64(0)
}

additionalDebt, err := a.SurplusBalance(peer)
if err != nil {
return nil, fmt.Errorf("failed to load surplus balance: %w", err)
}

// uint64 conversion of surplusbalance is safe because surplusbalance is always positive
if additionalDebt.Cmp(big.NewInt(0)) < 0 {
return nil, ErrInvalidValue
}

increasedExpectedDebt := new(big.Int).Add(expectedDebt, additionalDebt)
return increasedExpectedDebt, nil
}

// Reserve reserves a portion of the balance for peer and attempts settlements if necessary.
func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint64) error {
accountingPeer := a.getAccountingPeer(peer)
Expand Down Expand Up @@ -200,8 +239,31 @@ func (a *Accounting) Reserve(ctx context.Context, peer swarm.Address, price uint
// if expectedDebt would still exceed the paymentThreshold at this point block this request
// this can happen if there is a large number of concurrent requests to the same peer
if increasedExpectedDebt.Cmp(accountingPeer.paymentThreshold) > 0 {
a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft
if accountingPeer.paymentOngoing {
accountingPeer.lock.Unlock()
select {
case <-ctx.Done():
accountingPeer.lock.Lock()
return ErrOverdraft
case <-time.After(500 * time.Millisecond):
accountingPeer.lock.Lock()
return ErrOverdraft
case <-accountingPeer.paymentChan:
}
accountingPeer.lock.Lock()
increasedExpectedDebt, err = a.increasedExpectedDebt(peer, accountingPeer, price)
if err != nil {
return err
}

if increasedExpectedDebt.Cmp(accountingPeer.paymentThreshold) > 0 {
a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft
}
} else {
a.metrics.AccountingBlocksCount.Inc()
return ErrOverdraft
}
}

accountingPeer.reservedBalance = nextReserved
Expand Down Expand Up @@ -401,6 +463,7 @@ func (a *Accounting) getAccountingPeer(peer swarm.Address) *accountingPeer {
shadowReservedBalance: big.NewInt(0),
// initially assume the peer has the same threshold as us
paymentThreshold: new(big.Int).Set(a.paymentThreshold),
paymentChan: make(chan struct{}),
}
a.accountingPeers[peer.String()] = peerData
}
Expand Down Expand Up @@ -591,6 +654,11 @@ func (a *Accounting) NotifyPaymentSent(peer swarm.Address, amount *big.Int, rece
accountingPeer.paymentOngoing = false
accountingPeer.shadowReservedBalance.Sub(accountingPeer.shadowReservedBalance, amount)

select {
case accountingPeer.paymentChan <- struct{}{}:
default:
}

if receivedError != nil {
a.logger.Warningf("accounting: payment failure %v", receivedError)
return
Expand Down
6 changes: 0 additions & 6 deletions pkg/settlement/pseudosettle/pseudosettle.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,6 @@ func (s *Service) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) (e
paymentAmount.Set(big.NewInt(0))
}

err = s.accountingAPI.Reserve(ctx, p.Address, paymentAmount.Uint64())
if err != nil {
return err
}
defer s.accountingAPI.Release(p.Address, paymentAmount.Uint64())

err = w.WriteMsgWithContext(ctx, &pb.PaymentAck{
Amount: paymentAmount.Bytes(),
Timestamp: timestamp,
Expand Down

0 comments on commit cd54501

Please sign in to comment.