认识Confluent
Kafka用得最多的是他的大吞吐量的消息队列,很多公司利用这个特点构建了自己的数据管道系统。数据管道是大数据系统的灵魂。
由LinkedIn创建的Kafka在apache开源后并没有停下脚步,三位作者离开LinkedIn创建了Confluent公司,并且开发并开源了一些新的玩具。
我在工作中遇到一个任务,要为第三方用户构建一个数据查询系统。不能直接拿OLTP数据库来给第三方用,需要把业务数据导入到这个查询系统里,如果能保持时效性就更好了。
研究了一下Kafka的主页,发现了一个新玩具蛮好用的,现在就介绍给大家。
Kafka connect
Kafka connect就是这个新玩具,可以将数据和文件实时导入到Kafka中,然后再从Kafka中导入到下游系统进行存储。下面我会介绍两个例子,把MySQL source数据库导入到sink数据库,
重点是connect配置文件的用法。
快速上手
安装
需要安装Java>=1.7和最新版本Kafka,可以参考官方手册,在这里就不多说了
启动
安装完成后,默认的bin文件以及被copy到了系统路径中,分别启动ZooKeeper, Kafka, Schema Registry,为了方便调试,我都在前台运行1
2
3
4
5
6
7
8# Start ZooKeeper. Run this command in its own terminal.
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
# Start Kafka. Run this command in its own terminal.
$ ./bin/kafka-server-start ./etc/kafka/server.properties
# Start Schema Registry. Run this command in its own terminal.
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
Kafka connect只包含了PostgreSQL和SQLite的JDBC驱动,没有MySQL驱动,我理解可能因为MySQL是商业公司的产品,所以没有包含进来,没有关系,可以自己下载。
下载MySQL驱动,把jar文件解压到/usr/share/java/kafka-connect-jdbc
就可以了。
在/etc/kafka-connect-jdbc/
下面有一些配置文件的模板,先来复制一个source模板,然后在这基础上添加我们自己的参数:1
2
3
4
5
6
7
8name=mysql-myprj-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/myprj_online?user=root&password=mypassword
table.whitelist=main_orderinfo
mode=incrementing
incrementing.column.name=right_id
topic.prefix=mysql-
然后再复制一个sink模板,修改如下:1
2
3
4
5
6name=sink-mysql-myprj
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=mysql-main_orderinfo
connection.url=jdbc:mysql://localhost:3306/myprj_copy?user=root&password=mypassword
auto.create=true
这里我把同一个节点上的online数据库复制到了另一个copy数据库中。
接下来运行执行standalone模式的worker:1
2# 第一个参数是worker的配置文件,后面可以跟无数个connector[connector1, connector2,...]
connect-standalone /etc/schema-registry/connect-avro-standalone.properties /etc/kafka-connect-jdbc/source-mysql-myprj.properties /etc/kafka-connect-jdbc/sink-mysql-myprj.properties
然后去查看copy数据库,可以看到数据都已经同步过来了,在source库中插入一条数据,可以看到copy库中也出现了这条记录。
遇到的几个小问题
- Kafka中包含了多个slf4j,运行时可能会报错,只要删除其中一个即可
- hostname可能会出现不能识别的问题,这时只要把hostname在
/etc/hosts
中映射到127.0.0.1即可 - 运行worker是可以跟着多个connect配置文件的,我之前错误的启动了两个standalone worker,一直报错说REST server找不到,是因为
/etc/schema-registry/connect-avro-standalone.properties
只能对应一个worker, 如果启动多个standalone worker,需要针对worker实例分别对offset.storage.file.filename
和rest.port
进行配置
第二个例子会深入介绍一下connect配置文件的用法,下一篇再讲吧,happy big data~