kafka安全机制(SASL_SCRAM)

作者 : admin 本文共17294个字,预计阅读时间需要44分钟 发布时间: 2024-03-7 共3人阅读

在 Kafka 中,SASL 是一种重要的安全协议,用于提供基于身份验证的访问控制。Kafka 使用 SASL 来支持各种身份验证机制,如:

  • PLAIN(基于用户名和密码的认证)
  • GSSAPI(基于 Kerberos 的认证)
  • SCRAM(Salted Challenge Response Authentication Mechanism)
  • OAUTHBEARER

具体信息可以参考官网:kafka安全机制官网-2.7
这里采用SCRAM用于kafka安全机制的实现,而不是采用其他方式实现,主要有如下原因:

  1. SASL/GSSAPI (Kerberos) – starting at version 0.9.0.0。主要是为 Kerberos 使用,如果当前已有 Kerberos 认证,只需要为集群中每个 Broker 和访问用户申请 Principle ,然后在 Kafka 配置文件中开启 Kerberos 的支持即可,一般用于大型公司。
  2. SASL/PLAIN – starting at version 0.10.0.0。一个简单的用户名和密码身份认证机制,通常与 TLS/SSL 一起用于加密,以实现身份验证。是一种比较容易使用的方式,但是也有一个很明显的缺点,这种方式会把用户账户文件配置到静态文件中,每次想要添加新的账户都需要重启 Kafka 去加载静态文件,才能生效,十分不方便
  3. SASL/SCRAM-SHA-256 和 SASL/SCRAM-SHA-512 – starting at version 0.10.2.0。通过将认证信息保存在 ZooKeeper 里面,从而动态的获取用户信息,相当于把 ZK 用作一个认证中心使用。这种认证可以在使用过程中,使用 Kafka 提供的命令动态地创建和删除用户,无需重启整个集群,十分方便。
  4. SASL/OAUTHBEARER – starting at version 2.0。 Kafka 引入的新认证机制,主要是为了实现与 OAuth2 框架的集成,Kafka 不提倡单纯使用 OAUTHBEARER,因为它生成的不安全 Json Web Token,必须配以 SSL 加密才能在生产环境中使用。

一、环境搭建

1.1 环境准备

  • jdk1.8
  • apache-zookeeper-3.5.9-bin
  • kafka_2.12-2.7.1

安装顺序:jdk–>zookeeper–>kafka

小知识:kafka版本命名约定
kafka: 这部分指的是 Apache Kafka,一个开源的分布式事件流平台。Kafka 提供了一种可靠的、可扩展的发布-订阅消息系统,可以处理大规模的实时数据流。
2.12: 这表示 Scala 的版本。在 Kafka 的情况下,2.12意味着它是使用 Scala 2.12 编译的。Scala 是一种运行在 Java 虚拟机上的多范式编程语言,被用于 Kafka 的实现。
2.7.1: 这是 Kafka 的版本号。在这个例子中,版本号是 2.7.1。版本号通常表示软件的发布版本,新版本通常包含新功能、改进和修复之前版本的 bug。

  • 安装文件解压之后,需要将kafka的libs目录下的如下jar赋值到zookeeper的lib目录下:
cp ${KAFKA_HOME}/libs/kafka-clients-2.7.1.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/lz4-java-1.7.1.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/snappy-java-1.1.7.7.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/slf4j-api-1.7.30.jar ${ZOOKEEPER_HOME}/lib
cp ${KAFKA_HOME}/libs/slf4j-log4j12-1.7.30.jar ${ZOOKEEPER_HOME}/lib
  • 每个kafka对应的zookeeper版本不一样,建议先下载想要的kafka版本,解压之后,查看libs下依赖的zookeeper版本,然后去官网下载对应的版本进行安装!!!

1.2 JDK的安装与配置

Kafka从2.0.0版本开始就不再支持JDK7及以下版本

  1. 到官网下载jdk安装包,并上传至Linux的/opt目录下
  2. 解压安装包
  3. 配置jdk环境变量,修改/etc/profile文件并向其中添加如下配置
export JAVA_HOME=/opt/jdk解压后的文件名
export JRE_HOME=$JAVA_HOME/jre
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=./://$JAVA_HOME/lib:$JRE_HOME/lib
  1. 生效配置文件,source/etc/profile命令使配置生效
source /etc/profile
  1. 通过java-version命令验证JDK 是否已经安装配置成功
java -version

1.3 Zookeeper的安装与配置

ZooKeeper是安装Kafka集群的必要组件,Kafka通过ZooKeeper来实施对元数据信息的管理,包括集群、broker、主题、分区等内容。

  • ZooKeeper是一个开源的分布式协调服务,是Google Chubby的一个开源实现。
  • 分布式应用程序可以基于ZooKeeper实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master选举、配置维护等功能。
  • 在ZooKeeper中共有3个角色:leader、follower和observer,同一时刻 ZooKeeper集群中只会有一个leader,其他的都是follower和observer。
  • observer不参与投票,默认情况下 ZooKeeper 中只有 leader 和 follower 两个角色。更多相关知识可以查阅ZooKeeper官方网站来获得。

1.3.1 单机模式安装

  1. 到官网下载zookeeper安装包,并上传至Linux的/opt目录下
  2. 解压压缩包
  3. 添加配置,向/etc/profile配置文件中添加如下内容
export ZOOKEEPER_HOME=/opt/zookeeper解压后的文件名
export PATH=$PATH:$ZOOKEEPER_HOME/bin
  1. 执行source/etc/profile命令使配置生效
source /etc/profile
  1. 修改 ZooKeeper 的配置文件。首先进入$ZOOKEEPER_HOME/conf 目录,并将zoo_sample.cfg文件修改为zoo.cfg
  2. 修改zoo.cfg配置文件,zoo.cfg文件的内容参考如下
# ZooKeeper服务器心跳时间,单位为ms
tickTime=2000
# 投票选举新leader的初始化时间
initLimit=10
# leader与follower心跳检测最大客忍时间,响应超过syncLimit*tickTime,leader认为
# fo11ower“死掉”,从服务器列表中别除fol1ower
syncLimit=5
# 数据目录
dataDir=/tmp/zookeeper/data
# 日志目录
dataLogDir=/tmp/zookeeper/log
# ZooKeeper对外服务端口
clientPort=2181
  1. 默认情况下,Linux系统中没有/tmp/zookeeper/data和/tmp/zookeeper/log这两个目录,所以接下来还要创建这两个目录
mkdir -p /tmp/zookeeper/data
mkdir -p /tmp/zookeeper/log
  1. 在${dataDir}目录(也就是/tmp/zookeeper/data)下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号
  2. 通过zkServer.sh start启动Zookeeper服务
zkServer.sh start
  1. 通过zkServer.sh status查看启动状态
zkServer.sh status

1.3.2 集群模式安装

以上是关于ZooKeeper单机模式的安装与配置,一般在生产环境中使用的都是集群模式,集群模式的配置也比较简单,相比单机模式而言只需要修改一些配置即可。下面以3台机器为例来配置一个ZooKeeper集群。首先在这3台机器的**/etc/hosts文件中添加3台集群的IP地址与机器域名的映射,示例如下(3个IP地址分别对应3台机器)
kafka安全机制(SASL_SCRAM)插图
然后在这3台机器的
zoo.cfg**文件中添加以下配置:
kafka安全机制(SASL_SCRAM)插图(1)

  • 为了便于讲解上面的配置,这里抽象出一个公式,即 server.A=B:C:D。其中:
    • A是一个数字,代表服务器的编号,就是前面所说的myid文件里面的值。集群中每台服务器的编号都必须唯一,所以要保证每台服务器中的myid文件中的值不同。
    • B代表服务器的IP地址
    • C表示服务器与集群中的 leader 服务器交换信息的端口
    • D表示选举时服务器相互通信的端口

1.3.3 Zookeeper安全认证配置

zookeeper和kafka在默认情况下,是没有开启安全认证的,那么任意客户端可以在不需要任何身份认证的情况下访问zookeeper和kafka下的各节点,甚至可以进行节点的增加,修改以及删除的动作。注意,前面的动作是基于客户端能访问服务端所在的网络,如果进行了物理隔绝或者做了防火墙限制,那前述内容就不一定成立。但是,在某些对安全加固要求比较严格的客户或者生产环境中,那就必须开启安全认证才行。除了最基本的身份认证以外,还有针对每个节点的权限访问,但本文不涉及该话题。
进入正题,先从zookeeper开始配置,zookeeper官网提供了认证配置的参考,点击下方官网地址,即可查看详情。配置分两种情况:

  1. 客户端和服务端的双向认证(3.4.0开始引入)
  2. 服务端与服务端的双向认证(2.4.10开始引入)

如果是非集群模式下,仅配置客户端和服务端的双向认证即可。集群模式下,则需要客户端和服务端的认证以及zookeeper服务器之间的双向认证。
Zookeeper 使用的是Java自带的认证和授权服务(简称:JAAS),详细内容请看官网,该链接是 Java 8 的 JAAS 的介绍。这里为zookeeper和kafka分别在对应配置文件下创建jass配置文件为(文件名可以随意):

  • zookeeper:${ZOOKEEPER_HOME}/conf/zoo_jaas.conf
  • kafka:${KAFKA_HOME}/config/kafka-server-jaas.conf

注意:本节中的客户端指的kafka,服务端指的是zookeeper

1.3.3.1 客户端和服务端的双向认证
  1. 配置zookeeper服务端
  • 在zoo_jaas.conf添加如下配置
Server {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  username="zookeeper"
  password="zookeepersecret”
  user_kafka="kafkasecret";
};
  • 修改zoo.cfg配置
# 强制进行SASL认证
sessionRequireClientSASLAuth=true
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  • 增加jvm参数,在${ZOOKEEPER_HOME}/bin/zkEnv.sh脚本中增加:
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOKEEPER_HOME}/conf/zoo_jaas.conf"
  1. 配置客户端
  • 在kafka-server-jaas.conf中添加如下配置:
Client{
  org.apache.zookeeper.server.auth.DigestLoginModule required
  username="kafka"
  password="kafkasecret";
};
  • 修改客户端的启动脚本${KAFKA_HOME}/bin/kafka-server-start.sh,增加jvm参数:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=${KAFKA_HOME}/config/kafka-server-jaas.conf kafka.Kafka "$@"
1.3.3.2 服务端与服务端的双向认证
  1. 修改zoo.cfg,增加如下配置:
quorum.auth.enableSasl=true # 打开sasl开关, 默认是关的
quorum.auth.learnerRequireSasl=true # ZK做为leaner的时候, 会发送认证信息
quorum.auth.serverRequireSasl=true # 设置为true的时候,learner连接的时候需要发送认证信息,否则拒绝
quorum.auth.learner.loginContext=QuorumLearner # JAAS 配置里面的 Context 名字
quorum.auth.server.loginContext=QuorumServer # JAAS 配置里面的 Context 名字
quorum.cnxn.threads.size=20 # 建议设置成ZK节点的数量乘2
  1. 修改zoo_jaas.conf,增加如下配置:
QuorumServer {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  user_test="test";
};

QuorumLearner {
  org.apache.zookeeper.server.auth.DigestLoginModule required
  username="test"
  password="test";
};

QuorumServer 和 QuorumLearner 都是配置的ZK节点之间的认证配置

1.4 Kafka的安装与配置

1.4.1 单机模式安装

  1. 到官网下载kafka安装包,并上传至Linux的/opt目录下
  2. 解压压缩包
  3. 修改broker的配置文件**$KAFKA_HOME/conf/server.properties**
# broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同
broker.id=0
# broker对外提供的服务入口地址
listeners=PLAINTEXT://localhost:9092
# 存放消息日志文件的地址
log.dirs=/tmp/kafka-logs
# Kafka所需的ZooKeeper集群地址,为了方便演示,我们假设Kafka和ZooKeeper都安装在本机
zookeeper.connect=localhost:2181/kafka

1.4.2 集群模式安装

如果是单机模式,那么修改完上述配置参数之后就可以启动服务。如果是集群模式,那么只需要对单机模式的配置文件做相应的修改即可:确保集群中每个broker的broker.id配置参数的值不一样,以及listeners配置参数也需要修改为与broker对应的IP地址或域名,之后就可以各自启动服务。注意,在启动 Kafka 服务之前同样需要确保 zookeeper.connect参数所配置的ZooKeeper服务已经正确启动。

1.4.3 Kafka安全认证配置

  1. 通过kafka-configs.sh脚本生成一个用户admin作为超级管理员
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-256=[password=admin]' --entity-type users --entity-name admin

注意:在配置kafka的server.properties对于zookeeper的连接我们采用的是zookeeper的CHROOT,所以上述命令中也需要指定对应路径,不然启动的kafka时会获取不到生成的SCRAM认证信息!!!

小知识:
ZooKeeper 中的 CHROOT 是指将 ZooKeeper 的命名空间限定在一个特定的路径下。这就是说,ZooKeeper 的所有数据和操作都将在指定的路径下进行,而不是整个 ZooKeeper 服务器上。CHROOT 功能允许在一个 ZooKeeper 集群上运行多个独立的 ZooKeeper 实例,每个实例都有自己的命名空间。
在 ZooKeeper 的配置文件 zoo.cfg 中,CHROOT 通过配置项 chroot 来设置。例如:
chroot=/myapp
在这个例子中,ZooKeeper 就会将其根路径设置为 /myapp,而不是默认的根路径。这样,对于 ZooKeeper 中的所有路径,都将以 /myapp 为根进行解释。这就好比把 ZooKeeper 变成了一个容器,其内部的所有路径都相对于 /myapp 这个容器。
CHROOT 的使用场景包括:

  1. 隔离命名空间: 允许多个应用在同一个 ZooKeeper 集群上使用不同的命名空间,防止彼此之间的命名冲突。
  2. 模拟多个独立环境: 允许在同一个 ZooKeeper 集群上模拟多个独立的环境,每个环境有自己的数据和配置。

要注意的是,如果你在使用 CHROOT,ZooKeeper 客户端在连接到 ZooKeeper 服务器时,也需要指定相应的 CHROOT 路径。例如,如果 CHROOT 设置为 /myapp,那么客户端在连接时需要指定 “/myapp” 作为根路径。

总的来说,CHROOT 提供了一种简单而有效的方式,使得在同一个 ZooKeeper 集群上可以支持多个隔离的命名空间。

  • 生成之后可以通过如下命令进行查看:
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type users --entity-name admin
  • 也可以添加其他SCRAM认证信息,例如SCRAM-SHA-512:
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --add-config 'SCRAM-SHA-512=[password=admin512]' --entity-type users --entity-name admin
  • 也可使用如下命令删除已经添加的认证信息:
bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name admin
  1. 修改kafka-server-jaas.conf,增加kafka服务的SCRAM认证用户信息
KafkaServer {
  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin"
  password="admin"
  user_admin="admin";
};
  1. 修改server.properties,新增如下配置:
# 启用ACL
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 设置本例中admin为超级用户;在Zookeeper的“/kafka/config/users”下存在用户
super.users=User:admin
# 同时启用SCRAM和PLAIN机制
sasl.enabled.mechanisms=SCRAM-SHA-256
# 为broker间通讯开启SCRAM机制,采用SCRAM-SHA-256算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# broker间通讯使用PLAINTEXT,本例中不演示SSL配置
security.inter.broker.protocol=SASL_PLAINTEXT
# 配置listeners使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://192.168.64.102:9092
# 配置advertised.listeners
advertised.listeners=SASL_PLAINTEXT://192.168.64.102:9092

如果是集群,上述配置每个节点都应该配置一份!!!

1.4.4 服务启动

  1. 启动Kafka服务,在$KAFKA_HOME目录下执行下面的命令即可
bin/kafka-server-start.sh config/server.properties

如果要在后台运行Kafka服务,那么可以在启动命令中加入-daemon参数或&字符,示例如下:

bin/kafka-server-start.sh -daemon config/server.properties

或者

bin/kafka-server-start.sh config/server.properties &
  1. 通过jps命令查看Kafka服务进程是否已经启动

二、Admin API使用

2.1 SCRAM用户操作

package com.kafka.adminclient;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import java.util.Collections;
import java.util.Map;
/**
* @Author: Jiangxx
* @Date: 2023/11/24
* @Description:
*/
public class KafkaUserOperator {
private final AdminClient adminClient;
public KafkaUserOperator(AdminClient adminClient) {
this.adminClient = adminClient;
}
public boolean createScramUser(String username, String password) {
boolean res = false;
//指定一个协议ScramMechanism,迭代次数iterations还没搞清楚干嘛的,设置太小会报错
ScramCredentialInfo scramCredentialInfo = new ScramCredentialInfo(ScramMechanism.SCRAM_SHA_256, 10000);
//创建Scram用户凭证,用户不存在,会先创建用户
UserScramCredentialAlteration userScramCredentialUpsertion = new UserScramCredentialUpsertion(username, scramCredentialInfo, password);
AlterUserScramCredentialsResult alterUserScramCredentialsResult = adminClient.alterUserScramCredentials(Collections.singletonList(userScramCredentialUpsertion));
for (Map.Entry<String, KafkaFuture<Void>> e : alterUserScramCredentialsResult.values().entrySet()) {
KafkaFuture<Void> future = e.getValue();
try {
future.get();
} catch (Exception exc) {
System.err.println("返回信息:" + exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
return res;
}
public boolean deleteScramUser(String username) {
boolean res = false;
//删除Scram用户凭证,删除后用户无权限操作kafka,zk中用户节点还会存在
UserScramCredentialAlteration userScramCredentialDeletion = new UserScramCredentialDeletion(username, ScramMechanism.SCRAM_SHA_256);
AlterUserScramCredentialsResult alterUserScramCredentialsResult = adminClient.alterUserScramCredentials(Collections.singletonList(userScramCredentialDeletion));
for (Map.Entry<String, KafkaFuture<Void>> e : alterUserScramCredentialsResult.values().entrySet()) {
KafkaFuture<Void> future = e.getValue();
try {
future.get();
} catch (Exception exc) {
System.err.println("返回信息:" + exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
return res;
}
}

2.2 主题操作

package com.kafka.adminclient;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* @Author: Jiangxx
* @Date: 2023/11/24
* @Description:
*/
public class KafkaTopicOperator {
private final Logger logger = LoggerFactory.getLogger(KafkaTopicOperator.class);
private final AdminClient adminClient;
public KafkaTopicOperator(AdminClient adminClient) {
this.adminClient = adminClient;
}
/**
* 创建系统对应的topic
*
* @param topicName         主题名称
* @param partitions        分区
* @param replicationFactor 副本
* @param retention         数据有效期
* @return boolean
*/
public boolean createTopic(String topicName, Integer partitions, Integer replicationFactor, Integer retention) {
boolean res = false;
Set<String> topics = getTopicList();
if (!topics.contains(topicName)) {
partitions = partitions == null ? 1 : partitions;
replicationFactor = replicationFactor == null ? 1 : replicationFactor;
NewTopic topic = new NewTopic(topicName, partitions, replicationFactor.shortValue());
long param = retention * 24 * 60 * 60 * 1000;
Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", String.valueOf(param));
topic.configs(configs);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(topic));
for (Map.Entry<String, KafkaFuture<Void>> e : createTopicsResult.values().entrySet()) {
KafkaFuture<Void> future = e.getValue();
try {
future.get();
} catch (Exception exc) {
logger.warn("创建topic参数异常,返回信息:{}", exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
} else {
res = true;
logger.warn("该主题已存在,主题名称:{}", topicName);
}
return res;
}
/**
* 修改topic数据有效期
*
* @param topicName 主题名称
* @param retention 天数
* @return boolean
*/
public boolean updateTopic(String topicName, Integer retention) {
if (retention < 0) {
return false;
}
boolean res = false;
Map<ConfigResource, Config> alertConfigs = new HashMap<>();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
//转换为毫秒
long param = retention * 24 * 60 * 60 * 1000;
ConfigEntry configEntry = new ConfigEntry("retention.ms", String.valueOf(param));
Config config = new Config(Collections.singletonList(configEntry));
alertConfigs.put(configResource, config);
AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(alertConfigs);
for (Map.Entry<ConfigResource, KafkaFuture<Void>> e : alterConfigsResult.values().entrySet()) {
KafkaFuture<Void> future = e.getValue();
try {
future.get();
} catch (Exception exc) {
logger.warn("修改topic参数异常,返回信息:{}", exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
return res;
}
/**
* 删除topic
*
* @param topicName 主题
* @return boolean
*/
public boolean deleteTopic(String topicName) {
boolean res = false;
Set<String> topics = getTopicList();
if (topics.contains(topicName)) {
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList(topicName));
for (Map.Entry<String, KafkaFuture<Void>> e : deleteTopicsResult.values().entrySet()) {
KafkaFuture<Void> future = e.getValue();
try {
future.get();
} catch (Exception exc) {
logger.warn("删除topic参数异常,返回信息:{}", exc.getMessage());
}
res = !future.isCompletedExceptionally();
}
} else {
logger.info("topic不存在,名称:{}", topicName);
res = true;
}
return res;
}
/**
* 获取主题列表
*
* @return Set
*/
public Set<String> getTopicList() {
Set<String> result = null;
ListTopicsResult listTopicsResult = adminClient.listTopics();
try {
result = listTopicsResult.names().get();
} catch (Exception e) {
logger.warn("获取主题列表失败,失败原因:{}", e.getMessage());
e.printStackTrace();
}
return result;
}
}

2.3 ACL操作

package com.kafka.adminclient;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateAclsResult;
import org.apache.kafka.clients.admin.DeleteAclsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.*;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Map;
/**
* @Author: Jiangxx
* @Date: 2023/11/24
* @Description:
*/
public class AclOperator {
private final Logger logger = LoggerFactory.getLogger(AclOperator.class);
private final AdminClient adminClient;
public AclOperator(AdminClient adminClient) {
this.adminClient = adminClient;
}
/**
* 添加权限
*
* @param resourceType 资源类型
* @param resourceName 资源名称
* @param username     用户名
* @param operation    权限名称
*/
public void addAclAuth(String resourceType, String resourceName, String username, String operation) {
ResourcePattern resource = new ResourcePattern(getResourceType(resourceType), resourceName, PatternType.LITERAL);
AccessControlEntry accessControlEntry = new AccessControlEntry("User:" + username, "*", getOperation(operation), AclPermissionType.ALLOW);
AclBinding aclBinding = new AclBinding(resource, accessControlEntry);
CreateAclsResult createAclsResult = adminClient.createAcls(Collections.singletonList(aclBinding));
for (Map.Entry<AclBinding, KafkaFuture<Void>> e : createAclsResult.values().entrySet()) {
KafkaFuture<Void> future = e.getValue();
try {
future.get();
boolean success = !future.isCompletedExceptionally();
if (success) {
logger.info("创建权限成功");
}
} catch (Exception exc) {
logger.warn("创建权限失败,错误信息:{}", exc.getMessage());
exc.printStackTrace();
}
}
}
/**
* 删除权限
*
* @param resourceType 资源类型
* @param resourceName 资源名称
* @param username     用户名
* @param operation    权限名称
*/
public void deleteACLAuth(String resourceType, String resourceName, String username, String operation) {
ResourcePattern resource = new ResourcePattern(getResourceType(resourceType), resourceName, PatternType.LITERAL);
AccessControlEntry accessControlEntry = new AccessControlEntry("User:" + username, "*", getOperation(operation), AclPermissionType.ALLOW);
AclBinding aclBinding = new AclBinding(resource, accessControlEntry);
DeleteAclsResult deleteAclsResult = adminClient.deleteAcls(Collections.singletonList(aclBinding.toFilter()));
for (Map.Entry<AclBindingFilter, KafkaFuture<DeleteAclsResult.FilterResults>> e : deleteAclsResult.values().entrySet()) {
KafkaFuture<DeleteAclsResult.FilterResults> future = e.getValue();
try {
future.get();
boolean success = !future.isCompletedExceptionally();
if (success) {
logger.info("删除权限成功");
}
} catch (Exception exc) {
logger.warn("删除权限失败,错误信息:{}", exc.getMessage());
exc.printStackTrace();
}
}
}
private AclOperation getOperation(String operation) {
AclOperation aclOperation = null;
switch (operation) {
case "CREATE":
aclOperation = AclOperation.CREATE;
break;
case "WRITE":
aclOperation = AclOperation.WRITE;
break;
case "READ":
aclOperation = AclOperation.READ;
break;
default:
break;
}
return aclOperation;
}
private ResourceType getResourceType(String type) {
ResourceType resourceType = null;
switch (type) {
case "Group":
resourceType = ResourceType.GROUP;
break;
case "Topic":
resourceType = ResourceType.TOPIC;
break;
default:
break;
}
return resourceType;
}
}

三、参考链接

  • kafka、zookeeper配置sasl认证-CSDN博客
  • Zookeeper & Kafka 开启安全认证的配置_kafka认证配置_IT布道的博客-CSDN博客
  • Kafka安全(以SASL+ACL为例)_kafka 安全-CSDN博客
  • Kafka安全认证授权配置_kafka认证配置-CSDN博客
  • Java版 Kafka ACL使用实战_java kafka acl_芒果无忧的博客-CSDN博客
  • kafka官网
本站无任何商业行为
个人在线分享 » kafka安全机制(SASL_SCRAM)
E-->