本文简要介绍Apache Kafka,并使用Java编码方式创建、配置kafka主题。
Kafak 介绍
Apache Kafka是强大、高性能、分布式的事件流平台。通常生产者应用程序发布事件到Kafka,消费者订阅这些事件以便读取和处理它们。Kafka使用主题来存储和分类这些事件,例如,在一个电子商务应用程序中,可能有一个“订单”主题。
Kafka主题是分区的,它将数据分布在多个代理上以实现可伸缩性。分区可设置副本,从而使数据具有容错性和高可用性。主题还可设置保留策略,便于后期使用。这些都可以通过Kafka命令行工具或在配置文件以key-value方式配置主题。
除了命令行工具,Kafka还提供了一个Admin API来管理和检查主题,代理以及其他Kafka对象。在我们的示例中,将使用这个API来创建新主题。
依赖管理
为了使用Admin API,需要增加 kafka-clients
依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
在创建新主题之前,至少需要单节点kafka集群。本文使用测试容器实例化kafka服务器,这样可以和测试代码进行集成,并不依赖外部Kafka服务器。因此需要增加Kafka测试容器:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
接下来还要添加JUnit-jupiter组件,用于使用JUnit5运行Testcontainer测试:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>
依赖准备完毕,下面准备写简单应用程序创建主题。
创建主题示例
创建主题需要使用Admin管理对象,通过一些属性进行实例化Admin对象。首先创建Properties对象,用于保存本地kafka代理的最小化配置:
Properties properties = new Properties();
properties.put(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()
);
现在实例化Admin对象:
Admin admin = Admin.create(properties)
create方法接收Properties对象(或map,指定BOOTSTRAP_SERVERS_CONFIG属性),返回线程安全的admin实例。admin管理客户端使用此属性发现kafka集群代理,随后可执行一些管理操作。通常代理地址包含两个或三个,这样可以避免某些实例不可用的。
AdminClientConfig类还包括其他一些客户端配置项。
JUnit5 测试容器创建主题
首先我们通过JUnit5 测试容器进行验证,我们利用kafka模块,它使用Confluent OSS平台的docker镜像:
@Test
void givenTopicName_whenCreateNewTopic_thenTopicIsCreated() throws Exception {
kafkaTopicApplication.createTopic("test-topic");
String topicCommand = "/usr/bin/kafka-topics --bootstrap-server=localhost:9092 --list";
String stdout = KAFKA_CONTAINER.execInContainer("/bin/sh", "-c", topicCommand)
.getStdout();
assertThat(stdout).contains("test-topic");
}
这里测试容器在测试执行期间自动实例化和管理Kafka容器。我们只需调用命令行程序,验证主题已经在运行的容器中创建成功。
使用缺省配置创建主题
分区和副本是主题的关键配置,这里示例都设置为1:
try (Admin admin = Admin.create(properties)) {
int partitions = 1;
short replicationFactor = 1;
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic)
);
KafkaFuture<Void> future = result.values().get(topicName);
future.get();
}
在这里我们使用了Admin,createTopics方法可以创建带有默认配置的主题集合。由于Admin接口继承了AutoCloseable接口,因此使用了try-resource执行操作,可确保资源得到正常释放。
需要提醒的是,此方法与kafka代理通信并进行异步执行。返回的CreateTopicsResult对象公开了KafkaFuture来访问请求批处理中每个主题的结果。这中方式遵循了Java异步编程模式,允许调用者使用Future获得操作的结果。
对于同步操作可以立即调用这个方法来检索结果,直到操作完成或失败。在失败的情况下,会导致一个ExecutionException,它包括了底层错误信息。
自定义主题配置
除了默认选项,还可以使用Admin的重载方法,并通过CreateTopicsOptions对象提供一些配置选项。当创建新主题时,可以使用这些配置创建主题:
CreateTopicsOptions topicOptions = new CreateTopicsOptions()
.validateOnly(true)
.retryOnQuotaViolation(false);
CreateTopicsResult result = admin.createTopics(
Collections.singleton(newTopic), topicOptions
);
将validateOnly选项设置为true,这意味着客户端仅验证主题而不实际创建主题。类似地,retryonquota违规选项被设置为false,以便在配额违规的情况下不会重试操作。
其他主题配置
Kafka有大量的主题配置用于控制主题行为,如数据保留策略和压缩方式等。配置选项包括服务器默认值,也可以根据需要对特定主题进行设置。Admin API中的TopicConfig类包含在创建主题设置相应配置。
// 创建主题并指定压缩选项 'lz4'
Map<String, String> newTopicConfig = new HashMap<>();
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
.configs(newTopicConfig);
其他主题操作
除了能够创建新主题之外,Admin API还具有删除(delete)、列出(list)和描述(describe)主题的操作。所有这些与主题相关的操作与主题创建操作类似。
这些操作方法都有接受xxxTopicOptions对象作为输入的重载方法,所有这些方法都返回相应的xxxTopicsResult对象,kafkaFuture用于访问异步操作结果。
最后需要说明的是,自Kafka 0.11版本以来,admin api仍在不断进化,当标记了InterfaceStability.Evolving 注解,则暗示在为了版本该API可能会改变,从而影响兼容性。
总结
本文介绍了使用Java admin 客户端创建kafka主题。首先使用默认选项和指定选项创建主题。接着解释了如何使用各种属性配置新主题。最后简要介绍管理其他与主题相关的操作。
本文参考链接:https://blog.csdn.net/neweastsun/article/details/123851187