Skip to main content
 首页 » 编程设计

Java 创建kafka主题

2022年07月19日112yyy_WW

本文简要介绍Apache Kafka,并使用Java编码方式创建、配置kafka主题。

Kafak 介绍

Apache Kafka是强大、高性能、分布式的事件流平台。通常生产者应用程序发布事件到Kafka,消费者订阅这些事件以便读取和处理它们。Kafka使用主题来存储和分类这些事件,例如,在一个电子商务应用程序中,可能有一个“订单”主题。

Kafka主题是分区的,它将数据分布在多个代理上以实现可伸缩性。分区可设置副本,从而使数据具有容错性和高可用性。主题还可设置保留策略,便于后期使用。这些都可以通过Kafka命令行工具或在配置文件以key-value方式配置主题。

除了命令行工具,Kafka还提供了一个Admin API来管理和检查主题,代理以及其他Kafka对象。在我们的示例中,将使用这个API来创建新主题。

依赖管理

为了使用Admin API,需要增加 kafka-clients 依赖:

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka-clients</artifactId> 
    <version>2.8.0</version> 
</dependency> 

在创建新主题之前,至少需要单节点kafka集群。本文使用测试容器实例化kafka服务器,这样可以和测试代码进行集成,并不依赖外部Kafka服务器。因此需要增加Kafka测试容器:

<dependency> 
    <groupId>org.testcontainers</groupId> 
    <artifactId>kafka</artifactId> 
    <version>1.15.3</version> 
    <scope>test</scope> 
</dependency> 

接下来还要添加JUnit-jupiter组件,用于使用JUnit5运行Testcontainer测试:

<dependency> 
    <groupId>org.testcontainers</groupId> 
    <artifactId>junit-jupiter</artifactId> 
    <version>1.15.3</version> 
    <scope>test</scope> 
</dependency> 

依赖准备完毕,下面准备写简单应用程序创建主题。

创建主题示例

创建主题需要使用Admin管理对象,通过一些属性进行实例化Admin对象。首先创建Properties对象,用于保存本地kafka代理的最小化配置:

Properties properties = new Properties(); 
properties.put( 
  AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers() 
); 

现在实例化Admin对象:

Admin admin = Admin.create(properties) 

create方法接收Properties对象(或map,指定BOOTSTRAP_SERVERS_CONFIG属性),返回线程安全的admin实例。admin管理客户端使用此属性发现kafka集群代理,随后可执行一些管理操作。通常代理地址包含两个或三个,这样可以避免某些实例不可用的。

AdminClientConfig类还包括其他一些客户端配置项。

JUnit5 测试容器创建主题

首先我们通过JUnit5 测试容器进行验证,我们利用kafka模块,它使用Confluent OSS平台的docker镜像:

@Test 
void givenTopicName_whenCreateNewTopic_thenTopicIsCreated() throws Exception { 
    kafkaTopicApplication.createTopic("test-topic"); 
 
    String topicCommand = "/usr/bin/kafka-topics --bootstrap-server=localhost:9092 --list"; 
    String stdout = KAFKA_CONTAINER.execInContainer("/bin/sh", "-c", topicCommand) 
      .getStdout(); 
 
    assertThat(stdout).contains("test-topic"); 
} 

这里测试容器在测试执行期间自动实例化和管理Kafka容器。我们只需调用命令行程序,验证主题已经在运行的容器中创建成功。

使用缺省配置创建主题

分区和副本是主题的关键配置,这里示例都设置为1:

try (Admin admin = Admin.create(properties)) { 
    int partitions = 1; 
    short replicationFactor = 1; 
    NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor); 
     
    CreateTopicsResult result = admin.createTopics( 
      Collections.singleton(newTopic) 
    ); 
 
    KafkaFuture<Void> future = result.values().get(topicName); 
    future.get(); 
} 

在这里我们使用了Admin,createTopics方法可以创建带有默认配置的主题集合。由于Admin接口继承了AutoCloseable接口,因此使用了try-resource执行操作,可确保资源得到正常释放。

需要提醒的是,此方法与kafka代理通信并进行异步执行。返回的CreateTopicsResult对象公开了KafkaFuture来访问请求批处理中每个主题的结果。这中方式遵循了Java异步编程模式,允许调用者使用Future获得操作的结果。

对于同步操作可以立即调用这个方法来检索结果,直到操作完成或失败。在失败的情况下,会导致一个ExecutionException,它包括了底层错误信息。

自定义主题配置

除了默认选项,还可以使用Admin的重载方法,并通过CreateTopicsOptions对象提供一些配置选项。当创建新主题时,可以使用这些配置创建主题:

CreateTopicsOptions topicOptions = new CreateTopicsOptions() 
  .validateOnly(true) 
  .retryOnQuotaViolation(false); 
 
CreateTopicsResult result = admin.createTopics( 
  Collections.singleton(newTopic), topicOptions 
); 

将validateOnly选项设置为true,这意味着客户端仅验证主题而不实际创建主题。类似地,retryonquota违规选项被设置为false,以便在配额违规的情况下不会重试操作。

其他主题配置

Kafka有大量的主题配置用于控制主题行为,如数据保留策略和压缩方式等。配置选项包括服务器默认值,也可以根据需要对特定主题进行设置。Admin API中的TopicConfig类包含在创建主题设置相应配置。

// 创建主题并指定压缩选项 'lz4'  
 
Map<String, String> newTopicConfig = new HashMap<>(); 
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT); 
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"); 
 
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor) 
  .configs(newTopicConfig); 

其他主题操作

除了能够创建新主题之外,Admin API还具有删除(delete)、列出(list)和描述(describe)主题的操作。所有这些与主题相关的操作与主题创建操作类似。

这些操作方法都有接受xxxTopicOptions对象作为输入的重载方法,所有这些方法都返回相应的xxxTopicsResult对象,kafkaFuture用于访问异步操作结果。

最后需要说明的是,自Kafka 0.11版本以来,admin api仍在不断进化,当标记了InterfaceStability.Evolving 注解,则暗示在为了版本该API可能会改变,从而影响兼容性。

总结

本文介绍了使用Java admin 客户端创建kafka主题。首先使用默认选项和指定选项创建主题。接着解释了如何使用各种属性配置新主题。最后简要介绍管理其他与主题相关的操作。


本文参考链接:https://blog.csdn.net/neweastsun/article/details/123851187