flink 消费kafka带有kerberos
时间: 2025-01-05 10:27:56 浏览: 68
### 配置 Flink 使用 Kerberos 认证从 Kafka 消费数据
#### 1. 准备 Kerberos 环境
为了使 Flink 能够通过 Kerberos 进行身份验证并连接到 Kafka,需先设置好 Kerberos 客户端环境。这通常涉及安装 Kerberos 库以及配置 `/etc/krb5.conf` 文件来指定 KDC(密钥分发中心)的位置和其他必要的参数。
#### 2. 创建 JAAS 配置文件
创建一个名为 `jaas.conf` 的 Java Authentication and Authorization Service (JAAS) 配置文件,其内容如下:
```plaintext
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
keyTab="/path/to/user.keytab"
principal="flink/[email protected]"
useKeyTab=true
storeKey=true;
};
```
此段定义了用于登录的主体名称和对应的 keytab 文件路径[^1]。
#### 3. 设置 Hadoop 用户组信息
当与其他支持 Kerberos 的服务交互时,如HDFS, 可以利用已有的 login user 来简化认证流程而无需重复提供凭证材料。对于这种情况,在启动 Flink JobManager 和 TaskManagers 前应确保设置了合适的 UserGroupInformation 登录用户[^2]。
#### 4. 修改 Flink-Kafka Connector 参数
在提交作业给 Flink Cluster 执行之前,还需要调整 flink-conf.yaml 或者直接在程序代码里设定一些特定于 Kafka connector 的属性:
```yaml
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.contexts: Client,KafkaClient
security.kerberos.login.keytabs: /path/to/flink.keytab,/path/to/kafka.keytab
security.kerberos.login.principals: flink/[email protected],kafka/[email protected]
```
这些选项指定了如何加载 Kerberos 凭据,并允许应用程序同时持有多个不同的 Kerberos 主体身份以便与不同类型的服务器通信。
#### 5. 编写 Flink 数据源应用
最后一步是在编写实际的应用逻辑部分,这里给出一段简单的例子展示怎样读取来自受保护主题的消息流:
```java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "broker1:9092,broker2:9092");
properties.setProperty("group.id", "test-group");
// Enable security protocol for SASL_PLAINTEXT or other secure protocols.
properties.put("security.protocol", "SASL_PLAINTEXT");
// Specify the location of jaas configuration file.
System.setProperty("java.security.auth.login.config", "/path/to/jaas.conf");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"test",
new SimpleStringSchema(),
properties);
env.addSource(consumer).print().setParallelism(1);
env.execute("Flink Kafka Consumer Example");
}
}
```
这段代码展示了如何初始化消费者实例并与安全模式下的 Kafka 实例建立连接,从而实现消息的订阅和处理功能[^3]。
阅读全文
相关推荐


















