Skip to content

Commit

Permalink
Replace client-server with peer-to-peer mixing
Browse files Browse the repository at this point in the history
This commit converts dcrwallet from the CoinShuffle++ mixing server to perform
a similar protocol over peer-to-peer messages that propagate the Decred
network.  Besides the swap between the different client libraries, this commit
also adds support for sending and receiving mixing messages over RPC and SPV
modes.

The existing --csppserver option will no longer enable mixing using the
server.  Instead, the option has been deprecated in favor of a --mixing
option.  However, until the option is removed, a configured --csppserver will
imply using the new mixing protocol.

It is recommend, but not required, to build csppsolver and either install it
to PATH, or provide the path to the executable with dcrwallet's --csppsolver
option.  The solver is a necessary component to complete a mix, but only one
participant in the mix is required to provide it.  The solver requires the C
library libflint (including its development headers, if your distribution
creates separate -dev packages this way), and once these dependencies are met,
csppsolver can be installed with:

  go install decred.org/cspp/v2/cmd/csppsolver@latest
  • Loading branch information
jrick committed May 16, 2024
1 parent 475b13a commit bb04b75
Show file tree
Hide file tree
Showing 31 changed files with 1,131 additions and 525 deletions.
7 changes: 7 additions & 0 deletions chain/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/dcrutil/v4"
"github.com/decred/dcrd/gcs/v4"
"github.com/decred/dcrd/mixing"
dcrdtypes "github.com/decred/dcrd/rpc/jsonrpc/types/v4"
"github.com/decred/dcrd/txscript/v4/stdaddr"
"github.com/decred/dcrd/wire"
Expand Down Expand Up @@ -37,6 +38,12 @@ func (s *Syncer) PublishTransactions(ctx context.Context, txs ...*wire.MsgTx) er
return s.rpc.PublishTransactions(ctx, txs...)
}

// PublishMixMessages submits each mixing message to the dcrd mixpool for acceptance.
// If accepted, the messages are published to other peers.
func (s *Syncer) PublishMixMessages(ctx context.Context, msgs ...mixing.Message) error {
return s.rpc.PublishMixMessages(ctx, msgs...)
}

// LoadTxFilter is part of the wallet.NetworkBackend interface.
func (s *Syncer) LoadTxFilter(ctx context.Context, reload bool, addrs []stdaddr.Address, outpoints []wire.OutPoint) error {
return s.rpc.LoadTxFilter(ctx, reload, addrs, outpoints)
Expand Down
94 changes: 82 additions & 12 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,27 @@ import (
"crypto/x509"
"encoding/json"
"fmt"
"hash"
"net"
"runtime/trace"
"sync"
"sync/atomic"
"time"

"decred.org/dcrwallet/v4/errors"
"decred.org/dcrwallet/v4/internal/loggers"
"decred.org/dcrwallet/v4/rpc/client/dcrd"
"decred.org/dcrwallet/v4/validate"
"decred.org/dcrwallet/v4/wallet"
"github.com/decred/dcrd/blockchain/stake/v5"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/crypto/blake256"
"github.com/decred/dcrd/wire"
"github.com/jrick/wsrpc/v2"
"golang.org/x/sync/errgroup"
)

var requiredAPIVersion = semver{Major: 8, Minor: 0, Patch: 0}
var requiredAPIVersion = semver{Major: 8, Minor: 2, Patch: 0}

// Syncer implements wallet synchronization services by processing
// notifications from a dcrd JSON-RPC server.
Expand All @@ -40,6 +43,9 @@ type Syncer struct {
rpc *dcrd.RPC
notifier *notifier

blake256Hasher hash.Hash
blake256HasherMu sync.Mutex

discoverAccts bool
mu sync.Mutex

Expand Down Expand Up @@ -68,10 +74,11 @@ type RPCOptions struct {
// NewSyncer creates a Syncer that will sync the wallet using dcrd JSON-RPC.
func NewSyncer(w *wallet.Wallet, r *RPCOptions) *Syncer {
return &Syncer{
wallet: w,
opts: r,
discoverAccts: !w.Locked(),
relevantTxs: make(map[chainhash.Hash][]*wire.MsgTx),
wallet: w,
opts: r,
blake256Hasher: blake256.New(),
discoverAccts: !w.Locked(),
relevantTxs: make(map[chainhash.Hash][]*wire.MsgTx),
}
}

Expand Down Expand Up @@ -625,6 +632,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
return err
}

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
// Run wallet background goroutines (currently, this just runs
// mixclient).
return s.wallet.Run(ctx)
})

if s.wallet.VotingEnabled() {
err = s.rpc.Call(ctx, "notifywinningtickets", nil)
if err != nil {
Expand Down Expand Up @@ -692,20 +706,46 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
return err
}

// Request notifications for mixing messages.
if s.wallet.MixingEnabled() {
err = s.rpc.Call(ctx, "notifymixmessages", nil)
if err != nil {
return err
}

// Populate existing mix pair requests.
mixPRs, err := s.rpc.MixPairRequests(ctx)
if err != nil {
return err
}
s.blake256HasherMu.Lock()
for _, pr := range mixPRs {
pr.WriteHash(s.blake256Hasher)
}
s.blake256HasherMu.Unlock()
for _, pr := range mixPRs {
loggers.MixcLog.Debugf("accepting PR hash %s at initial sync", pr.Hash())
s.wallet.AcceptMixMessage(pr)
}
}

log.Infof("Blockchain sync completed, wallet ready for general usage.")

// Wait for notifications to finish before returning
defer func() {
<-s.notifier.closed
}()

select {
case <-ctx.Done():
client.Close()
return ctx.Err()
case <-client.Done():
return client.Err()
}
g.Go(func() error {
select {
case <-ctx.Done():
client.Close()
return ctx.Err()
case <-client.Done():
return client.Err()
}
})
return g.Wait()
}

type notifier struct {
Expand Down Expand Up @@ -749,6 +789,11 @@ func (n *notifier) Notify(method string, params json.RawMessage) error {
if err != nil {
log.Error(errors.E(op, err))
}
case "mixmessage":
err := s.mixMessage(ctx, params)
if err != nil {
log.Error(errors.E(op, err))
}
}
return nil
}
Expand Down Expand Up @@ -872,3 +917,28 @@ func (s *Syncer) storeTSpend(ctx context.Context, params json.RawMessage) error
}
return s.wallet.AddTSpend(*tx)
}

func (s *Syncer) mixMessage(ctx context.Context, params json.RawMessage) error {
if !s.wallet.MixingEnabled() {
log.Debugf("Ignoring mixmessage notification: mixing disabled")
return nil
}

msg, err := dcrd.MixMessage(params)
if err != nil {
return err
}
s.blake256HasherMu.Lock()
msg.WriteHash(s.blake256Hasher)
s.blake256HasherMu.Unlock()
msgHash := msg.Hash()
err = s.wallet.AcceptMixMessage(msg)
if err == nil {
loggers.MixpLog.Debugf("Accepted mix message %T %s by %x",
msg, &msgHash, msg.Pub())
} else {
loggers.MixpLog.Debugf("Rejected mix message %T %s by %x",
msg, &msgHash, msg.Pub())
}
return err
}
59 changes: 26 additions & 33 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"os"
Expand All @@ -19,6 +17,7 @@ import (
"strconv"
"strings"

"decred.org/cspp/v2/solverrpc"
"decred.org/dcrwallet/v4/errors"
"decred.org/dcrwallet/v4/internal/cfgutil"
"decred.org/dcrwallet/v4/internal/loggers"
Expand Down Expand Up @@ -175,10 +174,11 @@ type config struct {
IssueClientCert bool `long:"issueclientcert" description:"Notify a client cert and key over the TX pipe for RPC authentication"`

// CSPP
CSPPServer string `long:"csppserver" description:"Network address of CoinShuffle++ server"`
CSPPServerCA string `long:"csppserver.ca" description:"CoinShuffle++ Certificate Authority"`
dialCSPPServer func(ctx context.Context, network, addr string) (net.Conn, error)
MixedAccount string `long:"mixedaccount" description:"Account/branch used to derive CoinShuffle++ mixed outputs and voting rewards"`
CSPPServer string `long:"csppserver" description:"(deprecated) Network address of CoinShuffle++ server"`
CSPPServerCA string `long:"csppserver.ca" description:"(deprecated) CoinShuffle++ TLS Certificate Authority"`
Mixing bool `long:"mixing" description:"Enable mixing support"`
CSPPSolver *cfgutil.ExplicitString `long:"csppsolver" description:"Path to CSPP solver executable (if not in PATH)"`
MixedAccount string `long:"mixedaccount" description:"Account/branch used to derive CoinShuffle++ mixed outputs and voting rewards"`
mixedAccount string
mixedBranch uint32
TicketSplitAccount string `long:"ticketsplitaccount" description:"Account to derive fresh addresses from for mixed ticket splits; uses mixedaccount if unset"`
Expand Down Expand Up @@ -388,6 +388,7 @@ func loadConfig(ctx context.Context) (*config, []string, error) {
DisableCoinTypeUpgrades: defaultDisableCoinTypeUpgrades,
CircuitLimit: defaultCircuitLimit,
MixSplitLimit: defaultMixSplitLimit,
CSPPSolver: cfgutil.NewExplicitString(solverrpc.SolverProcess),

// Ticket Buyer Options
TBOpts: ticketBuyerOptions{
Expand Down Expand Up @@ -774,38 +775,30 @@ func loadConfig(ctx context.Context) (*config, []string, error) {
}
}

// Create CoinShuffle++ TLS dialer based on server name and certificate
// authority settings.
csppTLSConfig := new(tls.Config)
if cfg.CSPPServer != "" {
csppTLSConfig.ServerName, _, err = net.SplitHostPort(cfg.CSPPServer)
if err != nil {
err := errors.Errorf("Cannot parse CoinShuffle++ "+
"server name %q: %v", cfg.CSPPServer, err)
fmt.Fprintln(os.Stderr, err.Error())
return loadConfigError(err)
log.Warnf("--csppserver option is deprecated; set --mixing instead")
if !cfg.Mixing {
log.Warnf("Enabling --mixing option due to --csppserver being configured")
cfg.Mixing = true
}
}
if cfg.CSPPServerCA != "" {
cfg.CSPPServerCA = cleanAndExpandPath(cfg.CSPPServerCA)
ca, err := os.ReadFile(cfg.CSPPServerCA)
if err != nil {
err := errors.Errorf("Cannot read CoinShuffle++ "+
"Certificate Authority file: %v", err)
fmt.Fprintln(os.Stderr, err.Error())
return loadConfigError(err)

var solverMustWork bool
if cfg.Mixing {
if cfg.CSPPSolver.ExplicitlySet() {
solverrpc.SolverProcess = cfg.CSPPSolver.Value
solverMustWork = true
} else if err := solverrpc.StartSolver(); err == nil {
solverMustWork = true
} else {
log.Warnf("Unable to start csppsolver; must rely on " +
"other peers publishing results")
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(ca)
csppTLSConfig.RootCAs = pool
}
cfg.dialCSPPServer = func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := cfg.dial(ctx, network, addr)
if err != nil {
return nil, err
}
conn = tls.Client(conn, csppTLSConfig)
return conn, nil
if solverMustWork && !testStartedSolverWorks() {
err := errors.Errorf("csppsolver process is not operating properly")
fmt.Fprintln(os.Stderr, err)
return loadConfigError(err)
}

// Parse mixedaccount account/branch
Expand Down
26 changes: 12 additions & 14 deletions dcrwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,8 @@ func run(ctx context.Context) error {
}
loader := ldr.NewLoader(activeNet.Params, dbDir, stakeOptions,
cfg.GapLimit, cfg.WatchLast, cfg.AllowHighFees, cfg.RelayFee.Amount,
cfg.AccountGapLimit, cfg.DisableCoinTypeUpgrades, cfg.ManualTickets,
cfg.MixSplitLimit)
loader.DialCSPPServer = cfg.dialCSPPServer
cfg.AccountGapLimit, cfg.DisableCoinTypeUpgrades, !cfg.Mixing,
cfg.ManualTickets, cfg.MixSplitLimit)

// Stop any services started by the loader after the shutdown procedure is
// initialized and this function returns.
Expand Down Expand Up @@ -259,7 +258,7 @@ func run(ctx context.Context) error {

if cfg.VSPOpts.URL != "" {
changeAccountName := cfg.ChangeAccount
if changeAccountName == "" && cfg.CSPPServer == "" {
if changeAccountName == "" && !cfg.Mixing {
log.Warnf("Change account not set, using "+
"purchase account %q", cfg.PurchaseAccount)
changeAccountName = cfg.PurchaseAccount
Expand Down Expand Up @@ -314,19 +313,19 @@ func run(ctx context.Context) error {
var (
purchaseAccount uint32 // enableticketbuyer
votingAccount uint32 // enableticketbuyer
mixedAccount uint32 // (enableticketbuyer && csppserver) || mixchange
changeAccount uint32 // (enableticketbuyer && csppserver) || mixchange
ticketSplitAccount uint32 // enableticketbuyer && csppserver
mixedAccount uint32 // (enableticketbuyer && mixing) || mixchange
changeAccount uint32 // (enableticketbuyer && mixing) || mixchange
ticketSplitAccount uint32 // enableticketbuyer && mixing

votingAddr = cfg.TBOpts.votingAddress
poolFeeAddr = cfg.poolAddress
)
if cfg.EnableTicketBuyer {
purchaseAccount = lookup("purchaseaccount", cfg.PurchaseAccount)
if cfg.CSPPServer != "" {
if cfg.Mixing {
poolFeeAddr = nil
}
if cfg.CSPPServer != "" && cfg.TBOpts.VotingAccount == "" {
if cfg.Mixing && cfg.TBOpts.VotingAccount == "" {
err := errors.New("cannot run mixed ticketbuyer without --votingaccount")
log.Error(err)
return err
Expand All @@ -336,11 +335,11 @@ func run(ctx context.Context) error {
votingAddr = nil
}
}
if (cfg.EnableTicketBuyer && cfg.CSPPServer != "") || cfg.MixChange {
if (cfg.EnableTicketBuyer && cfg.Mixing) || cfg.MixChange {
mixedAccount = lookup("mixedaccount", cfg.mixedAccount)
changeAccount = lookup("changeaccount", cfg.ChangeAccount)
}
if cfg.EnableTicketBuyer && cfg.CSPPServer != "" {
if cfg.EnableTicketBuyer && cfg.Mixing {
ticketSplitAccount = lookup("ticketsplitaccount", cfg.TicketSplitAccount)
}
if err != nil {
Expand All @@ -358,8 +357,7 @@ func run(ctx context.Context) error {
c.PoolFeeAddr = poolFeeAddr
c.Limit = int(cfg.TBOpts.Limit)
c.VotingAccount = votingAccount
c.CSPPServer = cfg.CSPPServer
c.DialCSPPServer = cfg.dialCSPPServer
c.Mixing = cfg.Mixing
c.MixChange = cfg.MixChange
c.MixedAccount = mixedAccount
c.MixedAccountBranch = cfg.mixedBranch
Expand Down Expand Up @@ -400,7 +398,7 @@ func run(ctx context.Context) error {
// Start wallet, voting and network gRPC services after a
// wallet is loaded.
loader.RunAfterLoad(func(w *wallet.Wallet) {
rpcserver.StartWalletService(gRPCServer, w, cfg.dialCSPPServer)
rpcserver.StartWalletService(gRPCServer, w)
rpcserver.StartNetworkService(gRPCServer, w)
rpcserver.StartVotingService(gRPCServer, w)
})
Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/decred/dcrd/dcrutil/v4 v4.0.2
github.com/decred/dcrd/gcs/v4 v4.1.0
github.com/decred/dcrd/hdkeychain/v3 v3.1.2
github.com/decred/dcrd/mixing v0.1.0
github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.2.0
github.com/decred/dcrd/rpcclient/v8 v8.0.1
github.com/decred/dcrd/txscript/v4 v4.1.1
Expand All @@ -34,9 +35,9 @@ require (
github.com/jrick/logrotate v1.0.0
github.com/jrick/wsrpc/v2 v2.3.5
go.etcd.io/bbolt v1.3.8
golang.org/x/crypto v0.14.0
golang.org/x/sync v0.3.0
golang.org/x/term v0.13.0
golang.org/x/crypto v0.23.0
golang.org/x/sync v0.7.0
golang.org/x/term v0.20.0
google.golang.org/grpc v1.45.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0
google.golang.org/protobuf v1.27.1
Expand All @@ -51,9 +52,9 @@ require (
github.com/decred/dcrd/dcrec/edwards/v2 v2.0.3 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
lukechampine.com/blake3 v1.3.0 // indirect
)
Loading

0 comments on commit bb04b75

Please sign in to comment.