连接器
kafka自带了对连接器应用的脚本,用于将数据从外部系统导入到kafka或从kafka中导出到外部系统。kafka连接器有独立模式和分布式模式两种工作模式。kafka自带脚本connect-standalone.sh和connect-distributed.sh分别对应kafka连接器的两种工作模式。
独立模式
kafka自带脚本connect-standalone.sh用于以独立模式启动kafka连接器。通过该脚本将文件中的数据导入到kafka以及将kafka中的数据导出到文件。
脚本
1 | if [ $# -lt 1 ]; |
执行该脚本时需要指定两个配置文件,一个是worker运行时相关配置的配置文件,称为WorkConfig,在该文件中指定与kafka建立连接的配置(bootstrap.servers)、数据格式转化类(key.converter/value.converter)、保存偏移量的文件路径(offset.storage.file.filename)、提交偏移量的频率(offset.flush.interval.ms)等。另一个是指定source连接器或是sink连接器配置的文件,可同时指定多个连接器配置,每个连接器配置文件对应一个连接器,因此要保证连接器名称全局唯一,连接器名通过name属性指定。
connect-standalone.properties
1 | localhost:9092 = |
Source连接器
Source连接器用于将外部数据导入到kafka相应主题中。kafka自带的connect-file-source.properties文件配置了一个读取文件的Source连接器
1 | name=local-file-source # 连接器名称 |
该连接器运行一个task,执行将test.txt文件中的数据导入到一个名为”test-connect”的主题中
1 | connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties |
连接器启动后会在logs目录下创建一个connectStandalone.out日志文件,记录连接器运行时相关的日志。
Source连接器是通过多个SourceTask共享一个KafkaProducer将数据发送到kafka,在Source连接器启动时,会加载ProduceConfig相关的配置信息。可以在connect-standalone.properties文件中通过”producer.”前缀来指定生产者级别的配置
Sink连接器
kafka使用配置connect-file-sink.properties导出数据
1 | name=local-file-sink #连接器名称 |
启动Sink连接器
1 | connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-sink.properties |
Sink连接器是通过KafkaConsumer从指定的主题消费消息,会加载ConsumerConfig的配置信息。在启动Sink连接器时可以在connect-standalone.properties中以”consumer.”为前缀来指定Consumer级别的配置。FileStreamSink默认是以Sink连接器名作为group.id的,且不同的连接要求名称全局唯一
可以同时启动多个Sink连接器
1 | connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-sink.properties ../config/connect-file-sink2.properties |
连接器接口
kafka提供了一套基于rest风格的API接口来管理连接器,默认端口为8083,可以在WorkConfig配置文件中修改rest.port
GET / 查看kafka版本信息
GET /connectors 查看当前活跃的连接器列表
POST /connectors 根据指定配置,创建一个新的连接器
GET /connectors/{connector} 查看指定连接器的信息
GET /connectors/{connector}/config 查看指定连接器的配置信息
PUT /connectors/{connector}/config 修改指定连接器的配置
GET /connectors/{connector}/status 查看指定连接器的状态
POST /connectors/{connector}/restart 重启指定连接器
PUT /connectors/{connector}/pause 暂停指定连接器
PUT /connectors/{connector}/resume 恢复所指定的被暂停的连接器
GET /connectors/{connector}/tasks 查看指定连接器正在运行的task
POST /connectors/{connector}/tasks 修改task配置,即覆盖现有task,只支持分布式模式
GET /connectors/{connector}/tasks/{task}/status 查看指定连接器指定task的状态
POST /connectors/{connector}/tasks/{task}/restart 重启指定连接器指定task
DELETE /connectors/{connector} 删除指定连接器
GET /connector-plugins 查看已配置的连接器,显示连接器实例类完整路径
PUT /{connectorType}/config/validate 验证指定的配置,返回各配置
{connector}指待查看的连接器名
{task}指待查看的Task的taskId
{connectorType}指连接器配置文件中connector.class指定的值
访问时需要加请求头 User-Agent kafka-connect
分布式模式
使用connect-distributed脚本用于分布式模式进行连接器,该脚本只需指定workConfig类型的配置文件即可,不支持在启动时通过加载连接器配置文件创建一个连接器,而只能通过访问上述接口来创建连接器
脚本
1 | if [ $# -lt 1 ]; |
workConfig配置
connect-distributed.properties
1 | localhost:9092 = |
分布式模式启动
首先需要创建配置文件中对应的三个主题
启动分布式模式
1
connect-distributed.sh ../config/connect-distributed.properties
在连接器启动时会加载ProducerConfig以及ConsumerConfig中定义的配置项,所以在配置文件中可以设置以”producer.”为前缀的生产者级别配置以及以”consumer.”为前缀的消费者级别配置
创建一个FileStreamSource连接器
使用rest请求创建连接器
1
2
3
4
5
6
7
8
9
10
11
12{
"name":"test-source",
"config":{
"topic":"connect-distributed",
"connector.class":"FileStreamSource",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"converter.internal.key.converter":"org.apache.kafka.connect.storage.StringConverter",
"converter.internal.value.converter":"org.apache.kafka.connect.storage.StringConverter",
"file":"/tmp/input/connection-distributed.txt"
}
}创建一个FileStreamSink连接器
使用rest请求创建连接器
1
2
3
4
5
6
7
8
9
10
11
12{
"name":"test-sink",
"config":{
"topic":"connect-distributed",
"connector.class":"FileStreamSink",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"converter.internal.key.converter":"org.apache.kafka.connect.storage.StringConverter",
"converter.internal.value.converter":"org.apache.kafka.connect.storage.StringConverter",
"file":"/tmp/output/connection-distributed.txt"
}
}