Skip to content

如何使用kafka tcp rpc file等异步通道进行数据的发送和消费

Vinllen Chen edited this page Mar 7, 2020 · 3 revisions

本文介绍如何使用MongoShake把数据推到异步通道,主要是kafka以及从kafka里面消费数据,tcp/rcp/file等通道请见文章最后的说明。

在配置文件中tunnel相关的参数采用下面配置可以使得数据写入到kafka:

tunnel = kafka
tunnel.address = [email protected]:9092
tunnel.message = raw或者json或者bson

其中tunnel.address表示kafka的地址,@前面的是topic,tunnel.message表示数据封装的类型,从v2.2.1版本开始支持,共有如下三种类型:

  • raw。默认类型,采用聚合模式写入,一条kafka message里面包括多条oplog,详见下文。用户如果消费需要先拆解头部字段,然后再是body。考虑到用户处理比较复杂,我们从v1.4版本开始提供了receiver,用户可以用receiver对接,从kafka消费数据并解析。receiver里面我们给了示例代码,用户可以参考wiki的说明,对照receiver代码进行修改。
  • json(推荐)。消息为json格式。考虑到用户可能不熟悉golang语言,加上开发receiver的复杂度,我们从v2.2.1开始提供了json格式,每条kafka message就是一条原生的json格式的oplog,用户不需要采用receiver进行对接,而可以直接对接kafka进行消费,消费后以json格式进行解析。注意这种模式的效率会比较低下,所以如果希望比较高的效率,还是需要用raw模式采用receiver进行对接。
  • bson。消息为bson格式。每条kafka message就是一条原生的bson格式的oplog,同样可以直接对接kafka进行消费,消费后以son格式进行解析。

原理解析

在v2.2.1版本之前,kafka的通道中的数据都是带有控制信息的,这主要是为了以前collector和receiver分离模式下,保证远程传输的正确性以及效率而实现的。而在mongoshake开源版本中,我们提供了direct通道进行数据的直接写入,所以这个远程通道基本很少被使用,用户的使用场景大多围绕异构同步/监控/分析等场景而进行。在v1.4版本中,我们提供了receiver工具便于用户从通道层面直接拉取数据,由receiver自身进行控制信息的剥离,但是这样的话,用户需要自己去根据receiver进行对接,然后修改部分代码对接到下游,势必有局限性。所以v2.2.1版本,我们考虑对通道层面进行优化,剥离控制信息,从而解决用户经常反馈的“kafka数据读取乱码”的问题。

v1.4版本开始的实现

目前MongoShake发送到kafka中的数据格式如下图所示,其包含了Header和Body。 Header部分主要为控制信息:

  • checksum。发送端打上crc checksum,接收端收到可以校验是否与checksum一致。
  • Tag。关于本条oplog的tag,比如是否是正常报文,是否是心跳报文,是否要强制刷盘,是否为重发报文。
  • Hash分片。MongoShake对于oplog是默认按表并发的,一个表对应一个"hash分片"。所以这个表示当前这批报文属于的hash分片,用于后面receiver接收到以后进行并发处理。
  • 压缩算法。当前数据是否压缩,采用何种压缩算法。
  • 加密。目前未启用。 Body部分主要是一批聚合的oplog,格式是bson。

kafka_raw

v2.2.1版本的实现

对于写入kafka中的数据,不再是一个批量聚合的数据,也没有Header内容,一条kafka message就是一条oplog,用户可以勾选模式是raw,json,bson。其中raw模式仍然沿用v1.4版本的格式,json表示把数据以json形式打印,bson就是原始的mongodb oplog的格式。 这种处理的优点就是把以前需要由receiver对接的格式,改为直接对接,从而少了一个receiver,也不需要用户额外开发,降低开源用户的使用成本。

  • 原来处理模式:MongoDB->MongoShake->Kafka->receiver->下游组件
  • 目前的模式:MongoDB->MongoShake->Kafka->下游组件。

缺点就是,传输的性能降低了,包括读、写kafka,不过考虑到kafka本身性能还可以,即使用户的流量大,写kafka不太会可能成为瓶颈。

其他通道rpc/tcp/file

其他通道 目前只实现了kafka的优化,对于其他通道如rpc/tcp/file等模式,还是沿用header+body的模式。原因如下:

  • 对于rpc来说,本身就需要借助net.rpc的传输,接收端也必须是golang,所以用户还是需要自己写一个recevier对接(否则会很繁琐)。
  • 对于tcp来说,本身传输就需要受控制,比如最简单的需要要一个Length+Oplog的编码格式,这样用户还是需要自己去解析,所以没有必要。
  • 对于file来说,开源用户的场景会比较少,也有部分用户采用这种模式,也是用的receiver对接。所以暂时不做处理。

总结

raw格式能够最大程度的提高性能,但是需要用户有额外部署receiver的成本。json和bson格式能够降低用户部署成本,直接对接kafka即可消费,相对于raw来说,会有部分性能损耗。

Clone this wiki locally