CITA微服务注册及简单pubsub消息通信样例

# 一、RabbitMQ通信协议原理

RabbitMQ基于ampq实现,是一种借助消息队列的通信协议,下面介绍几个RabbitMQ中的名词:

  • proudcer(消息产生者):类似客户端,产生和发布消息,并给定消息格式和交换器类型;

  • exchange(交换器):有direct、fanout、headers、topic四种类型,分别实现了四种不同机制的广播和订阅消息方式,可以通过routingkey绑定消息队列;

  • queue(消息队列):用于缓存未被订阅的已发布消息;

  • consumer(消息消费者):也是客户端,接收消息。

topic交换器的发布逻辑如下:

image-20201210104149220

  • exchange1即为topic类型的交换器,上面通过routingkey绑定了一些消息队列。和每个队列的绑定routingkey不少于一个。

  • Producer发送包含routingkey和Exchange两个字符串的消息,消息会被对应交换器发送到含对应routingkey的队列(所有符合条件的队列均会收到消息,类似范围广播)中。

  • Consumer发送包含routingkey和Exchange的请求,对应交换器会把该交换器上所有符合routingkey条件的绑定对应队列的消息返回,并从消息队列中删去已被订阅的消息。

# 二、CITA中的微服务通信

在CITA中,每个节点上都会部署六个微服务以及相应通信总线。

启动节点后,以链test-chain01的0节点为例,在RabbitMQ可视化web(http://localhost:15672) 中可见到节点上的通信部署情况。

下图为0节点的交换器列表,第一列为vhost名称,第二列为交换器名称,第三列为交换器类型。可见,0节点上共有8个交换器,其中有7个为RabbitMQ的默认交换器,第8个’cita’交换器是cita系统自定义的,类型为topic。

image-20201210093609880

启动节点后发现,只有cita交换器在运行,其他七个交换器没有用到。查看运行时cita交换器的绑定(bindings)消息队列列表,下图为部分绑定情况,第一列为绑定队列名称,第二列为routingkey

image-20201210093619807

此处的绑定列表中和共识相关部分和cita-bft/src/main.rs中列出的共识消息发布/订阅接口是对应的。

下图即为一个CITA节点上(完整通信)包括的所有消息队列。

image-20201210094745740

在cita每个微服务模块中,都实现了基于rabbitmq的微服务通信,下面以cita-bft(共识模块)为例介绍具体通信部分代码。

main.rs中,由start_pubsub()函数初始化交换器与消息队列的绑定,由下代码可见。start_pubsub()函数在common依赖库pubsub/lib.rs中有封装的函数内部代码,在该函数封装中由start_rabbitmq()新建了exchange并实现消息队列绑定,但是在实现通信时,这些封装好的函数一般不用涉及,了解及修改start_pubsub()和订阅消息、发送消息即可。

// mq pubsub module
    let (tx_sub, rx_sub) = channel::unbounded();
    let (tx_pub, rx_pub) = channel::unbounded();
    start_pubsub(
        "consensus",
        routing_key!([
            Net >> CompactSignedProposal,
            Net >> RawBytes,
            Chain >> RichStatus,
            Auth >> BlockTxs,
            Auth >> VerifyBlockResp,
            Snapshot >> SnapshotReq,
        ]),
        tx_sub,
        rx_pub,
    );

由上代码可见,在consensus队列上和交换器共初始化了六个绑定的routingkey,这六个绑定的发送端和消息类型均由名称可以帮助理解代码。这些枚举变量均在common包的route.rs文件中定义了。消息类型在MsgType中定义,微服务模块在SubModules中定义,如下两段代码。

// CITA内置消息类型
pub enum MsgType {
    // Generate MSG-PROTOS struct automatically begin:
    RawBytes,
    Request,
    Response,
    SyncRequest,
    SyncResponse,
    Status,
    RichStatus,
    SignedProposal,
    Block,
    BlockWithProof,
    BlockHeader,
    BlockTxs,
    BlockTxHashes,
    BlockTxHashesReq,
    VerifyBlockReq,
    VerifyBlockResp,
    ExecutedResult,
    SnapshotReq,
    SnapshotResp,
    Miscellaneous,
    MiscellaneousReq,
    BlackList,
    StateSignal,
    GetBlockTxn,
    BlockTxn,
    CompactSignedProposal,
    // Generate MSG-PROTOS struct automatically end.
    All,
    Unknown,
    // TODO This is a issue left over by history.
    //      The Request is too big (send from Jsonrpc).
    //      To remove follow items should be better.
    RequestNewTx,
    RequestNewTxBatch,
    RequestNet,
    LocalSync,
    RequestRpc,
    RequestPeersInfo,
}
// CITA内置模块
pub enum SubModules {
    Jsonrpc,
    Net,
    Chain,
    Consensus,
    Auth,
    Executor,
    Synchronizer,
    Snapshot,
    All,
    Unknown,
}

在各模块消息队列均绑定交换器后,在代码中发布消息代码示例如下,send函数包括routing_key和消息内容两个参数:

self.pub_sender
                .send((
                    routing_key!(Consensus >> VerifyBlockReq).into(),
                    msg.clone().try_into().unwrap(),
                ))
                .unwrap();

订阅消息代码示例如下,一为获取rtkey,二通过匹配rtkey订阅消息:

// 1.获取rtkey
let (key, body) = info;
let rtkey = RoutingKey::from(&key);

// 2.匹配rtkey订阅消息
 match rtkey {
                routing_key!(Net >> CompactSignedProposal) => {
                    let res = self.handle_proposal(&body[..], true, true);
                    if let Ok((h, r)) = res {
                        trace!(
                            "process {} recieve handle_proposal ok; h: {}, r: {}",
                            self,
                            h,
                            r,
                        );
                        if h == self.height && r == self.round && self.step < Step::Prevote {
                            self.step = Step::ProposeWait;
                            let now = Instant::now();
                            let _ = self.timer_seter.send(TimeoutInfo {
                                timeval: now + Duration::new(0, 0),
                                height: h,
                                round: r,
                                step: Step::ProposeWait,
                            });
                        }
                    } else {
                        trace!(
                            "process {} fail handle_proposal {}",
                            self,
                            res.err().unwrap()
                        );
                    }
                }

# 三、CITA中实现微服务通信样例

# 3.1 从现有模块迁移

复制cita-auth模块,重命名为cita-test,考虑以下代码结构,删除其他文件

image-20201210101102538

cita-test 通信框架图

image-20201210110800125

其中,build.rsCargo.toml保留, main.rs是主函数入口,handler.rs是消息处理模块,进行如下改造

其中将auth均改为了test

// main.rs
extern crate cita_crypto as crypto;
extern crate core as chain_core;
#[macro_use]
extern crate libproto;
#[macro_use]
extern crate cita_logger as logger;
#[cfg(test)]
#[macro_use]
extern crate quickcheck;
// #[macro_use]
// extern crate serde_derive;
#[cfg(test)]
extern crate tempfile;
#[macro_use]
extern crate util;
extern crate db as cita_db;
extern crate hashable;

use clap::App;
use handler::MsgHandler;
// use handler::PubMessage;
use libproto::router::{MsgType, RoutingKey, SubModules};
use libproto::Message;
use pubsub::channel;
use pubsub::start_pubsub;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
use util::set_panic_handler;

pub mod handler;

include!(concat!(env!("OUT_DIR"), "/build_info.rs"));

fn main() {
    // 初始化 test app
    let matches = App::new("test")
        .version(get_build_info_str(true))
        .long_version(get_build_info_str(false))
        .author("Cryptape")
        .about("CITA Block Chain Node powered by Rust")
        .args_from_usage(
            "-c, --config=[FILE] 'Sets a custom config file'
                          -s, --stdout 'Log to console'",
        )
        .get_matches();

    // 开启日志
    let stdout = matches.is_present("stdout");
    micro_service_init!("cita-test", "CITA:test", stdout);
    info!("Version: {}", get_build_info_str(true));
    info!("This is a test microservice");

    // let config_path = matches.value_of("config").unwrap_or("test.toml");

    // Start publish and subcribe message from MQ.
    // The CITA system runs in a logic nodes, and it contains some components
    // which we called micro-service at their running time.
    // All micro-services connect to a MQ, as this design can keep them loose
    // coupling with each other.

    let (tx_sub, rx_sub) = channel::unbounded();
    let (tx_pub, rx_pub) = channel::unbounded();
    // 初始化交换器与消息队列的绑定
    start_pubsub(
        "test",
        routing_key!([Jsonrpc >> RequestNet, Auth >> RawBytes, Net >> RawBytes]),
        tx_sub,
        rx_pub,
    );

    // 初始化消息handler
    let mut msg_handler = MsgHandler::new(rx_sub, tx_pub);

    let mainhd = thread::spawn(move || loop { // 创建线程
        let intra_message: Message = String::from("test message").into_bytes().into();

        let inter_message: Message = String::from("remote node's request").into_bytes().into();

        // 发送节点内消息
        msg_handler.send_intra_message(intra_message);

        // 发送节点外消息
        msg_handler.send_inter_message(inter_message);

        sleep(Duration::new(3, 0)); // 休眠3秒

        // 接收消息
        msg_handler.recv_msg();

        sleep(Duration::new(2, 0));
    });

    mainhd.join().unwrap();
}

// handler.rs
use libproto::router::{MsgType, RoutingKey, SubModules};
use libproto::Message;
use libproto::{TryFrom, TryInto};
use pubsub::channel::{Receiver, Sender};

pub struct MsgHandler {
    rx_sub: Receiver<(String, Vec<u8>)>,
    tx_pub: Sender<(String, Vec<u8>)>,
}

impl MsgHandler {
    pub fn new(rx_sub: Receiver<(String, Vec<u8>)>, tx_pub: Sender<(String, Vec<u8>)>) -> Self {
        MsgHandler { rx_sub, tx_pub }
    }

    pub fn send_intra_message(&mut self, msg: Message) {
        self.tx_pub
            .send((
                routing_key!(Auth >> RawBytes).into(),
                msg.try_into().unwrap(),
            ))
            .unwrap();
    }

    pub fn send_inter_message(&mut self, msg: Message) {
        self.tx_pub
            .send((
                routing_key!(Consensus >> RawBytes).into(),
                msg.try_into().unwrap(),
            ))
            .unwrap();
    }

    pub fn recv_msg(&mut self) {
        if let Ok((key, payload)) = self.rx_sub.recv() {
            if Message::try_from(&payload).is_err() {
                error!("Can not get message from payload {:?}", &payload);
                return;
            }

            let mut msg = Message::try_from(&payload).unwrap();
            let rounting_key = RoutingKey::from(&key);
            trace!("process message key = {}", key);
            // 匹配rtkey
            match rounting_key {
                // 收到JsonRPC类型消息
                routing_key!(Jsonrpc >> RequestNet) => {
                    info!(
                        "recv message Jsonrpc >> RequestNet {:?}.",
                        msg.take_request()
                    );
                }
				// 收到节点内消息
                routing_key!(Auth >> RawBytes) => {
                    info!(
                        "recv intra message Test >> TestMessageReq {:?}.",
                        msg.take_raw_bytes()
                    );
                }
				// 收到节点外消息
                routing_key!(Net >> RawBytes) => {
                    info!(
                        "recv remote message Net >> RawBytes {:?}.",
                        msg.take_raw_bytes()
                    );
                }

                _ => {
                    error!("receive unexpected message key {}", key);
                }
            }
        }
    }
}

# 3.2 微服务注册

左边是原始版本,右边是修改后的版本

  1. 根目录Cargo.toml下添加cita-test

image-20201210102040685

  1. scripts/cita.sh 添加test

    image-20201210102201354

  2. forever.toml中添加cita-test(CITA启动节点先启动forever微服务,然后用forever启动其他微服务)

    image-20201210102235468

  3. release.sh中添加cita-test(用于编译打包)

    image-20201210102327375

    1. 添加空白的test.toml(test微服务的配置文件)

      image-20201210112311158

# 3.3 测试

  1. 编译代码

    $ sudo ./env.sh make debug
    
  2. 创链,启动CITA

    $ cd target/install
    $ sudo ./bin/cita create --super_admin "0x37d1c7449bfe76fe9c445e626da06265e9377601" --nodes "127.0.0.1:4000,127.0.0.1:4001"
    $ sudo ./bin/cita setup test-chain/0
    $ sudo ./bin/cita setup test-chain/1
    $ sudo ./bin/cita start test-chain/0
    $ sudo ./bin/cita start test-chain/1
    $ sudo ./bin/cita top test-chain/0
    $ sudo ./bin/cita top test-chain/1
    

    编译和启动过程中若出现no such file or directory错误,重启docker后重新执行命令

  3. 查看RabbitMQ Management

    image-20201210104820156

  4. 查看test微服务日志

$ less /test-chain/0/logs/cita-test.log
image-20201210105156356

image-20201210105447474