一、前言

Tendermint学习随笔(2)P2P源码阅读与分析 中,对 p2p 相关源码的调用关系和业务逻辑进行了分析,可以发现整个 p2p 通信的最小单位就是 node,故而本文对 node 进行分析。而 node 涉及到了 tendermint 系统的全生命周期所以将以 node 启动流程为切入点,对相关功能进行分析。

二、源码分析

2.1 tendermint 启动入口

与传统 go 语言项目相同,tendermint 的启动主入口在 cmd 目录中,使用cobra作为命令行的构建工具,cmd/tendermint 目录的目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
.
├── commands # 具体命令实现目录
│ ├── compact.go # experimental-compact-goleveldb, force compacts the tendermint storage engine (only GoLevelDB supported)
│ ├── debug # debug相关命令目录
│ │ ├── debug.go # ebugCmd defines the root command containing subcommands that assist in debugging running Tendermint processes.
│ │ ├── dump.go # Continuously poll a Tendermint process and dump debugging data into a single location
│ │ ├── io.go
│ │ ├── kill.go # Kill a Tendermint process while aggregating and packaging debugging data
│ │ └── util.go
│ ├── gen_node_key.go # GenNodeKeyCmd allows the generation of a node key. It prints node's ID to the standard output.
│ ├── gen_validator.go # GenValidatorCmd allows the generation of a keypair for a validator
│ ├── init.go # InitFilesCmd initialises a fresh Tendermint Core instance.
│ ├── light.go # LightCmd represents the base command when called without any subcommands
│ ├── probe_upnp.go # ProbeUpnpCmd adds capabilities to test the UPnP functionality.
│ ├── reindex_event.go # ReIndexEventCmd constructs a command to re-index events in a block height interval.
│ ├── reindex_event_test.go
│ ├── replay.go # ReplayCmd allows replaying of messages from the WAL.// ReplayConsoleCmd allows replaying of messages from the WAL in a console
│ ├── reset.go # ResetAllCmd removes the database of this Tendermint core instance.// ResetStateCmd removes the database of the specified Tendermint core instance.// ResetPrivValidatorCmd resets the private validator files.
│ ├── reset_test.go # RollbackStateCmd rollback tendermint state by one height
│ ├── rollback.go
│ ├── root.go # RootCmd is the root command for Tendermint core.
│ ├── root_test.go
│ ├── run_node.go # NewRunNodeCmd returns the command that allows the CLI to start a node. It can be used with a custom PrivValidator and in-process ABCI application.
│ ├── show_node_id.go # It can be used with a custom PrivValidator and in-process ABCI application.
│ ├── show_validator.go # ShowValidatorCmd adds capabilities for showing the validator info.
│ ├── testnet.go # TestnetFilesCmd allows initialisation of files for a Tendermint testnet.
│ └── version.go # VersionCmd ...
└── main.go # 主入口

结合cobra和对目录结构的分析,不难看出应该从main.go作为分析的入口,commands目录下是各个命令的具体实现。接下来先看该文件源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main

import (
"os"
"path/filepath"

cmd "github.com/tendermint/tendermint/cmd/tendermint/commands"
"github.com/tendermint/tendermint/cmd/tendermint/commands/debug"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/cli"
nm "github.com/tendermint/tendermint/node"
)

func main() {
rootCmd := cmd.RootCmd
rootCmd.AddCommand(
cmd.GenValidatorCmd,
cmd.InitFilesCmd,
cmd.ProbeUpnpCmd,
cmd.LightCmd,
cmd.ReIndexEventCmd,
cmd.ReplayCmd,
cmd.ReplayConsoleCmd,
cmd.ResetAllCmd,
cmd.ResetPrivValidatorCmd,
cmd.ResetStateCmd,
cmd.ShowValidatorCmd,
cmd.TestnetFilesCmd,
cmd.ShowNodeIDCmd,
cmd.GenNodeKeyCmd,
cmd.VersionCmd,
cmd.RollbackStateCmd,
cmd.CompactGoLevelDBCmd,
debug.DebugCmd,
cli.NewCompletionCmd(rootCmd, true),
)

// NOTE:
// Users wishing to:
// * Use an external signer for their validators
// * Supply an in-proc abci app
// * Supply a genesis doc file from another source
// * Provide their own DB implementation
// can copy this file and use something other than the
// DefaultNewNode function
nodeFunc := nm.DefaultNewNode

// Create & start node
rootCmd.AddCommand(cmd.NewRunNodeCmd(nodeFunc))

cmd := cli.PrepareBaseCmd(rootCmd, "TM", os.ExpandEnv(filepath.Join("$HOME", cfg.DefaultTendermintDir)))
if err := cmd.Execute(); err != nil {
panic(err)
}
}

从代码中不难看出,各种具体命令的实现都在github.com/tendermint/tendermint/cmd/tendermint/commands包中,对应前文看到的commands目录,启动 node 对应的代码入口为cmd.NewRunNodeCmd(nodeFunc),这里我们先看nodeFunc是如何创建的,对应的github.com/tendermint/tendermint/node包就是前文提到的node目录。

首先看NewRunNodeCmd函数,该函数是 tendermint 节点启动的直接入口,对应实现在run_node.go文件中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// NewRunNodeCmd returns the command that allows the CLI to start a node.
// It can be used with a custom PrivValidator and in-process ABCI application.
func NewRunNodeCmd(nodeProvider nm.Provider) *cobra.Command {
cmd := &cobra.Command{
Use: "start",
Aliases: []string{"node", "run"},
Short: "Run the tendermint node",
RunE: func(cmd *cobra.Command, args []string) error {
if err := checkGenesisHash(config); err != nil {
return err
}

n, err := nodeProvider(config, logger)
if err != nil {
return fmt.Errorf("failed to create node: %w", err)
}

if err := n.Start(); err != nil {
return fmt.Errorf("failed to start node: %w", err)
}

logger.Info("Started node", "nodeInfo", n.Switch().NodeInfo())

// Stop upon receiving SIGTERM or CTRL-C.
tmos.TrapSignal(logger, func() {
if n.IsRunning() {
if err := n.Stop(); err != nil {
logger.Error("unable to stop the node", "error", err)
}
}
})

// Run forever.
select {}
},
}

AddNodeFlags(cmd)
return cmd
}

这个函数通过传入的nodeProvider构建节点n,构建过程中传入的configlogger两个变量是在构建命令行的过程中优先构建的,创建代码在commands目录的root.go中:

1
2
3
4
var (
config = cfg.DefaultConfig()
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout))
)

随后通过调用n.Start()启动节点,Start函数是 tendermint 系统构建的统一服务类,用于实现各个服务的统一启动和停止,具体实现在/libs/sevice/service.go中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// Start implements Service by calling OnStart (if defined). An error will be
// returned if the service is already running or stopped. Not to start the
// stopped service, you need to call Reset.
func (bs *BaseService) Start() error {
if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
if atomic.LoadUint32(&bs.stopped) == 1 {
bs.Logger.Error(fmt.Sprintf("Not starting %v service -- already stopped", bs.name),
"impl", bs.impl)
// revert flag
atomic.StoreUint32(&bs.started, 0)
return ErrAlreadyStopped
}
bs.Logger.Info("service start",
"msg",
log.NewLazySprintf("Starting %v service", bs.name),
"impl",
bs.impl.String())
err := bs.impl.OnStart()
if err != nil {
// revert flag
atomic.StoreUint32(&bs.started, 0)
return err
}
return nil
}
bs.Logger.Debug("service start",
"msg",
log.NewLazySprintf("Not starting %v service -- already started", bs.name),
"impl",
bs.impl)
return ErrAlreadyStarted
}

不难看出,该实现最后调用了对应实现的OnStart()函数,Start作为BaseService的成员函数能被调用的原因,是Node结构体包含了service.BaseService结构体,这个结构体中有Service的定义impl,只要node匿名实现了该方法,就可以进行调用.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type Node struct {
service.BaseService
......
}

type BaseService struct {
Logger log.Logger
name string
started uint32 // atomic
stopped uint32 // atomic
quit chan struct{}

// The "subclass" of BaseService
impl Service
}

最后通过自定义的TrapSignal函数监听系统的终止信号.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// TrapSignal catches the SIGTERM/SIGINT and executes cb function. After that it exits
// with code 0.
func TrapSignal(logger logger, cb func()) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
for sig := range c {
logger.Info("signal trapped", "msg", log.NewLazySprintf("captured %v, exiting...", sig))
if cb != nil {
cb()
}
os.Exit(0)
}
}()
}

另一方面,再先看DefaultNewNode这个函数,这个函数是直接被作为对象传入NewRunNodeCmd函数中,而该函数的定义也在github.com/tendermint/tendermint/node这个包中,综上所属,各种调用的最后逻辑都指向了node这个包,接下来对该包进行分析.

2.2 Node 创建过程

node 目录下,只有如下四个文件,具体发挥功能的只有node.go这个文件.

1
2
3
4
5
.
├── doc.go #node构建说明
├── id.go #节点身份相关,暂未启用
├── node.go #node功能核心实现
└── node_test.go #node功能测试

顺着上一节,我们首先分析DefaultNewNode这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// DefaultNewNode returns a Tendermint node with default settings for the
// PrivValidator, ClientCreator, GenesisDoc, and DBProvider.
// It implements NodeProvider.
func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
if err != nil {
return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err)
}

return NewNode(config, //配置文件,由run_node.go传入
privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()), //获取全部验证节点私钥
nodeKey, //节点私钥,由run_node.go传入
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),//构建默认程序代理
DefaultGenesisDocProviderFunc(config),//读取创世块信息
DefaultDBProvider,//构建数据库代理
DefaultMetricsProvider(config.Instrumentation),//构建监控代理
logger,//日志统计句柄,由run_node.go传入
)
}

//---------------------------------------------------------
// privval/file.go
// LoadOrGenFilePV loads a FilePV from the given filePaths
// or else generates a new one and saves it to the filePaths.
func LoadOrGenFilePV(keyFilePath, stateFilePath string) *FilePV {
var pv *FilePV
if tmos.FileExists(keyFilePath) {
pv = LoadFilePV(keyFilePath, stateFilePath)
} else {
pv = GenFilePV(keyFilePath, stateFilePath)
pv.Save()
}
return pv
}

//---------------------------------------------------------
// proxy/client.go
// DefaultClientCreator returns a default ClientCreator, which will create a
// local client if addr is one of: 'counter', 'counter_serial', 'kvstore',
// 'persistent_kvstore' or 'noop', otherwise - a remote client.
func DefaultClientCreator(addr, transport, dbDir string) ClientCreator {
switch addr {
case "counter":
return NewLocalClientCreator(counter.NewApplication(false))
case "counter_serial":
return NewLocalClientCreator(counter.NewApplication(true))
case "kvstore":
return NewLocalClientCreator(kvstore.NewApplication())
case "persistent_kvstore":
return NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(dbDir))
case "e2e":
app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir))
if err != nil {
panic(err)
}
return NewLocalClientCreator(app)
case "noop":
return NewLocalClientCreator(types.NewBaseApplication())
default:
mustConnect := false // loop retrying
return NewRemoteClientCreator(addr, transport, mustConnect)
}
}

//---------------------------------------------------------
// node/node.go
// GenesisDocProvider returns a GenesisDoc.
// It allows the GenesisDoc to be pulled from sources other than the
// filesystem, for instance from a distributed key-value store cluster.
type GenesisDocProvider func() (*types.GenesisDoc, error)

// DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads
// the GenesisDoc from the config.GenesisFile() on the filesystem.
func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider {
return func() (*types.GenesisDoc, error) {
return types.GenesisDocFromFile(config.GenesisFile())
}
}

//---------------------------------------------------------
// node/node.go
// DBProvider takes a DBContext and returns an instantiated DB.
type DBProvider func(*DBContext) (dbm.DB, error)
// DefaultDBProvider returns a database using the DBBackend and DBDir
// specified in the ctx.Config.
func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
dbType := dbm.BackendType(ctx.Config.DBBackend)
return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir())
}

//---------------------------------------------------------
// node/node.go
// MetricsProvider returns a consensus, p2p and mempool Metrics.
type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics)
// DefaultMetricsProvider returns Metrics build using Prometheus client library
// if Prometheus is enabled. Otherwise, it returns no-op Metrics.
func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
if config.Prometheus {
return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID),
p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID),
mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID),
sm.PrometheusMetrics(config.Namespace, "chain_id", chainID)
}
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
}
}

该函数首先通过p2p.LoadOrGenNodeKey函数获取节点密钥,然后调用NewNode函数构建节点,传入的各个函数的构建方法如上所示,都是普通的构建逻辑.在分析NewNode函数前,先看一下Node的结构体定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Node is the highest level interface to a full Tendermint node.
// It includes all configuration information and running services.
type Node struct {
service.BaseService //服务类,涉及tendermint所有组件的统一管理

// config 配置文件
config *cfg.Config
genesisDoc *types.GenesisDoc // initial validator set
privValidator types.PrivValidator // local node's validator key

// network 网络信息&节点信息
transport *p2p.MultiplexTransport
sw *p2p.Switch // p2p connections
addrBook pex.AddrBook // known peers
nodeInfo p2p.NodeInfo
nodeKey *p2p.NodeKey // our node privkey
isListening bool

// services 各类服务和reactor相关配置
eventBus *types.EventBus // pub/sub for services

stateStore sm.Store
blockStore *store.BlockStore // store the blockchain to disk

bcReactor p2p.Reactor // for fast-syncing
mempoolReactor p2p.Reactor // for gossipping transactions
mempool mempl.Mempool
stateSync bool // whether the node should state sync on startup
stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots
stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node
stateSyncGenesis sm.State // provides the genesis state for state sync
consensusState *cs.State // latest consensus state
consensusReactor *cs.Reactor // for participating in the consensus
pexReactor *pex.Reactor // for exchanging peer addresses
evidencePool *evidence.Pool // tracking evidence

proxyApp proxy.AppConns // connection to the application
rpcListeners []net.Listener // rpc servers
txIndexer txindex.TxIndexer
blockIndexer indexer.BlockIndexer
indexerService *txindex.IndexerService

prometheusSrv *http.Server
}

Node的结构体定义中,保存了所有节点的网络信息,同时保存了各类存储句柄,reactor 句柄,通信相关服务句柄,监控相关句柄,具体的构建都在NewNode中进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
// NewNode returns a new, ready to go, Tendermint Node.
func NewNode(config *cfg.Config,
privValidator types.PrivValidator,
nodeKey *p2p.NodeKey,
clientCreator proxy.ClientCreator,
genesisDocProvider GenesisDocProvider,
dbProvider DBProvider,
metricsProvider MetricsProvider,
logger log.Logger,
options ...Option,
) (*Node, error) {
blockStore, stateDB, err := initDBs(config, dbProvider)
if err != nil {
return nil, err
}

stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: config.Storage.DiscardABCIResponses,
})

state, genDoc, err := LoadStateFromDBOrGenesisDocProvider(stateDB, genesisDocProvider)
if err != nil {
return nil, err
}

// Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query).
proxyApp, err := createAndStartProxyAppConns(clientCreator, logger)
if err != nil {
return nil, err
}

// EventBus and IndexerService must be started before the handshake because
// we might need to index the txs of the replayed block as this might not have happened
// when the node stopped last time (i.e. the node stopped after it saved the block
// but before it indexed the txs, or, endblocker panicked)
eventBus, err := createAndStartEventBus(logger)
if err != nil {
return nil, err
}

indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config,
genDoc.ChainID, dbProvider, eventBus, logger)
if err != nil {
return nil, err
}

// If an address is provided, listen on the socket for a connection from an
// external signing process.
if config.PrivValidatorListenAddr != "" {
// FIXME: we should start services inside OnStart
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, genDoc.ChainID, logger)
if err != nil {
return nil, fmt.Errorf("error with private validator socket client: %w", err)
}
}

pubKey, err := privValidator.GetPubKey()
if err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
}

// Determine whether we should attempt state sync.
stateSync := config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey)
if stateSync && state.LastBlockHeight > 0 {
logger.Info("Found local state with non-zero height, skipping state sync")
stateSync = false
}

// Create the handshaker, which calls RequestInfo, sets the AppVersion on the state,
// and replays any blocks as necessary to sync tendermint with the app.
consensusLogger := logger.With("module", "consensus")
if !stateSync {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
return nil, err
}

// Reload the state. It will have the Version.Consensus.App set by the
// Handshake, and may have other modifications as well (ie. depending on
// what happened during block replay).
state, err = stateStore.Load()
if err != nil {
return nil, fmt.Errorf("cannot load state: %w", err)
}
}

// Determine whether we should do fast sync. This must happen after the handshake, since the
// app may modify the validator set, specifying ourself as the only validator.
fastSync := config.FastSyncMode && !onlyValidatorIsUs(state, pubKey)

logNodeStartupInfo(state, pubKey, logger, consensusLogger)

csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)

// Make MempoolReactor
mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger)

// Make Evidence Reactor
evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger)
if err != nil {
return nil, err
}

// make block executor for consensus and blockchain reactors to execute blocks
blockExec := sm.NewBlockExecutor(
stateStore,
logger.With("module", "state"),
proxyApp.Consensus(),
mempool,
evidencePool,
sm.BlockExecutorWithMetrics(smMetrics),
)

// Make BlockchainReactor. Don't start fast sync if we're doing a state sync first.
bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, fastSync && !stateSync, logger)
if err != nil {
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
}

// Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
if stateSync {
csMetrics.StateSyncing.Set(1)
} else if fastSync {
csMetrics.FastSyncing.Set(1)
}
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger,
)

// Set up state sync reactor, and schedule a sync if requested.
// FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy,
// we should clean this whole thing up. See:
// https://github.com/tendermint/tendermint/issues/4644
stateSyncReactor := statesync.NewReactor(
*config.StateSync,
proxyApp.Snapshot(),
proxyApp.Query(),
config.StateSync.TempDir,
)
stateSyncReactor.SetLogger(logger.With("module", "statesync"))

nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state)
if err != nil {
return nil, err
}

// Setup Transport.
transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp)

// Setup Switch.
p2pLogger := logger.With("module", "p2p")
sw := createSwitch(
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
stateSyncReactor, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger,
)

err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err)
}

err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
if err != nil {
return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err)
}

addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey)
if err != nil {
return nil, fmt.Errorf("could not create addrbook: %w", err)
}

// Optionally, start the pex reactor
//
// TODO:
//
// We need to set Seeds and PersistentPeers on the switch,
// since it needs to be able to use these (and their DNS names)
// even if the PEX is off. We can include the DNS name in the NetAddress,
// but it would still be nice to have a clear list of the current "PersistentPeers"
// somewhere that we can return with net_info.
//
// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
// Note we currently use the addrBook regardless at least for AddOurAddress
var pexReactor *pex.Reactor
if config.P2P.PexReactor {
pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger)
}

if config.RPC.PprofListenAddress != "" {
go func() {
logger.Info("Starting pprof server", "laddr", config.RPC.PprofListenAddress)
//nolint:gosec,nolintlint // G114: Use of net/http serve function that has no support for setting timeouts
logger.Error("pprof server error", "err", http.ListenAndServe(config.RPC.PprofListenAddress, nil))
}()
}

node := &Node{
config: config,
genesisDoc: genDoc,
privValidator: privValidator,

transport: transport,
sw: sw,
addrBook: addrBook,
nodeInfo: nodeInfo,
nodeKey: nodeKey,

stateStore: stateStore,
blockStore: blockStore,
bcReactor: bcReactor,
mempoolReactor: mempoolReactor,
mempool: mempool,
consensusState: consensusState,
consensusReactor: consensusReactor,
stateSyncReactor: stateSyncReactor,
stateSync: stateSync,
stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state
pexReactor: pexReactor,
evidencePool: evidencePool,
proxyApp: proxyApp,
txIndexer: txIndexer,
indexerService: indexerService,
blockIndexer: blockIndexer,
eventBus: eventBus,
}
node.BaseService = *service.NewBaseService(logger, "Node", node)

for _, option := range options {
option(node)
}

return node, nil
}

NewNode函数首先通过initDBs初始化区块存储和状态数据库,默认的存储后端为goleveldb.然后再通过sm.NewStore封装并初始化当前的状态存储,最后通过LoadStateFromDBOrGenesisDocProvider函数从创世文件和数据库中获取当前状态,具体的状态存储设计,放在后续章节中分析.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
var blockStoreDB dbm.DB
blockStoreDB, err = dbProvider(&DBContext{"blockstore", config})
if err != nil {
return
}
blockStore = store.NewBlockStore(blockStoreDB)

stateDB, err = dbProvider(&DBContext{"state", config})
if err != nil {
return
}

return
}

//--------------------------------------
//state/store.go
// dbStore wraps a db (github.com/tendermint/tm-db)
type dbStore struct {
db dbm.DB

StoreOptions
}

type StoreOptions struct {

// DiscardABCIResponses determines whether or not the store
// retains all ABCIResponses. If DiscardABCiResponses is enabled,
// the store will maintain only the response object from the latest
// height.
DiscardABCIResponses bool
}

var _ Store = (*dbStore)(nil)

// NewStore creates the dbStore of the state pkg.
func NewStore(db dbm.DB, options StoreOptions) Store {
return dbStore{db, options}
}

//------------------------------------------------------------------------------

var genesisDocKey = []byte("genesisDoc")

// LoadStateFromDBOrGenesisDocProvider attempts to load the state from the
// database, or creates one using the given genesisDocProvider. On success this also
// returns the genesis doc loaded through the given provider.
func LoadStateFromDBOrGenesisDocProvider(
stateDB dbm.DB,
genesisDocProvider GenesisDocProvider,
) (sm.State, *types.GenesisDoc, error) {
// Get genesis doc
genDoc, err := loadGenesisDoc(stateDB)
if err != nil {
genDoc, err = genesisDocProvider()
if err != nil {
return sm.State{}, nil, err
}
// save genesis doc to prevent a certain class of user errors (e.g. when it
// was changed, accidentally or not). Also good for audit trail.
if err := saveGenesisDoc(stateDB, genDoc); err != nil {
return sm.State{}, nil, err
}
}
stateStore := sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: false,
})
state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc)
if err != nil {
return sm.State{}, nil, err
}
return state, genDoc, nil
}

// panics if failed to unmarshal bytes
func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) {
b, err := db.Get(genesisDocKey)
if err != nil {
panic(err)
}
if len(b) == 0 {
return nil, errors.New("genesis doc not found")
}
var genDoc *types.GenesisDoc
err = tmjson.Unmarshal(b, &genDoc)
if err != nil {
panic(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, b))
}
return genDoc, nil
}

// panics if failed to marshal the given genesis document
func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) error {
b, err := tmjson.Marshal(genDoc)
if err != nil {
return fmt.Errorf("failed to save genesis doc due to marshaling error: %w", err)
}
if err := db.SetSync(genesisDocKey, b); err != nil {
return err
}

return nil
}

在构建完存储和数据库后,NewNode函数将创建用于通信的代理服务.其中createAndStartProxyAppConns创建 ABCI 通信,createAndStartEventBuscreateAndStartIndexerService用以监听事件和构建事务与区块索引.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) {
proxyApp := proxy.NewAppConns(clientCreator)
proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil {
return nil, fmt.Errorf("error starting proxy app connections: %v", err)
}
return proxyApp, nil
}

func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
if err := eventBus.Start(); err != nil {
return nil, err
}
return eventBus, nil
}

func createAndStartIndexerService(
config *cfg.Config,
chainID string,
dbProvider DBProvider,
eventBus *types.EventBus,
logger log.Logger,
) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) {
var (
txIndexer txindex.TxIndexer
blockIndexer indexer.BlockIndexer
)

switch config.TxIndex.Indexer {
case "kv":
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, nil, nil, err
}

txIndexer = kv.NewTxIndex(store)
blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events")))

case "psql":
if config.TxIndex.PsqlConn == "" {
return nil, nil, nil, errors.New(`no psql-conn is set for the "psql" indexer`)
}
es, err := psql.NewEventSink(config.TxIndex.PsqlConn, chainID)
if err != nil {
return nil, nil, nil, fmt.Errorf("creating psql indexer: %w", err)
}
txIndexer = es.TxIndexer()
blockIndexer = es.BlockIndexer()

default:
txIndexer = &null.TxIndex{}
blockIndexer = &blockidxnull.BlockerIndexer{}
}

indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false)
indexerService.SetLogger(logger.With("module", "txindex"))

if err := indexerService.Start(); err != nil {
return nil, nil, nil, err
}

return indexerService, txIndexer, blockIndexer, nil
}

接下来构建验证者,默认是将自己作为验证者,但如果配置文件中设置了外部验证者的地址,则会尝试通过createAndStartPrivValidatorSocketClient函数进行连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func createAndStartPrivValidatorSocketClient(
listenAddr,
chainID string,
logger log.Logger,
) (types.PrivValidator, error) {
pve, err := privval.NewSignerListener(listenAddr, logger)
if err != nil {
return nil, fmt.Errorf("failed to start private validator: %w", err)
}

pvsc, err := privval.NewSignerClient(pve, chainID)
if err != nil {
return nil, fmt.Errorf("failed to start private validator: %w", err)
}

// try to get a pubkey from private validate first time
_, err = pvsc.GetPubKey()
if err != nil {
return nil, fmt.Errorf("can't get pubkey: %w", err)
}

const (
retries = 50 // 50 * 100ms = 5s total
timeout = 100 * time.Millisecond
)
pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout)

return pvscWithRetries, nil
}

在验证者构建完成并获得pubKey以后,则将根据配置文件信息,检查是否进行状态同步,当本地存在状态数据时,将直接通过doHandshake构建本地数据,其核心是调用Handshake函数,该函数会对所有区块数据进行重放.具体的重放流程将会在后续分析共识模块的过程中进行分析.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func doHandshake(
stateStore sm.Store,
state sm.State,
blockStore sm.BlockStore,
genDoc *types.GenesisDoc,
eventBus types.BlockEventPublisher,
proxyApp proxy.AppConns,
consensusLogger log.Logger,
) error {
handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc)
handshaker.SetLogger(consensusLogger)
handshaker.SetEventBus(eventBus)
if err := handshaker.Handshake(proxyApp); err != nil {
return fmt.Errorf("error during handshake: %v", err)
}
return nil
}

//--------------------------------------
//consensus/replay.go
// TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {

// Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Query().InfoSync(proxy.RequestInfo)
if err != nil {
return fmt.Errorf("error calling Info: %v", err)
}

blockHeight := res.LastBlockHeight
if blockHeight < 0 {
return fmt.Errorf("got a negative last block height (%d) from the app", blockHeight)
}
appHash := res.LastBlockAppHash

h.logger.Info("ABCI Handshake App Info",
"height", blockHeight,
"hash", appHash,
"software-version", res.Version,
"protocol-version", res.AppVersion,
)

// Only set the version if there is no existing state.
if h.initialState.LastBlockHeight == 0 {
h.initialState.Version.Consensus.App = res.AppVersion
}

// Replay blocks up to the latest in the blockstore.
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
if err != nil {
return fmt.Errorf("error on replay: %v", err)
}

h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced",
"appHeight", blockHeight, "appHash", appHash)

// TODO: (on restart) replay mempool

return nil
}

在状态数据准备完毕后,检测是否启用快速同步,然后将当前节点启动状态信息输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) {
// Log the version info.
logger.Info("Version info",
"tendermint_version", version.TMCoreSemVer,
"block", version.BlockProtocol,
"p2p", version.P2PProtocol,
)

// If the state and software differ in block version, at least log it.
if state.Version.Consensus.Block != version.BlockProtocol {
logger.Info("Software and state have different block protocols",
"software", version.BlockProtocol,
"state", state.Version.Consensus.Block,
)
}

addr := pubKey.Address()
// Log whether this node is a validator or an observer
if state.Validators.HasAddress(addr) {
consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey)
} else {
consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey)
}
}

输出信息后,就进入正式的 reactor 等各类功能模块的构建环节,这个环节中,首先建立各个组件的监控工具,然后通过createMempoolAndMempoolReactor,createEvidenceReactor,createBlockchainReactor,createConsensusReactor,statesync.NewReactor构建 tendermint 核心的几个 reactor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
//检查配置文件,构建指定版本mempool
func createMempoolAndMempoolReactor(
config *cfg.Config,
proxyApp proxy.AppConns,
state sm.State,
memplMetrics *mempl.Metrics,
logger log.Logger,
) (mempl.Mempool, p2p.Reactor) {
switch config.Mempool.Version {
case cfg.MempoolV1:
mp := mempoolv1.NewTxMempool(
logger,
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv1.WithMetrics(memplMetrics),
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)

reactor := mempoolv1.NewReactor(
config.Mempool,
mp,
)
if config.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}

return mp, reactor

case cfg.MempoolV0:
mp := mempoolv0.NewCListMempool(
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(memplMetrics),
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)),
)

mp.SetLogger(logger)

reactor := mempoolv0.NewReactor(
config.Mempool,
mp,
)
if config.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}

return mp, reactor

default:
return nil, nil
}
}

func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider,
stateDB dbm.DB, blockStore *store.BlockStore, logger log.Logger,
) (*evidence.Reactor, *evidence.Pool, error) {
evidenceDB, err := dbProvider(&DBContext{"evidence", config})
if err != nil {
return nil, nil, err
}
evidenceLogger := logger.With("module", "evidence")
evidencePool, err := evidence.NewPool(evidenceDB, sm.NewStore(stateDB, sm.StoreOptions{
DiscardABCIResponses: config.Storage.DiscardABCIResponses,
}), blockStore)
if err != nil {
return nil, nil, err
}
evidenceReactor := evidence.NewReactor(evidencePool)
evidenceReactor.SetLogger(evidenceLogger)
return evidenceReactor, evidencePool, nil
}

func createBlockchainReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore *store.BlockStore,
fastSync bool,
logger log.Logger,
) (bcReactor p2p.Reactor, err error) {
switch config.FastSync.Version {
case "v0":
bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
case "v1":
bcReactor = bcv1.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
case "v2":
bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
default:
return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}

bcReactor.SetLogger(logger.With("module", "blockchain"))
return bcReactor, nil
}

func createConsensusReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mempool mempl.Mempool,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *cs.Metrics,
waitSync bool,
eventBus *types.EventBus,
consensusLogger log.Logger,
) (*cs.Reactor, *cs.State) {
consensusState := cs.NewState(
config.Consensus,
state.Copy(),
blockExec,
blockStore,
mempool,
evidencePool,
cs.StateMetrics(csMetrics),
)
consensusState.SetLogger(consensusLogger)
if privValidator != nil {
consensusState.SetPrivValidator(privValidator)
}
consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics))
consensusReactor.SetLogger(consensusLogger)
// services which will be publishing and/or subscribing for messages (events)
// consensusReactor will set it on consensusState and blockExecutor
consensusReactor.SetEventBus(eventBus)
return consensusReactor, consensusState
}

//statesync/reactor.go
// NewReactor creates a new state sync reactor.
func NewReactor(
cfg config.StateSyncConfig,
conn proxy.AppConnSnapshot,
connQuery proxy.AppConnQuery,
tempDir string,
) *Reactor {

r := &Reactor{
cfg: cfg,
conn: conn,
connQuery: connQuery,
}
r.BaseReactor = *p2p.NewBaseReactor("StateSync", r)

return r
}

在构建各个相关组件之后,将会构建节点通信功能,对应上一篇文章提到的 p2p 功能中的transport,switch等通信组件并通过AddPersistentPeers,AddUnconditionalPeerIDs添加对等节点和利用createAddrBookAndSetOnSwitch构件地址簿:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
func createTransport(
config *cfg.Config,
nodeInfo p2p.NodeInfo,
nodeKey *p2p.NodeKey,
proxyApp proxy.AppConns,
) (
*p2p.MultiplexTransport,
[]p2p.PeerFilterFunc,
) {
var (
mConnConfig = p2p.MConnConfig(config.P2P)
transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig)
connFilters = []p2p.ConnFilterFunc{}
peerFilters = []p2p.PeerFilterFunc{}
)

if !config.P2P.AllowDuplicateIP {
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter())
}

// Filter peers by addr or pubkey with an ABCI query.
// If the query return code is OK, add peer.
if config.FilterPeers {
connFilters = append(
connFilters,
// ABCI query for address filtering.
func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("error querying abci app: %v", res)
}

return nil
},
)

peerFilters = append(
peerFilters,
// ABCI query for ID filtering.
func(_ p2p.IPeerSet, p p2p.Peer) error {
res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("error querying abci app: %v", res)
}

return nil
},
)
}

p2p.MultiplexTransportConnFilters(connFilters...)(transport)

// Limit the number of incoming connections.
max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " "))
p2p.MultiplexTransportMaxIncomingConnections(max)(transport)

return transport, peerFilters
}

//上一篇文章提到过,switch会在`AddReactor`过程中,跟reactor互相注册方便进行调用
func createSwitch(config *cfg.Config,
transport p2p.Transport,
p2pMetrics *p2p.Metrics,
peerFilters []p2p.PeerFilterFunc,
mempoolReactor p2p.Reactor,
bcReactor p2p.Reactor,
stateSyncReactor *statesync.Reactor,
consensusReactor *cs.Reactor,
evidenceReactor *evidence.Reactor,
nodeInfo p2p.NodeInfo,
nodeKey *p2p.NodeKey,
p2pLogger log.Logger,
) *p2p.Switch {
sw := p2p.NewSwitch(
config.P2P,
transport,
p2p.WithMetrics(p2pMetrics),
p2p.SwitchPeerFilters(peerFilters...),
)
sw.SetLogger(p2pLogger)
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
sw.AddReactor("STATESYNC", stateSyncReactor)

sw.SetNodeInfo(nodeInfo)
sw.SetNodeKey(nodeKey)

p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
return sw
}

func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch,
p2pLogger log.Logger, nodeKey *p2p.NodeKey,
) (pex.AddrBook, error) {
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))

// Add ourselves to addrbook to prevent dialing ourselves
if config.P2P.ExternalAddress != "" {
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ExternalAddress))
if err != nil {
return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err)
}
addrBook.AddOurAddress(addr)
}
if config.P2P.ListenAddress != "" {
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ListenAddress))
if err != nil {
return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err)
}
addrBook.AddOurAddress(addr)
}

sw.SetAddrBook(addrBook)

return addrBook, nil
}/

在通信功能构建完成以后,会再根据配置文件调用createPEXReactorAndAddToSwitch函数创建一个特殊的pexReactor,这个reactor会接管节点之间的连接和信息交换..

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config,
sw *p2p.Switch, logger log.Logger,
) *pex.Reactor {
// TODO persistent peers ? so we can have their DNS addrs saved
pexReactor := pex.NewReactor(addrBook,
&pex.ReactorConfig{
Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
SeedMode: config.P2P.SeedMode,
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
// blocks assuming 10s blocks ~ 28 hours.
// TODO (melekes): make it dynamic based on the actual block latencies
// from the live network.
// https://github.com/tendermint/tendermint/issues/3523
SeedDisconnectWaitPeriod: 28 * time.Hour,
PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod,
})
pexReactor.SetLogger(logger.With("module", "pex"))
sw.AddReactor("PEX", pexReactor)
return pexReactor
}

至此,node的构建流程就基本结束,NewNode函数后续代码是构建node对象等操作,这里就不再详细赘述,总结下来,创建节点主要分为以下几个过程:

  • 初始化区块存储与状态存储
  • 初始化通信服务和索引服务
  • 初始化验证者角色
  • 恢复存储状态
  • 构建mempool,Evidence,Blockchain,Consensus,statesync等核心功能
  • 建立节点节通信并构建pexReactor
  • 构建节点

2.3 Node 启动过程

在 tendermint 系统中,核心服务都是通过OnStart,OnStop两个函数进行统一的启动和停止管理,我们这里首先看启动过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// OnStart starts the Node. It implements service.Service.
func (n *Node) OnStart() error {
now := tmtime.Now()
genTime := n.genesisDoc.GenesisTime
if genTime.After(now) {
n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime)
time.Sleep(genTime.Sub(now))
}

// Add private IDs to addrbook to block those peers being added
n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))

// Start the RPC server before the P2P server
// so we can eg. receive txs for the first block
if n.config.RPC.ListenAddress != "" {
listeners, err := n.startRPC()
if err != nil {
return err
}
n.rpcListeners = listeners
}

if n.config.Instrumentation.Prometheus &&
n.config.Instrumentation.PrometheusListenAddr != "" {
n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
}

// Start the transport.
addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress))
if err != nil {
return err
}
if err := n.transport.Listen(*addr); err != nil {
return err
}

n.isListening = true

// Start the switch (the P2P server).
err = n.sw.Start()
if err != nil {
return err
}

// Always connect to persistent peers
err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " "))
if err != nil {
return fmt.Errorf("could not dial peers from persistent_peers field: %w", err)
}

// Run state sync
if n.stateSync {
bcR, ok := n.bcReactor.(fastSyncReactor)
if !ok {
return fmt.Errorf("this blockchain reactor does not support switching from state sync")
}
err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider,
n.config.StateSync, n.config.FastSyncMode, n.stateStore, n.blockStore, n.stateSyncGenesis)
if err != nil {
return fmt.Errorf("failed to start state sync: %w", err)
}
}

return nil
}

这个启动过程中,首先同步时间,然后通过startRPC函数启动 RPC 通信:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// ConfigureRPC makes sure RPC has all the objects it needs to operate.
func (n *Node) ConfigureRPC() error {
pubKey, err := n.privValidator.GetPubKey()
if err != nil {
return fmt.Errorf("can't get pubkey: %w", err)
}
rpccore.SetEnvironment(&rpccore.Environment{
ProxyAppQuery: n.proxyApp.Query(),
ProxyAppMempool: n.proxyApp.Mempool(),

StateStore: n.stateStore,
BlockStore: n.blockStore,
EvidencePool: n.evidencePool,
ConsensusState: n.consensusState,
P2PPeers: n.sw,
P2PTransport: n,

PubKey: pubKey,
GenDoc: n.genesisDoc,
TxIndexer: n.txIndexer,
BlockIndexer: n.blockIndexer,
ConsensusReactor: n.consensusReactor,
EventBus: n.eventBus,
Mempool: n.mempool,

Logger: n.Logger.With("module", "rpc"),

Config: *n.config.RPC,
})
if err := rpccore.InitGenesisChunks(); err != nil {
return err
}

return nil
}

func (n *Node) startRPC() ([]net.Listener, error) {
err := n.ConfigureRPC()
if err != nil {
return nil, err
}

listenAddrs := splitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ")

if n.config.RPC.Unsafe {
rpccore.AddUnsafeRoutes()
}

config := rpcserver.DefaultConfig()
config.MaxBodyBytes = n.config.RPC.MaxBodyBytes
config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
config.MaxOpenConnections = n.config.RPC.MaxOpenConnections
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}

// we may expose the rpc over both a unix and tcp socket
listeners := make([]net.Listener, len(listenAddrs))
for i, listenAddr := range listenAddrs {
mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server")
wmLogger := rpcLogger.With("protocol", "websocket")
wm := rpcserver.NewWebsocketManager(rpccore.Routes,
rpcserver.OnDisconnect(func(remoteAddr string) {
err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil && err != tmpubsub.ErrSubscriptionNotFound {
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
}
}),
rpcserver.ReadLimit(config.MaxBodyBytes),
rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize),
)
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
listener, err := rpcserver.Listen(
listenAddr,
config,
)
if err != nil {
return nil, err
}

var rootHandler http.Handler = mux
if n.config.RPC.IsCorsEnabled() {
corsMiddleware := cors.New(cors.Options{
AllowedOrigins: n.config.RPC.CORSAllowedOrigins,
AllowedMethods: n.config.RPC.CORSAllowedMethods,
AllowedHeaders: n.config.RPC.CORSAllowedHeaders,
})
rootHandler = corsMiddleware.Handler(mux)
}
if n.config.RPC.IsTLSEnabled() {
go func() {
if err := rpcserver.ServeTLS(
listener,
rootHandler,
n.config.RPC.CertFile(),
n.config.RPC.KeyFile(),
rpcLogger,
config,
); err != nil {
n.Logger.Error("Error serving server with TLS", "err", err)
}
}()
} else {
go func() {
if err := rpcserver.Serve(
listener,
rootHandler,
rpcLogger,
config,
); err != nil {
n.Logger.Error("Error serving server", "err", err)
}
}()
}

listeners[i] = listener
}

// we expose a simplified api over grpc for convenience to app devs
grpcListenAddr := n.config.RPC.GRPCListenAddress
if grpcListenAddr != "" {
config := rpcserver.DefaultConfig()
config.MaxBodyBytes = n.config.RPC.MaxBodyBytes
config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes
// NOTE: GRPCMaxOpenConnections is used, not MaxOpenConnections
config.MaxOpenConnections = n.config.RPC.GRPCMaxOpenConnections
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit {
config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}
listener, err := rpcserver.Listen(grpcListenAddr, config)
if err != nil {
return nil, err
}
go func() {
if err := grpccore.StartGRPCServer(listener); err != nil {
n.Logger.Error("Error starting gRPC server", "err", err)
}
}()
listeners = append(listeners, listener)

}

return listeners, nil
}

随后通过startPrometheusServer函数启动监控服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// startPrometheusServer starts a Prometheus HTTP server, listening for metrics
// collectors on addr.
func (n *Node) startPrometheusServer(addr string) *http.Server {
srv := &http.Server{
Addr: addr,
Handler: promhttp.InstrumentMetricHandler(
prometheus.DefaultRegisterer, promhttp.HandlerFor(
prometheus.DefaultGatherer,
promhttp.HandlerOpts{MaxRequestsInFlight: n.config.Instrumentation.MaxOpenConnections},
),
),
ReadHeaderTimeout: readHeaderTimeout,
}
go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
// Error starting or closing listener:
n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err)
}
}()
return srv
}

最后启动 p2p 通信并设置节点信息同步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// startStateSync starts an asynchronous state sync process, then switches to fast sync mode.
func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reactor,
stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, fastSync bool,
stateStore sm.Store, blockStore *store.BlockStore, state sm.State,
) error {
ssR.Logger.Info("Starting state sync")

if stateProvider == nil {
var err error
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stateProvider, err = statesync.NewLightClientStateProvider(
ctx,
state.ChainID, state.Version, state.InitialHeight,
config.RPCServers, light.TrustOptions{
Period: config.TrustPeriod,
Height: config.TrustHeight,
Hash: config.TrustHashBytes(),
}, ssR.Logger.With("module", "light"))
if err != nil {
return fmt.Errorf("failed to set up light client state provider: %w", err)
}
}

go func() {
state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime)
if err != nil {
ssR.Logger.Error("State sync failed", "err", err)
return
}
err = stateStore.Bootstrap(state)
if err != nil {
ssR.Logger.Error("Failed to bootstrap node with new state", "err", err)
return
}
err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit)
if err != nil {
ssR.Logger.Error("Failed to store last seen commit", "err", err)
return
}

if fastSync {
// FIXME Very ugly to have these metrics bleed through here.
conR.Metrics.StateSyncing.Set(0)
conR.Metrics.FastSyncing.Set(1)
err = bcR.SwitchToFastSync(state)
if err != nil {
ssR.Logger.Error("Failed to switch to fast sync", "err", err)
return
}
} else {
conR.SwitchToConsensus(state, true)
}
}()
return nil
}

在整个启动过程中,最核心的是swtich的启动,该启动同样服务了基本服务的Start函数并最终调用了swtichOnStart`函数.

2.4 Node 停止过程

停止的核心函数是OnStop,没有过多需要分析的地方,就是一个个 reactor 和通信服务的停止.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// OnStop stops the Node. It implements service.Service.
func (n *Node) OnStop() {
n.BaseService.OnStop()

n.Logger.Info("Stopping Node")

// first stop the non-reactor services
if err := n.eventBus.Stop(); err != nil {
n.Logger.Error("Error closing eventBus", "err", err)
}
if err := n.indexerService.Stop(); err != nil {
n.Logger.Error("Error closing indexerService", "err", err)
}

// now stop the reactors
if err := n.sw.Stop(); err != nil {
n.Logger.Error("Error closing switch", "err", err)
}

if err := n.transport.Close(); err != nil {
n.Logger.Error("Error closing transport", "err", err)
}

n.isListening = false

// finally stop the listeners / external services
for _, l := range n.rpcListeners {
n.Logger.Info("Closing rpc listener", "listener", l)
if err := l.Close(); err != nil {
n.Logger.Error("Error closing listener", "listener", l, "err", err)
}
}

if pvsc, ok := n.privValidator.(service.Service); ok {
if err := pvsc.Stop(); err != nil {
n.Logger.Error("Error closing private validator", "err", err)
}
}

if n.prometheusSrv != nil {
if err := n.prometheusSrv.Shutdown(context.Background()); err != nil {
// Error from closing listeners, or context timeout:
n.Logger.Error("Prometheus HTTP server Shutdown", "err", err)
}
}
if n.blockStore != nil {
if err := n.blockStore.Close(); err != nil {
n.Logger.Error("problem closing blockstore", "err", err)
}
}
if n.stateStore != nil {
if err := n.stateStore.Close(); err != nil {
n.Logger.Error("problem closing statestore", "err", err)
}
}
}

三、总结

  • 结合上一篇文章对 P2P 源码的分析,不难看出 Tendermint 的节点中,各个 Reactor 启动最后均是由 Switch 的启动引发的,每个 Reactor 启动时同时也会将自己子模块的各个功能启动。相应的 reactor 停止功能也是由 Switch 的停止引发的.