0%

kafka安全机制

安全机制

kafka有身份认证和权限控制两种安全机制

身份认证

身份认证是指客户端与服务端连接进行身份认证,包括客户端与kafka代理之间的连接认证、代理之间的连接认证、代理与zookeeper之间的连接认证。目前支持SSL、SASL/GSSAPI、SASL/PLAIN、SASL/SCRAM-SHA-256 和SASL/SCRAM-SHA-512、SASL/OAUTHBEARER

使用SASL/PLAIN进行身份认证

  • 修改server.properties,开启SASL认证配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    #配置SASL端口
    listeners=SASL_PLAINTEXT://localhost:9092
    #设置代理之间的通信协议
    security.inter.broker.protocol=SASL_PLAINTEXT
    #启用SASL机制
    sasl.enabled.mechanisms=PLAIN
    #配置SASL机制
    sasl.mechanism.inter.broker.protocol=PLAIN
    #超级管理员
    super.users=User:admin
  • 修改zookeeper.properties,开启sasl认证

    1
    2
    3
    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl
    jaasLoginRenew=3600000
  • 创建zookeeper的JAAS文件,创建”kafka_zk_jaas.conf”

    1
    2
    3
    4
    5
    6
    Server { #zookeeper服务端的配置
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin";
    };

    在每个zookeeper中设置JVM参数 -Djava.security.auth.login.config=kafka_zk_jaas.conf文件所在的位置,在zookeeper-server-start.sh中添加

    1
    export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/etc/kafka/kafka_zk_jaas.conf"
  • 创建服务端JAAS文件,配置PLAIN。创建一个服务端java验证与授权的JAAS文件”kafka_server_jaas.conf”,在文件中指定认证机制,并配置连接代理的用户名和密码。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    KafkaServer { #KafkaServer用于指定服务端登录配置
    org.apache.kafka.common.security.plain.PlainLoginModule required #指定采用PLAIN机制
    username="admin" #使用username和password指定该代理与集群其他代理初始化连接的用户名和密码
    password="admin"
    user_admin="admin" #以"user_"为前缀后接用户名方式创建连接代理的用户名和密码 用户名为admin密码为admin
    user_morton="kafka";
    };

    Client { #kafka连接zookeeper的配置
    org.apache.kafka.common.security.plain.PlainLoginModule required #指定采用PLAIN机制
    username="admin" #使用username和password指定该代理与集群其他代理初始化连接的用户名和密码
    password="admin"
    }

    在每个broker中设置JVM参数 -Djava.security.auth.login.config=kafka_server_jaas.conf文件所在的位置(这个可以在kafka-server-start.sh中修改)

    1
    export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/etc/kafka/kafka_server_jaas.conf"
  • 创建和配置客户端JAAS文件。”kafka_client_jaas.conf”在文件中指定客户端消费者和生产者连接KafkaServer的认证机制

    这里使用的用户最好不要使用初始化连接的用户名和密码

    1
    2
    3
    4
    5
    KafkaClient { #KafkaClient用于指定客户端连接kafka服务端登录配置
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin";
    };

    在启动生产者或者消费者时,在kafka-console-producer.sh和kafka-console-consumer.sh中设置JVM参数-Djava.security.auth.login.config=kafka_server_jaas.conf文件所在的位置

    1
    export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/etc/kafka/kafka_client_jaas.conf"

    启动生产者 要带上—producer-property security.protocol=SASL_PLAINTEXT —producer-property sasl.mechanism=PLAIN配置,否则连接不上服务端

    1
    kafka-console-producer --bootstrap-server localhost:9092 --topic acls-test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN

    也可以在producer.properties配置文件中添加配置

    1
    2
    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=PLAIN

    此时在使用producer的时候指定配置文件即可

    1
    kafka-console-producer --bootstrap-server localhost:9092 --topic acls-test --producer.config /usr/local/etc/kafka/producer.propertie

权限控制

权限控制是指对客户端的读写操作进行权限控制,包括对于消息或kafka集群操作权限控制。权限控制是可插拔的,并且支持与外部的授权服务进行集成,kafka自带了简单的授权实现类SimpleAclAuthorizer,可以在server.properties文件配置authorizer.class.name指定,如设置authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

权限说明

  • READ 读操作权限,如消费者消费主题的权限,消费组管理时读取相关元信息权限等

  • WRITE 写操作权限,如生产者向主题写消息的权限

  • DELETE 删除主题操作的权限

  • CREATE 创建主题操作的权限

  • ALTER 修改主题及配置操作的权限

  • DESCRIBE 获取主题元数据信息的权限

  • ClusterAction 集群元数据操作权限,如更新集群元数据操作、关闭控制器、停止副本等

  • ALL 所有权限

kafka将权限控制列表ACL存储在zookeeper中,添加权限控制之后,会在zookeeper中创建两个节点:即存储ACL信息的kafka-acl节点自己存储ACL变更信息的kafka-acl-changes节点

Kafka-acls.sh脚本支持查询(list)、添加(add)、移除(remove)这3类权限控制的操作

启用ACL权限控制

需要在server.properties中添加权限控制实现类配置

1
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

使用了该认证之后,生产者消费者都将无法连接,因为在开启权限后,默认条件下除了超级用户之外,所有用户都没有授予任何权限,可以在server.properties中调整

1
allow.everyone.if.no.acl.found=true

权限操作

  • 查询权限

    1
    kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --list
  • 增加权限

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:morton --producer  --topic acls-test
    #输出内容
    Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=acls-test, patternType=LITERAL)`:
    (principal=User:morton, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:morton, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:morton, host=*, operation=CREATE, permissionType=ALLOW)

    Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=acls-test, patternType=LITERAL)`:
    (principal=User:morton, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:morton, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:morton, host=*, operation=CREATE, permissionType=ALLOW)

    可以使用—operation参数指定添加具体的权限

    Describe/DescribeConfigs/Alter/IdempotentWrite/Read/Delete/Create/ClusterAction/All/Write/AlterConfigs

  • 删除权限

    1
    kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --remove --topic acls-test

    可以使用—operation参数指定删除具体的权限

欢迎关注我的其它发布渠道