前言
Tendermint 学习随笔(1)环境配置 中对该系统进行了基本的介绍,并配置了开发环境。本文将基于目前最新的 `release` 版本`v0.34.24`进行分析。一、整体框架
Tendermint 是一个用于构建区块链应用程序的开源软件,其核心是共识引擎,同时也提供了用于实现应用的 ABCI 接口。整个系统架构可以自顶向下分为以下四个部分:
-
应用层:应用程序定义了在区块链上运行的业务逻辑。它们可以使用任何编程语言来编写,并与
Tendermint接口进行交互。 -
共识层:Tendermint实现了一个BFT 共识算法,称为Tendermint Core,该算法确保所有节点达成一致的状态转换。共识层处理节点之间的通信和同步,维护网络的安全性和一致性,以及验证和打包交易。 -
网络层:网络层负责传输信息,包括交易和区块。
Tendermint采用P2P 网络协议,以便节点之间可以互相通信,并启用了TLS 加密和身份验证。 -
存储层:存储层负责维护区块链数据。
Tendermint提供了两种数据库选择:内存数据库或LevelDB。数据存储方式使用Merkle 树的结构组织。
二、源码分析
2.1 P2P 功能结构
P2P 目录 结构与分析如下所示:
bash123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657. ├── README.md ├── base_reactor.go ├── conn # 最底层的连接功能实现 │ ├── conn_go110.go │ ├── conn_notgo110.go │ ├── connection.go # 基本连接 │ ├── connection_test.go │ ├── evil_secret_connection_test.go │ ├── secret_connection.go # 加密连接 │ ├── secret_connection_test.go │ └── testdata │ └── TestDeriveSecretsAndChallengeGolden.golden ├── conn_set.go ├── errors.go ├── fuzz.go ├── key.go ├── key_test.go ├── metrics.go ├── mock │ ├── peer.go │ └── reactor.go ├── mocks │ └── peer.go ├── netaddress.go # 网络地址存储 ├── netaddress_test.go ├── node_info.go # 节点基本信息 ├── node_info_test.go ├── peer.go # reactor对等连接 ├── peer_set.go # 对等连接集合,每个node有多个peer ├── peer_set_test.go ├── peer_test.go ├── pex # pex reactor 用于处理节点之间的 P2P 交换 │ ├── addrbook.go │ ├── addrbook_test.go │ ├── errors.go │ ├── file.go │ ├── known_address.go │ ├── params.go │ ├── pex_reactor.go │ └── pex_reactor_test.go ├── switch.go # 负责管理节点之间的 P2P 连接 ├── switch_test.go ├── test_util.go ├── transport.go # 提供了底层的网络传输能力的管理 ├── transport_test.go ├── trust # 用于节点信任度量,目前未启用 │ ├── config.go │ ├── metric.go │ ├── metric_test.go │ ├── store.go │ ├── store_test.go │ └── ticker.go ├── types.go └── upnp # 测试UPnP功能 ├── probe.go └── upnp.go
P2P 提供了节点间点对点通信的封装,根据 /p2p 目录下 readme.md 文件的描述描述,核心功能有五个。
- Connection 该组件是
tendermint网络通信的基础,消息的写入和读取都是通过此组件完成的。 - Peer 该组件是
tendermint系统中的基本通信实体单位,所有节点之间的通信和节点与应用程序的通信都通过peer进行。 - Node
Tendermint的P2P 网络具有不同类型的节点,对彼此的连接有不同的要求。该组件对tendermint系统中不同的节点工作需求进行定义。 - Pex 该功能的作用是定义进行节点发现和交换工作的数据包。
- Config 该功能主要针对
tendermint的配置信息进行定义。
2.2 Peer
从 /p2p 这个目录来看,首先注意到的就是 peer.go 这个文件,在 p2p 网络 中, peer 应该是对等通信的最小实体。
首先来看 peer 接口的定义,他继承自 service.Service 接口,并声明了一系列方法用于获取节点的各种信息,如 ID、IP 地址、连接状态 等。同时,Peer 也定义了一些方法用于管理节点连接,例如关闭连接、设置或获取某些属性等。
golang12345678910111213141516171819202122232425262728293031323334353637// Peer is an interface representing a peer connected on a reactor. type Peer interface { service.Service FlushStop() ID() ID // peer's cryptographic ID RemoteIP() net.IP // remote IP of the connection RemoteAddr() net.Addr // remote address of the connection IsOutbound() bool // did we dial the peer IsPersistent() bool // do we redial this peer when we disconnect CloseConn() error // close original connection NodeInfo() NodeInfo // peer's info Status() tmconn.ConnectionStatus SocketAddr() *NetAddress // actual address of the socket // Deprecated: entities looking to act as peers should implement SendEnvelope instead. // Send will be removed in v0.37. Send(byte, []byte) bool // Deprecated: entities looking to act as peers should implement TrySendEnvelope instead. // TrySend will be removed in v0.37. TrySend(byte, []byte) bool Set(string, interface{}) Get(string) interface{} SetRemovalFailed() GetRemovalFailed() bool } type EnvelopeSender interface { SendEnvelope(Envelope) bool TrySendEnvelope(Envelope) bool }
接下来是对应的 peer 结构体,用于实现 Peer 接口。该结构体包含了一些字段和方法,用于管理与其他节点的连接和信息,并提供了一些额外的功能,如度量指标记录等。
具体来说,该结构体继承了 service.BaseService 类型,并包含了一个 peerConn 类型的字段 peerConn 和一个 tmconn.MConnection 类型的字段 mconn,分别表示节点之间的原始连接和多路复用连接。
除此之外,peer 结构体还包含了一些其他的字段和方法,如 nodeInfo 和 channels 字段,用于保存节点信息和通道信息;Data 字段,用于保存用户自定义数据;metrics 和metricsTicker 字段,用于记录节点的度量指标;mlc 字段,用于缓存度量指标的标签等。
golang1234567891011121314151617181920212223242526// peer implements Peer. // // Before using a peer, you will need to perform a handshake on connection. type peer struct { service.BaseService // raw peerConn and the multiplex connection peerConn mconn *tmconn.MConnection // peer's node info and the channel it knows about // channels = nodeInfo.Channels // cached to avoid copying nodeInfo in hasChannel nodeInfo NodeInfo channels []byte // User data Data *cmap.CMap metrics *Metrics metricsTicker *time.Ticker mlc *metricsLabelCache // When removal of a peer fails, we set this flag removalAttemptFailed bool }
进一步的,跟踪分析 peerConn 和 mconn 这两个成员变量。mconn 是 tendermint 所有连接通信的技术,相关内容将在 Connection.go 的分析中具体展开,这里首先看以下 peerConn 的相关代码。
结合 outbound 这个参数,不难猜测,tendermint 中的连接应该有两类,一类是传入,一类是传出。
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354//---------------------------------------------------------- // peerConn contains the raw connection and its config. type peerConn struct { outbound bool //表示该连接是否为主动发起的出站连接(即由本地节点主动连接远程节点)。 persistent bool //表示该连接是否为持久化连接,即在断开连接后是否需要重新建立连接。 conn net.Conn // source connection 表示底层的网络连接。在通讯过程中,该对象会被封装成mconn连接。 socketAddr *NetAddress //一个指向NetAddress类型的指针,表示连接的实际地址。 // cached RemoteIP() ip net.IP //一个net.IP类型的对象,表示连接对应的远程IP地址 } func newPeerConn( outbound, persistent bool, conn net.Conn, socketAddr *NetAddress, ) peerConn { return peerConn{ outbound: outbound, persistent: persistent, conn: conn, socketAddr: socketAddr, } } // ID only exists for SecretConnection. // NOTE: Will panic if conn is not *SecretConnection. func (pc peerConn) ID() ID { return PubKeyToID(pc.conn.(*tmconn.SecretConnection).RemotePubKey()) } // Return the IP from the connection RemoteAddr func (pc peerConn) RemoteIP() net.IP { if pc.ip != nil { return pc.ip } host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String()) if err != nil { panic(err) } ips, err := net.LookupIP(host) if err != nil { panic(err) } pc.ip = ips[0] return pc.ip }
newPeerConn 函数后续会在 transport.go 中被 wrapPeer 函数调用,进而被 Accept 和 Dial 两个函数调用,两个函数进而被 switch.go 中的 acceptRoutine 和 addOutboundPeerWithConfig 调用,前者负责循环接收传入连接,后者负责主动发出传出连接。据此不难看出,在 tendermint 系统的 p2p 架构中最底层的网络连接实现是 /p2p/conn/connection.go,在此基础上在 /p2p/peer.go 中构建 peer 连接对,并通过 /p2p/transport.go 对连接进行操作,随后所有的 peer 连接及系统操作的路由都由 /p2p/switch.go 文件负责维护。
后续也将会对这些文件进行分析,这里继续看 peer 的构造逻辑。
golang123456789101112131415161718192021222324252627282930313233343536373839type PeerOption func(*peer) func newPeer( pc peerConn, mConfig tmconn.MConnConfig, nodeInfo NodeInfo, reactorsByCh map[byte]Reactor, msgTypeByChID map[byte]proto.Message, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), mlc *metricsLabelCache, options ...PeerOption, ) *peer { p := &peer{ peerConn: pc, nodeInfo: nodeInfo, channels: nodeInfo.(DefaultNodeInfo).Channels, Data: cmap.NewCMap(), metricsTicker: time.NewTicker(metricsTickerDuration), metrics: NopMetrics(), mlc: mlc, } p.mconn = createMConnection( pc.conn, p, reactorsByCh, msgTypeByChID, chDescs, onPeerError, mConfig, ) p.BaseService = *service.NewBaseService(nil, "Peer", p) for _, option := range options { option(p) } return p }
这个函数中,除了基本的参数传递和服务注册外外,就是 mconn 的构建,接下来看一下 createMConnection 函数。
这个函数会首先构建 onReceive 和 onError 两个传输回调方法。onReceive 回调会根据 chID 调用具体的 reactor 实现,相关 reactor 的添加函数都在 switch.go 中,并在 node.go 中被调用。onError 回调实现依然是根据参数传递过来的,通过调用追踪,最后定位在了 switch.go 的 StopPeerForError 函数,具体分析将在后续展开。
golang12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061//------------------------------------------------------------------ // helper funcs func createMConnection( conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, msgTypeByChID map[byte]proto.Message, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), config tmconn.MConnConfig, ) *tmconn.MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { // Note that its ok to panic here as it's caught in the conn._recover, // which does onPeerError. panic(fmt.Sprintf("Unknown channel %X", chID)) } mt := msgTypeByChID[chID] msg := proto.Clone(mt) err := proto.Unmarshal(msgBytes, msg) if err != nil { panic(fmt.Errorf("unmarshaling message: %s into type: %s", err, reflect.TypeOf(mt))) } labels := []string{ "peer_id", string(p.ID()), "chID", fmt.Sprintf("%#x", chID), } if w, ok := msg.(Unwrapper); ok { msg, err = w.Unwrap() if err != nil { panic(fmt.Errorf("unwrapping message: %s", err)) } } p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes))) p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes))) if nr, ok := reactor.(EnvelopeReceiver); ok { nr.ReceiveEnvelope(Envelope{ ChannelID: chID, Src: p, Message: msg, }) } else { reactor.Receive(chID, p, msgBytes) } } onError := func(r interface{}) { onPeerError(p, r) } return tmconn.NewMConnectionWithConfig( conn, chDescs, onReceive, onError, config, ) }
接下来再看一下 peer 的启停函数。该函数主要就是启动 peer 的服务,随后启动连接和统计功能。
golang12345678910111213141516171819202122232425262728293031// OnStart implements BaseService. func (p *peer) OnStart() error { if err := p.BaseService.OnStart(); err != nil { return err } if err := p.mconn.Start(); err != nil { return err } go p.metricsReporter() return nil } // FlushStop mimics OnStop but additionally ensures that all successful // SendEnvelope() calls will get flushed before closing the connection. // NOTE: it is not safe to call this method more than once. func (p *peer) FlushStop() { p.metricsTicker.Stop() p.BaseService.OnStop() p.mconn.FlushStop() // stop everything and close the conn } // OnStop implements BaseService. func (p *peer) OnStop() { p.metricsTicker.Stop() p.BaseService.OnStop() if err := p.mconn.Stop(); err != nil { // stop everything and close the conn p.Logger.Debug("Error while stopping peer", "err", err) } }
接下来peer.go文件内部还由其他相关的参数设置,消息收发等函数都是统一的设计思路,这里就不再进行分析。
2.3 Connection
由对peer.go的分析可知,他的底层通信都由/p2p/conn/connection.go实现,由/p2p/transport.go维护,并由/p2p/switch.go进行路由管理,为此,我们先分析其底层通信实现。
connection.go首先定义了一个用于多路复用传输的MConnection (multiplex connection) 结构。
golang123456789101112131415161718192021222324252627282930313233343536373839404142type MConnection struct { service.BaseService conn net.Conn bufConnReader *bufio.Reader bufConnWriter *bufio.Writer sendMonitor *flow.Monitor recvMonitor *flow.Monitor send chan struct{} pong chan struct{} channels []*Channel channelsIdx map[byte]*Channel onReceive receiveCbFunc onError errorCbFunc errored uint32 config MConnConfig // Closing quitSendRoutine will cause the sendRoutine to eventually quit. // doneSendRoutine is closed when the sendRoutine actually quits. quitSendRoutine chan struct{} doneSendRoutine chan struct{} // Closing quitRecvRouting will cause the recvRouting to eventually quit. quitRecvRoutine chan struct{} // used to ensure FlushStop and OnStop // are safe to call concurrently. stopMtx tmsync.Mutex flushTimer *timer.ThrottleTimer // flush writes as necessary but throttled. pingTimer *time.Ticker // send pings periodically // close conn if pong is not received in pongTimeout pongTimer *time.Timer pongTimeoutCh chan bool // true - timeout, false - peer sent pong chStatsTimer *time.Ticker // update channel stats periodically created time.Time // time of creation _maxPacketMsgSize int }
其中大部分参数顾名思义:
conn是一个通用的面向流的网络连接。bufConnReader,bufConnWriter是面向数据通信的字节流读写对象,将基本的 net.conn 数据读写操作进行封装,可以利用类似文件 IO 的方式来对 TCP 数据量进行操作。sendMonitor,recvMonitor分别用于监控数据的接受和发送状态。channels,channelsIdx存储通道相关信息。onReceive,onError是相关传输结束的回调方法。config使用了一个MConnConfig结构定义连接的配置信息。
golang12345678910111213141516type MConnConfig struct { SendRate int64 `mapstructure:"send_rate"` RecvRate int64 `mapstructure:"recv_rate"` // Maximum payload size MaxPacketMsgPayloadSize int `mapstructure:"max_packet_msg_payload_size"` // Interval to flush writes (throttled) FlushThrottle time.Duration `mapstructure:"flush_throttle"` // Interval to send pings PingInterval time.Duration `mapstructure:"ping_interval"` // Maximum wait time for pongs PongTimeout time.Duration `mapstructure:"pong_timeout"` }
在定义完数据结构后,该包提供了MConnection的构建方法,首先是提供默认配置信息的DefaultMConnConfig函数,然后是进行构建的NewMConnection和NewMConnectionWithConfig函数,两者的区别是配置信息是否通过默认配置信息函数进行创建。同时前者最终调用了后者。
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445// NewMConnectionWithConfig wraps net.Conn and creates multiplex connection with a config func NewMConnectionWithConfig( conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig, ) *MConnection { if config.PongTimeout >= config.PingInterval { panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)") } mconn := &MConnection{ conn: conn, bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize), bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize), sendMonitor: flow.New(0, 0), recvMonitor: flow.New(0, 0), send: make(chan struct{}, 1), pong: make(chan struct{}, 1), onReceive: onReceive, onError: onError, config: config, created: time.Now(), } // Create channels var channelsIdx = map[byte]*Channel{} var channels = []*Channel{} for _, desc := range chDescs { channel := newChannel(mconn, *desc) channelsIdx[channel.desc.ID] = channel channels = append(channels, channel) } mconn.channels = channels mconn.channelsIdx = channelsIdx mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn) // maxPacketMsgSize() is a bit heavy, so call just once mconn._maxPacketMsgSize = mconn.maxPacketMsgSize() return mconn }
这里重点分析NewMConnectionWithConfig函数。该函数内容如前文所示,其中chDescs用于创建通道,涉及到Channel这个数据结构。函数首先创建MConnection对象,然后从chDescs获取通道信息,构建通道并赋予MConnection对象,最后将该对象注册到相应的服务中并配置最大消息大小。接下来就进一步分析Channel.
golang1234567891011121314151617181920212223242526272829// TODO: lowercase. // NOTE: not goroutine-safe. type Channel struct { conn *MConnection desc ChannelDescriptor sendQueue chan []byte sendQueueSize int32 // atomic. recving []byte sending []byte recentlySent int64 // exponential moving average maxPacketMsgPayloadSize int Logger log.Logger } func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { desc = desc.FillDefaults() if desc.Priority <= 0 { panic("Channel default priority must be a positive integer") } return &Channel{ conn: conn, desc: desc, sendQueue: make(chan []byte, desc.SendQueueCapacity), recving: make([]byte, 0, desc.RecvBufferCapacity), maxPacketMsgPayloadSize: conn.config.MaxPacketMsgPayloadSize, } }
Channel结构体中,sendQueue是发送队列,recving是接收缓冲区,sending是发送缓冲区。构造函数也是基本的参数传递,那么数据相关操作层,相对应的,就是Channel相关的数据收发函数。
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106// Queues message to send to this channel. // Goroutine-safe // Times out (and returns false) after defaultSendTimeout func (ch *Channel) sendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) return true case <-time.After(defaultSendTimeout): return false } } // Queues message to send to this channel. // Nonblocking, returns true if successful. // Goroutine-safe func (ch *Channel) trySendBytes(bytes []byte) bool { select { case ch.sendQueue <- bytes: atomic.AddInt32(&ch.sendQueueSize, 1) return true default: return false } } // Goroutine-safe func (ch *Channel) loadSendQueueSize() (size int) { return int(atomic.LoadInt32(&ch.sendQueueSize)) } // Goroutine-safe // Use only as a heuristic. func (ch *Channel) canSend() bool { return ch.loadSendQueueSize() < defaultSendQueueCapacity } // Returns true if any PacketMsgs are pending to be sent. // Call before calling nextPacketMsg() // Goroutine-safe func (ch *Channel) isSendPending() bool { if len(ch.sending) == 0 { if len(ch.sendQueue) == 0 { return false } ch.sending = <-ch.sendQueue } return true } // Creates a new PacketMsg to send. // Not goroutine-safe func (ch *Channel) nextPacketMsg() tmp2p.PacketMsg { packet := tmp2p.PacketMsg{ChannelID: int32(ch.desc.ID)} maxSize := ch.maxPacketMsgPayloadSize packet.Data = ch.sending[:tmmath.MinInt(maxSize, len(ch.sending))] if len(ch.sending) <= maxSize { packet.EOF = true ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { packet.EOF = false ch.sending = ch.sending[tmmath.MinInt(maxSize, len(ch.sending)):] } return packet } // Writes next PacketMsg to w and updates c.recentlySent. // Not goroutine-safe func (ch *Channel) writePacketMsgTo(w io.Writer) (n int, err error) { packet := ch.nextPacketMsg() n, err = protoio.NewDelimitedWriter(w).WriteMsg(mustWrapPacket(&packet)) atomic.AddInt64(&ch.recentlySent, int64(n)) return } // Handles incoming PacketMsgs. It returns a message bytes if message is // complete. NOTE message bytes may change on next call to recvPacketMsg. // Not goroutine-safe func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) { ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet) var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Data) if recvCap < recvReceived { return nil, fmt.Errorf("received message exceeds available capacity: %v < %v", recvCap, recvReceived) } ch.recving = append(ch.recving, packet.Data...) if packet.EOF { msgBytes := ch.recving // clear the slice without re-allocating. // http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go // suggests this could be a memory leak, but we might as well keep the memory for the channel until it closes, // at which point the recving slice stops being used and should be garbage collected ch.recving = ch.recving[:0] // make([]byte, 0, ch.desc.RecvBufferCapacity) return msgBytes, nil } return nil, nil } // Call this periodically to update stats for throttling purposes. // Not goroutine-safe func (ch *Channel) updateStats() { // Exponential decay of stats. // TODO: optimize. atomic.StoreInt64(&ch.recentlySent, int64(float64(atomic.LoadInt64(&ch.recentlySent))*0.8)) }
这些函数中,首先注意到的是发送函数sendBytes和trySendBytes两者的区别是是否由超时的控制。
随后还有关于PacketMsgs收发的writePacketMsgTo和recvPacketMsg.PacketMsg是基于 ABCI 的自定义消息类型,其中系统相关的有ping和pong两类,用于节点之间的心跳检测。具体来说,当一个节点想要向另外一个节点发送心跳检测消息时,它会将一个 PacketPing 消息打包,并通过多路复用连接(MConnection)对象发送给目标节点。在收到这个 PacketPing 消息后,目标节点会立即回复一个 PacketPong 消息,以表明自己仍然存活和可达。这样一来,节点之间就可以通过周期性的心跳检测,确保彼此之间的连接状态正常。其他节点之间的消息,也都会封装成PacketMsgs进行传输。
结束Channel的分析后,再回到mconnection中,接下来看他的启停函数。
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899// OnStart implements BaseService func (c *MConnection) OnStart() error { if err := c.BaseService.OnStart(); err != nil { return err } c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle) c.pingTimer = time.NewTicker(c.config.PingInterval) c.pongTimeoutCh = make(chan bool, 1) c.chStatsTimer = time.NewTicker(updateStats) c.quitSendRoutine = make(chan struct{}) c.doneSendRoutine = make(chan struct{}) c.quitRecvRoutine = make(chan struct{}) go c.sendRoutine() go c.recvRoutine() return nil } // stopServices stops the BaseService and timers and closes the quitSendRoutine. // if the quitSendRoutine was already closed, it returns true, otherwise it returns false. // It uses the stopMtx to ensure only one of FlushStop and OnStop can do this at a time. func (c *MConnection) stopServices() (alreadyStopped bool) { c.stopMtx.Lock() defer c.stopMtx.Unlock() select { case <-c.quitSendRoutine: // already quit return true default: } select { case <-c.quitRecvRoutine: // already quit return true default: } c.BaseService.OnStop() c.flushTimer.Stop() c.pingTimer.Stop() c.chStatsTimer.Stop() // inform the recvRouting that we are shutting down close(c.quitRecvRoutine) close(c.quitSendRoutine) return false } // FlushStop replicates the logic of OnStop. // It additionally ensures that all successful // .Send() calls will get flushed before closing // the connection. func (c *MConnection) FlushStop() { if c.stopServices() { return } // this block is unique to FlushStop { // wait until the sendRoutine exits // so we dont race on calling sendSomePacketMsgs <-c.doneSendRoutine // Send and flush all pending msgs. // Since sendRoutine has exited, we can call this // safely eof := c.sendSomePacketMsgs() for !eof { eof = c.sendSomePacketMsgs() } c.flush() // Now we can close the connection } c.conn.Close() // We can't close pong safely here because // recvRoutine may write to it after we've stopped. // Though it doesn't need to get closed at all, // we close it @ recvRoutine. // c.Stop() } // OnStop implements BaseService func (c *MConnection) OnStop() { if c.stopServices() { return } c.conn.Close() // We can't close pong safely here because // recvRoutine may write to it after we've stopped. // Though it doesn't need to get closed at all, // we close it @ recvRoutine. }
不难看出,启停函数主要是对相关计时器的维护以及建立数据发送和接收的两个协程sendRoutine和recvRoutine.
接下来首先看sendRoutine的实现。
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134// sendRoutine polls for packets to send from channels. func (c *MConnection) sendRoutine() { defer c._recover() protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter) FOR_LOOP: for { var _n int var err error SELECTION: select { case <-c.flushTimer.Ch: // NOTE: flushTimer.Set() must be called every time // something is written to .bufConnWriter. c.flush() case <-c.chStatsTimer.C: for _, channel := range c.channels { channel.updateStats() } case <-c.pingTimer.C: c.Logger.Debug("Send Ping") _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{})) if err != nil { c.Logger.Error("Failed to send PacketPing", "err", err) break SELECTION } c.sendMonitor.Update(_n) c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout) c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() { select { case c.pongTimeoutCh <- true: default: } }) c.flush() case timeout := <-c.pongTimeoutCh: if timeout { c.Logger.Debug("Pong timeout") err = errors.New("pong timeout") } else { c.stopPongTimer() } case <-c.pong: c.Logger.Debug("Send Pong") _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{})) if err != nil { c.Logger.Error("Failed to send PacketPong", "err", err) break SELECTION } c.sendMonitor.Update(_n) c.flush() case <-c.quitSendRoutine: break FOR_LOOP case <-c.send: // Send some PacketMsgs eof := c.sendSomePacketMsgs() if !eof { // Keep sendRoutine awake. select { case c.send <- struct{}{}: default: } } } if !c.IsRunning() { break FOR_LOOP } if err != nil { c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err) c.stopForError(err) break FOR_LOOP } } // Cleanup c.stopPongTimer() close(c.doneSendRoutine) } // Returns true if messages from channels were exhausted. // Blocks in accordance to .sendMonitor throttling. func (c *MConnection) sendSomePacketMsgs() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. c.sendMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.SendRate), true) // Now send some PacketMsgs. for i := 0; i < numBatchPacketMsgs; i++ { if c.sendPacketMsg() { return true } } return false } // Returns true if messages from channels were exhausted. func (c *MConnection) sendPacketMsg() bool { // Choose a channel to create a PacketMsg from. // The chosen channel will be the one whose recentlySent/priority is the least. var leastRatio float32 = math.MaxFloat32 var leastChannel *Channel for _, channel := range c.channels { // If nothing to send, skip this channel if !channel.isSendPending() { continue } // Get ratio, and keep track of lowest ratio. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority) if ratio < leastRatio { leastRatio = ratio leastChannel = channel } } // Nothing to send? if leastChannel == nil { return true } // c.Logger.Info("Found a msgPacket to send") // Make & send a PacketMsg from this channel _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter) if err != nil { c.Logger.Error("Failed to write PacketMsg", "err", err) c.stopForError(err) return true } c.sendMonitor.Update(_n) c.flushTimer.Set() return false }
sendRoutine在一个独立的 Go routine 中处理在 MConnection 上发送的消息。函数包含一个循环,该循环会一直运行直到连接不再运行或出现错误为止。在循环中,该函数使用 select 语句监听多个通道,通道之间是顺序执行的关系:
如果 flushTimer.Ch 通道接收到值(即指定的超时已经发生),则函数将调用 flush() 函数以将任何缓冲的数据发送到连接上。
如果 chStatsTimer.C 通道接收到值(即指定的通道状态更新周期已更新),则函数将调用channel.updateStats()对多路复用连接中的每个channel进行更新。
如果 pingTimer.C 通道接收到值(即指定的channel查询周期已更新),该函数将发送一个 PacketPing 消息,并等待相应的 PacketPong 消息。pongTimer函数在c.config.PongTimeout的持续时间内都未收到相应的PacketPong消息,则会向pongTimeoutCh通道注入true标志。
如果 pongTimeoutCh 通道接收到值(即获取到了对应channel的响应),该函数将检查是否发生了 pong 超时错误 (由pingTimer.C通道注入),如果发生了超时错误,则函数将触发 Pong 超时错误并退出循环,否则默认收到了PacketPong消息,将重置pongTimer函数。
如果 pong 通道接收到值(即收到了 PacketPong 消息),该函数将更新 sendMonitor并写入消息,刷新缓存的数据。
如果 quitSendRoutine 通道接收到值(即关闭连接),该函数将退出循环。
如果 send 通道接收到值(即有数据需要发送),该函数将调用 sendSomePacketMsgs() 函数以尽可能多地发送 PacketMsg 消息。如果还有更多数据要发送,则函数将再次唤醒 send 通道。
在SELECTION代码块执行完成后,如果 c.IsRunning()标志位被修改为false,则停止循环。如果循环中的任何时刻发生错误,该函数也将记录错误信息,并退出循环。
一旦循环完成,该函数将停止 pong 定时器,并关闭 doneSendRoutine 通道。
可以看到,除了对通道状态的检测,核心的数据发送函数就是sendSomePacketMsgs,这个函数发送消息的核心是调用sendPacketMsg.在sendPacketMsg中,会遍历每一个通道,获取消息权重ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)最小的且含有待发送消息的通道,将数据通过writePacketMsgTo封装成PacketMsg进行数据发送。
接下来再看对应的recvRoutine实现。
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108// recvRoutine reads PacketMsgs and reconstructs the message using the channels' "recving" buffer. // After a whole message has been assembled, it's pushed to onReceive(). // Blocks depending on how the connection is throttled. // Otherwise, it never blocks. func (c *MConnection) recvRoutine() { defer c._recover() protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize) FOR_LOOP: for { // Block until .recvMonitor says we can read. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true) // Peek into bufConnReader for debugging /* if numBytes := c.bufConnReader.Buffered(); numBytes > 0 { bz, err := c.bufConnReader.Peek(tmmath.MinInt(numBytes, 100)) if err == nil { // return } else { c.Logger.Debug("Error peeking connection buffer", "err", err) // return nil } c.Logger.Info("Peek connection buffer", "numBytes", numBytes, "bz", bz) } */ // Read packet type var packet tmp2p.Packet _n, err := protoReader.ReadMsg(&packet) c.recvMonitor.Update(_n) if err != nil { // stopServices was invoked and we are shutting down // receiving is excpected to fail since we will close the connection select { case <-c.quitRecvRoutine: break FOR_LOOP default: } if c.IsRunning() { if err == io.EOF { c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c) } else { c.Logger.Debug("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err) } c.stopForError(err) } break FOR_LOOP } // Read more depending on packet type. switch pkt := packet.Sum.(type) { case *tmp2p.Packet_PacketPing: // TODO: prevent abuse, as they cause flush()'s. // https://github.com/tendermint/tendermint/issues/1190 c.Logger.Debug("Receive Ping") select { case c.pong <- struct{}{}: default: // never block } case *tmp2p.Packet_PacketPong: c.Logger.Debug("Receive Pong") select { case c.pongTimeoutCh <- false: default: // never block } case *tmp2p.Packet_PacketMsg: channelID := byte(pkt.PacketMsg.ChannelID) channel, ok := c.channelsIdx[channelID] if pkt.PacketMsg.ChannelID < 0 || pkt.PacketMsg.ChannelID > math.MaxUint8 || !ok || channel == nil { err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID) c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) break FOR_LOOP } msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg) if err != nil { if c.IsRunning() { c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) } break FOR_LOOP } if msgBytes != nil { c.Logger.Debug("Received bytes", "chID", channelID, "msgBytes", msgBytes) // NOTE: This means the reactor.Receive runs in the same thread as the p2p recv routine c.onReceive(channelID, msgBytes) } default: err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet)) c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) break FOR_LOOP } } // Cleanup close(c.pong) for range c.pong { // Drain } }
recvRoutine在一个独立的 Go routine 中处理在 MConnection 上发送的消息。函数包含一个循环,该循环会一直运行直到连接不再运行或出现错误为止。
在该循环中,首先通过c.recvMonitor.Limit函数以阻塞的方式等待完整的数据区块,在获取到PacketMsg首先对数据进行解压缩,然后判断数据类型,对应前文提到的ping,ping和普通消息。
其中,收到ping消息后会立马返回pong消息;收到pong消息后会重置c.pongTimeoutCh为false;收到普通消息后,会分析对应的channelID并进行校验,对应channel存在则调用相关recvPacketMsg函数机械能消息处理,否则认为系统异常,跳出循环结束线程。
以上就是connection的基本通信代码链路。
2.4 transport
接下来分析用来封装和注册peer的transport.go文件。
该文件首先定义了accept和peerConfig两个结构体,前者用于在异步通信中更新通信连接net.conn和节点信息NodeInfo,后者用于保存配置peer所需信息。
golang12345678910111213141516171819202122232425262728// accept is the container to carry the upgraded connection and NodeInfo from an // asynchronously running routine to the Accept method. type accept struct { netAddr *NetAddress conn net.Conn nodeInfo NodeInfo err error } // peerConfig is used to bundle data we need to fully setup a Peer with an // MConn, provided by the caller of Accept and Dial (currently the Switch). This // a temporary measure until reactor setup is less dynamic and we introduce the // concept of PeerBehaviour to communicate about significant Peer lifecycle // events. // TODO(xla): Refactor out with more static Reactor setup and PeerBehaviour. type peerConfig struct { chDescs []*conn.ChannelDescriptor onPeerError func(Peer, interface{}) outbound bool // isPersistent allows you to set a function, which, given socket address // (for outbound peers) OR self-reported address (for inbound peers), tells // if the peer is persistent or not. isPersistent func(*NetAddress) bool reactorsByCh map[byte]Reactor msgTypeByChID map[byte]proto.Message metrics *Metrics mlc *metricsLabelCache }
随后定义了Transport和transportLifecycle接口,前者包含用于peer间注册和通信的NetAddress,Accept,Dial等方法,后者用于管理生命周期,提供close和listen方法。
golang123456789101112131415161718192021222324252627// Transport emits and connects to Peers. The implementation of Peer is left to // the transport. Each transport is also responsible to filter establishing // peers specific to its domain. type Transport interface { // Listening address. NetAddress() NetAddress // Accept returns a newly connected Peer. Accept(peerConfig) (Peer, error) // Dial connects to the Peer for the address. Dial(NetAddress, peerConfig) (Peer, error) // Cleanup any resources associated with Peer. Cleanup(Peer) } // transportLifecycle bundles the methods for callers to control start and stop // behaviour. type transportLifecycle interface { Close() error Listen(NetAddress) error } // Test multiplexTransport for interface completeness. var _ Transport = (*MultiplexTransport)(nil) var _ transportLifecycle = (*MultiplexTransport)(nil)
从上述 test 的两行代码中,可以看出,Transport和transportLifecycle接口都是抽象接口,具体的实现则是MultiplexTransport这个结构体中。接下来看一下这个结构体的定义及其创建方法。
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869// ConnFilterFunc to be implemented by filter hooks after a new connection has // been established. The set of exisiting connections is passed along together // with all resolved IPs for the new connection. type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error // ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection // and refuses new ones if they come from a known ip. func ConnDuplicateIPFilter() ConnFilterFunc { return func(cs ConnSet, c net.Conn, ips []net.IP) error { for _, ip := range ips { if cs.HasIP(ip) { return ErrRejected{ conn: c, err: fmt.Errorf("ip<%v> already connected", ip), isDuplicate: true, } } } return nil } } // MultiplexTransport accepts and dials tcp connections and upgrades them to // multiplexed peers. type MultiplexTransport struct { netAddr NetAddress listener net.Listener maxIncomingConnections int // see MaxIncomingConnections acceptc chan accept closec chan struct{} // Lookup table for duplicate ip and id checks. conns ConnSet connFilters []ConnFilterFunc dialTimeout time.Duration filterTimeout time.Duration handshakeTimeout time.Duration nodeInfo NodeInfo nodeKey NodeKey resolver IPResolver // TODO(xla): This config is still needed as we parameterise peerConn and // peer currently. All relevant configuration should be refactored into options // with sane defaults. mConfig conn.MConnConfig } // NewMultiplexTransport returns a tcp connected multiplexed peer. func NewMultiplexTransport( nodeInfo NodeInfo, nodeKey NodeKey, mConfig conn.MConnConfig, ) *MultiplexTransport { return &MultiplexTransport{ acceptc: make(chan accept), closec: make(chan struct{}), dialTimeout: defaultDialTimeout, filterTimeout: defaultFilterTimeout, handshakeTimeout: defaultHandshakeTimeout, mConfig: mConfig, nodeInfo: nodeInfo, nodeKey: nodeKey, conns: NewConnSet(), resolver: net.DefaultResolver, } }
其中accepts和closes两个管道,前者用于传递接受到的连接请求,后者用于用于关闭该 MultiplexTransport.conns是一个链接的集合,用于存储查找"连接及其所有 ip 的表",具体内容在conn_set.go文件中,可自行阅读。connFilters用于过滤链接,默认实现ConnDuplicateIPFilter函数的功能为过滤重复 IP,resolver是一个 golang 提供的用于解析 ip 地址的工具。接下来具体看之前提到的几个核心函数的实现。
首先是,accept函数,通过解析peerconfig,利用wrapPeer获取并构建peer.
golang12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758// Accept implements Transport. func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) { select { // This case should never have any side-effectful/blocking operations to // ensure that quality peers are ready to be used. case a := <-mt.acceptc: if a.err != nil { return nil, a.err } cfg.outbound = false return mt.wrapPeer(a.conn, a.nodeInfo, cfg, a.netAddr), nil case <-mt.closec: return nil, ErrTransportClosed{} } } func (mt *MultiplexTransport) wrapPeer( c net.Conn, ni NodeInfo, cfg peerConfig, socketAddr *NetAddress, ) Peer { persistent := false if cfg.isPersistent != nil { if cfg.outbound { persistent = cfg.isPersistent(socketAddr) } else { selfReportedAddr, err := ni.NetAddress() if err == nil { persistent = cfg.isPersistent(selfReportedAddr) } } } peerConn := newPeerConn( cfg.outbound, persistent, c, socketAddr, ) p := newPeer( peerConn, mt.mConfig, ni, cfg.reactorsByCh, cfg.msgTypeByChID, cfg.chDescs, cfg.onPeerError, cfg.mlc, PeerMetrics(cfg.metrics), ) return p }
接下来是dial函数,用于构建连接,首先通过netadress.go的DialTimeout函数在指定时间内构建连接,然后通过filterConn对连接进行过过滤并利用resolveIPs解析 ip 地址,接下来通过upgrade函数尝试将连接通过证书升级加密,并利用handshake函数进行握手,最终建立连接。
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279// Dial implements Transport. func (mt *MultiplexTransport) Dial( addr NetAddress, cfg peerConfig, ) (Peer, error) { c, err := addr.DialTimeout(mt.dialTimeout) if err != nil { return nil, err } // TODO(xla): Evaluate if we should apply filters if we explicitly dial. if err := mt.filterConn(c); err != nil { return nil, err } secretConn, nodeInfo, err := mt.upgrade(c, &addr) if err != nil { return nil, err } cfg.outbound = true p := mt.wrapPeer(secretConn, nodeInfo, cfg, &addr) return p, nil } func (mt *MultiplexTransport) filterConn(c net.Conn) (err error) { defer func() { if err != nil { _ = c.Close() } }() // Reject if connection is already present. if mt.conns.Has(c) { return ErrRejected{conn: c, isDuplicate: true} } // Resolve ips for incoming conn. ips, err := resolveIPs(mt.resolver, c) if err != nil { return err } errc := make(chan error, len(mt.connFilters)) for _, f := range mt.connFilters { go func(f ConnFilterFunc, c net.Conn, ips []net.IP, errc chan<- error) { errc <- f(mt.conns, c, ips) }(f, c, ips, errc) } for i := 0; i < cap(errc); i++ { select { case err := <-errc: if err != nil { return ErrRejected{conn: c, err: err, isFiltered: true} } case <-time.After(mt.filterTimeout): return ErrFilterTimeout{} } } mt.conns.Set(c, ips) return nil } func resolveIPs(resolver IPResolver, c net.Conn) ([]net.IP, error) { host, _, err := net.SplitHostPort(c.RemoteAddr().String()) if err != nil { return nil, err } addrs, err := resolver.LookupIPAddr(context.Background(), host) if err != nil { return nil, err } ips := []net.IP{} for _, addr := range addrs { ips = append(ips, addr.IP) } return ips, nil } func (mt *MultiplexTransport) upgrade( c net.Conn, dialedAddr *NetAddress, ) (secretConn *conn.SecretConnection, nodeInfo NodeInfo, err error) { defer func() { if err != nil { _ = mt.cleanup(c) } }() secretConn, err = upgradeSecretConn(c, mt.handshakeTimeout, mt.nodeKey.PrivKey) if err != nil { return nil, nil, ErrRejected{ conn: c, err: fmt.Errorf("secret conn failed: %v", err), isAuthFailure: true, } } // For outgoing conns, ensure connection key matches dialed key. connID := PubKeyToID(secretConn.RemotePubKey()) if dialedAddr != nil { if dialedID := dialedAddr.ID; connID != dialedID { return nil, nil, ErrRejected{ conn: c, id: connID, err: fmt.Errorf( "conn.ID (%v) dialed ID (%v) mismatch", connID, dialedID, ), isAuthFailure: true, } } } nodeInfo, err = handshake(secretConn, mt.handshakeTimeout, mt.nodeInfo) if err != nil { return nil, nil, ErrRejected{ conn: c, err: fmt.Errorf("handshake failed: %v", err), isAuthFailure: true, } } if err := nodeInfo.Validate(); err != nil { return nil, nil, ErrRejected{ conn: c, err: err, isNodeInfoInvalid: true, } } // Ensure connection key matches self reported key. if connID != nodeInfo.ID() { return nil, nil, ErrRejected{ conn: c, id: connID, err: fmt.Errorf( "conn.ID (%v) NodeInfo.ID (%v) mismatch", connID, nodeInfo.ID(), ), isAuthFailure: true, } } // Reject self. if mt.nodeInfo.ID() == nodeInfo.ID() { return nil, nil, ErrRejected{ addr: *NewNetAddress(nodeInfo.ID(), c.RemoteAddr()), conn: c, id: nodeInfo.ID(), isSelf: true, } } if err := mt.nodeInfo.CompatibleWith(nodeInfo); err != nil { return nil, nil, ErrRejected{ conn: c, err: err, id: nodeInfo.ID(), isIncompatible: true, } } return secretConn, nodeInfo, nil } func upgradeSecretConn( c net.Conn, timeout time.Duration, privKey crypto.PrivKey, ) (*conn.SecretConnection, error) { if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { return nil, err } sc, err := conn.MakeSecretConnection(c, privKey) if err != nil { return nil, err } return sc, sc.SetDeadline(time.Time{}) } func handshake( c net.Conn, timeout time.Duration, nodeInfo NodeInfo, ) (NodeInfo, error) { if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { return nil, err } var ( errc = make(chan error, 2) pbpeerNodeInfo tmp2p.DefaultNodeInfo peerNodeInfo DefaultNodeInfo ourNodeInfo = nodeInfo.(DefaultNodeInfo) ) go func(errc chan<- error, c net.Conn) { _, err := protoio.NewDelimitedWriter(c).WriteMsg(ourNodeInfo.ToProto()) errc <- err }(errc, c) go func(errc chan<- error, c net.Conn) { protoReader := protoio.NewDelimitedReader(c, MaxNodeInfoSize()) _, err := protoReader.ReadMsg(&pbpeerNodeInfo) errc <- err }(errc, c) for i := 0; i < cap(errc); i++ { err := <-errc if err != nil { return nil, err } } peerNodeInfo, err := DefaultNodeInfoFromToProto(&pbpeerNodeInfo) if err != nil { return nil, err } return peerNodeInfo, c.SetDeadline(time.Time{}) } func handshake( c net.Conn, timeout time.Duration, nodeInfo NodeInfo, ) (NodeInfo, error) { if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { return nil, err } var ( errc = make(chan error, 2) pbpeerNodeInfo tmp2p.DefaultNodeInfo peerNodeInfo DefaultNodeInfo ourNodeInfo = nodeInfo.(DefaultNodeInfo) ) go func(errc chan<- error, c net.Conn) { _, err := protoio.NewDelimitedWriter(c).WriteMsg(ourNodeInfo.ToProto()) errc <- err }(errc, c) go func(errc chan<- error, c net.Conn) { protoReader := protoio.NewDelimitedReader(c, MaxNodeInfoSize()) _, err := protoReader.ReadMsg(&pbpeerNodeInfo) errc <- err }(errc, c) for i := 0; i < cap(errc); i++ { err := <-errc if err != nil { return nil, err } } peerNodeInfo, err := DefaultNodeInfoFromToProto(&pbpeerNodeInfo) if err != nil { return nil, err } return peerNodeInfo, c.SetDeadline(time.Time{}) }
第三个来看Cleanup函数,用于清理连接,没什么特别的逻辑。
golang123456789101112// Cleanup removes the given address from the connections set and // closes the connection. func (mt *MultiplexTransport) Cleanup(p Peer) { mt.conns.RemoveAddr(p.RemoteAddr()) _ = p.CloseConn() } func (mt *MultiplexTransport) cleanup(c net.Conn) error { mt.conns.Remove(c) return c.Close() }
最后是生命周期管理的close与listen函数,前者是正常的关闭逻辑,后者要关注的就是最后启动了一个acceptPeers的协程,持续监听消息。
golang1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798// Close implements transportLifecycle. func (mt *MultiplexTransport) Close() error { close(mt.closec) if mt.listener != nil { return mt.listener.Close() } return nil } // Listen implements transportLifecycle. func (mt *MultiplexTransport) Listen(addr NetAddress) error { ln, err := net.Listen("tcp", addr.DialString()) if err != nil { return err } if mt.maxIncomingConnections > 0 { ln = netutil.LimitListener(ln, mt.maxIncomingConnections) } mt.netAddr = addr mt.listener = ln go mt.acceptPeers() return nil } func (mt *MultiplexTransport) acceptPeers() { for { c, err := mt.listener.Accept() if err != nil { // If Close() has been called, silently exit. select { case _, ok := <-mt.closec: if !ok { return } default: // Transport is not closed } mt.acceptc <- accept{err: err} return } // Connection upgrade and filtering should be asynchronous to avoid // Head-of-line blocking[0]. // Reference: https://github.com/tendermint/tendermint/issues/2047 // // [0] https://en.wikipedia.org/wiki/Head-of-line_blocking go func(c net.Conn) { defer func() { if r := recover(); r != nil { err := ErrRejected{ conn: c, err: fmt.Errorf("recovered from panic: %v", r), isAuthFailure: true, } select { case mt.acceptc <- accept{err: err}: case <-mt.closec: // Give up if the transport was closed. _ = c.Close() return } } }() var ( nodeInfo NodeInfo secretConn *conn.SecretConnection netAddr *NetAddress ) err := mt.filterConn(c) if err == nil { secretConn, nodeInfo, err = mt.upgrade(c, nil) if err == nil { addr := c.RemoteAddr() id := PubKeyToID(secretConn.RemotePubKey()) netAddr = NewNetAddress(id, addr) } } select { case mt.acceptc <- accept{netAddr, secretConn, nodeInfo, err}: // Make the upgraded peer available. case <-mt.closec: // Give up if the transport was closed. _ = c.Close() return } }(c) } }
以上就是transport.go文件的整体业务逻辑,完整的提供了节点直接的通信过程,并最终通过证书将通信进行加密。
2.5 switch
接下来分析用于路由管理的switch.go这个文件,顾明思意,该文件又类似交换机的作用,分析代码,该文件负责处理节点之间的连接管理、消息路由和流控等操作。它将消息从一个Connection转发到另一个Connection,并维护Connection的状态,以保证通信的可靠性和安全性。
该文件首先对Switch结构进行定义,Switch是 Tendermint P2P 库中一个核心组件,负责管理节点之间的连接和消息路由。其中包括了多个 Reactor 实例、地址簿、节点信息、消息传输协议等多个方面,是整个 P2P 库的重要组成部分。
golang123456789101112131415161718192021222324252627282930313233type Switch struct { service.BaseService //表明该结构继承自service.BaseService接口,表示Switch是一个基于服务的模块,可以启动、停止等。 config *config.P2PConfig //存储Tendermint P2P配置参数的数据结构,例如最大连接数、最大广播范围等。 reactors map[string]Reactor //一个map[string]Reactor类型的映射表,将Reactor名称映射到对应的Reactor实例上。 chDescs []*conn.ChannelDescriptor //存储通道描述符(Channel Descriptor)的切片,用于描述不同通道的属性信息,例如带宽、延迟等。 reactorsByCh map[byte]Reactor //一个map[byte]Reactor类型的映射表,将每个通道ID映射到对应的Reactor实例上。 msgTypeByChID map[byte]proto.Message //一个map[byte]Reactor类型的映射表,将每个通道ID映射到对应的Reactor实例上。 peers *PeerSet //存储所有已连接的节点信息的PeerSet实例,包括其地址、公钥、连接状态等。 dialing *cmap.CMap //一个并发安全的map,用于存储正在进行拨号的节点信息。 reconnecting *cmap.CMap //一个并发安全的map,用于存储正在尝试重新建立连接的节点信息。 nodeInfo NodeInfo // our node info 存储本地节点信息的结构体,包括节点ID、网络地址、版本等信息。 nodeKey *NodeKey // our node privkey 存储本地节点私钥的结构体,用于加密通信数据和签名共识信息等。 addrBook AddrBook //存储所有已知节点地址的AddrBook实例,用于管理节点的网络地址信息。 // peers addresses with whom we'll maintain constant connection persistentPeersAddrs []*NetAddress //一个存储持久连接节点地址的NetAddress切片,表示需要保持长时间连接的节点信息。 unconditionalPeerIDs map[ID]struct{} //一个存储无条件连接节点ID的映射表,表示不需要进行握手验证就可以连接的节点。 transport Transport //P2P消息传输协议的实现Transport接口实例,例如TCP或QUIC等协议的实现。 filterTimeout time.Duration //过滤器超时时间,即在多长时间内没有活动的连接要关闭。 peerFilters []PeerFilterFunc //一个PeerFilterFunc切片,存储了所有的PeerFilterFunc函数,用于过滤和处理不需要的消息。 rng *rand.Rand // seed for randomizing dial times and orders 一个Rand结构体实例,用于随机化拨号时间和顺序等参数。 metrics *Metrics //存储P2P性能指标的Metrics结构体实例,例如网络吞吐量、延迟、丢包率等。 mlc *metricsLabelCache //一个metricsLabelCache结构体实例,用于缓存P2P性能指标的标签信息。 } // PeerFilterFunc to be implemented by filter hooks after a new Peer has been // fully setup. type PeerFilterFunc func(IPeerSet, Peer) error
在结构体中,有一个AddrBook接口,根据描述,这个接口定义了一些常见的地址簿管理操作,用于记录和管理节点的网络地址信息。由于 Tendermint P2P 库需要维护一张节点地址簿,所以这个接口在 P2P 库中得到了广泛的应用。根据具体的实现方式,这些方法可能会被多个 Reactor 或其他模块调用,以便更好地管理节点之间的连接和消息传输等相关功能。
golang1234567891011121314// An AddrBook represents an address book from the pex package, which is used // to store peer addresses. type AddrBook interface { AddAddress(addr *NetAddress, src *NetAddress) error //添加一个节点网络地址到地址簿中,addr表示待添加的地址,src表示该地址的来源;该方法返回一个错误类型的结果。 AddPrivateIDs([]string) //添加一些私有ID到地址簿中,例如本地节点的ID等。 AddOurAddress(*NetAddress) //将一个本地节点的地址添加到地址簿中。 OurAddress(*NetAddress) bool //检查一个网络地址是否为本地节点的地址,如果是则返回true,否则返回false。 MarkGood(ID) //标记一个节点为可靠的(即“好”的)节点,以便在进行P2P连接时优先选择这些节点。 RemoveAddress(*NetAddress) //从地址簿中移除一个网络地址。 HasAddress(*NetAddress) bool //从地址簿中移除一个网络地址。 Save() //从地址簿中移除一个网络地址。 }
随后是Switch的构造函数以及相关的配置函数,重点需要关注的是AddReactor方法。其中的每个channel都是由connection进行维护的。
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126// SwitchOption sets an optional parameter on the Switch. type SwitchOption func(*Switch) // NewSwitch creates a new Switch with the given config. func NewSwitch( cfg *config.P2PConfig, transport Transport, options ...SwitchOption, ) *Switch { sw := &Switch{ config: cfg, reactors: make(map[string]Reactor), chDescs: make([]*conn.ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), msgTypeByChID: make(map[byte]proto.Message), peers: NewPeerSet(), dialing: cmap.NewCMap(), reconnecting: cmap.NewCMap(), metrics: NopMetrics(), transport: transport, filterTimeout: defaultFilterTimeout, persistentPeersAddrs: make([]*NetAddress, 0), unconditionalPeerIDs: make(map[ID]struct{}), mlc: newMetricsLabelCache(), } // Ensure we have a completely undeterministic PRNG. sw.rng = rand.NewRand() sw.BaseService = *service.NewBaseService(nil, "P2P Switch", sw) for _, option := range options { option(sw) } return sw } // SwitchFilterTimeout sets the timeout used for peer filters. func SwitchFilterTimeout(timeout time.Duration) SwitchOption { return func(sw *Switch) { sw.filterTimeout = timeout } } // SwitchPeerFilters sets the filters for rejection of new peers. func SwitchPeerFilters(filters ...PeerFilterFunc) SwitchOption { return func(sw *Switch) { sw.peerFilters = filters } } // WithMetrics sets the metrics. func WithMetrics(metrics *Metrics) SwitchOption { return func(sw *Switch) { sw.metrics = metrics } } //--------------------------------------------------------------------- // Switch setup // AddReactor adds the given reactor to the switch. // NOTE: Not goroutine safe. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { //针对reactor的每个通道信息单独处理 for _, chDesc := range reactor.GetChannels() { //首先获取通道ID并保证唯一性 chID := chDesc.ID // No two reactors can share the same channel. if sw.reactorsByCh[chID] != nil { panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor)) } //接下来注册通道信息 sw.chDescs = append(sw.chDescs, chDesc) sw.reactorsByCh[chID] = reactor sw.msgTypeByChID[chID] = chDesc.MessageType } sw.reactors[name] = reactor //最后将switch注册到reactor中,做到交叉引用 reactor.SetSwitch(sw) return reactor } // RemoveReactor removes the given Reactor from the Switch. // NOTE: Not goroutine safe. func (sw *Switch) RemoveReactor(name string, reactor Reactor) { for _, chDesc := range reactor.GetChannels() { // remove channel description for i := 0; i < len(sw.chDescs); i++ { if chDesc.ID == sw.chDescs[i].ID { sw.chDescs = append(sw.chDescs[:i], sw.chDescs[i+1:]...) break } } delete(sw.reactorsByCh, chDesc.ID) delete(sw.msgTypeByChID, chDesc.ID) } delete(sw.reactors, name) reactor.SetSwitch(nil) } // Reactors returns a map of reactors registered on the switch. // NOTE: Not goroutine safe. func (sw *Switch) Reactors() map[string]Reactor { return sw.reactors } // Reactor returns the reactor with the given name. // NOTE: Not goroutine safe. func (sw *Switch) Reactor(name string) Reactor { return sw.reactors[name] } // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes. // NOTE: Not goroutine safe. func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) { sw.nodeInfo = nodeInfo } // NodeInfo returns the switch's NodeInfo. // NOTE: Not goroutine safe. func (sw *Switch) NodeInfo() NodeInfo { return sw.nodeInfo } // SetNodeKey sets the switch's private key for authenticated encryption. // NOTE: Not goroutine safe. func (sw *Switch) SetNodeKey(nodeKey *NodeKey) { sw.nodeKey = nodeKey }
在初始化以后,就是涉及到函数整体启停的OnStart和OnStop函数。在启动过程中,会首先启动所有的 reactor,然后通过acceptRoutine持续获取节点信息。
golang1234567891011121314151617181920212223242526272829303132333435363738394041424344//--------------------------------------------------------------------- // Service start/stop // OnStart implements BaseService. It starts all the reactors and peers. func (sw *Switch) OnStart() error { // Start reactors //该函数首先会启动Switch中的所有reactor,reactor在node.go的createSwtich中被添加,有如下五个系统的reactor //sw.AddReactor("MEMPOOL", mempoolReactor) //sw.AddReactor("BLOCKCHAIN", bcReactor) //sw.AddReactor("CONSENSUS", consensusReactor) //sw.AddReactor("EVIDENCE", evidenceReactor) //sw.AddReactor("STATESYNC", stateSyncReactor) //随后在node的创建过程`newnode`中,还会添加pexreactor和CustomReactors //func CustomReactors(reactors map[string]p2p.Reactor) Option 用户自定义的reactor //sw.AddReactor("PEX", pexReactor) 利用`createPEXReactorAndAddToSwitch`函数添加 for _, reactor := range sw.reactors { err := reactor.Start() if err != nil { return fmt.Errorf("failed to start %v: %w", reactor, err) } } // Start accepting Peers. //这个函数的主要作用是接受传入连接并管理新节点的添加过程,同时根据各种情况进行错误处理。 go sw.acceptRoutine() return nil } // OnStop implements BaseService. It stops all peers and reactors. func (sw *Switch) OnStop() { // Stop peers for _, p := range sw.peers.List() { sw.stopAndRemovePeer(p, nil) } // Stop reactors sw.Logger.Debug("Switch: Stopping reactors") for _, reactor := range sw.reactors { if err := reactor.Stop(); err != nil { sw.Logger.Error("error while stopped reactor", "reactor", reactor, "error", err) } } }
acceptRoutine函数如下,在每次循环中,函数会调用sw.transport.Accept()方法来获取一个传入连接。如果成功获取到连接,则会进行一系列判断来决定是否添加该连接作为新的节点,并将其加入到 peers 集合中。具体来说,函数会先检查当前已经连接的节点数是否已经达到了最大值,如果是则忽略该连接;否则,会再检查该连接对应的节点是否已经存在,如果是则忽略该连接,否则将其加入到 peers 集合中。
golang1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889func (sw *Switch) acceptRoutine() { for { p, err := sw.transport.Accept(peerConfig{ chDescs: sw.chDescs, onPeerError: sw.StopPeerForError, reactorsByCh: sw.reactorsByCh, msgTypeByChID: sw.msgTypeByChID, metrics: sw.metrics, mlc: sw.mlc, isPersistent: sw.IsPeerPersistent, }) if err != nil { switch err := err.(type) { case ErrRejected: if err.IsSelf() { // Remove the given address from the address book and add to our addresses // to avoid dialing in the future. addr := err.Addr() sw.addrBook.RemoveAddress(&addr) sw.addrBook.AddOurAddress(&addr) } sw.Logger.Info( "Inbound Peer rejected", "err", err, "numPeers", sw.peers.Size(), ) continue case ErrFilterTimeout: sw.Logger.Error( "Peer filter timed out", "err", err, ) continue case ErrTransportClosed: sw.Logger.Error( "Stopped accept routine, as transport is closed", "numPeers", sw.peers.Size(), ) default: sw.Logger.Error( "Accept on transport errored", "err", err, "numPeers", sw.peers.Size(), ) // We could instead have a retry loop around the acceptRoutine, // but that would need to stop and let the node shutdown eventually. // So might as well panic and let process managers restart the node. // There's no point in letting the node run without the acceptRoutine, // since it won't be able to accept new connections. panic(fmt.Errorf("accept routine exited: %v", err)) } break } if !sw.IsPeerUnconditional(p.NodeInfo().ID()) { // Ignore connection if we already have enough peers. _, in, _ := sw.NumPeers() if in >= sw.config.MaxNumInboundPeers { sw.Logger.Info( "Ignoring inbound connection: already have enough inbound peers", "address", p.SocketAddr(), "have", in, "max", sw.config.MaxNumInboundPeers, ) sw.transport.Cleanup(p) continue } } if err := sw.addPeer(p); err != nil { sw.transport.Cleanup(p) if p.IsRunning() { _ = p.Stop() } sw.Logger.Info( "Ignoring inbound connection: error while adding peer", "err", err, "id", p.ID(), ) } } }
在acceptRoutine函数中,值得关注的是sw.addPeer这个函数调用,这里将传入连接最终构建为一个peer并注册在switch和reactor中。
golang12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455// addPeer starts up the Peer and adds it to the Switch. Error is returned if // the peer is filtered out or failed to start or can't be added. func (sw *Switch) addPeer(p Peer) error { if err := sw.filterPeer(p); err != nil { return err } p.SetLogger(sw.Logger.With("peer", p.SocketAddr())) // Handle the shut down case where the switch has stopped but we're // concurrently trying to add a peer. if !sw.IsRunning() { // XXX should this return an error or just log and terminate? sw.Logger.Error("Won't start a peer - switch is not running", "peer", p) return nil } // Add some data to the peer, which is required by reactors. for _, reactor := range sw.reactors { p = reactor.InitPeer(p) } // Start the peer's send/recv routines. // Must start it before adding it to the peer set // to prevent Start and Stop from being called concurrently. err := p.Start() if err != nil { // Should never happen sw.Logger.Error("Error starting peer", "err", err, "peer", p) return err } // Add the peer to PeerSet. Do this before starting the reactors // so that if Receive errors, we will find the peer and remove it. // Add should not err since we already checked peers.Has(). if err := sw.peers.Add(p); err != nil { switch err.(type) { case ErrPeerRemoval: sw.Logger.Error("Error starting peer ", " err ", "Peer has already errored and removal was attempted.", "peer", p.ID()) } return err } sw.metrics.Peers.Add(float64(1)) // Start all the reactor protocols on the peer. for _, reactor := range sw.reactors { reactor.AddPeer(p) } sw.Logger.Info("Added peer", "peer", p) return nil }
随后就是一系列关于 peer 的维护函数。整体看下来,整个 switch 的业务逻辑就是首先在 node.go 中被创建和维护,然后再 swtich 中维护节点自身的 reactor 以及监听其他 peer 的连接信息。
三、总结
- 通过对
P2P目录的整体梳理,可以掌握 tendermint 目前的基本通信过程,同时可以发现,所有的核心操作其实是通过reactor完成的。 tendermint目前每个节点的通信都是通过证书进行加密的,保证网络传输的安全可靠。