CITA共识状态转换和MQ分析

# Bft模块订阅和发送的消息

//! 1. Subscribe channel
//!
//!     | Queue     | PubModule | Message Type          |
//!     | --------- | --------- | --------------------- |
//!     | consensus | Net       | CompactSignedProposal# |
//!     | consensus | Net       | RawBytes#              |
//!     | consensus | Chain     | RichStatus#            |
//!     | consensus | Auth      | BlockTxs#              |
//!     | consensus | Auth      | VerifyBlockResp#       |
//!     | consensus | Snapshot  | SnapshotReq           |
//!
//! 2. Publish channel
//!
//!     | Queue     | PubModule | SubModule       | Message Type          |
//!     | --------- | --------- | --------------- | --------------------- |
//!     | consensus | Consensus | Auth            | VerifyBlockReq#        |
//!     | consensus | Consensus | Net             | RawBytes#              |
//!     | consensus | Consensus | Chain, Executor | BlockWithProof#        |
//!     | consensus | Consensus | Net             | CompactSignedProposal# |
//!     | consensus | Consensus | Executor        | SignedProposal        |
//!     | consensus | Consensus | Snapshot        | SnapshotResp          |
//!

# 共识的架构

共识主要有MQ(消息队列)通讯模块、交易池、定时模块、WAL(write ahead log)、算法逻辑模块。

   +-------------+       +-------------+       +-----+
   | MQ通讯模块   |<----->|  算法逻辑模块  |<---->| WAL |
   +-------------+       +-------------+       +-----+
          ^                ^    ^
          |                |    |
          |----------------+    |
          |                     |
     +--------+           +-----------+
     | 交易池  |           | 定时模块  |
     +--------+           +-----------+

MQ通讯模块: CITA的消息通过MQ进行周转,MQ通讯模块负责订阅、发布基于MQ的消息。

交易池: 交易池订阅和存储交易信息,并提供交易的打包、生成Block。还进行交易的持久化,实现快速确认的功能。

定时模块: 提供算法定时服务。使用者向定时模块发送定时请求,定时模块在时间到达后,发送确认信息。

WAL: WAL提供预写日志(write ahead log)的服务,持久化各个节点的投票。用来进行节点崩溃后恢复。

算法逻辑模块: 分布式算法逻辑的实现模块,接受共识其它模块发送过来的信息,根据自身的算法要求,进行算法逻辑相应的处理。

# 基本约定

  1. 规定 H 为当前高度,R 为当前轮次,N 为该轮参与共识的节点数量,B 为当前区块,在每一个高度下,达成共识至少需要一轮;

  2. 一个包含 +2/3 的对应于处在 <H, R> 的特定区块或者 nil(空区块)的预投票的集合,称之为锁变化证明(Proof-of-lock-change),简称 PoLC。

# 状态转换图

image-20201129102029596

# 一、状态转换描述

Bft 的代码实现,就是一个大的状态机,以及一个定时器,状态由链上消息通信以及计时器 timeout_process() 推动

pub enum Step {
    Propose, /
    ProposeWait,
    Prevote,
    PrevoteWait,
    PrecommitAuth,
    Precommit,
    PrecommitWait,
    Commit,
    CommitWait,
}

pub struct WaitTimer {
    timer_seter: Receiver<TimeoutInfo>,
    timer_notify: Sender<TimeoutInfo>,
}

Propose:每个节点检查自己是否是 proposer。proposer 广播当前轮次的 proposal

ProposeWait:非提议节点对 proposal 进行基本检查并向 Auth 发送验证请求

Prevote:节点进行预投票

PrevoteWait:等待其他节点的预投票

PrecommitAuth:等待 Auth 返回对区块 B 的校验结果

Precommit:节点进行预提交

PrecommitWait:等待其他节点对特定 proposal 的预提交

Commit:提交区块 B 给 Chain 微服务和 Executor 微服务

CommitWait:等待 Chain 发来的最新状态(rich_status)消息

# 1.1 PROPOSE <H, R> → PROPOSEWAIT <H, R>

新一轮开始时**(Auth >> BlockTxs)**,共识节点处于 Propose<H, R> 状态,共识节点通过计算 (H+R) % N 确定本轮的 proposer<H, R>,接着重置并启动一个计时器 T0 (T0 = 3s) :

  • 如果该共识节点就是本轮的 proposer<H, R>,就广播这一轮的提议 proposal<H, R, B>(Consensus >> Net)
  • 如果该共识节点不是本轮的 proposer<H, R>,就重置并启动一个计时器 T1(T1 = T0 * 24 / 30 * (R + 1) )

共识节点进入 ProposeWait<H, R> 状态。

# 1.2 PROPOSEWAIT <H, R> → PREVOTE <H, R>

  • 如果共识节点是 proposer<H, R> ,共识节点对自己发出的 proposal<H, R, B> 投 prevote<H, R, B>
  • 如果共识节点不是 proposer<H, R> 且在 T1 内收到 proposal<H, R, B>,共识节点对该 proposal<H, R, B> 做基本检查
    • 如果 proposal<H, R, B> 通过了基本检查, 则向 Auth 发送请求验证 B 的合法性**(Consensus >> VerifyBlockReq)**,共识节点对该 proposal<H, R, B> 投 prevote<H, R, B>
    • 如果 proposal<H, R, B> 没有通过基本检查,共识节点对 nil<H, R> 投 prevote<H, R, B>
  • 如果共识节点不是 proposer<H, R> 且在 T1 内没有收到 proposer<H, R> 发出的 proposal<H, R, B>,共识节点对 nil<H, R> 投 prevote<H, R, P>

共识节点将 prevote<H, R, B> 投票保存到本地,并进入 Prevote<H, R> 状态。共识节点重置并启动一个计时器 T2 以重新广播 prevote<H, R, P> 投票。

# 1.3 PREVOTE<H, R> → PREVOTEWAIT<H, R>

共识节点收到 +2/3 的 prevote<H, R, P> 后, 进入 PrevoteWait<H, R> 状态,同时重置并启动一个计时器 T3(T3 = T0 * 1 / 30 = 0.1s)。

# 1.4 PREVOTEWAIT<H, R> → PRECOMMIT<H, R> 或者 PREVOTEWAIT<H, R> → PRECOMMITAUTH<H, R>

  • 如果共识节点在 T3 内收到 +2/3 的共识节点对 proposal<H, R, B> 的 prevote<H, R, B>
    • 如果 Auth 对 B 的验证 **(Auth >> VerifyBlockResp)**通过,共识节点对该 proposal<H, R, B> 投 precommit<H, R, B>,共识节点进入 Precommit<H, R> 状态
    • 如果 Auth 对 B 的验证 **(Auth >> VerifyBlockResp)**不通过,共识节点对 nil<H,R> 投 precommit<H, R, B>,共识节点进入 Precommit<H, R> 状态
    • 如果 Auth 还没有返回对 B 的验证结果,共识节点重置并启动一个 T4(T4 = T0 * 1 / 30 * 15 = 1.5s)计时器,并进入 PrecommitAuth 状态
  • 如果共识节点在 T3 内收到 +2/3 的共识节点对 nil<H, R> 的 prevote<H, R, P>,共识节点对 nil<H,R> 投 precommit<H, R, P>,共识节点进入 Precommit<H, R> 状态
  • 如果共识节点在 T3 内没有满足以上条件,共识节点对 nil<H, R> 投 precommit<H, R, B>,共识节点进入 Precommit<H, R> 状态

如果共识节点投了 precommit<H, R, B>,便重置并启动一个 T5(T5 = T0 * 1 / 30 * 15 = 1.5s)计时器以重发 prevote<H, R, P> 和 precommit<H, R, P>。

# 1.5 PRECOMMITAUTH<H, R> → PRECOMMIT<H, R>

如果共识节点在 T4 时间内没有收到 Auth 返回,就再次向 Auth 请求验证**(Consensus >> VerifyBlockReq)**区块,并一直等待,直到收到 Auth 返回:

  • 如果 Auth 对区块的验证**(Auth >> VerifyBlockResp)**通过,共识节点对 proposal<H, R, B> 投 precommit<H, R, P>,共识节点进入 Precommit<H, R> 状态
  • 如果 Auth 对区块的验证**(Auth >> VerifyBlockResp)**不通过,共识节点对 nil<H, R> 投 prevote<H, R, B> 和 precommit<H, R, B>,共识节点进入Precommit<H, R> 状态

PRECOMMIT<H, R> → PRECOMMITWAIT<H, R>

当共识节点收到 +2/3 的 precommit<H, R, B> 后,进入 PrecommitWait<H, R> 状态,同时重置并启动一个计时器 T6(T6 = T0 * 1 / 30 = 0.1s)。

# 1.6 PRECOMMITWAIT<H, R> → COMMIT<H, R> 或者 PRECOMMITWAIT<H, R> → PREPARE<H, R + 1>

  • 如果共识节点在 T6 内收到 +2/3 的共识节点对特定 proposal<H, R, B> 的 precommit<H, R, B> ,共识节点将区块 B 发送给 Executor 微服务和 Chain 微服务处理**(Consensus >> BlockWithProof)**
  • 如果共识节点在 T6 内收到 +2/3 的共识节点对 nil<H, R> 的 precommit<H, R, B> ,共识节点进入 Propose<H, R+1> 状态
  • 如果共识节点在 T6 内没有满足以上条件,共识节点进入 Propose<H, R+1> 状态

# 1.7 COMMIT<H, R> → COMMITWAIT<H, R>

共识节点收到 Chain 发来的最新共识区块返回的消息 rich_status 后**(Chain >> RichStatus)**,进入 CommitWait<H, R> 状态。

# 1.8 COMMITWAIT<H, R> → PROPOSE<H + 1, 0>

共识节点等待 T0 超时,进入 Propose<H + 1, 0> 状态。

# 二、Subscribe channel

# 2.1 CompactSignedProposal

共识节点通过Net收到Proposer节点的CompactSignedProposal

接到已经签名的 proposal ,判断自身是否有权利投票、判断高度、验证签名、验证该轮是否为此节点提出 proposal 、发送该 proposal 下的 block transaction 给 auth 验证交易是否正常等一系列验证操作,最后给 计时器发送一个计时消息,转至 Step::ProposeWait 状态。

 routing_key!(Net >> CompactSignedProposal) => {
                    let res = self.handle_proposal(&body[..], true, true); // 这个函数是对proposal的详细处理
                    if let Ok((h, r)) = res {
                        trace!(
                            "process {} recieve handle_proposal ok; h: {}, r: {}",
                            self,
                            h,
                            r,
                        );
                        if h == self.height && r == self.round && self.step < Step::Prevote {
                            self.step = Step::ProposeWait;
                            let now = Instant::now();
                            let _ = self.timer_seter.send(TimeoutInfo {
                                timeval: nowG + Duration::new(0, 0),
                                height: h,
                                round: r,
                                step: Step::ProposeWait,
                            });
                        }
                    } else {
                        trace!(
                            "process {} fail handle_proposal {}",
                            self,
                            res.err().unwrap()
                        );
                    }
                }

CompactSignedProposal 消息结构

pub struct CompactSignedProposal {
    // message fields
    pub proposal: ::protobuf::SingularPtrField<CompactProposal>,
    pub signature: ::std::vec::Vec<u8>, // 签名
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

pub struct CompactProposal {
    // message fields
    pub block: ::protobuf::SingularPtrField<super::blockchain::CompactBlock>,
    pub islock: bool,
    pub lock_round: u64,
    pub lock_votes: ::protobuf::RepeatedField<Vote>,
    pub round: u64, 
    pub height: u64,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

pub struct CompactBlock {
    // message fields
    pub version: u32,
    pub header: ::protobuf::SingularPtrField<BlockHeader>,
    pub body: ::protobuf::SingularPtrField<CompactBlockBody>,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

pub struct CompactBlockBody {
    // message fields
    pub tx_hashes: ::protobuf::RepeatedField<::std::vec::Vec<u8>>, // 交易内容
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

CompactSignedProposal 样例

recieve CompactSignedProposal content: 
proposal {
  block {
    version: 2
    header {
      prevhash: "\356\001c\341\366\212\250\"R1\356|\332\007T\235y\027PK\307)\252\377\2630>\276\331\232 \007"
      timestamp: 1609302994419
      height: 18
      transactions_root: "\310\340\317\240\351i\210\023~]\303\347<\225#\001\356\036^\336\037\200\346P\261\362Y\227b\307\236\033"
      proof {
        content: "B\000\000\000\000\000\000\0000x525eb27908f10e12417a612fe7b17f7f48699857107f03db96416d0db0cfe8b7\021\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\003\000\000\000\000\000\000\000*\000\000\000\000\000\000\0000x019691e3b8aab4d2e0c64827b7539fc0bc232116A\000\000\000\000\000\000\000\314\251\254\311{l\030\301x5\027}\323\367\253\177\017\221\0171\236\352\253&U\032=\022\254\027\366\300FL\323\204\t~\036\343\001Y\035>\247\233\243\320\202\303X\215\377\351\327)\027\375\223\304Z\254\212\032\001*\000\000\000\000\000\000\0000x5bd2a07d8e08f50a20acab485ebb8095d6f7c4f4A\000\000\000\000\000\000\000/\227\2711\\\021&*\016\232p\2651\036\027\305\351Di\241\243\\\3222\314\344\307\365R\314h-;\227\201x{\027\317\327\206Y\240\315F\3656\201\014\242\335;\251\2039\201\211\354}\304TZ\207\354\000*\000\000\000\000\000\000\0000x4da4204662e7fd97d39d21b476bef5cbbf8d2232A\000\000\000\000\000\000\000\251\276-G\270\002\273K4S&\353z\355\354\000l\362\321M\371\350\3674X0\342\205\3741\000n.\ngSt\303\330\215\222[\022\003u\374\016\253\325\341\242\345:\303+\336\307\006\313\313=1t\004\001"
        type: Bft
      }
      proposer: "[\322\240}\216\010\365\n \254\253H^\273\200\225\326\367\304\364"
    }
    body {
      tx_hashes: "\254\332\244\252\036q\337}\223\324\223yC\374\023\243\301\240\360i\241\242\216\270\223AC:\320l]y"
      tx_hashes: "\212 \235A\216\334\261\200\361\333\346>\227\314<\020Wt\013&\n\312\371\362w::\221\220O\260\324"
    }
  }
  height: 18
}
signature: "n\247\020\200\346\255\250\225\211&S\2059\247k\202u\335\222\310d\037\354\350a\310\237\265k\305xU\t\004\211>\243NQ\214B9\237\276\350}\320>nwi\307Y\374\317\346,8\023\243\263\373\216\034\000"
 by proposer

# 2.2 RawBytes

RawBytes,接到对 proposal 的投票消息,根据投票的轮数不同分别处理,如果数据正常,则分别进入 Step::PrevoteWaitStep::PrecommitWait 状态。

routing_key!(Net >> RawBytes) => {
                    let raw_bytes = msg.take_raw_bytes().unwrap();
                    let res = self.handle_message(&raw_bytes[..], true);

                    if let Ok((h, r, s)) = res {
                        if s == Step::Prevote {
                            self.proc_prevote(h, r);
                        } else {
                            self.proc_precommit(h, r);
                        }
                    }
                }

RawBytes 样例

receive RawBytes 
height:18 
round:0 
step:Precommit 
author.address:0x019691e3b8aab4d2e0c64827b7539fc0bc232116 hash:Some(0xf6ab3e3be72c74bcc07e2d679ddd223a030a966123b1db323155a263a35be932) 
signature:Signature { r: "08eae1f7561b241a78f0b3ee624fd2d67ab144c613f70e8f7c214e1d1a8b6fe9", 
                     s: "07e57283d763229656468b31417723638ec9aa066a71f4fc0b4e69b2398435f2", 
                     v: "00" }

其中Signature {
    /// Get a slice into the 'r' portion of the data.
    pub fn r(&self) -> &[u8] {
        &self.0[0..32]
    }

    /// Get a slice into the 's' portion of the data.
    pub fn s(&self) -> &[u8] {
        &self.0[32..64]
    }

    /// Get the recovery byte.
    pub fn v(&self) -> u8 {
        self.0[64]
    }
}

# 2.3 RichStatus

RichStatus,消息来自于 chain ,根据传输过来的状态表达的含义有多种,如果高度等于上一次共识高度,则表明块commit成功,bft 状态由 Step::Commit 更改为 Step::CommitWait; 如果高度等于上一次共识高度 - 1,则说明commit出现了异常,bft 重新将上一次的共识结果发送到 MQ(如果还保留着的话)。同时根据 Richstatus 中的共识列表、prev_hash 等重要信息更新自身的信息。

routing_key!(Chain >> RichStatus) => {
                    let rich_status = msg.take_rich_status().unwrap();
                    trace!(
                        "process {} get new local status {:?}",
                        self,
                        rich_status.height
                    );
                    self.receive_new_status(&rich_status);
                    let authorities: Vec<Address> = rich_status
                        .get_nodes()
                        .iter()
                        .map(|node| Address::from_slice(node))
                        .collect();
                    trace!("authorities: [{:?}]", authorities);

                    let validators: Vec<Address> = rich_status
                        .get_validators()
                        .iter()
                        .map(|node| Address::from_slice(node))
                        .collect();
                    trace!("validators: [{:?}]", validators);

                    if validators.contains(&self.params.signer.address) {
                        self.consensus_power = true;
                    } else {
                        info!(
                            "address[{:?}] is not consensus power !",
                            self.params.signer.address
                        );
                        self.consensus_power = false;
                    }
                    self.auth_manage.receive_authorities_list(
                        rich_status.height as usize,
                        &authorities,
                        &validators,
                    );
                    let version = rich_status.get_version();
                    trace!("verison: {}", version);
                    self.version = Some(version);
                }

RichStatus消息结构

pub struct RichStatus {
    // message fields
    pub hash: ::std::vec::Vec<u8>,
    pub height: u64,
    pub nodes: ::protobuf::RepeatedField<::std::vec::Vec<u8>>,
    pub interval: u64,
    pub version: u32,
    pub validators: ::protobuf::RepeatedField<::std::vec::Vec<u8>>,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

RichStatus样例

receive RichStatus content: hash: "\223\213%\212\325\002\254qM\344h80\231\247\246\3274\002\257\240\200*e\371\310\326\3432\177\006\303"
height: 18
nodes: "[\322\240}\216\010\365\n \254\253H^\273\200\225\326\367\304\364"
nodes: "M\244 Fb\347\375\227\323\235!\264v\276\365\313\277\215\"2"
nodes: "\001\226\221\343\270\252\264\322\340\306H'\267S\237\300\274#!\026"
interval: 3000
version: 2
validators: "\001\226\221\343\270\252\264\322\340\306H'\267S\237\300\274#!\026"
validators: "[\322\240}\216\010\365\n \254\253H^\273\200\225\326\367\304\364"
validators: "M\244 Fb\347\375\227\323\235!\264v\276\365\313\277\215\"2"

# 2.4 BlockTxs

BlockTxs 消息来自于 Auth,chain 落盘成功后的 Richstatus 消息 Auth 接到后,清除交易池中对应的交易,并从交易池中按一定顺序打包一份新的交易,发送给 bft。bft 接到消息后,先序列化到本地,再判断这个高度和轮数是否是自己出块,如果是,验证必要信息后生成一个新的 proposal,广播给整个链,发送计时消息。

 routing_key!(Auth >> BlockTxs) => {
                    let block_txs = msg.take_block_txs().unwrap();
                    debug!(
                        "process {} recieve BlockTxs h: {}", // 接受到BlockTxs
                        self,
                        block_txs.get_height(),
                    );
                    let height = block_txs.get_height() as usize;
                    let msg: Vec<u8> = (&block_txs).try_into().unwrap();
                    self.block_txs.push_back((height, block_txs));
                    let _ = self.wal_log.save(height, LogType::AuthTxs, &msg);
                    let now_height = self.height;
                    let now_round = self.round;
                    let now_step = self.step;
                    if now_height == height + 1
                        && self
     						// 检查是不是proposer
                            .is_round_proposer(now_height, now_round, &self.params.signer.address)  
                            .is_ok()
                        && now_step == Step::ProposeWait
                        && self.proposal.is_none()
                    {
                        self.new_proposal(); // 开启新提议 自己签名、验证后,发送CompactSignedProposal到Net
                        let now = Instant::now();
                        let _ = self.timer_seter.send(TimeoutInfo {
                            timeval: now + Duration::new(0, 0),
                            height: now_height,
                            round: now_round,
                            step: Step::ProposeWait,
                        });
                    }
                }

BlockTxs消息结构:

pub struct BlockTxs { // 块
    // message fields
    pub height: u64,
    pub body: ::protobuf::SingularPtrField<BlockBody>,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

pub struct BlockBody { // 块体
    // message fields
    pub transactions: ::protobuf::RepeatedField<SignedTransaction>, // 交易列表
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

pub struct SignedTransaction { // 验证通过的交易
    // message fields
    pub transaction_with_sig: ::protobuf::SingularPtrField<UnverifiedTransaction>,
    pub tx_hash: ::std::vec::Vec<u8>, // SignedTransaction hash
    pub signer: ::std::vec::Vec<u8>,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

pub struct UnverifiedTransaction { // 带签名的交易
    // message fields
    pub transaction: ::protobuf::SingularPtrField<Transaction>,
    pub signature: ::std::vec::Vec<u8>,
    pub crypto: Crypto,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

pub struct Transaction { // 原始交易内容
    // message fields
    pub to: ::std::string::String,
    pub nonce: ::std::string::String,
    pub quota: u64,
    pub valid_until_block: u64,
    pub data: ::std::vec::Vec<u8>,
    pub value: ::std::vec::Vec<u8>,
    pub chain_id: u32,
    pub version: u32,
    pub to_v1: ::std::vec::Vec<u8>,
    pub chain_id_v1: ::std::vec::Vec<u8>,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

BlockTxs样例

recieve BlockTxs h: 17 content: height: 17
body {
  transactions {
    transaction_with_sig {
      transaction {
        nonce: "ea42756e121f49e28ca34f10b02394ce"
        quota: 10000000
        valid_until_block: 104
        data: "\003"
        value: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
        version: 2
        to_v1: "\033\345\027\006\343Es\0168\234IU\274\3220\nXm\222\272"
        chain_id_v1: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\001"
      }
      signature: "\243\360\303\216TPt\304&\316d\223\324\217ugl\375\313\275\177\343\021N>\357yYY\204\261\000\006\3628\"\276\223v\316n\327\206\244d\224\320\205Q\204\3338\023p\265\3175\210T\214f\200\210\350\001"
    }
    tx_hash: "\254\332\244\252\036q\337}\223\324\223yC\374\023\243\301\240\360i\241\242\216\270\223AC:\320l]y"
    signer: "y\353\361\213\3752h\203L\026\222\203\201\223Rw\232\274\224\206\254\220\036\362f\227'wA\357\277\370g\332\303\321%\204\360\003\223U:5\212?\350\014\263>\250\221\013C\351\207\244\275\021_?\036\003\236"
  }
  transactions {
    transaction_with_sig {
      transaction {
        nonce: "f28f497522f440c7921a1e27b581dc83"
        quota: 10000000
        valid_until_block: 104
        data: "\004"
        value: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
        version: 2
        to_v1: "\033\345\027\006\343Es\0168\234IU\274\3220\nXm\222\272"
        chain_id_v1: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\001"
      }
      signature: "\260l\202\004v\336RL)\311\372<\003\016\364^i\203\224,\305\237\246\206\344t5\370`y\330\014\007I\231\262\262\374\006\376\277J\324\274\214\034\243NXOv\301S\225v_h\244pI\362\023`\267\000"
    }
    tx_hash: "\212 \235A\216\334\261\200\361\333\346>\227\314<\020Wt\013&\n\312\371\362w::\221\220O\260\324"
    signer: "y\353\361\213\3752h\203L\026\222\203\201\223Rw\232\274\224\206\254\220\036\362f\227'wA\357\277\370g\332\303\321%\204\360\003\223U:5\212?\350\014\263>\250\221\013C\351\207\244\275\021_?\036\003\236"
  }
}

# 2.5 VerifyBlockResp

VerifyBlockResp,消息来自于 Auth,Auth 接到 bft 发的交易信息并验证后,返回结果,如果验证无误,移除待验证交易列表中对应的交易,并将该 proposal 序列化到磁盘,状态切换为 Step::Precommit, 检查投票信息,如果投票信息已经大于 2/3 了,那么随即转入 Step::PrecommitWait 状态;如果验证失败,清除待验证交易信息,并广播,投空票,转入 Step::Precommit状态,检查投票信息,如果投票信息已经大于 2/3 了,那么随即转入 Step::PrecommitWait 状态

routing_key!(Auth >> VerifyBlockResp) => {
                    let resp = msg.take_verify_block_resp().unwrap();

                    let block = resp.get_block();
                    let vheight = resp.get_height() as usize;
                    let vround = resp.get_round() as usize;

                    let verify_res = if resp.get_pass() {
                        // Save the verified block which has passed verification by the auth.
                        self.verified_blocks
                            .insert(block.crypt_hash(), block.clone());
                        VerifiedBlockStatus::Ok
                    } else {
                        VerifiedBlockStatus::Err
                    };

                    if let Some(res) = self.unverified_msg.get_mut(&(vheight, vround)) {
                        res.1 = verify_res;
                        let block_bytes: Vec<u8> = block.try_into().unwrap();
                        let msg = serialize(
                            &(vheight, vround, verify_res.value(), block_bytes),
                            Infinite,
                        )
                        .unwrap();
                        let _ = self.wal_log.save(vheight, LogType::VerifiedBlock, &msg);
                        // Send SignedProposal to executor.
                        if let Some(compact_signed_proposal) =
                            res.0.clone().take_compact_signed_proposal()
                        {
                            let signed_proposal = compact_signed_proposal
                                .complete(block.get_body().get_transactions().to_vec());
                            let msg: Message = signed_proposal.into();
                            self.pub_sender
                                .send((
                                    routing_key!(Consensus >> SignedProposal).into(),
                                    msg.try_into().unwrap(),
                                ))
                                .unwrap();
                        }
                    };

                    info!(
                        "process {} recieve Auth VerifyBlockResp h: {}, r: {}, resp: {:?}",
                        self, vheight, vround, verify_res,
                    );
                    if vheight == self.height && vround == self.round {
                        //verify not ok,so clean the proposal info
                        if verify_res.is_ok() {
                            if self.step == Step::PrecommitAuth && self.pre_proc_precommit() {
                                self.change_state_step(vheight, vround, Step::Precommit, false);
                                self.proc_precommit(vheight, vround);
                            }
                        } else {
                            self.clean_proposal_when_verify_failed();
                        }
                    }
                }

VerifyBlockResp消息结构

pub struct VerifyBlockResp {
    // message fields
    pub height: u64,
    pub round: u64,
    pub pass: bool,
    pub block: ::protobuf::SingularPtrField<super::blockchain::Block>,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

VerifyBlockResp消息样例

receive VerifyBlockResp content: height: 18
pass: true
block {
  version: 2
  header {
    prevhash: "\356\001c\341\366\212\250\"R1\356|\332\007T\235y\027PK\307)\252\377\2630>\276\331\232 \007"
    timestamp: 1609302994419
    height: 18
    transactions_root: "\310\340\317\240\351i\210\023~]\303\347<\225#\001\356\036^\336\037\200\346P\261\362Y\227b\307\236\033"
    proof {
      content: "B\000\000\000\000\000\000\0000x525eb27908f10e12417a612fe7b17f7f48699857107f03db96416d0db0cfe8b7\021\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\003\000\000\000\000\000\000\000*\000\000\000\000\000\000\0000x019691e3b8aab4d2e0c64827b7539fc0bc232116A\000\000\000\000\000\000\000\314\251\254\311{l\030\301x5\027}\323\367\253\177\017\221\0171\236\352\253&U\032=\022\254\027\366\300FL\323\204\t~\036\343\001Y\035>\247\233\243\320\202\303X\215\377\351\327)\027\375\223\304Z\254\212\032\001*\000\000\000\000\000\000\0000x5bd2a07d8e08f50a20acab485ebb8095d6f7c4f4A\000\000\000\000\000\000\000/\227\2711\\\021&*\016\232p\2651\036\027\305\351Di\241\243\\\3222\314\344\307\365R\314h-;\227\201x{\027\317\327\206Y\240\315F\3656\201\014\242\335;\251\2039\201\211\354}\304TZ\207\354\000*\000\000\000\000\000\000\0000x4da4204662e7fd97d39d21b476bef5cbbf8d2232A\000\000\000\000\000\000\000\251\276-G\270\002\273K4S&\353z\355\354\000l\362\321M\371\350\3674X0\342\205\3741\000n.\ngSt\303\330\215\222[\022\003u\374\016\253\325\341\242\345:\303+\336\307\006\313\313=1t\004\001"
      type: Bft
    }
    proposer: "[\322\240}\216\010\365\n \254\253H^\273\200\225\326\367\304\364"
  }
  body {
    transactions {
      transaction_with_sig {
        transaction {
          nonce: "ea42756e121f49e28ca34f10b02394ce"
          quota: 10000000
          valid_until_block: 104
          data: "\003"
          value: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
          version: 2
          to_v1: "\033\345\027\006\343Es\0168\234IU\274\3220\nXm\222\272"
          chain_id_v1: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\001"
        }
        signature: "\243\360\303\216TPt\304&\316d\223\324\217ugl\375\313\275\177\343\021N>\357yYY\204\261\000\006\3628\"\276\223v\316n\327\206\244d\224\320\205Q\204\3338\023p\265\3175\210T\214f\200\210\350\001"
      }
      tx_hash: "\254\332\244\252\036q\337}\223\324\223yC\374\023\243\301\240\360i\241\242\216\270\223AC:\320l]y"
      signer: "y\353\361\213\3752h\203L\026\222\203\201\223Rw\232\274\224\206\254\220\036\362f\227'wA\357\277\370g\332\303\321%\204\360\003\223U:5\212?\350\014\263>\250\221\013C\351\207\244\275\021_?\036\003\236"
    }
    transactions {
      transaction_with_sig {
        transaction {
          nonce: "f28f497522f440c7921a1e27b581dc83"
          quota: 10000000
          valid_until_block: 104
          data: "\004"
          value: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
          version: 2
          to_v1: "\033\345\027\006\343Es\0168\234IU\274\3220\nXm\222\272"
          chain_id_v1: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\001"
        }
        signature: "\260l\202\004v\336RL)\311\372<\003\016\364^i\203\224,\305\237\246\206\344t5\370`y\330\014\007I\231\262\262\374\006\376\277J\324\274\214\034\243NXOv\301S\225v_h\244pI\362\023`\267\000"
      }
      tx_hash: "\212 \235A\216\334\261\200\361\333\346>\227\314<\020Wt\013&\n\312\371\362w::\221\220O\260\324"
      signer: "y\353\361\213\3752h\203L\026\222\203\201\223Rw\232\274\224\206\254\220\036\362f\227'wA\357\277\370g\332\303\321%\204\360\003\223U:5\212?\350\014\263>\250\221\013C\351\207\244\275\021_?\036\003\236"
    }
  }
}

# 2.6 SnapshotReq

CITA 提供了快照工具,给当前区块链某一个节点做快照,保存某个高度的状态、区块等数据,然后将快照恢复到另一个节点/本节点,就可以在较短时间内同步/恢复区块链数据。

收到快照请求,做快照处理

routing_key!(Snapshot >> SnapshotReq) => {
                    self.process_snapshot(msg);
                }

fn process_snapshot(&mut self, mut msg: Message) {
        if let Some(req) = msg.take_snapshot_req() {
            // 不同cmd对应快照的不同操作
            match req.cmd {
                Cmd::Snapshot => {
                    info!("receive Snapshot::Snapshot: {:?}", req);
                    snapshot_response(&self.pub_sender, Resp::SnapshotAck, true);
                }
                Cmd::Begin => { 
                    info!("receive Snapshot::Begin: {:?}", req);
                    self.set_snapshot(true);
                    self.is_cleared = false;
                    snapshot_response(&self.pub_sender, Resp::BeginAck, true);
                }
                Cmd::Restore => {
                    info!("receive Snapshot::Restore: {:?}", req);
                    snapshot_response(&self.pub_sender, Resp::RestoreAck, true);
                }
                Cmd::Clear => {
                    info!("receive Snapshot::Clear: {:?}", req);
                    let walpath = DataPath::wal_path();
                    let tmp_path = DataPath::root_node_path() + "/wal_tmp";
                    self.wal_log = Wal::create(&*tmp_path).unwrap();
                    let _ = fs::remove_dir_all(&walpath);
                    self.wal_log = Wal::create(&*walpath).unwrap();
                    let _ = fs::remove_dir_all(&tmp_path);

                    self.is_cleared = true;

                    snapshot_response(&self.pub_sender, Resp::ClearAck, true);
                }
                Cmd::End => {
                    info!("receive Snapshot::End: {:?}", req);
                    if self.is_cleared {
                        self.consensus_power = false;
                        self.clean_verified_info(0);
                        self.clean_saved_info();
                        self.clean_filter_info();
                        self.block_txs.clear();
                        self.proposals.proposals.clear();
                        self.votes.votes.clear();
                        self.proof = BftProof::from(req.get_proof().clone());
                        self.pre_hash = None;
                        self.block_proof = None;
                        self.change_state_step(
                            req.end_height as usize,
                            0,
                            Step::PrecommitAuth,
                            true,
                        );
                        self.save_wal_proof(req.end_height as usize);
                    }

                    self.set_snapshot(false);
                    self.is_cleared = false;

                    snapshot_response(&self.pub_sender, Resp::EndAck, true); // 返回SnapshotResp
                }
            }
        }
    }

fn snapshot_response(sender: &Sender<(String, Vec<u8>)>, ack: Resp, flag: bool) {
    info!("snapshot_response ack: {:?}, flag: {}", ack, flag);

    let mut resp = SnapshotResp::new();
    resp.set_resp(ack);
    resp.set_flag(flag);
    let msg: Message = resp.into();
    sender
        .send((
            routing_key!(Consensus >> SnapshotResp).into(),
            (&msg).try_into().unwrap(),
        ))
        .unwrap();
}

# 三、Publish channel

# 3.1 VerifyBlockReq

共识节点向Auth模块请求验证 B 的合法性

 fn handle_proposal(
        &mut self,
        body: &[u8],
        wal_flag: bool,
        need_verify: bool,
    ) -> Result<(usize, usize), EngineError> {
        ......
        ......
        ......
        // 请求Auth验证合法性
		if need_verify && !self.verify_req(csp_msg, &compact_block, height, round) {
              warn!("handle_proposal {} verify_req is error", self);
              return Err(EngineError::InvalidTxInProposal);
        }
        ......
        ......
        .....
}


fn verify_req(
        &mut self,
        csp_msg: Message,
        compact_block: &CompactBlock,
        vheight: usize,
        vround: usize,
    ) -> bool {
        let tx_hashes = compact_block.get_body().get_tx_hashes();
        // If there is no transaction, the block don't have to be verified.
        if tx_hashes.is_empty() {
            self.verified_blocks.insert(
                compact_block.crypt_hash(),
                compact_block.clone().complete(Vec::new()),
            );
            return true;
        }
        let verify_ok = compact_block.check_hash();
        if verify_ok {
            let verify_req = {
                let mut verify_req = auth::VerifyBlockReq::new();
                verify_req.set_height(vheight as u64);
                verify_req.set_round(vround as u64);
                verify_req.set_block(compact_block.clone());
                verify_req
            };
            let mut msg: Message = verify_req.into();
            msg.set_origin(csp_msg.get_origin());
            self.pub_sender
                .send((
                    routing_key!(Consensus >> VerifyBlockReq).into(),
                    msg.clone().try_into().unwrap(),
                ))
                .unwrap();
            self.unverified_msg
                .insert((vheight, vround), (csp_msg, VerifiedBlockStatus::Init(0)));
        }
        verify_ok
    }

VerifyBlockReq消息结构

pub struct VerifyBlockReq {
    // message fields
    pub height: u64,
    pub round: u64,
    pub block: ::protobuf::SingularPtrField<super::blockchain::CompactBlock>,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

VerifyBlockReq消息样例

send VerifyBlockReq content: 
height: 18
block {
  version: 2
  header {
    prevhash: "\356\001c\341\366\212\250\"R1\356|\332\007T\235y\027PK\307)\252\377\2630>\276\331\232 \007"
    timestamp: 1609302994419
    height: 18
    transactions_root: "\310\340\317\240\351i\210\023~]\303\347<\225#\001\356\036^\336\037\200\346P\261\362Y\227b\307\236\033"
    proof {
      content: "B\000\000\000\000\000\000\0000x525eb27908f10e12417a612fe7b17f7f48699857107f03db96416d0db0cfe8b7\021\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\003\000\000\000\000\000\000\000*\000\000\000\000\000\000\0000x019691e3b8aab4d2e0c64827b7539fc0bc232116A\000\000\000\000\000\000\000\314\251\254\311{l\030\301x5\027}\323\367\253\177\017\221\0171\236\352\253&U\032=\022\254\027\366\300FL\323\204\t~\036\343\001Y\035>\247\233\243\320\202\303X\215\377\351\327)\027\375\223\304Z\254\212\032\001*\000\000\000\000\000\000\0000x5bd2a07d8e08f50a20acab485ebb8095d6f7c4f4A\000\000\000\000\000\000\000/\227\2711\\\021&*\016\232p\2651\036\027\305\351Di\241\243\\\3222\314\344\307\365R\314h-;\227\201x{\027\317\327\206Y\240\315F\3656\201\014\242\335;\251\2039\201\211\354}\304TZ\207\354\000*\000\000\000\000\000\000\0000x4da4204662e7fd97d39d21b476bef5cbbf8d2232A\000\000\000\000\000\000\000\251\276-G\270\002\273K4S&\353z\355\354\000l\362\321M\371\350\3674X0\342\205\3741\000n.\ngSt\303\330\215\222[\022\003u\374\016\253\325\341\242\345:\303+\336\307\006\313\313=1t\004\001"
      type: Bft
    }
    proposer: "[\322\240}\216\010\365\n \254\253H^\273\200\225\326\367\304\364"
  }
  body {
    tx_hashes: "\254\332\244\252\036q\337}\223\324\223yC\374\023\243\301\240\360i\241\242\216\270\223AC:\320l]y"
    tx_hashes: "\212 \235A\216\334\261\200\361\333\346>\227\314<\020Wt\013&\n\312\371\362w::\221\220O\260\324"
  }
}

# 3.2 RawBytes

RawBytes,在如下函数调用pub_and_broadcast_message,将Bft 签名好的 proposol 或者投票消息,发送到Net然后分发广播

  • pre_proc_prevote
  • pre_proc_precommit
  • retrans_vote
  • clean_proposal_when_verify_failed
fn pub_and_broadcast_message(
        &mut self,
        height: usize,
        round: usize,
        step: Step,
        hash: Option<H256>,
    ) {
        let author = &self.params.signer;
        let msg = serialize(&(height, round, step, author.address, hash), Infinite).unwrap();
        let signature = Signature::sign(author.keypair.privkey(), &msg.crypt_hash()).unwrap();
        let sig = signature.clone();
        let msg = serialize(&(msg, sig), Infinite).unwrap();

        trace!(
            "pub_and_broadcast_message {} begin h: {}, r: {}, s: {}",
            self,
            height,
            round,
            step,
        );
        self.pub_message(msg.clone());
		......
		......
		......
        
    }


fn pub_message(&self, message: Vec<u8>) {
        let msg: Message = message.into();
        self.pub_sender
            .send((
                routing_key!(Consensus >> RawBytes).into(),
                msg.try_into().unwrap(),
            ))
            .unwrap();
    }

RawBytes样例

send RawBytes 
height:18 
round:0 
step:Precommit 
author.address:0x5bd2a07d8e08f50a20acab485ebb8095d6f7c4f4 hash:Some(0xf6ab3e3be72c74bcc07e2d679ddd223a030a966123b1db323155a263a35be932) 
signature:Signature { r: "e7e255cc1051abbe6063b39bf6ec21a2c9acf509039196a725d8cc647de7b05d", 
                     s: "3219304b94467cf911a0052dff09e7941142d0cb9a009b1cd5838a71d2b8e9bb", 
                     v: "01" }

# 3.3 BlockWithProof

将共识好的块发送给Chain和Executor,存储块到链上

pub fn pub_block(&self, block: &BlockWithProof) {
        let msg: Message = block.clone().into();
        self.pub_sender
            .send((
                routing_key!(Consensus >> BlockWithProof).into(),
                msg.try_into().unwrap(),
            ))
            .unwrap();
    }

BlockWithProof结构

pub struct BlockWithProof {
    // message fields
    pub blk: ::protobuf::SingularPtrField<Block>,
    pub proof: ::protobuf::SingularPtrField<Proof>,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

BlockWithProof样例

send BlockWithProof content: 
blk {
  version: 2
  header {
    prevhash: "\356\001c\341\366\212\250\"R1\356|\332\007T\235y\027PK\307)\252\377\2630>\276\331\232 \007"
    timestamp: 1609302994419
    height: 18
    transactions_root: "\310\340\317\240\351i\210\023~]\303\347<\225#\001\356\036^\336\037\200\346P\261\362Y\227b\307\236\033"
    proof {
      content: "B\000\000\000\000\000\000\0000x525eb27908f10e12417a612fe7b17f7f48699857107f03db96416d0db0cfe8b7\021\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\003\000\000\000\000\000\000\000*\000\000\000\000\000\000\0000x019691e3b8aab4d2e0c64827b7539fc0bc232116A\000\000\000\000\000\000\000\314\251\254\311{l\030\301x5\027}\323\367\253\177\017\221\0171\236\352\253&U\032=\022\254\027\366\300FL\323\204\t~\036\343\001Y\035>\247\233\243\320\202\303X\215\377\351\327)\027\375\223\304Z\254\212\032\001*\000\000\000\000\000\000\0000x5bd2a07d8e08f50a20acab485ebb8095d6f7c4f4A\000\000\000\000\000\000\000/\227\2711\\\021&*\016\232p\2651\036\027\305\351Di\241\243\\\3222\314\344\307\365R\314h-;\227\201x{\027\317\327\206Y\240\315F\3656\201\014\242\335;\251\2039\201\211\354}\304TZ\207\354\000*\000\000\000\000\000\000\0000x4da4204662e7fd97d39d21b476bef5cbbf8d2232A\000\000\000\000\000\000\000\251\276-G\270\002\273K4S&\353z\355\354\000l\362\321M\371\350\3674X0\342\205\3741\000n.\ngSt\303\330\215\222[\022\003u\374\016\253\325\341\242\345:\303+\336\307\006\313\313=1t\004\001"
      type: Bft
    }
    proposer: "[\322\240}\216\010\365\n \254\253H^\273\200\225\326\367\304\364"
  }
  body {
    transactions {
      transaction_with_sig {
        transaction {
          nonce: "ea42756e121f49e28ca34f10b02394ce"
          quota: 10000000
          valid_until_block: 104
          data: "\003"
          value: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
          version: 2
          to_v1: "\033\345\027\006\343Es\0168\234IU\274\3220\nXm\222\272"
          chain_id_v1: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\001"
        }
        signature: "\243\360\303\216TPt\304&\316d\223\324\217ugl\375\313\275\177\343\021N>\357yYY\204\261\000\006\3628\"\276\223v\316n\327\206\244d\224\320\205Q\204\3338\023p\265\3175\210T\214f\200\210\350\001"
      }
      tx_hash: "\254\332\244\252\036q\337}\223\324\223yC\374\023\243\301\240\360i\241\242\216\270\223AC:\320l]y"
      signer: "y\353\361\213\3752h\203L\026\222\203\201\223Rw\232\274\224\206\254\220\036\362f\227'wA\357\277\370g\332\303\321%\204\360\003\223U:5\212?\350\014\263>\250\221\013C\351\207\244\275\021_?\036\003\236"
    }
    transactions {
      transaction_with_sig {
        transaction {
          nonce: "f28f497522f440c7921a1e27b581dc83"
          quota: 10000000
          valid_until_block: 104
          data: "\004"
          value: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
          version: 2
          to_v1: "\033\345\027\006\343Es\0168\234IU\274\3220\nXm\222\272"
          chain_id_v1: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\001"
        }
        signature: "\260l\202\004v\336RL)\311\372<\003\016\364^i\203\224,\305\237\246\206\344t5\370`y\330\014\007I\231\262\262\374\006\376\277J\324\274\214\034\243NXOv\301S\225v_h\244pI\362\023`\267\000"
      }
      tx_hash: "\212 \235A\216\334\261\200\361\333\346>\227\314<\020Wt\013&\n\312\371\362w::\221\220O\260\324"
      signer: "y\353\361\213\3752h\203L\026\222\203\201\223Rw\232\274\224\206\254\220\036\362f\227'wA\357\277\370g\332\303\321%\204\360\003\223U:5\212?\350\014\263>\250\221\013C\351\207\244\275\021_?\036\003\236"
    }
  }
}
proof {
  content: "B\000\000\000\000\000\000\0000xf6ab3e3be72c74bcc07e2d679ddd223a030a966123b1db323155a263a35be932\022\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\003\000\000\000\000\000\000\000*\000\000\000\000\000\000\0000x4da4204662e7fd97d39d21b476bef5cbbf8d2232A\000\000\000\000\000\000\000\271x\031@\360}\245g\213]B\203\317\0204\312\t\305\206P\205'\017_z\273\315t\004\316 tB\330\236\221\357\231gR\033Eof\2219\234W\222hc<\036e\270\206\321H\206\334 a\306\236\001*\000\000\000\000\000\000\0000x019691e3b8aab4d2e0c64827b7539fc0bc232116A\000\000\000\000\000\000\000\010\352\341\367V\033$\032x\360\263\356bO\322\326z\261D\306\023\367\016\217|!N\035\032\213o\351\007\345r\203\327c\"\226VF\2131Aw#c\216\311\252\006jq\364\374\013Ni\2629\2045\362\000*\000\000\000\000\000\000\0000x5bd2a07d8e08f50a20acab485ebb8095d6f7c4f4A\000\000\000\000\000\000\000\347\342U\314\020Q\253\276`c\263\233\366\354!\242\311\254\365\t\003\221\226\247%\330\314d}\347\260]2\0310K\224F|\371\021\240\005-\377\t\347\224\021B\320\313\232\000\233\034\325\203\212q\322\270\351\273\001"
  type: Bft
}

# 3.4 CompactSignedProposal

将验证签名过的Proposal发送到网络

// Send CompactSignedProposal to nextwork.
        let msg: Message = compact_signed_proposal.into();
        let bmsg: Vec<u8> = (&msg).try_into().unwrap();
        self.pub_sender
            .send((
                routing_key!(Consensus >> CompactSignedProposal).into(),
                msg.try_into().unwrap(),
            ))
            .unwrap();
        bmsg

CompactSignedProposal 消息结构

pub struct CompactSignedProposal {
    // message fields
    pub proposal: ::protobuf::SingularPtrField<CompactProposal>,
    pub signature: ::std::vec::Vec<u8>, // 签名
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

pub struct CompactProposal {
    // message fields
    pub block: ::protobuf::SingularPtrField<super::blockchain::CompactBlock>,
    pub islock: bool,
    pub lock_round: u64,
    pub lock_votes: ::protobuf::RepeatedField<Vote>,
    pub round: u64, 
    pub height: u64,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

pub struct CompactBlock {
    // message fields
    pub version: u32,
    pub header: ::protobuf::SingularPtrField<BlockHeader>,
    pub body: ::protobuf::SingularPtrField<CompactBlockBody>,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

pub struct CompactBlockBody {
    // message fields
    pub tx_hashes: ::protobuf::RepeatedField<::std::vec::Vec<u8>>, // 交易内容
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

CompactSignedProposal样例

send CompactSignedProposal content: 
proposal {
  block {
    version: 2
    header {
      prevhash: "\356\001c\341\366\212\250\"R1\356|\332\007T\235y\027PK\307)\252\377\2630>\276\331\232 \007"
      timestamp: 1609302994419
      height: 18
      transactions_root: "\310\340\317\240\351i\210\023~]\303\347<\225#\001\356\036^\336\037\200\346P\261\362Y\227b\307\236\033"
      proof {
        content: "B\000\000\000\000\000\000\0000x525eb27908f10e12417a612fe7b17f7f48699857107f03db96416d0db0cfe8b7\021\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\003\000\000\000\000\000\000\000*\000\000\000\000\000\000\0000x019691e3b8aab4d2e0c64827b7539fc0bc232116A\000\000\000\000\000\000\000\314\251\254\311{l\030\301x5\027}\323\367\253\177\017\221\0171\236\352\253&U\032=\022\254\027\366\300FL\323\204\t~\036\343\001Y\035>\247\233\243\320\202\303X\215\377\351\327)\027\375\223\304Z\254\212\032\001*\000\000\000\000\000\000\0000x5bd2a07d8e08f50a20acab485ebb8095d6f7c4f4A\000\000\000\000\000\000\000/\227\2711\\\021&*\016\232p\2651\036\027\305\351Di\241\243\\\3222\314\344\307\365R\314h-;\227\201x{\027\317\327\206Y\240\315F\3656\201\014\242\335;\251\2039\201\211\354}\304TZ\207\354\000*\000\000\000\000\000\000\0000x4da4204662e7fd97d39d21b476bef5cbbf8d2232A\000\000\000\000\000\000\000\251\276-G\270\002\273K4S&\353z\355\354\000l\362\321M\371\350\3674X0\342\205\3741\000n.\ngSt\303\330\215\222[\022\003u\374\016\253\325\341\242\345:\303+\336\307\006\313\313=1t\004\001"
        type: Bft
      }
      proposer: "[\322\240}\216\010\365\n \254\253H^\273\200\225\326\367\304\364"
    }
    body {
      tx_hashes: "\254\332\244\252\036q\337}\223\324\223yC\374\023\243\301\240\360i\241\242\216\270\223AC:\320l]y"
      tx_hashes: "\212 \235A\216\334\261\200\361\333\346>\227\314<\020Wt\013&\n\312\371\362w::\221\220O\260\324"
    }
  }
  height: 18
}
signature: "n\247\020\200\346\255\250\225\211&S\2059\247k\202u\335\222\310d\037\354\350a\310\237\265k\305xU\t\004\211>\243NQ\214B9\237\276\350}\320>nwi\307Y\374\317\346,8\023\243\263\373\216\034\000"
 to network

# 3.5 SignedProposal

Executor接到 Bft 发来的已经签名的 proposal,向backlogs插入记录

// Consensus发送
// If consensus has the full verified block, then send it to executor.
        if let Some(block) = self.verified_blocks.get(&block_hash) {
            // Send SignedProposal to executor.
            let signed_proposal = compact_signed_proposal
                .clone()
                .complete(block.get_body().get_transactions().to_vec());
            let msg: Message = signed_proposal.into();
            self.pub_sender
                .send((
                    routing_key!(Consensus >> SignedProposal).into(),
                    msg.try_into().unwrap(),
                ))
                .unwrap();
        };

// Executor接收
// SignedProposal{Proposal { height, ...}, signature}
            routing_key!(Consensus >> SignedProposal) => {
                let mut proposal = msg.take_signed_proposal().unwrap();
                let open_block = OpenBlock::from(proposal.take_proposal().take_block());
                self.backlogs.insert_proposal(open_block)
            }

SignedProposal消息结构

pub struct SignedProposal {
    // message fields
    pub proposal: ::protobuf::SingularPtrField<Proposal>,
    pub signature: ::std::vec::Vec<u8>,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

pub struct Proposal {
    // message fields
    pub block: ::protobuf::SingularPtrField<super::blockchain::Block>,
    pub islock: bool,
    pub lock_round: u64,
    pub lock_votes: ::protobuf::RepeatedField<Vote>,
    pub round: u64,
    pub height: u64,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

关于BackLog:

// When a block is processed done and proofed, we can say this block is completed. So that our
// chain grows up.
// So, a Backlog `is_completed` return true if open block, proof, and closed block for that
// height are all exist and matched.

SignedProposal样例

end SignedProposal content: 
proposal {
  block {
    version: 2
    header {
      prevhash: "\356\001c\341\366\212\250\"R1\356|\332\007T\235y\027PK\307)\252\377\2630>\276\331\232 \007"
      timestamp: 1609302994419
      height: 18
      transactions_root: "\310\340\317\240\351i\210\023~]\303\347<\225#\001\356\036^\336\037\200\346P\261\362Y\227b\307\236\033"
      proof {
        content: "B\000\000\000\000\000\000\0000x525eb27908f10e12417a612fe7b17f7f48699857107f03db96416d0db0cfe8b7\021\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\003\000\000\000\000\000\000\000*\000\000\000\000\000\000\0000x019691e3b8aab4d2e0c64827b7539fc0bc232116A\000\000\000\000\000\000\000\314\251\254\311{l\030\301x5\027}\323\367\253\177\017\221\0171\236\352\253&U\032=\022\254\027\366\300FL\323\204\t~\036\343\001Y\035>\247\233\243\320\202\303X\215\377\351\327)\027\375\223\304Z\254\212\032\001*\000\000\000\000\000\000\0000x5bd2a07d8e08f50a20acab485ebb8095d6f7c4f4A\000\000\000\000\000\000\000/\227\2711\\\021&*\016\232p\2651\036\027\305\351Di\241\243\\\3222\314\344\307\365R\314h-;\227\201x{\027\317\327\206Y\240\315F\3656\201\014\242\335;\251\2039\201\211\354}\304TZ\207\354\000*\000\000\000\000\000\000\0000x4da4204662e7fd97d39d21b476bef5cbbf8d2232A\000\000\000\000\000\000\000\251\276-G\270\002\273K4S&\353z\355\354\000l\362\321M\371\350\3674X0\342\205\3741\000n.\ngSt\303\330\215\222[\022\003u\374\016\253\325\341\242\345:\303+\336\307\006\313\313=1t\004\001"
        type: Bft
      }
      proposer: "[\322\240}\216\010\365\n \254\253H^\273\200\225\326\367\304\364"
    }
    body {
      transactions {
        transaction_with_sig {
          transaction {
            nonce: "ea42756e121f49e28ca34f10b02394ce"
            quota: 10000000
            valid_until_block: 104
            data: "\003"
            value: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
            version: 2
            to_v1: "\033\345\027\006\343Es\0168\234IU\274\3220\nXm\222\272"
            chain_id_v1: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\001"
          }
          signature: "\243\360\303\216TPt\304&\316d\223\324\217ugl\375\313\275\177\343\021N>\357yYY\204\261\000\006\3628\"\276\223v\316n\327\206\244d\224\320\205Q\204\3338\023p\265\3175\210T\214f\200\210\350\001"
        }
        tx_hash: "\254\332\244\252\036q\337}\223\324\223yC\374\023\243\301\240\360i\241\242\216\270\223AC:\320l]y"
        signer: "y\353\361\213\3752h\203L\026\222\203\201\223Rw\232\274\224\206\254\220\036\362f\227'wA\357\277\370g\332\303\321%\204\360\003\223U:5\212?\350\014\263>\250\221\013C\351\207\244\275\021_?\036\003\236"
      }
      transactions {
        transaction_with_sig {
          transaction {
            nonce: "f28f497522f440c7921a1e27b581dc83"
            quota: 10000000
            valid_until_block: 104
            data: "\004"
            value: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
            version: 2
            to_v1: "\033\345\027\006\343Es\0168\234IU\274\3220\nXm\222\272"
            chain_id_v1: "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\001"
          }
          signature: "\260l\202\004v\336RL)\311\372<\003\016\364^i\203\224,\305\237\246\206\344t5\370`y\330\014\007I\231\262\262\374\006\376\277J\324\274\214\034\243NXOv\301S\225v_h\244pI\362\023`\267\000"
        }
        tx_hash: "\212 \235A\216\334\261\200\361\333\346>\227\314<\020Wt\013&\n\312\371\362w::\221\220O\260\324"
        signer: "y\353\361\213\3752h\203L\026\222\203\201\223Rw\232\274\224\206\254\220\036\362f\227'wA\357\277\370g\332\303\321%\204\360\003\223U:5\212?\350\014\263>\250\221\013C\351\207\244\275\021_?\036\003\236"
      }
    }
  }
  height: 18
}
signature: "n\247\020\200\346\255\250\225\211&S\2059\247k\202u\335\222\310d\037\354\350a\310\237\265k\305xU\t\004\211>\243NQ\214B9\237\276\350}\320>nwi\307Y\374\317\346,8\023\243\263\373\216\034\000"

# 3.6 SnapshotResp

CITA 提供了快照工具,给当前区块链某一个节点做快照,保存某个高度的状态、区块等数据,然后将快照恢复到另一个节点/本节点,就可以在较短时间内同步/恢复区块链数据。

返回快照响应

fn snapshot_response(sender: &Sender<(String, Vec<u8>)>, ack: Resp, flag: bool) {
    info!("snapshot_response ack: {:?}, flag: {}", ack, flag);

    let mut resp = SnapshotResp::new();
    resp.set_resp(ack);
    resp.set_flag(flag);
    let msg: Message = resp.into();
    sender
        .send((
            routing_key!(Consensus >> SnapshotResp).into(),
            (&msg).try_into().unwrap(),
        ))
        .unwrap();
}

SnapshotResp消息结构

pub struct SnapshotResp {
    // message fields
    pub resp: Resp,
    pub proof: ::protobuf::SingularPtrField<super::blockchain::Proof>,
    pub height: u64,
    pub flag: bool,
    // special fields
    unknown_fields: ::protobuf::UnknownFields,
    cached_size: ::protobuf::CachedSize,
}

pub enum Resp {
    BeginAck = 0,
    ClearAck = 1,
    SnapshotAck = 2,
    RestoreAck = 3,
    EndAck = 4,
}