// Peer is an interface representing a peer connected on a reactor. type Peer interface { service.Service FlushStop()
ID() ID // peer's cryptographic ID RemoteIP() net.IP // remote IP of the connection RemoteAddr() net.Addr // remote address of the connection
IsOutbound() bool// did we dial the peer IsPersistent() bool// do we redial this peer when we disconnect
CloseConn() error// close original connection
NodeInfo() NodeInfo // peer's info Status() tmconn.ConnectionStatus SocketAddr() *NetAddress // actual address of the socket
// Deprecated: entities looking to act as peers should implement SendEnvelope instead. // Send will be removed in v0.37. Send(byte, []byte) bool
// Deprecated: entities looking to act as peers should implement TrySendEnvelope instead. // TrySend will be removed in v0.37. TrySend(byte, []byte) bool
Set(string, interface{}) Get(string) interface{}
SetRemovalFailed() GetRemovalFailed() bool }
type EnvelopeSender interface { SendEnvelope(Envelope) bool TrySendEnvelope(Envelope) bool }
// peer implements Peer. // // Before using a peer, you will need to perform a handshake on connection. type peer struct { service.BaseService
// raw peerConn and the multiplex connection peerConn mconn *tmconn.MConnection
// peer's node info and the channel it knows about // channels = nodeInfo.Channels // cached to avoid copying nodeInfo in hasChannel nodeInfo NodeInfo channels []byte
// peerConn contains the raw connection and its config. type peerConn struct { outbound bool//表示该连接是否为主动发起的出站连接(即由本地节点主动连接远程节点)。 persistent bool//表示该连接是否为持久化连接,即在断开连接后是否需要重新建立连接。 conn net.Conn // source connection 表示底层的网络连接。在通讯过程中,该对象会被封装成mconn连接。
// ID only exists for SecretConnection. // NOTE: Will panic if conn is not *SecretConnection. func(pc peerConn) ID() ID { return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey()) }
// Return the IP from the connection RemoteAddr func(pc peerConn) RemoteIP() net.IP { if pc.ip != nil { return pc.ip }
// FlushStop mimics OnStop but additionally ensures that all successful // SendEnvelope() calls will get flushed before closing the connection. // NOTE: it is not safe to call this method more than once. func(p *peer) FlushStop() { p.metricsTicker.Stop() p.BaseService.OnStop() p.mconn.FlushStop() // stop everything and close the conn }
// OnStop implements BaseService. func(p *peer) OnStop() { p.metricsTicker.Stop() p.BaseService.OnStop() if err := p.mconn.Stop(); err != nil { // stop everything and close the conn p.Logger.Debug("Error while stopping peer", "err", err) } }
// Closing quitSendRoutine will cause the sendRoutine to eventually quit. // doneSendRoutine is closed when the sendRoutine actually quits. quitSendRoutine chanstruct{} doneSendRoutine chanstruct{}
// Closing quitRecvRouting will cause the recvRouting to eventually quit. quitRecvRoutine chanstruct{}
// used to ensure FlushStop and OnStop // are safe to call concurrently. stopMtx tmsync.Mutex
flushTimer *timer.ThrottleTimer // flush writes as necessary but throttled. pingTimer *time.Ticker // send pings periodically
// close conn if pong is not received in pongTimeout pongTimer *time.Timer pongTimeoutCh chanbool// true - timeout, false - peer sent pong
// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config funcNewMConnectionWithConfig( conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig, ) *MConnection { if config.PongTimeout >= config.PingInterval { panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)") }
// Queues message to send to this channel. // Goroutine-safe // Times out (and returns false) after defaultSendTimeout func(ch *Channel) sendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) returntrue case <-time.After(defaultSendTimeout): returnfalse } }
// Queues message to send to this channel. // Nonblocking, returns true if successful. // Goroutine-safe func(ch *Channel) trySendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) returntrue default: returnfalse } }
// Writes next PacketMsg to w and updates c.recentlySent. // Not goroutine-safe func(ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) { packet := ch.nextPacketMsg() n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet)) atomic.AddInt64(&ch.recentlySent, int64(n)) return }
// Handles incoming PacketMsgs. It returns a message bytes if message is // complete. NOTE message bytes may change on next call to recvPacketMsg. // Not goroutine-safe func(ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet) var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data) if recvCap < recvReceived { returnnil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived) } ch.recving = append(ch.recving, packet.Data...) if packet.EOF { msgBytes := ch.recving
// clear the slice without re-allocating. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes, // at which point the recving slice stops being used and should be garbage collected ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity) return msgBytes, nil } returnnil, nil }
// Call this periodically to update stats for throttling purposes. // Not goroutine-safe func(ch *Channel) updateStats() { // Exponential decay of stats. // TODO: optimize. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8)) }
// stopServices stops the BaseService and timers and closes the quitSendRoutine. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time. func(c *MConnection) stopServices() (alreadyStopped bool) { c.stopMtx.Lock() defer c.stopMtx.Unlock()
select { case <-c.quitSendRoutine: // already quit returntrue default: }
select { case <-c.quitRecvRoutine: // already quit returntrue default: }
// inform the recvRouting that we are shutting down close(c.quitRecvRoutine) close(c.quitSendRoutine) returnfalse }
// FlushStop replicates the logic of OnStop. // It additionally ensures that all successful // .Send() calls will get flushed before closing // the connection. func(c *MConnection) FlushStop() { if c.stopServices() { return }
// this block is unique to FlushStop { // wait until the sendRoutine exits // so we dont race on calling sendSomePacketMsgs <-c.doneSendRoutine
// Send and flush all pending msgs. // Since sendRoutine has exited, we can call this // safely eof := c.sendSomePacketMsgs() for !eof { eof = c.sendSomePacketMsgs() } c.flush()
// Now we can close the connection }
c.conn.Close()
// We can't close pong safely here because // recvRoutine may write to it after we've stopped. // Though it doesn't need to get closed at all, // we close it @ recvRoutine.
// We can't close pong safely here because // recvRoutine may write to it after we've stopped. // Though it doesn't need to get closed at all, // we close it @ recvRoutine. }
// Returns true if messages from channels were exhausted. // Blocks in accordance to .sendMonitor throttling. func(c *MConnection) sendSomePacketMsgs() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true)
// Now send some PacketMsgs. for i := 0; i < numBatchPacketMsgs; i++ { if c.sendPacketMsg() { returntrue } } returnfalse }
// Returns true if messages from channels were exhausted. func(c *MConnection) sendPacketMsg() bool { // Choose a channel to create a PacketMsg from. // The chosen channel will be the one whose recentlySent/priority is the least. var leastRatio float32 = math.MaxFloat32 var leastChannel *Channel for _, channel := range c.channels { // If nothing to send, skip this channel if !channel.isSendPending() { continue } // Get ratio, and keep track of lowest ratio. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority) if ratio < leastRatio { leastRatio = ratio leastChannel = channel } }
// Nothing to send? if leastChannel == nil { returntrue } // c.Logger.Info("Found a msgPacket to send")
// Make & send a PacketMsg from this channel _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter) if err != nil { c.Logger.Error("Failed to write PacketMsg", "err", err) c.stopForError(err) returntrue } c.sendMonitor.Update(_n) c.flushTimer.Set() returnfalse }
sendRoutine在一个独立的 Go routine 中处理在 MConnection 上发送的消息。函数包含一个循环,该循环会一直运行直到连接不再运行或出现错误为止。在循环中,该函数使用 select 语句监听多个通道,通道之间是顺序执行的关系:
// recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer. // After a whole message has been assembled, it's pushed to onReceive(). // Blocks depending on how the connection is throttled. // Otherwise, it never blocks. func(c *MConnection) recvRoutine() { defer c._recover()
_n, err := protoReader.ReadMsg(&packet) c.recvMonitor.Update(_n) if err != nil { // stopServices was invoked and we are shutting down // receiving is excpected to fail since we will close the connection select { case <-c.quitRecvRoutine: break FOR_LOOP default: }
if c.IsRunning() { if err == io.EOF { c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c) } else { c.Logger.Debug("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err) } c.stopForError(err) } break FOR_LOOP }
// Read more depending on packet type. switch pkt := packet.Sum.(type) { case *tmp2p.Packet_PacketPing: // TODO: prevent abuse, as they cause flush()'s. // https://github.com/tendermint/tendermint/issues/1190 c.Logger.Debug("Receive Ping") select { case c.pong <- struct{}{}: default: // never block } case *tmp2p.Packet_PacketPong: c.Logger.Debug("Receive Pong") select { case c.pongTimeoutCh <- false: default: // never block } case *tmp2p.Packet_PacketMsg: channelID := byte(pkt.PacketMsg.ChannelID) channel, ok := c.channelsIdx[channelID] if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil { err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID) c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) break FOR_LOOP }
msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg) if err != nil { if c.IsRunning() { c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) } break FOR_LOOP } if msgBytes != nil { c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes) // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine c.onReceive(channelID, msgBytes) } default: err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet)) c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) break FOR_LOOP } }
// accept is the container to carry the upgraded connection and NodeInfo from an // asynchronously running routine to the Accept method. type accept struct { netAddr *NetAddress conn net.Conn nodeInfo NodeInfo err error }
// peerConfig is used to bundle data we need to fully setup a Peer with an // MConn, provided by the caller of Accept and Dial (currently the Switch). This // a temporary measure until reactor setup is less dynamic and we introduce the // concept of PeerBehaviour to communicate about significant Peer lifecycle // events. // TODO(xla): Refactor out with more static Reactor setup and PeerBehaviour. type peerConfig struct { chDescs []*conn.ChannelDescriptor onPeerError func(Peer, interface{}) outbound bool // isPersistent allows you to set a function, which, given socket address // (for outbound peers) OR self-reported address (for inbound peers), tells // if the peer is persistent or not. isPersistent func(*NetAddress)bool reactorsByCh map[byte]Reactor msgTypeByChID map[byte]proto.Message metrics *Metrics mlc *metricsLabelCache }
// Transport emits and connects to Peers. The implementation of Peer is left to // the transport. Each transport is also responsible to filter establishing // peers specific to its domain. type Transport interface { // Listening address. NetAddress() NetAddress
// Accept returns a newly connected Peer. Accept(peerConfig) (Peer, error)
// Dial connects to the Peer for the address. Dial(NetAddress, peerConfig) (Peer, error)
// Cleanup any resources associated with Peer. Cleanup(Peer) }
// transportLifecycle bundles the methods for callers to control start and stop // behaviour. type transportLifecycle interface { Close() error Listen(NetAddress) error }
// Test multiplexTransport for interface completeness. var _ Transport = (*MultiplexTransport)(nil) var _ transportLifecycle = (*MultiplexTransport)(nil)
从上述 test 的两行代码中,可以看出,Transport和transportLifecycle接口都是抽象接口,具体的实现则是MultiplexTransport这个结构体中.接下来看一下这个结构体的定义及其创建方法.
// ConnFilterFunc to be implemented by filter hooks after a new connection has // been established. The set of exisiting connections is passed along together // with all resolved IPs for the new connection. type ConnFilterFunc func(ConnSet, net.Conn, []net.IP)error
// ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection // and refuses new ones if they come from a known ip. funcConnDuplicateIPFilter() ConnFilterFunc { returnfunc(cs ConnSet, c net.Conn, ips []net.IP)error { for _, ip := range ips { if cs.HasIP(ip) { return ErrRejected{ conn: c, err: fmt.Errorf("ip<%v> already connected", ip), isDuplicate: true, } } }
returnnil } }
// MultiplexTransport accepts and dials tcp connections and upgrades them to // multiplexed peers. type MultiplexTransport struct { netAddr NetAddress listener net.Listener maxIncomingConnections int// see MaxIncomingConnections
acceptc chan accept closec chanstruct{}
// Lookup table for duplicate ip and id checks. conns ConnSet connFilters []ConnFilterFunc
// TODO(xla): This config is still needed as we parameterise peerConn and // peer currently. All relevant configuration should be refactored into options // with sane defaults. mConfig conn.MConnConfig }
其中accepts和closes两个管道,前者用于传递接受到的连接请求,后者用于用于关闭该 MultiplexTransport.conns是一个链接的集合,用于存储查找"连接及其所有 ip 的表",具体内容在conn_set.go文件中,可自行阅读。connFilters用于过滤链接,默认实现ConnDuplicateIPFilter函数的功能为过滤重复 IP,resolver是一个 golang 提供的用于解析 ip 地址的工具.接下来具体看之前提到的几个核心函数的实现.
// Accept implements Transport. func(mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) { select { // This case should never have any side-effectful/blocking operations to // ensure that quality peers are ready to be used. case a := <-mt.acceptc: if a.err != nil { returnnil, a.err }
接下来是dial函数,用于构建连接,首先通过netadress.go的DialTimeout函数在指定时间内构建连接,然后通过filterConn对连接进行过过滤并利用resolveIPs解析 ip 地址,接下来通过upgrade函数尝试将连接通过证书升级加密,并利用handshake函数进行握手,最终建立连接.
// Cleanup removes the given address from the connections set and // closes the connection. func(mt *MultiplexTransport) Cleanup(p Peer) { mt.conns.RemoveAddr(p.RemoteAddr()) _ = p.CloseConn() }
if mt.maxIncomingConnections > 0 { ln = netutil.LimitListener(ln, mt.maxIncomingConnections) }
mt.netAddr = addr mt.listener = ln
go mt.acceptPeers()
returnnil }
func(mt *MultiplexTransport) acceptPeers() { for { c, err := mt.listener.Accept() if err != nil { // If Close() has been called, silently exit. select { case _, ok := <-mt.closec: if !ok { return } default: // Transport is not closed }
mt.acceptc <- accept{err: err} return }
// Connection upgrade and filtering should be asynchronous to avoid // Head-of-line blocking[0]. // Reference: https://github.com/tendermint/tendermint/issues/2047 // // [0] https://en.wikipedia.org/wiki/Head-of-line_blocking gofunc(c net.Conn) { deferfunc() { if r := recover(); r != nil { err := ErrRejected{ conn: c, err: fmt.Errorf("recovered from panic: %v", r), isAuthFailure: true, } select { case mt.acceptc <- accept{err: err}: case <-mt.closec: // Give up if the transport was closed. _ = c.Close() return } } }()
var ( nodeInfo NodeInfo secretConn *conn.SecretConnection netAddr *NetAddress )
select { case mt.acceptc <- accept{netAddr, secretConn, nodeInfo, err}: // Make the upgraded peer available. case <-mt.closec: // Give up if the transport was closed. _ = c.Close() return } }(c) } }
// An AddrBook represents an address book from the pex package, which is used // to store peer addresses. type AddrBook interface { AddAddress(addr *NetAddress, src *NetAddress) error//添加一个节点网络地址到地址簿中,addr表示待添加的地址,src表示该地址的来源;该方法返回一个错误类型的结果。 AddPrivateIDs([]string) //添加一些私有ID到地址簿中,例如本地节点的ID等。 AddOurAddress(*NetAddress) //将一个本地节点的地址添加到地址簿中。 OurAddress(*NetAddress) bool//检查一个网络地址是否为本地节点的地址,如果是则返回true,否则返回false。 MarkGood(ID) //标记一个节点为可靠的(即“好”的)节点,以便在进行P2P连接时优先选择这些节点。 RemoveAddress(*NetAddress) //从地址簿中移除一个网络地址。
// SwitchOption sets an optional parameter on the Switch. type SwitchOption func(*Switch)
// NewSwitch creates a new Switch with the given config. funcNewSwitch( cfg *config.P2PConfig, transport Transport, options ...SwitchOption, ) *Switch {
// AddReactor adds the given reactor to the switch. // NOTE: Not goroutine safe. func(sw *Switch) AddReactor(name string, reactor Reactor) Reactor { //针对reactor的每个通道信息单独处理 for _, chDesc := range reactor.GetChannels() { //首先获取通道ID并保证唯一性 chID := chDesc.ID // No two reactors can share the same channel. if sw.reactorsByCh[chID] != nil { panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor)) } //接下来注册通道信息 sw.chDescs = append(sw.chDescs, chDesc) sw.reactorsByCh[chID] = reactor sw.msgTypeByChID[chID] = chDesc.MessageType } sw.reactors[name] = reactor //最后将switch注册到reactor中,做到交叉引用 reactor.SetSwitch(sw) return reactor }
// RemoveReactor removes the given Reactor from the Switch. // NOTE: Not goroutine safe. func(sw *Switch) RemoveReactor(name string, reactor Reactor) { for _, chDesc := range reactor.GetChannels() { // remove channel description for i := 0; i < len(sw.chDescs); i++ { if chDesc.ID == sw.chDescs[i].ID { sw.chDescs = append(sw.chDescs[:i], sw.chDescs[i+1:]...) break } } delete(sw.reactorsByCh, chDesc.ID) delete(sw.msgTypeByChID, chDesc.ID) } delete(sw.reactors, name) reactor.SetSwitch(nil) }
// Reactors returns a map of reactors registered on the switch. // NOTE: Not goroutine safe. func(sw *Switch) Reactors() map[string]Reactor { return sw.reactors }
// Reactor returns the reactor with the given name. // NOTE: Not goroutine safe. func(sw *Switch) Reactor(name string) Reactor { return sw.reactors[name] }
// SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes. // NOTE: Not goroutine safe. func(sw *Switch) SetNodeInfo(nodeInfo NodeInfo) { sw.nodeInfo = nodeInfo }
// NodeInfo returns the switch's NodeInfo. // NOTE: Not goroutine safe. func(sw *Switch) NodeInfo() NodeInfo { return sw.nodeInfo }
// SetNodeKey sets the switch's private key for authenticated encryption. // NOTE: Not goroutine safe. func(sw *Switch) SetNodeKey(nodeKey *NodeKey) { sw.nodeKey = nodeKey }
//--------------------------------------------------------------------- // Service start/stop
// OnStart implements BaseService. It starts all the reactors and peers. func(sw *Switch) OnStart() error { // Start reactors //该函数首先会启动Switch中的所有reactor,reactor在node.go的createSwtich中被添加,有如下五个系统的reactor //sw.AddReactor("MEMPOOL", mempoolReactor) //sw.AddReactor("BLOCKCHAIN", bcReactor) //sw.AddReactor("CONSENSUS", consensusReactor) //sw.AddReactor("EVIDENCE", evidenceReactor) //sw.AddReactor("STATESYNC", stateSyncReactor) //随后在node的创建过程`newnode`中,还会添加pexreactor和CustomReactors //func CustomReactors(reactors map[string]p2p.Reactor) Option 用户自定义的reactor //sw.AddReactor("PEX", pexReactor) 利用`createPEXReactorAndAddToSwitch`函数添加 for _, reactor := range sw.reactors { err := reactor.Start() if err != nil { return fmt.Errorf("failed to start %v: %w", reactor, err) } }
// Start accepting Peers. //这个函数的主要作用是接受传入连接并管理新节点的添加过程,同时根据各种情况进行错误处理。 go sw.acceptRoutine()
returnnil }
// OnStop implements BaseService. It stops all peers and reactors. func(sw *Switch) OnStop() { // Stop peers for _, p := range sw.peers.List() { sw.stopAndRemovePeer(p, nil) }
// Stop reactors sw.Logger.Debug("Switch: Stopping reactors") for _, reactor := range sw.reactors { if err := reactor.Stop(); err != nil { sw.Logger.Error("error while stopped reactor", "reactor", reactor, "error", err) } } }
func(sw *Switch) acceptRoutine() { for { p, err := sw.transport.Accept(peerConfig{ chDescs: sw.chDescs, onPeerError: sw.StopPeerForError, reactorsByCh: sw.reactorsByCh, msgTypeByChID: sw.msgTypeByChID, metrics: sw.metrics, mlc: sw.mlc, isPersistent: sw.IsPeerPersistent, }) if err != nil { switch err := err.(type) { case ErrRejected: if err.IsSelf() { // Remove the given address from the address book and add to our addresses // to avoid dialing in the future. addr := err.Addr() sw.addrBook.RemoveAddress(&addr) sw.addrBook.AddOurAddress(&addr) }
continue case ErrTransportClosed: sw.Logger.Error( "Stopped accept routine, as transport is closed", "numPeers", sw.peers.Size(), ) default: sw.Logger.Error( "Accept on transport errored", "err", err, "numPeers", sw.peers.Size(), ) // We could instead have a retry loop around the acceptRoutine, // but that would need to stop and let the node shutdown eventually. // So might as well panic and let process managers restart the node. // There's no point in letting the node run without the acceptRoutine, // since it won't be able to accept new connections. panic(fmt.Errorf("accept routine exited: %v", err)) }
break }
if !sw.IsPeerUnconditional(p.NodeInfo().ID()) { // Ignore connection if we already have enough peers. _, in, _ := sw.NumPeers() if in >= sw.config.MaxNumInboundPeers { sw.Logger.Info( "Ignoring inbound connection: already have enough inbound peers", "address", p.SocketAddr(), "have", in, "max", sw.config.MaxNumInboundPeers, )
// addPeer starts up the Peer and adds it to the Switch. Error is returned if // the peer is filtered out or failed to start or can't be added. func(sw *Switch) addPeer(p Peer) error { if err := sw.filterPeer(p); err != nil { return err }
// Handle the shut down case where the switch has stopped but we're // concurrently trying to add a peer. if !sw.IsRunning() { // XXX should this return an error or just log and terminate? sw.Logger.Error("Won't start a peer - switch is not running", "peer", p) returnnil }
// Add some data to the peer, which is required by reactors. for _, reactor := range sw.reactors { p = reactor.InitPeer(p) }
// Start the peer's send/recv routines. // Must start it before adding it to the peer set // to prevent Start and Stop from being called concurrently. err := p.Start() if err != nil { // Should never happen sw.Logger.Error("Error starting peer", "err", err, "peer", p) return err }
// Add the peer to PeerSet. Do this before starting the reactors // so that if Receive errors, we will find the peer and remove it. // Add should not err since we already checked peers.Has(). if err := sw.peers.Add(p); err != nil { switch err.(type) { case ErrPeerRemoval: sw.Logger.Error("Error starting peer ", " err ", "Peer has already errored and removal was attempted.", "peer", p.ID()) } return err } sw.metrics.Peers.Add(float64(1))
// Start all the reactor protocols on the peer. for _, reactor := range sw.reactors { reactor.AddPeer(p) }