# 一、RabbitMQ通信协议原理
RabbitMQ基于ampq实现,是一种借助消息队列的通信协议,下面介绍几个RabbitMQ中的名词:
proudcer(消息产生者):类似客户端,产生和发布消息,并给定消息格式和交换器类型;
exchange(交换器):有direct、fanout、headers、topic四种类型,分别实现了四种不同机制的广播和订阅消息方式,可以通过routingkey绑定消息队列;
queue(消息队列):用于缓存未被订阅的已发布消息;
consumer(消息消费者):也是客户端,接收消息。
topic交换器的发布逻辑如下:

exchange1即为topic类型的交换器,上面通过routingkey绑定了一些消息队列。和每个队列的绑定routingkey不少于一个。
Producer发送包含routingkey和Exchange两个字符串的消息,消息会被对应交换器发送到含对应routingkey的队列(所有符合条件的队列均会收到消息,类似范围广播)中。
Consumer发送包含routingkey和Exchange的请求,对应交换器会把该交换器上所有符合routingkey条件的绑定对应队列的消息返回,并从消息队列中删去已被订阅的消息。
# 二、CITA中的微服务通信
在CITA中,每个节点上都会部署六个微服务以及相应通信总线。
启动节点后,以链test-chain01的0节点为例,在RabbitMQ可视化web(http://localhost:15672) 中可见到节点上的通信部署情况。
下图为0节点的交换器列表,第一列为vhost名称,第二列为交换器名称,第三列为交换器类型。可见,0节点上共有8个交换器,其中有7个为RabbitMQ的默认交换器,第8个’cita’交换器是cita系统自定义的,类型为topic。

启动节点后发现,只有cita交换器在运行,其他七个交换器没有用到。查看运行时cita交换器的绑定(bindings)消息队列列表,下图为部分绑定情况,第一列为绑定队列名称,第二列为routingkey。

此处的绑定列表中和共识相关部分和cita-bft/src/main.rs中列出的共识消息发布/订阅接口是对应的。
下图即为一个CITA节点上(完整通信)包括的所有消息队列。
在cita每个微服务模块中,都实现了基于rabbitmq的微服务通信,下面以cita-bft(共识模块)为例介绍具体通信部分代码。
在main.rs中,由start_pubsub()函数初始化交换器与消息队列的绑定,由下代码可见。start_pubsub()函数在common依赖库pubsub/lib.rs中有封装的函数内部代码,在该函数封装中由start_rabbitmq()新建了exchange并实现消息队列绑定,但是在实现通信时,这些封装好的函数一般不用涉及,了解及修改start_pubsub()和订阅消息、发送消息即可。
// mq pubsub module
let (tx_sub, rx_sub) = channel::unbounded();
let (tx_pub, rx_pub) = channel::unbounded();
start_pubsub(
"consensus",
routing_key!([
Net >> CompactSignedProposal,
Net >> RawBytes,
Chain >> RichStatus,
Auth >> BlockTxs,
Auth >> VerifyBlockResp,
Snapshot >> SnapshotReq,
]),
tx_sub,
rx_pub,
);
由上代码可见,在consensus队列上和交换器共初始化了六个绑定的routingkey,这六个绑定的发送端和消息类型均由名称可以帮助理解代码。这些枚举变量均在common包的route.rs文件中定义了。消息类型在MsgType中定义,微服务模块在SubModules中定义,如下两段代码。
// CITA内置消息类型
pub enum MsgType {
// Generate MSG-PROTOS struct automatically begin:
RawBytes,
Request,
Response,
SyncRequest,
SyncResponse,
Status,
RichStatus,
SignedProposal,
Block,
BlockWithProof,
BlockHeader,
BlockTxs,
BlockTxHashes,
BlockTxHashesReq,
VerifyBlockReq,
VerifyBlockResp,
ExecutedResult,
SnapshotReq,
SnapshotResp,
Miscellaneous,
MiscellaneousReq,
BlackList,
StateSignal,
GetBlockTxn,
BlockTxn,
CompactSignedProposal,
// Generate MSG-PROTOS struct automatically end.
All,
Unknown,
// TODO This is a issue left over by history.
// The Request is too big (send from Jsonrpc).
// To remove follow items should be better.
RequestNewTx,
RequestNewTxBatch,
RequestNet,
LocalSync,
RequestRpc,
RequestPeersInfo,
}
// CITA内置模块
pub enum SubModules {
Jsonrpc,
Net,
Chain,
Consensus,
Auth,
Executor,
Synchronizer,
Snapshot,
All,
Unknown,
}
在各模块消息队列均绑定交换器后,在代码中发布消息代码示例如下,send函数包括routing_key和消息内容两个参数:
self.pub_sender
.send((
routing_key!(Consensus >> VerifyBlockReq).into(),
msg.clone().try_into().unwrap(),
))
.unwrap();
订阅消息代码示例如下,一为获取rtkey,二通过匹配rtkey订阅消息:
// 1.获取rtkey
let (key, body) = info;
let rtkey = RoutingKey::from(&key);
// 2.匹配rtkey订阅消息
match rtkey {
routing_key!(Net >> CompactSignedProposal) => {
let res = self.handle_proposal(&body[..], true, true);
if let Ok((h, r)) = res {
trace!(
"process {} recieve handle_proposal ok; h: {}, r: {}",
self,
h,
r,
);
if h == self.height && r == self.round && self.step < Step::Prevote {
self.step = Step::ProposeWait;
let now = Instant::now();
let _ = self.timer_seter.send(TimeoutInfo {
timeval: now + Duration::new(0, 0),
height: h,
round: r,
step: Step::ProposeWait,
});
}
} else {
trace!(
"process {} fail handle_proposal {}",
self,
res.err().unwrap()
);
}
}
# 三、CITA中实现微服务通信样例
# 3.1 从现有模块迁移
复制cita-auth模块,重命名为cita-test,考虑以下代码结构,删除其他文件
cita-test 通信框架图

其中,build.rs和Cargo.toml保留, main.rs是主函数入口,handler.rs是消息处理模块,进行如下改造
其中将auth均改为了test
// main.rs
extern crate cita_crypto as crypto;
extern crate core as chain_core;
#[macro_use]
extern crate libproto;
#[macro_use]
extern crate cita_logger as logger;
#[cfg(test)]
#[macro_use]
extern crate quickcheck;
// #[macro_use]
// extern crate serde_derive;
#[cfg(test)]
extern crate tempfile;
#[macro_use]
extern crate util;
extern crate db as cita_db;
extern crate hashable;
use clap::App;
use handler::MsgHandler;
// use handler::PubMessage;
use libproto::router::{MsgType, RoutingKey, SubModules};
use libproto::Message;
use pubsub::channel;
use pubsub::start_pubsub;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
use util::set_panic_handler;
pub mod handler;
include!(concat!(env!("OUT_DIR"), "/build_info.rs"));
fn main() {
// 初始化 test app
let matches = App::new("test")
.version(get_build_info_str(true))
.long_version(get_build_info_str(false))
.author("Cryptape")
.about("CITA Block Chain Node powered by Rust")
.args_from_usage(
"-c, --config=[FILE] 'Sets a custom config file'
-s, --stdout 'Log to console'",
)
.get_matches();
// 开启日志
let stdout = matches.is_present("stdout");
micro_service_init!("cita-test", "CITA:test", stdout);
info!("Version: {}", get_build_info_str(true));
info!("This is a test microservice");
// let config_path = matches.value_of("config").unwrap_or("test.toml");
// Start publish and subcribe message from MQ.
// The CITA system runs in a logic nodes, and it contains some components
// which we called micro-service at their running time.
// All micro-services connect to a MQ, as this design can keep them loose
// coupling with each other.
let (tx_sub, rx_sub) = channel::unbounded();
let (tx_pub, rx_pub) = channel::unbounded();
// 初始化交换器与消息队列的绑定
start_pubsub(
"test",
routing_key!([Jsonrpc >> RequestNet, Auth >> RawBytes, Net >> RawBytes]),
tx_sub,
rx_pub,
);
// 初始化消息handler
let mut msg_handler = MsgHandler::new(rx_sub, tx_pub);
let mainhd = thread::spawn(move || loop { // 创建线程
let intra_message: Message = String::from("test message").into_bytes().into();
let inter_message: Message = String::from("remote node's request").into_bytes().into();
// 发送节点内消息
msg_handler.send_intra_message(intra_message);
// 发送节点外消息
msg_handler.send_inter_message(inter_message);
sleep(Duration::new(3, 0)); // 休眠3秒
// 接收消息
msg_handler.recv_msg();
sleep(Duration::new(2, 0));
});
mainhd.join().unwrap();
}
// handler.rs
use libproto::router::{MsgType, RoutingKey, SubModules};
use libproto::Message;
use libproto::{TryFrom, TryInto};
use pubsub::channel::{Receiver, Sender};
pub struct MsgHandler {
rx_sub: Receiver<(String, Vec<u8>)>,
tx_pub: Sender<(String, Vec<u8>)>,
}
impl MsgHandler {
pub fn new(rx_sub: Receiver<(String, Vec<u8>)>, tx_pub: Sender<(String, Vec<u8>)>) -> Self {
MsgHandler { rx_sub, tx_pub }
}
pub fn send_intra_message(&mut self, msg: Message) {
self.tx_pub
.send((
routing_key!(Auth >> RawBytes).into(),
msg.try_into().unwrap(),
))
.unwrap();
}
pub fn send_inter_message(&mut self, msg: Message) {
self.tx_pub
.send((
routing_key!(Consensus >> RawBytes).into(),
msg.try_into().unwrap(),
))
.unwrap();
}
pub fn recv_msg(&mut self) {
if let Ok((key, payload)) = self.rx_sub.recv() {
if Message::try_from(&payload).is_err() {
error!("Can not get message from payload {:?}", &payload);
return;
}
let mut msg = Message::try_from(&payload).unwrap();
let rounting_key = RoutingKey::from(&key);
trace!("process message key = {}", key);
// 匹配rtkey
match rounting_key {
// 收到JsonRPC类型消息
routing_key!(Jsonrpc >> RequestNet) => {
info!(
"recv message Jsonrpc >> RequestNet {:?}.",
msg.take_request()
);
}
// 收到节点内消息
routing_key!(Auth >> RawBytes) => {
info!(
"recv intra message Test >> TestMessageReq {:?}.",
msg.take_raw_bytes()
);
}
// 收到节点外消息
routing_key!(Net >> RawBytes) => {
info!(
"recv remote message Net >> RawBytes {:?}.",
msg.take_raw_bytes()
);
}
_ => {
error!("receive unexpected message key {}", key);
}
}
}
}
}
# 3.2 微服务注册
左边是原始版本,右边是修改后的版本
- 根目录
Cargo.toml下添加cita-test

scripts/cita.sh添加test
forever.toml中添加cita-test(CITA启动节点先启动forever微服务,然后用forever启动其他微服务)
release.sh中添加cita-test(用于编译打包)
添加空白的
test.toml(test微服务的配置文件)
# 3.3 测试
编译代码
$ sudo ./env.sh make debug创链,启动CITA
$ cd target/install $ sudo ./bin/cita create --super_admin "0x37d1c7449bfe76fe9c445e626da06265e9377601" --nodes "127.0.0.1:4000,127.0.0.1:4001" $ sudo ./bin/cita setup test-chain/0 $ sudo ./bin/cita setup test-chain/1 $ sudo ./bin/cita start test-chain/0 $ sudo ./bin/cita start test-chain/1 $ sudo ./bin/cita top test-chain/0 $ sudo ./bin/cita top test-chain/1编译和启动过程中若出现
no such file or directory错误,重启docker后重新执行命令查看RabbitMQ Management

查看test微服务日志
$ less /test-chain/0/logs/cita-test.log
