-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhost.go
113 lines (91 loc) · 2.46 KB
/
host.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package ep2p
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
// "github.com/libp2p/go-libp2p/core/routing"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
maddr "github.com/multiformats/go-multiaddr"
"go.uber.org/zap"
)
const (
defaultStreamProtocol = "/ep2p/unicast/default"
)
type Host struct {
ctx context.Context
dht *DHT
impl host.Host
bootstrapPeers []maddr.Multiaddr
logger *zap.Logger
defaultStreams map[peer.ID]network.Stream
}
func NewServer(bootstrapPeers []maddr.Multiaddr, logger *zap.Logger) *Host {
return NewHost(bootstrapPeers, logger, true)
}
func NewClient(bootstrapPeers []maddr.Multiaddr, logger *zap.Logger) *Host {
return NewHost(bootstrapPeers, logger, false)
}
func NewHost(bootstrapPeers []maddr.Multiaddr, logger *zap.Logger, serverMode bool) *Host {
ho := &Host{
ctx: context.Background(),
bootstrapPeers: bootstrapPeers,
logger: logger,
defaultStreams: make(map[peer.ID]network.Stream),
}
h, err := libp2p.New(
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
// libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
// ho.h = h
// dht := newDHT(ho, logger, serverMode)
// ho.dht = dht
// return dht.kademliaDHT, nil
// }),
)
if err != nil {
panic(err)
}
ho.impl = h
ho.dht = newDHT(ho, logger, serverMode)
ho.impl = routedhost.Wrap(h, ho.dht.kademliaDHT)
return ho
}
// For testing purposes
func (h *Host) Impl() host.Host {
return h.impl
}
func (h *Host) ID() peer.ID {
return h.impl.ID()
}
func (h *Host) Desc() []string {
var strs []string
for _, addr := range h.impl.Addrs() {
strs = append(strs, fmt.Sprintf("%v/p2p/%v", addr, h.impl.ID()))
}
return strs
}
func (h *Host) Peers() []peer.ID {
return h.impl.Network().Peers()
}
func (h *Host) NewGossipSub() *GossipSub {
return newGossipSub(h, h.logger)
}
func (h *Host) Send(data []byte, peer peer.ID) error {
if _, found := h.defaultStreams[peer]; !found {
s, err := h.impl.NewStream(h.ctx, peer, defaultStreamProtocol)
if err != nil {
return Libp2pError(err)
}
h.defaultStreams[peer] = s
}
if _, err := h.defaultStreams[peer].Write(data); err != nil {
delete(h.defaultStreams, peer)
return Libp2pError(err)
}
return nil
}
func (h *Host) SetDefaultRecvHandler(callback func(network.Stream)) {
h.impl.SetStreamHandler(defaultStreamProtocol, callback)
}