0%

kafka连接器

连接器

kafka自带了对连接器应用的脚本,用于将数据从外部系统导入到kafka或从kafka中导出到外部系统。kafka连接器有独立模式和分布式模式两种工作模式。kafka自带脚本connect-standalone.sh和connect-distributed.sh分别对应kafka连接器的两种工作模式。

独立模式

kafka自带脚本connect-standalone.sh用于以独立模式启动kafka连接器。通过该脚本将文件中的数据导入到kafka以及将kafka中的数据导出到文件。

脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] connect-standalone.properties"
exit 1
fi

base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xms256M -Xmx2G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name connectStandalone'}

COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac

exec $(dirname $0)/kafka-run-class.sh $EXTRA_ARGS org.apache.kafka.connect.cli.ConnectStandalone "$@"

执行该脚本时需要指定两个配置文件,一个是worker运行时相关配置的配置文件,称为WorkConfig,在该文件中指定与kafka建立连接的配置(bootstrap.servers)、数据格式转化类(key.converter/value.converter)、保存偏移量的文件路径(offset.storage.file.filename)、提交偏移量的频率(offset.flush.interval.ms)等。另一个是指定source连接器或是sink连接器配置的文件,可同时指定多个连接器配置,每个连接器配置文件对应一个连接器,因此要保证连接器名称全局唯一,连接器名通过name属性指定。

connect-standalone.properties

1
2
3
4
5
6
7
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

Source连接器

Source连接器用于将外部数据导入到kafka相应主题中。kafka自带的connect-file-source.properties文件配置了一个读取文件的Source连接器

1
2
3
4
5
name=local-file-source  # 连接器名称
connector.class=FileStreamSource #Source连接器执行类
tasks.max=1 # SourceTask数量
file=test.txt #该连接器数据源文件
topic=test-connect #数据导入的目标主题

该连接器运行一个task,执行将test.txt文件中的数据导入到一个名为”test-connect”的主题中

1
connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties

连接器启动后会在logs目录下创建一个connectStandalone.out日志文件,记录连接器运行时相关的日志。

Source连接器是通过多个SourceTask共享一个KafkaProducer将数据发送到kafka,在Source连接器启动时,会加载ProduceConfig相关的配置信息。可以在connect-standalone.properties文件中通过”producer.”前缀来指定生产者级别的配置

Sink连接器

kafka使用配置connect-file-sink.properties导出数据

1
2
3
4
5
name=local-file-sink #连接器名称
connector.class=FileStreamSink #Sink连接器执行类
tasks.max=1 #SinkTask数量
file=test-sink.txt #数据导出后输出的目标文件路径
topics=test-connect #导出数据源对应的主题名称,可指定多个主题

启动Sink连接器

1
connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-sink.properties

Sink连接器是通过KafkaConsumer从指定的主题消费消息,会加载ConsumerConfig的配置信息。在启动Sink连接器时可以在connect-standalone.properties中以”consumer.”为前缀来指定Consumer级别的配置。FileStreamSink默认是以Sink连接器名作为group.id的,且不同的连接要求名称全局唯一

可以同时启动多个Sink连接器

1
connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-sink.properties ../config/connect-file-sink2.properties

连接器接口

kafka提供了一套基于rest风格的API接口来管理连接器,默认端口为8083,可以在WorkConfig配置文件中修改rest.port

  • GET / 查看kafka版本信息

  • GET /connectors 查看当前活跃的连接器列表

  • POST /connectors 根据指定配置,创建一个新的连接器

  • GET /connectors/{connector} 查看指定连接器的信息

  • GET /connectors/{connector}/config 查看指定连接器的配置信息

  • PUT /connectors/{connector}/config 修改指定连接器的配置

  • GET /connectors/{connector}/status 查看指定连接器的状态

  • POST /connectors/{connector}/restart 重启指定连接器

  • PUT /connectors/{connector}/pause 暂停指定连接器

  • PUT /connectors/{connector}/resume 恢复所指定的被暂停的连接器

  • GET /connectors/{connector}/tasks 查看指定连接器正在运行的task

  • POST /connectors/{connector}/tasks 修改task配置,即覆盖现有task,只支持分布式模式

  • GET /connectors/{connector}/tasks/{task}/status 查看指定连接器指定task的状态

  • POST /connectors/{connector}/tasks/{task}/restart 重启指定连接器指定task

  • DELETE /connectors/{connector} 删除指定连接器

  • GET /connector-plugins 查看已配置的连接器,显示连接器实例类完整路径

  • PUT /{connectorType}/config/validate 验证指定的配置,返回各配置

{connector}指待查看的连接器名

{task}指待查看的Task的taskId

{connectorType}指连接器配置文件中connector.class指定的值

访问时需要加请求头 User-Agent kafka-connect

分布式模式

使用connect-distributed脚本用于分布式模式进行连接器,该脚本只需指定workConfig类型的配置文件即可,不支持在启动时通过加载连接器配置文件创建一个连接器,而只能通过访问上述接口来创建连接器

脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] connect-distributed.properties"
exit 1
fi

base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/connect-log4j.properties"
fi

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xms256M -Xmx2G"
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name connectDistributed'}

COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac

exec $(dirname $0)/kafka-run-class.sh $EXTRA_ARGS org.apache.kafka.connect.cli.ConnectDistributed "$@"

workConfig配置

connect-distributed.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
bootstrap.servers=localhost:9092
#连接器Cluster的唯一标识
group.id=connect-cluster

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

#用于存储Source连接器读取数据对应偏移量的主题,与存储消费者提交偏移量的内部主题作用相同,建议手动创建
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#offset.storage.partitions=25

#用于存储连接器相关配置信息的主题,包括创建连接的配置信息以及连接的task信息。要指定该主题拥有一个分区和多个副本,需要手动创建
config.storage.topic=connect-configs
config.storage.replication.factor=1

# 用于存储连接器每个task状态的主题,需要手动创建
status.storage.topic=connect-status
status.storage.replication.factor=1
#status.storage.partitions=5

# 连接器task提交偏移量的时间间隔
offset.flush.interval.ms=10000

分布式模式启动

  • 首先需要创建配置文件中对应的三个主题

  • 启动分布式模式

    1
    connect-distributed.sh ../config/connect-distributed.properties

    在连接器启动时会加载ProducerConfig以及ConsumerConfig中定义的配置项,所以在配置文件中可以设置以”producer.”为前缀的生产者级别配置以及以”consumer.”为前缀的消费者级别配置

  • 创建一个FileStreamSource连接器

    使用rest请求创建连接器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    {
    "name":"test-source",
    "config":{
    "topic":"connect-distributed",
    "connector.class":"FileStreamSource",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.storage.StringConverter",
    "converter.internal.key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "converter.internal.value.converter":"org.apache.kafka.connect.storage.StringConverter",
    "file":"/tmp/input/connection-distributed.txt"
    }
    }
  • 创建一个FileStreamSink连接器

    使用rest请求创建连接器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    {
    "name":"test-sink",
    "config":{
    "topic":"connect-distributed",
    "connector.class":"FileStreamSink",
    "key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "value.converter":"org.apache.kafka.connect.storage.StringConverter",
    "converter.internal.key.converter":"org.apache.kafka.connect.storage.StringConverter",
    "converter.internal.value.converter":"org.apache.kafka.connect.storage.StringConverter",
    "file":"/tmp/output/connection-distributed.txt"
    }
    }

欢迎关注我的其它发布渠道