前言
Tendermint 学习随笔(2)P2P 源码阅读与分析 中,对 `p2p` 相关源码的调用关系和业务逻辑进行了分析,可以发现整个 `p2p` 通信的最小单位就是 `node`,故而本文对 `node` 进行分析。而 `node` 涉及到了 `tendermint` 系统的全生命周期所以将以 `node` 启动流程为切入点,对相关功能进行分析。一、源码分析
1.1 tendermint 启动入口
与传统 go
语言项目相同,tendermint
的启动主入口在 cmd
目录中,使用 cobra
作为命令行的构建工具,cmd/tendermint
目录的目录结构如下:
bash12345678910111213141516171819202122232425262728. ├── commands # 具体命令实现目录 │ ├── compact.go # experimental-compact-goleveldb, force compacts the tendermint storage engine (only GoLevelDB supported) │ ├── debug # debug相关命令目录 │ │ ├── debug.go # ebugCmd defines the root command containing subcommands that assist in debugging running Tendermint processes. │ │ ├── dump.go # Continuously poll a Tendermint process and dump debugging data into a single location │ │ ├── io.go │ │ ├── kill.go # Kill a Tendermint process while aggregating and packaging debugging data │ │ └── util.go │ ├── gen_node_key.go # GenNodeKeyCmd allows the generation of a node key. It prints node's ID to the standard output. │ ├── gen_validator.go # GenValidatorCmd allows the generation of a keypair for a validator │ ├── init.go # InitFilesCmd initialises a fresh Tendermint Core instance. │ ├── light.go # LightCmd represents the base command when called without any subcommands │ ├── probe_upnp.go # ProbeUpnpCmd adds capabilities to test the UPnP functionality. │ ├── reindex_event.go # ReIndexEventCmd constructs a command to re-index events in a block height interval. │ ├── reindex_event_test.go │ ├── replay.go # ReplayCmd allows replaying of messages from the WAL.// ReplayConsoleCmd allows replaying of messages from the WAL in a console │ ├── reset.go # ResetAllCmd removes the database of this Tendermint core instance.// ResetStateCmd removes the database of the specified Tendermint core instance.// ResetPrivValidatorCmd resets the private validator files. │ ├── reset_test.go # RollbackStateCmd rollback tendermint state by one height │ ├── rollback.go │ ├── root.go # RootCmd is the root command for Tendermint core. │ ├── root_test.go │ ├── run_node.go # NewRunNodeCmd returns the command that allows the CLI to start a node. It can be used with a custom PrivValidator and in-process ABCI application. │ ├── show_node_id.go # It can be used with a custom PrivValidator and in-process ABCI application. │ ├── show_validator.go # ShowValidatorCmd adds capabilities for showing the validator info. │ ├── testnet.go # TestnetFilesCmd allows initialisation of files for a Tendermint testnet. │ └── version.go # VersionCmd ... └── main.go # 主入口
结合cobra
和对目录结构的分析,不难看出应该从main.go
作为分析的入口,commands
目录下是各个命令的具体实现。接下来先看该文件源码:
GOLANG1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556package main import ( "os" "path/filepath" cmd "github.com/tendermint/tendermint/cmd/tendermint/commands" "github.com/tendermint/tendermint/cmd/tendermint/commands/debug" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/cli" nm "github.com/tendermint/tendermint/node" ) func main() { rootCmd := cmd.RootCmd rootCmd.AddCommand( cmd.GenValidatorCmd, cmd.InitFilesCmd, cmd.ProbeUpnpCmd, cmd.LightCmd, cmd.ReIndexEventCmd, cmd.ReplayCmd, cmd.ReplayConsoleCmd, cmd.ResetAllCmd, cmd.ResetPrivValidatorCmd, cmd.ResetStateCmd, cmd.ShowValidatorCmd, cmd.TestnetFilesCmd, cmd.ShowNodeIDCmd, cmd.GenNodeKeyCmd, cmd.VersionCmd, cmd.RollbackStateCmd, cmd.CompactGoLevelDBCmd, debug.DebugCmd, cli.NewCompletionCmd(rootCmd, true), ) // NOTE: // Users wishing to: // * Use an external signer for their validators // * Supply an in-proc abci app // * Supply a genesis doc file from another source // * Provide their own DB implementation // can copy this file and use something other than the // DefaultNewNode function nodeFunc := nm.DefaultNewNode // Create & start node rootCmd.AddCommand(cmd.NewRunNodeCmd(nodeFunc)) cmd := cli.PrepareBaseCmd(rootCmd, "TM", os.ExpandEnv(filepath.Join("$HOME", cfg.DefaultTendermintDir))) if err := cmd.Execute(); err != nil { panic(err) } }
从代码中不难看出,各种具体命令的实现都在github.com/tendermint/tendermint/cmd/tendermint/commands
包中,对应前文看到的commands
目录,启动 node 对应的代码入口为cmd.NewRunNodeCmd(nodeFunc)
,这里我们先看nodeFunc
是如何创建的,对应的github.com/tendermint/tendermint/node
包就是前文提到的node
目录。
首先看NewRunNodeCmd
函数,该函数是 tendermint 节点启动的直接入口,对应实现在run_node.go
文件中:
golang12345678910111213141516171819202122232425262728293031323334353637383940// NewRunNodeCmd returns the command that allows the CLI to start a node. // It can be used with a custom PrivValidator and in-process ABCI application. func NewRunNodeCmd(nodeProvider nm.Provider) *cobra.Command { cmd := &cobra.Command{ Use: "start", Aliases: []string{"node", "run"}, Short: "Run the tendermint node", RunE: func(cmd *cobra.Command, args []string) error { if err := checkGenesisHash(config); err != nil { return err } n, err := nodeProvider(config, logger) if err != nil { return fmt.Errorf("failed to create node: %w", err) } if err := n.Start(); err != nil { return fmt.Errorf("failed to start node: %w", err) } logger.Info("Started node", "nodeInfo", n.Switch().NodeInfo()) // Stop upon receiving SIGTERM or CTRL-C. tmos.TrapSignal(logger, func() { if n.IsRunning() { if err := n.Stop(); err != nil { logger.Error("unable to stop the node", "error", err) } } }) // Run forever. select {} }, } AddNodeFlags(cmd) return cmd }
这个函数通过传入的nodeProvider
构建节点n
,构建过程中传入的config
和logger
两个变量是在构建命令行的过程中优先构建的,创建代码在commands
目录的root.go
中:
golang1234var ( config = cfg.DefaultConfig() logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)) )
随后通过调用n.Start()
启动节点,Start
函数是 tendermint 系统构建的统一服务类,用于实现各个服务的统一启动和停止,具体实现在/libs/sevice/service.go
中:
golang1234567891011121314151617181920212223242526272829303132// Start implements Service by calling OnStart (if defined). An error will be // returned if the service is already running or stopped. Not to start the // stopped service, you need to call Reset. func (bs *BaseService) Start() error { if atomic.CompareAndSwapUint32(&bs.started, 0, 1) { if atomic.LoadUint32(&bs.stopped) == 1 { bs.Logger.Error(fmt.Sprintf("Not starting %v service -- already stopped", bs.name), "impl", bs.impl) // revert flag atomic.StoreUint32(&bs.started, 0) return ErrAlreadyStopped } bs.Logger.Info("service start", "msg", log.NewLazySprintf("Starting %v service", bs.name), "impl", bs.impl.String()) err := bs.impl.OnStart() if err != nil { // revert flag atomic.StoreUint32(&bs.started, 0) return err } return nil } bs.Logger.Debug("service start", "msg", log.NewLazySprintf("Not starting %v service -- already started", bs.name), "impl", bs.impl) return ErrAlreadyStarted }
不难看出,该实现最后调用了对应实现的OnStart()
函数,Start
作为BaseService
的成员函数能被调用的原因,是Node
结构体包含了service.BaseService
结构体,这个结构体中有Service
的定义impl
,只要node
匿名实现了该方法,就可以进行调用。
golang12345678910111213141516type Node struct { service.BaseService ...... } type BaseService struct { Logger log.Logger name string started uint32 // atomic stopped uint32 // atomic quit chan struct{} // The "subclass" of BaseService impl Service }
最后通过自定义的TrapSignal
函数监听系统的终止信号。
golang123456789101112131415// TrapSignal catches the SIGTERM/SIGINT and executes cb function. After that it exits // with code 0. func TrapSignal(logger logger, cb func()) { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { for sig := range c { logger.Info("signal trapped", "msg", log.NewLazySprintf("captured %v, exiting...", sig)) if cb != nil { cb() } os.Exit(0) } }() }
另一方面,再先看DefaultNewNode
这个函数,这个函数是直接被作为对象传入NewRunNodeCmd
函数中,而该函数的定义也在github.com/tendermint/tendermint/node
这个包中,综上所属,各种调用的最后逻辑都指向了node
这个包,接下来对该包进行分析。
1.2 Node 创建过程
node 目录下,只有如下四个文件,具体发挥功能的只有node.go
这个文件。
bash12345. ├── doc.go #node构建说明 ├── id.go #节点身份相关,暂未启用 ├── node.go #node功能核心实现 └── node_test.go #node功能测试
顺着上一节,我们首先分析DefaultNewNode
这个函数:
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107// DefaultNewNode returns a Tendermint node with default settings for the // PrivValidator, ClientCreator, GenesisDoc, and DBProvider. // It implements NodeProvider. func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) { nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile()) if err != nil { return nil, fmt.Errorf("failed to load or gen node key %s: %w", config.NodeKeyFile(), err) } return NewNode(config, //配置文件,由run_node.go传入 privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()), //获取全部验证节点私钥 nodeKey, //节点私钥,由run_node.go传入 proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),//构建默认程序代理 DefaultGenesisDocProviderFunc(config),//读取创世块信息 DefaultDBProvider,//构建数据库代理 DefaultMetricsProvider(config.Instrumentation),//构建监控代理 logger,//日志统计句柄,由run_node.go传入 ) } //--------------------------------------------------------- // privval/file.go // LoadOrGenFilePV loads a FilePV from the given filePaths // or else generates a new one and saves it to the filePaths. func LoadOrGenFilePV(keyFilePath, stateFilePath string) *FilePV { var pv *FilePV if tmos.FileExists(keyFilePath) { pv = LoadFilePV(keyFilePath, stateFilePath) } else { pv = GenFilePV(keyFilePath, stateFilePath) pv.Save() } return pv } //--------------------------------------------------------- // proxy/client.go // DefaultClientCreator returns a default ClientCreator, which will create a // local client if addr is one of: 'counter', 'counter_serial', 'kvstore', // 'persistent_kvstore' or 'noop', otherwise - a remote client. func DefaultClientCreator(addr, transport, dbDir string) ClientCreator { switch addr { case "counter": return NewLocalClientCreator(counter.NewApplication(false)) case "counter_serial": return NewLocalClientCreator(counter.NewApplication(true)) case "kvstore": return NewLocalClientCreator(kvstore.NewApplication()) case "persistent_kvstore": return NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(dbDir)) case "e2e": app, err := e2e.NewApplication(e2e.DefaultConfig(dbDir)) if err != nil { panic(err) } return NewLocalClientCreator(app) case "noop": return NewLocalClientCreator(types.NewBaseApplication()) default: mustConnect := false // loop retrying return NewRemoteClientCreator(addr, transport, mustConnect) } } //--------------------------------------------------------- // node/node.go // GenesisDocProvider returns a GenesisDoc. // It allows the GenesisDoc to be pulled from sources other than the // filesystem, for instance from a distributed key-value store cluster. type GenesisDocProvider func() (*types.GenesisDoc, error) // DefaultGenesisDocProviderFunc returns a GenesisDocProvider that loads // the GenesisDoc from the config.GenesisFile() on the filesystem. func DefaultGenesisDocProviderFunc(config *cfg.Config) GenesisDocProvider { return func() (*types.GenesisDoc, error) { return types.GenesisDocFromFile(config.GenesisFile()) } } //--------------------------------------------------------- // node/node.go // DBProvider takes a DBContext and returns an instantiated DB. type DBProvider func(*DBContext) (dbm.DB, error) // DefaultDBProvider returns a database using the DBBackend and DBDir // specified in the ctx.Config. func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) { dbType := dbm.BackendType(ctx.Config.DBBackend) return dbm.NewDB(ctx.ID, dbType, ctx.Config.DBDir()) } //--------------------------------------------------------- // node/node.go // MetricsProvider returns a consensus, p2p and mempool Metrics. type MetricsProvider func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) // DefaultMetricsProvider returns Metrics build using Prometheus client library // if Prometheus is enabled. Otherwise, it returns no-op Metrics. func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider { return func(chainID string) (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) { if config.Prometheus { return cs.PrometheusMetrics(config.Namespace, "chain_id", chainID), p2p.PrometheusMetrics(config.Namespace, "chain_id", chainID), mempl.PrometheusMetrics(config.Namespace, "chain_id", chainID), sm.PrometheusMetrics(config.Namespace, "chain_id", chainID) } return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics() } }
该函数首先通过p2p.LoadOrGenNodeKey
函数获取节点密钥,然后调用NewNode
函数构建节点,传入的各个函数的构建方法如上所示,都是普通的构建逻辑。在分析NewNode
函数前,先看一下Node
的结构体定义:
golang1234567891011121314151617181920212223242526272829303132333435363738394041424344// Node is the highest level interface to a full Tendermint node. // It includes all configuration information and running services. type Node struct { service.BaseService //服务类,涉及tendermint所有组件的统一管理 // config 配置文件 config *cfg.Config genesisDoc *types.GenesisDoc // initial validator set privValidator types.PrivValidator // local node's validator key // network 网络信息&节点信息 transport *p2p.MultiplexTransport sw *p2p.Switch // p2p connections addrBook pex.AddrBook // known peers nodeInfo p2p.NodeInfo nodeKey *p2p.NodeKey // our node privkey isListening bool // services 各类服务和reactor相关配置 eventBus *types.EventBus // pub/sub for services stateStore sm.Store blockStore *store.BlockStore // store the blockchain to disk bcReactor p2p.Reactor // for fast-syncing mempoolReactor p2p.Reactor // for gossipping transactions mempool mempl.Mempool stateSync bool // whether the node should state sync on startup stateSyncReactor *statesync.Reactor // for hosting and restoring state sync snapshots stateSyncProvider statesync.StateProvider // provides state data for bootstrapping a node stateSyncGenesis sm.State // provides the genesis state for state sync consensusState *cs.State // latest consensus state consensusReactor *cs.Reactor // for participating in the consensus pexReactor *pex.Reactor // for exchanging peer addresses evidencePool *evidence.Pool // tracking evidence proxyApp proxy.AppConns // connection to the application rpcListeners []net.Listener // rpc servers txIndexer txindex.TxIndexer blockIndexer indexer.BlockIndexer indexerService *txindex.IndexerService prometheusSrv *http.Server }
在Node
的结构体定义中,保存了所有节点的网络信息,同时保存了各类存储句柄,reactor 句柄,通信相关服务句柄,监控相关句柄,具体的构建都在NewNode
中进行:
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234// NewNode returns a new, ready to go, Tendermint Node. func NewNode(config *cfg.Config, privValidator types.PrivValidator, nodeKey *p2p.NodeKey, clientCreator proxy.ClientCreator, genesisDocProvider GenesisDocProvider, dbProvider DBProvider, metricsProvider MetricsProvider, logger log.Logger, options ...Option, ) (*Node, error) { blockStore, stateDB, err := initDBs(config, dbProvider) if err != nil { return nil, err } stateStore := sm.NewStore(stateDB, sm.StoreOptions{ DiscardABCIResponses: config.Storage.DiscardABCIResponses, }) state, genDoc, err := LoadStateFromDBOrGenesisDocProvider(stateDB, genesisDocProvider) if err != nil { return nil, err } // Create the proxyApp and establish connections to the ABCI app (consensus, mempool, query). proxyApp, err := createAndStartProxyAppConns(clientCreator, logger) if err != nil { return nil, err } // EventBus and IndexerService must be started before the handshake because // we might need to index the txs of the replayed block as this might not have happened // when the node stopped last time (i.e. the node stopped after it saved the block // but before it indexed the txs, or, endblocker panicked) eventBus, err := createAndStartEventBus(logger) if err != nil { return nil, err } indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config, genDoc.ChainID, dbProvider, eventBus, logger) if err != nil { return nil, err } // If an address is provided, listen on the socket for a connection from an // external signing process. if config.PrivValidatorListenAddr != "" { // FIXME: we should start services inside OnStart privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, genDoc.ChainID, logger) if err != nil { return nil, fmt.Errorf("error with private validator socket client: %w", err) } } pubKey, err := privValidator.GetPubKey() if err != nil { return nil, fmt.Errorf("can't get pubkey: %w", err) } // Determine whether we should attempt state sync. stateSync := config.StateSync.Enable && !onlyValidatorIsUs(state, pubKey) if stateSync && state.LastBlockHeight > 0 { logger.Info("Found local state with non-zero height, skipping state sync") stateSync = false } // Create the handshaker, which calls RequestInfo, sets the AppVersion on the state, // and replays any blocks as necessary to sync tendermint with the app. consensusLogger := logger.With("module", "consensus") if !stateSync { if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil { return nil, err } // Reload the state. It will have the Version.Consensus.App set by the // Handshake, and may have other modifications as well (ie. depending on // what happened during block replay). state, err = stateStore.Load() if err != nil { return nil, fmt.Errorf("cannot load state: %w", err) } } // Determine whether we should do fast sync. This must happen after the handshake, since the // app may modify the validator set, specifying ourself as the only validator. fastSync := config.FastSyncMode && !onlyValidatorIsUs(state, pubKey) logNodeStartupInfo(state, pubKey, logger, consensusLogger) csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) // Make MempoolReactor mempool, mempoolReactor := createMempoolAndMempoolReactor(config, proxyApp, state, memplMetrics, logger) // Make Evidence Reactor evidenceReactor, evidencePool, err := createEvidenceReactor(config, dbProvider, stateDB, blockStore, logger) if err != nil { return nil, err } // make block executor for consensus and blockchain reactors to execute blocks blockExec := sm.NewBlockExecutor( stateStore, logger.With("module", "state"), proxyApp.Consensus(), mempool, evidencePool, sm.BlockExecutorWithMetrics(smMetrics), ) // Make BlockchainReactor. Don't start fast sync if we're doing a state sync first. bcReactor, err := createBlockchainReactor(config, state, blockExec, blockStore, fastSync && !stateSync, logger) if err != nil { return nil, fmt.Errorf("could not create blockchain reactor: %w", err) } // Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first. // FIXME We need to update metrics here, since other reactors don't have access to them. if stateSync { csMetrics.StateSyncing.Set(1) } else if fastSync { csMetrics.FastSyncing.Set(1) } consensusReactor, consensusState := createConsensusReactor( config, state, blockExec, blockStore, mempool, evidencePool, privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, ) // Set up state sync reactor, and schedule a sync if requested. // FIXME The way we do phased startups (e.g. replay -> fast sync -> consensus) is very messy, // we should clean this whole thing up. See: // https://github.com/tendermint/tendermint/issues/4644 stateSyncReactor := statesync.NewReactor( *config.StateSync, proxyApp.Snapshot(), proxyApp.Query(), config.StateSync.TempDir, ) stateSyncReactor.SetLogger(logger.With("module", "statesync")) nodeInfo, err := makeNodeInfo(config, nodeKey, txIndexer, genDoc, state) if err != nil { return nil, err } // Setup Transport. transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp) // Setup Switch. p2pLogger := logger.With("module", "p2p") sw := createSwitch( config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor, stateSyncReactor, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, ) err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) if err != nil { return nil, fmt.Errorf("could not add peers from persistent_peers field: %w", err) } err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) if err != nil { return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) } addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) if err != nil { return nil, fmt.Errorf("could not create addrbook: %w", err) } // Optionally, start the pex reactor // // TODO: // // We need to set Seeds and PersistentPeers on the switch, // since it needs to be able to use these (and their DNS names) // even if the PEX is off. We can include the DNS name in the NetAddress, // but it would still be nice to have a clear list of the current "PersistentPeers" // somewhere that we can return with net_info. // // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it. // Note we currently use the addrBook regardless at least for AddOurAddress var pexReactor *pex.Reactor if config.P2P.PexReactor { pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) } if config.RPC.PprofListenAddress != "" { go func() { logger.Info("Starting pprof server", "laddr", config.RPC.PprofListenAddress) //nolint:gosec,nolintlint // G114: Use of net/http serve function that has no support for setting timeouts logger.Error("pprof server error", "err", http.ListenAndServe(config.RPC.PprofListenAddress, nil)) }() } node := &Node{ config: config, genesisDoc: genDoc, privValidator: privValidator, transport: transport, sw: sw, addrBook: addrBook, nodeInfo: nodeInfo, nodeKey: nodeKey, stateStore: stateStore, blockStore: blockStore, bcReactor: bcReactor, mempoolReactor: mempoolReactor, mempool: mempool, consensusState: consensusState, consensusReactor: consensusReactor, stateSyncReactor: stateSyncReactor, stateSync: stateSync, stateSyncGenesis: state, // Shouldn't be necessary, but need a way to pass the genesis state pexReactor: pexReactor, evidencePool: evidencePool, proxyApp: proxyApp, txIndexer: txIndexer, indexerService: indexerService, blockIndexer: blockIndexer, eventBus: eventBus, } node.BaseService = *service.NewBaseService(logger, "Node", node) for _, option := range options { option(node) } return node, nil }
NewNode
函数首先通过initDBs
初始化区块存储和状态数据库,默认的存储后端为goleveldb
.然后再通过sm.NewStore
封装并初始化当前的状态存储,最后通过LoadStateFromDBOrGenesisDocProvider
函数从创世文件和数据库中获取当前状态,具体的状态存储设计,放在后续章节中分析。
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) { var blockStoreDB dbm.DB blockStoreDB, err = dbProvider(&DBContext{"blockstore", config}) if err != nil { return } blockStore = store.NewBlockStore(blockStoreDB) stateDB, err = dbProvider(&DBContext{"state", config}) if err != nil { return } return } //-------------------------------------- //state/store.go // dbStore wraps a db (github.com/tendermint/tm-db) type dbStore struct { db dbm.DB StoreOptions } type StoreOptions struct { // DiscardABCIResponses determines whether or not the store // retains all ABCIResponses. If DiscardABCiResponses is enabled, // the store will maintain only the response object from the latest // height. DiscardABCIResponses bool } var _ Store = (*dbStore)(nil) // NewStore creates the dbStore of the state pkg. func NewStore(db dbm.DB, options StoreOptions) Store { return dbStore{db, options} } //------------------------------------------------------------------------------ var genesisDocKey = []byte("genesisDoc") // LoadStateFromDBOrGenesisDocProvider attempts to load the state from the // database, or creates one using the given genesisDocProvider. On success this also // returns the genesis doc loaded through the given provider. func LoadStateFromDBOrGenesisDocProvider( stateDB dbm.DB, genesisDocProvider GenesisDocProvider, ) (sm.State, *types.GenesisDoc, error) { // Get genesis doc genDoc, err := loadGenesisDoc(stateDB) if err != nil { genDoc, err = genesisDocProvider() if err != nil { return sm.State{}, nil, err } // save genesis doc to prevent a certain class of user errors (e.g. when it // was changed, accidentally or not). Also good for audit trail. if err := saveGenesisDoc(stateDB, genDoc); err != nil { return sm.State{}, nil, err } } stateStore := sm.NewStore(stateDB, sm.StoreOptions{ DiscardABCIResponses: false, }) state, err := stateStore.LoadFromDBOrGenesisDoc(genDoc) if err != nil { return sm.State{}, nil, err } return state, genDoc, nil } // panics if failed to unmarshal bytes func loadGenesisDoc(db dbm.DB) (*types.GenesisDoc, error) { b, err := db.Get(genesisDocKey) if err != nil { panic(err) } if len(b) == 0 { return nil, errors.New("genesis doc not found") } var genDoc *types.GenesisDoc err = tmjson.Unmarshal(b, &genDoc) if err != nil { panic(fmt.Sprintf("Failed to load genesis doc due to unmarshaling error: %v (bytes: %X)", err, b)) } return genDoc, nil } // panics if failed to marshal the given genesis document func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) error { b, err := tmjson.Marshal(genDoc) if err != nil { return fmt.Errorf("failed to save genesis doc due to marshaling error: %w", err) } if err := db.SetSync(genesisDocKey, b); err != nil { return err } return nil }
在构建完存储和数据库后,NewNode
函数将创建用于通信的代理服务。其中createAndStartProxyAppConns
创建 ABCI 通信,createAndStartEventBus
和createAndStartIndexerService
用以监听事件和构建事务与区块索引。
golang1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465func createAndStartProxyAppConns(clientCreator proxy.ClientCreator, logger log.Logger) (proxy.AppConns, error) { proxyApp := proxy.NewAppConns(clientCreator) proxyApp.SetLogger(logger.With("module", "proxy")) if err := proxyApp.Start(); err != nil { return nil, fmt.Errorf("error starting proxy app connections: %v", err) } return proxyApp, nil } func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) { eventBus := types.NewEventBus() eventBus.SetLogger(logger.With("module", "events")) if err := eventBus.Start(); err != nil { return nil, err } return eventBus, nil } func createAndStartIndexerService( config *cfg.Config, chainID string, dbProvider DBProvider, eventBus *types.EventBus, logger log.Logger, ) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) { var ( txIndexer txindex.TxIndexer blockIndexer indexer.BlockIndexer ) switch config.TxIndex.Indexer { case "kv": store, err := dbProvider(&DBContext{"tx_index", config}) if err != nil { return nil, nil, nil, err } txIndexer = kv.NewTxIndex(store) blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events"))) case "psql": if config.TxIndex.PsqlConn == "" { return nil, nil, nil, errors.New(`no psql-conn is set for the "psql" indexer`) } es, err := psql.NewEventSink(config.TxIndex.PsqlConn, chainID) if err != nil { return nil, nil, nil, fmt.Errorf("creating psql indexer: %w", err) } txIndexer = es.TxIndexer() blockIndexer = es.BlockIndexer() default: txIndexer = &null.TxIndex{} blockIndexer = &blockidxnull.BlockerIndexer{} } indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus, false) indexerService.SetLogger(logger.With("module", "txindex")) if err := indexerService.Start(); err != nil { return nil, nil, nil, err } return indexerService, txIndexer, blockIndexer, nil }
接下来构建验证者,默认是将自己作为验证者,但如果配置文件中设置了外部验证者的地址,则会尝试通过createAndStartPrivValidatorSocketClient
函数进行连接:
golang1234567891011121314151617181920212223242526272829func createAndStartPrivValidatorSocketClient( listenAddr, chainID string, logger log.Logger, ) (types.PrivValidator, error) { pve, err := privval.NewSignerListener(listenAddr, logger) if err != nil { return nil, fmt.Errorf("failed to start private validator: %w", err) } pvsc, err := privval.NewSignerClient(pve, chainID) if err != nil { return nil, fmt.Errorf("failed to start private validator: %w", err) } // try to get a pubkey from private validate first time _, err = pvsc.GetPubKey() if err != nil { return nil, fmt.Errorf("can't get pubkey: %w", err) } const ( retries = 50 // 50 * 100ms = 5s total timeout = 100 * time.Millisecond ) pvscWithRetries := privval.NewRetrySignerClient(pvsc, retries, timeout) return pvscWithRetries, nil }
在验证者构建完成并获得pubKey
以后,则将根据配置文件信息,检查是否进行状态同步,当本地存在状态数据时,将直接通过doHandshake
构建本地数据,其核心是调用Handshake
函数,该函数会对所有区块数据进行重放。具体的重放流程将会在后续分析共识模块的过程中进行分析。
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960func doHandshake( stateStore sm.Store, state sm.State, blockStore sm.BlockStore, genDoc *types.GenesisDoc, eventBus types.BlockEventPublisher, proxyApp proxy.AppConns, consensusLogger log.Logger, ) error { handshaker := cs.NewHandshaker(stateStore, state, blockStore, genDoc) handshaker.SetLogger(consensusLogger) handshaker.SetEventBus(eventBus) if err := handshaker.Handshake(proxyApp); err != nil { return fmt.Errorf("error during handshake: %v", err) } return nil } //-------------------------------------- //consensus/replay.go // TODO: retry the handshake/replay if it fails ? func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { // Handshake is done via ABCI Info on the query conn. res, err := proxyApp.Query().InfoSync(proxy.RequestInfo) if err != nil { return fmt.Errorf("error calling Info: %v", err) } blockHeight := res.LastBlockHeight if blockHeight < 0 { return fmt.Errorf("got a negative last block height (%d) from the app", blockHeight) } appHash := res.LastBlockAppHash h.logger.Info("ABCI Handshake App Info", "height", blockHeight, "hash", appHash, "software-version", res.Version, "protocol-version", res.AppVersion, ) // Only set the version if there is no existing state. if h.initialState.LastBlockHeight == 0 { h.initialState.Version.Consensus.App = res.AppVersion } // Replay blocks up to the latest in the blockstore. _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) if err != nil { return fmt.Errorf("error on replay: %v", err) } h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced", "appHeight", blockHeight, "appHash", appHash) // TODO: (on restart) replay mempool return nil }
在状态数据准备完毕后,检测是否启用快速同步,然后将当前节点启动状态信息输出:
golang123456789101112131415161718192021222324func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) { // Log the version info. logger.Info("Version info", "tendermint_version", version.TMCoreSemVer, "block", version.BlockProtocol, "p2p", version.P2PProtocol, ) // If the state and software differ in block version, at least log it. if state.Version.Consensus.Block != version.BlockProtocol { logger.Info("Software and state have different block protocols", "software", version.BlockProtocol, "state", state.Version.Consensus.Block, ) } addr := pubKey.Address() // Log whether this node is a validator or an observer if state.Validators.HasAddress(addr) { consensusLogger.Info("This node is a validator", "addr", addr, "pubKey", pubKey) } else { consensusLogger.Info("This node is not a validator", "addr", addr, "pubKey", pubKey) } }
输出信息后,就进入正式的 reactor 等各类功能模块的构建环节,这个环节中,首先建立各个组件的监控工具,然后通过createMempoolAndMempoolReactor
,createEvidenceReactor
,createBlockchainReactor
,createConsensusReactor
,statesync.NewReactor
构建 tendermint 核心的几个 reactor
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149//检查配置文件,构建指定版本mempool func createMempoolAndMempoolReactor( config *cfg.Config, proxyApp proxy.AppConns, state sm.State, memplMetrics *mempl.Metrics, logger log.Logger, ) (mempl.Mempool, p2p.Reactor) { switch config.Mempool.Version { case cfg.MempoolV1: mp := mempoolv1.NewTxMempool( logger, config.Mempool, proxyApp.Mempool(), state.LastBlockHeight, mempoolv1.WithMetrics(memplMetrics), mempoolv1.WithPreCheck(sm.TxPreCheck(state)), mempoolv1.WithPostCheck(sm.TxPostCheck(state)), ) reactor := mempoolv1.NewReactor( config.Mempool, mp, ) if config.Consensus.WaitForTxs() { mp.EnableTxsAvailable() } return mp, reactor case cfg.MempoolV0: mp := mempoolv0.NewCListMempool( config.Mempool, proxyApp.Mempool(), state.LastBlockHeight, mempoolv0.WithMetrics(memplMetrics), mempoolv0.WithPreCheck(sm.TxPreCheck(state)), mempoolv0.WithPostCheck(sm.TxPostCheck(state)), ) mp.SetLogger(logger) reactor := mempoolv0.NewReactor( config.Mempool, mp, ) if config.Consensus.WaitForTxs() { mp.EnableTxsAvailable() } return mp, reactor default: return nil, nil } } func createEvidenceReactor(config *cfg.Config, dbProvider DBProvider, stateDB dbm.DB, blockStore *store.BlockStore, logger log.Logger, ) (*evidence.Reactor, *evidence.Pool, error) { evidenceDB, err := dbProvider(&DBContext{"evidence", config}) if err != nil { return nil, nil, err } evidenceLogger := logger.With("module", "evidence") evidencePool, err := evidence.NewPool(evidenceDB, sm.NewStore(stateDB, sm.StoreOptions{ DiscardABCIResponses: config.Storage.DiscardABCIResponses, }), blockStore) if err != nil { return nil, nil, err } evidenceReactor := evidence.NewReactor(evidencePool) evidenceReactor.SetLogger(evidenceLogger) return evidenceReactor, evidencePool, nil } func createBlockchainReactor(config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore *store.BlockStore, fastSync bool, logger log.Logger, ) (bcReactor p2p.Reactor, err error) { switch config.FastSync.Version { case "v0": bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) case "v1": bcReactor = bcv1.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) case "v2": bcReactor = bcv2.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) default: return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version) } bcReactor.SetLogger(logger.With("module", "blockchain")) return bcReactor, nil } func createConsensusReactor(config *cfg.Config, state sm.State, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, mempool mempl.Mempool, evidencePool *evidence.Pool, privValidator types.PrivValidator, csMetrics *cs.Metrics, waitSync bool, eventBus *types.EventBus, consensusLogger log.Logger, ) (*cs.Reactor, *cs.State) { consensusState := cs.NewState( config.Consensus, state.Copy(), blockExec, blockStore, mempool, evidencePool, cs.StateMetrics(csMetrics), ) consensusState.SetLogger(consensusLogger) if privValidator != nil { consensusState.SetPrivValidator(privValidator) } consensusReactor := cs.NewReactor(consensusState, waitSync, cs.ReactorMetrics(csMetrics)) consensusReactor.SetLogger(consensusLogger) // services which will be publishing and/or subscribing for messages (events) // consensusReactor will set it on consensusState and blockExecutor consensusReactor.SetEventBus(eventBus) return consensusReactor, consensusState } //statesync/reactor.go // NewReactor creates a new state sync reactor. func NewReactor( cfg config.StateSyncConfig, conn proxy.AppConnSnapshot, connQuery proxy.AppConnQuery, tempDir string, ) *Reactor { r := &Reactor{ cfg: cfg, conn: conn, connQuery: connQuery, } r.BaseReactor = *p2p.NewBaseReactor("StateSync", r) return r }
在构建各个相关组件之后,将会构建节点通信功能,对应上一篇文章提到的 p2p 功能中的transport
,switch
等通信组件并通过AddPersistentPeers
,AddUnconditionalPeerIDs
添加对等节点和利用createAddrBookAndSetOnSwitch
构件地址簿:
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129func createTransport( config *cfg.Config, nodeInfo p2p.NodeInfo, nodeKey *p2p.NodeKey, proxyApp proxy.AppConns, ) ( *p2p.MultiplexTransport, []p2p.PeerFilterFunc, ) { var ( mConnConfig = p2p.MConnConfig(config.P2P) transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey, mConnConfig) connFilters = []p2p.ConnFilterFunc{} peerFilters = []p2p.PeerFilterFunc{} ) if !config.P2P.AllowDuplicateIP { connFilters = append(connFilters, p2p.ConnDuplicateIPFilter()) } // Filter peers by addr or pubkey with an ABCI query. // If the query return code is OK, add peer. if config.FilterPeers { connFilters = append( connFilters, // ABCI query for address filtering. func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error { res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()), }) if err != nil { return err } if res.IsErr() { return fmt.Errorf("error querying abci app: %v", res) } return nil }, ) peerFilters = append( peerFilters, // ABCI query for ID filtering. func(_ p2p.IPeerSet, p p2p.Peer) error { res, err := proxyApp.Query().QuerySync(abci.RequestQuery{ Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()), }) if err != nil { return err } if res.IsErr() { return fmt.Errorf("error querying abci app: %v", res) } return nil }, ) } p2p.MultiplexTransportConnFilters(connFilters...)(transport) // Limit the number of incoming connections. max := config.P2P.MaxNumInboundPeers + len(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) p2p.MultiplexTransportMaxIncomingConnections(max)(transport) return transport, peerFilters } //上一篇文章提到过,switch会在`AddReactor`过程中,跟reactor互相注册方便进行调用 func createSwitch(config *cfg.Config, transport p2p.Transport, p2pMetrics *p2p.Metrics, peerFilters []p2p.PeerFilterFunc, mempoolReactor p2p.Reactor, bcReactor p2p.Reactor, stateSyncReactor *statesync.Reactor, consensusReactor *cs.Reactor, evidenceReactor *evidence.Reactor, nodeInfo p2p.NodeInfo, nodeKey *p2p.NodeKey, p2pLogger log.Logger, ) *p2p.Switch { sw := p2p.NewSwitch( config.P2P, transport, p2p.WithMetrics(p2pMetrics), p2p.SwitchPeerFilters(peerFilters...), ) sw.SetLogger(p2pLogger) sw.AddReactor("MEMPOOL", mempoolReactor) sw.AddReactor("BLOCKCHAIN", bcReactor) sw.AddReactor("CONSENSUS", consensusReactor) sw.AddReactor("EVIDENCE", evidenceReactor) sw.AddReactor("STATESYNC", stateSyncReactor) sw.SetNodeInfo(nodeInfo) sw.SetNodeKey(nodeKey) p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile()) return sw } func createAddrBookAndSetOnSwitch(config *cfg.Config, sw *p2p.Switch, p2pLogger log.Logger, nodeKey *p2p.NodeKey, ) (pex.AddrBook, error) { addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict) addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile())) // Add ourselves to addrbook to prevent dialing ourselves if config.P2P.ExternalAddress != "" { addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ExternalAddress)) if err != nil { return nil, fmt.Errorf("p2p.external_address is incorrect: %w", err) } addrBook.AddOurAddress(addr) } if config.P2P.ListenAddress != "" { addr, err := p2p.NewNetAddressString(p2p.IDAddressString(nodeKey.ID(), config.P2P.ListenAddress)) if err != nil { return nil, fmt.Errorf("p2p.laddr is incorrect: %w", err) } addrBook.AddOurAddress(addr) } sw.SetAddrBook(addrBook) return addrBook, nil }/
在通信功能构建完成以后,会再根据配置文件调用createPEXReactorAndAddToSwitch
函数创建一个特殊的pexReactor
,这个reactor
会接管节点之间的连接和信息交换…
golang1234567891011121314151617181920func createPEXReactorAndAddToSwitch(addrBook pex.AddrBook, config *cfg.Config, sw *p2p.Switch, logger log.Logger, ) *pex.Reactor { // TODO persistent peers ? so we can have their DNS addrs saved pexReactor := pex.NewReactor(addrBook, &pex.ReactorConfig{ Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "), SeedMode: config.P2P.SeedMode, // See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000 // blocks assuming 10s blocks ~ 28 hours. // TODO (melekes): make it dynamic based on the actual block latencies // from the live network. // https://github.com/tendermint/tendermint/issues/3523 SeedDisconnectWaitPeriod: 28 * time.Hour, PersistentPeersMaxDialPeriod: config.P2P.PersistentPeersMaxDialPeriod, }) pexReactor.SetLogger(logger.With("module", "pex")) sw.AddReactor("PEX", pexReactor) return pexReactor }
至此,node
的构建流程就基本结束,NewNode
函数后续代码是构建node
对象等操作,这里就不再详细赘述,总结下来,创建节点主要分为以下几个过程:
- 初始化区块存储与状态存储
- 初始化通信服务和索引服务
- 初始化验证者角色
- 恢复存储状态
- 构建
mempool
,Evidence
,Blockchain
,Consensus
,statesync
等核心功能 - 建立节点节通信并构建
pexReactor
- 构建节点
1.3 Node 启动过程
在 tendermint 系统中,核心服务都是通过OnStart
,OnStop
两个函数进行统一的启动和停止管理,我们这里首先看启动过程:
golang1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465// OnStart starts the Node. It implements service.Service. func (n *Node) OnStart() error { now := tmtime.Now() genTime := n.genesisDoc.GenesisTime if genTime.After(now) { n.Logger.Info("Genesis time is in the future. Sleeping until then...", "genTime", genTime) time.Sleep(genTime.Sub(now)) } // Add private IDs to addrbook to block those peers being added n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " ")) // Start the RPC server before the P2P server // so we can eg. receive txs for the first block if n.config.RPC.ListenAddress != "" { listeners, err := n.startRPC() if err != nil { return err } n.rpcListeners = listeners } if n.config.Instrumentation.Prometheus && n.config.Instrumentation.PrometheusListenAddr != "" { n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr) } // Start the transport. addr, err := p2p.NewNetAddressString(p2p.IDAddressString(n.nodeKey.ID(), n.config.P2P.ListenAddress)) if err != nil { return err } if err := n.transport.Listen(*addr); err != nil { return err } n.isListening = true // Start the switch (the P2P server). err = n.sw.Start() if err != nil { return err } // Always connect to persistent peers err = n.sw.DialPeersAsync(splitAndTrimEmpty(n.config.P2P.PersistentPeers, ",", " ")) if err != nil { return fmt.Errorf("could not dial peers from persistent_peers field: %w", err) } // Run state sync if n.stateSync { bcR, ok := n.bcReactor.(fastSyncReactor) if !ok { return fmt.Errorf("this blockchain reactor does not support switching from state sync") } err := startStateSync(n.stateSyncReactor, bcR, n.consensusReactor, n.stateSyncProvider, n.config.StateSync, n.config.FastSyncMode, n.stateStore, n.blockStore, n.stateSyncGenesis) if err != nil { return fmt.Errorf("failed to start state sync: %w", err) } } return nil }
这个启动过程中,首先同步时间,然后通过startRPC
函数启动 RPC 通信:
golang123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153// ConfigureRPC makes sure RPC has all the objects it needs to operate. func (n *Node) ConfigureRPC() error { pubKey, err := n.privValidator.GetPubKey() if err != nil { return fmt.Errorf("can't get pubkey: %w", err) } rpccore.SetEnvironment(&rpccore.Environment{ ProxyAppQuery: n.proxyApp.Query(), ProxyAppMempool: n.proxyApp.Mempool(), StateStore: n.stateStore, BlockStore: n.blockStore, EvidencePool: n.evidencePool, ConsensusState: n.consensusState, P2PPeers: n.sw, P2PTransport: n, PubKey: pubKey, GenDoc: n.genesisDoc, TxIndexer: n.txIndexer, BlockIndexer: n.blockIndexer, ConsensusReactor: n.consensusReactor, EventBus: n.eventBus, Mempool: n.mempool, Logger: n.Logger.With("module", "rpc"), Config: *n.config.RPC, }) if err := rpccore.InitGenesisChunks(); err != nil { return err } return nil } func (n *Node) startRPC() ([]net.Listener, error) { err := n.ConfigureRPC() if err != nil { return nil, err } listenAddrs := splitAndTrimEmpty(n.config.RPC.ListenAddress, ",", " ") if n.config.RPC.Unsafe { rpccore.AddUnsafeRoutes() } config := rpcserver.DefaultConfig() config.MaxBodyBytes = n.config.RPC.MaxBodyBytes config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes config.MaxOpenConnections = n.config.RPC.MaxOpenConnections // If necessary adjust global WriteTimeout to ensure it's greater than // TimeoutBroadcastTxCommit. // See https://github.com/tendermint/tendermint/issues/3435 if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit { config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second } // we may expose the rpc over both a unix and tcp socket listeners := make([]net.Listener, len(listenAddrs)) for i, listenAddr := range listenAddrs { mux := http.NewServeMux() rpcLogger := n.Logger.With("module", "rpc-server") wmLogger := rpcLogger.With("protocol", "websocket") wm := rpcserver.NewWebsocketManager(rpccore.Routes, rpcserver.OnDisconnect(func(remoteAddr string) { err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr) if err != nil && err != tmpubsub.ErrSubscriptionNotFound { wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) } }), rpcserver.ReadLimit(config.MaxBodyBytes), rpcserver.WriteChanCapacity(n.config.RPC.WebSocketWriteBufferSize), ) wm.SetLogger(wmLogger) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) listener, err := rpcserver.Listen( listenAddr, config, ) if err != nil { return nil, err } var rootHandler http.Handler = mux if n.config.RPC.IsCorsEnabled() { corsMiddleware := cors.New(cors.Options{ AllowedOrigins: n.config.RPC.CORSAllowedOrigins, AllowedMethods: n.config.RPC.CORSAllowedMethods, AllowedHeaders: n.config.RPC.CORSAllowedHeaders, }) rootHandler = corsMiddleware.Handler(mux) } if n.config.RPC.IsTLSEnabled() { go func() { if err := rpcserver.ServeTLS( listener, rootHandler, n.config.RPC.CertFile(), n.config.RPC.KeyFile(), rpcLogger, config, ); err != nil { n.Logger.Error("Error serving server with TLS", "err", err) } }() } else { go func() { if err := rpcserver.Serve( listener, rootHandler, rpcLogger, config, ); err != nil { n.Logger.Error("Error serving server", "err", err) } }() } listeners[i] = listener } // we expose a simplified api over grpc for convenience to app devs grpcListenAddr := n.config.RPC.GRPCListenAddress if grpcListenAddr != "" { config := rpcserver.DefaultConfig() config.MaxBodyBytes = n.config.RPC.MaxBodyBytes config.MaxHeaderBytes = n.config.RPC.MaxHeaderBytes // NOTE: GRPCMaxOpenConnections is used, not MaxOpenConnections config.MaxOpenConnections = n.config.RPC.GRPCMaxOpenConnections // If necessary adjust global WriteTimeout to ensure it's greater than // TimeoutBroadcastTxCommit. // See https://github.com/tendermint/tendermint/issues/3435 if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit { config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second } listener, err := rpcserver.Listen(grpcListenAddr, config) if err != nil { return nil, err } go func() { if err := grpccore.StartGRPCServer(listener); err != nil { n.Logger.Error("Error starting gRPC server", "err", err) } }() listeners = append(listeners, listener) } return listeners, nil }
随后通过startPrometheusServer
函数启动监控服务:
golang123456789101112131415161718192021// startPrometheusServer starts a Prometheus HTTP server, listening for metrics // collectors on addr. func (n *Node) startPrometheusServer(addr string) *http.Server { srv := &http.Server{ Addr: addr, Handler: promhttp.InstrumentMetricHandler( prometheus.DefaultRegisterer, promhttp.HandlerFor( prometheus.DefaultGatherer, promhttp.HandlerOpts{MaxRequestsInFlight: n.config.Instrumentation.MaxOpenConnections}, ), ), ReadHeaderTimeout: readHeaderTimeout, } go func() { if err := srv.ListenAndServe(); err != http.ErrServerClosed { // Error starting or closing listener: n.Logger.Error("Prometheus HTTP server ListenAndServe", "err", err) } }() return srv }
最后启动 p2p 通信并设置节点信息同步:
golang1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556// startStateSync starts an asynchronous state sync process, then switches to fast sync mode. func startStateSync(ssR *statesync.Reactor, bcR fastSyncReactor, conR *cs.Reactor, stateProvider statesync.StateProvider, config *cfg.StateSyncConfig, fastSync bool, stateStore sm.Store, blockStore *store.BlockStore, state sm.State, ) error { ssR.Logger.Info("Starting state sync") if stateProvider == nil { var err error ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() stateProvider, err = statesync.NewLightClientStateProvider( ctx, state.ChainID, state.Version, state.InitialHeight, config.RPCServers, light.TrustOptions{ Period: config.TrustPeriod, Height: config.TrustHeight, Hash: config.TrustHashBytes(), }, ssR.Logger.With("module", "light")) if err != nil { return fmt.Errorf("failed to set up light client state provider: %w", err) } } go func() { state, commit, err := ssR.Sync(stateProvider, config.DiscoveryTime) if err != nil { ssR.Logger.Error("State sync failed", "err", err) return } err = stateStore.Bootstrap(state) if err != nil { ssR.Logger.Error("Failed to bootstrap node with new state", "err", err) return } err = blockStore.SaveSeenCommit(state.LastBlockHeight, commit) if err != nil { ssR.Logger.Error("Failed to store last seen commit", "err", err) return } if fastSync { // FIXME Very ugly to have these metrics bleed through here. conR.Metrics.StateSyncing.Set(0) conR.Metrics.FastSyncing.Set(1) err = bcR.SwitchToFastSync(state) if err != nil { ssR.Logger.Error("Failed to switch to fast sync", "err", err) return } } else { conR.SwitchToConsensus(state, true) } }() return nil }
在整个启动过程中,最核心的是
swtich的启动,该启动同样服务了基本服务的
Start函数并最终调用了
swtich的
OnStart`函数。
1.4 Node 停止过程
停止的核心函数是OnStop
,没有过多需要分析的地方,就是一个个 reactor 和通信服务的停止。
golang1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556// OnStop stops the Node. It implements service.Service. func (n *Node) OnStop() { n.BaseService.OnStop() n.Logger.Info("Stopping Node") // first stop the non-reactor services if err := n.eventBus.Stop(); err != nil { n.Logger.Error("Error closing eventBus", "err", err) } if err := n.indexerService.Stop(); err != nil { n.Logger.Error("Error closing indexerService", "err", err) } // now stop the reactors if err := n.sw.Stop(); err != nil { n.Logger.Error("Error closing switch", "err", err) } if err := n.transport.Close(); err != nil { n.Logger.Error("Error closing transport", "err", err) } n.isListening = false // finally stop the listeners / external services for _, l := range n.rpcListeners { n.Logger.Info("Closing rpc listener", "listener", l) if err := l.Close(); err != nil { n.Logger.Error("Error closing listener", "listener", l, "err", err) } } if pvsc, ok := n.privValidator.(service.Service); ok { if err := pvsc.Stop(); err != nil { n.Logger.Error("Error closing private validator", "err", err) } } if n.prometheusSrv != nil { if err := n.prometheusSrv.Shutdown(context.Background()); err != nil { // Error from closing listeners, or context timeout: n.Logger.Error("Prometheus HTTP server Shutdown", "err", err) } } if n.blockStore != nil { if err := n.blockStore.Close(); err != nil { n.Logger.Error("problem closing blockstore", "err", err) } } if n.stateStore != nil { if err := n.stateStore.Close(); err != nil { n.Logger.Error("problem closing statestore", "err", err) } } }
二、总结
- 结合上一篇文章对
P2P
源码的分析,不难看出Tendermint
的节点中,各个Reactor
启动最后均是由Switch
的启动引发的,每个Reactor
启动时同时也会将自己子模块的各个功能启动。相应的reactor
停止功能也是由Switch
的停止引发的。