1.概述

Apache卡夫卡是一种强大的分布式容错流处理系统。在上一个教程中,我们学到了如何与春天和kafka合作

在本教程中,我们将在上一个上构建并学习如何编写不依赖于外部Kafka服务器运行的可靠的自包含的集成测试

首先,我们将开始,但查看如何使用和配置Kafka的嵌入式实例。然后我们会看到我们如何利用流行的框架TestContainers.从我们的测试中。

2.依赖关系

当然,我们需要添加标准春天kafka.依赖性对我们的pom.xml:

<依赖> < groupId > org.springframework。Kafka. spring-kafka 2.6.3.RELEASE 

然后我们需要两个专门用于我们的测试的依赖项。首先,我们将添加Spring-Kafka-Test文物:

<依赖> < groupId > org.springframework。Kafka. spring-kafka-test 2.6.3.RELEASE test 

最后,我们将添加TestContainers Kafka依赖,也可以通过Maven Central.:

<依赖>  org.tiestContainers   KAFKA   1.15.0   test  

现在我们拥有配置的所有必要依赖项,我们可以使用Kafka编写一个简单的Spring启动应用程序。

3.一个简单的Kafka生产者-消费者应用程序

在本教程中,我们测试的重点将是一个简单的生产者-消费者Spring Boot Kafka应用程序。

让我们首先定义我们的应用程序入口点:

@springbootapplication @enableautoconfiguration公共类kafkaproducerconsumerapplication {public静态void main(string [] args){springapplication.run(kafkaproducerconsumerapplication.class,args);}}

正如我们所看到的,这是一个标准的Spring引导应用程序。在可能的情况下,我们希望使用默认配置值。考虑到这一点,我们利用@EnableAutoConfiguration.注释以自动配置我们的应用程序。

3.1。制片人设置

接下来,让我们考虑一个生产者bean,我们将使用它向给定的Kafka主题发送消息:

@Component公共类Kafkaproducer {私有静态最终记录器Logger = loggerFactory.getLogger(Kafkaproducer.class);@autowired私有kafkatemplate  kafkatemplate;public void发送(字符串主题,字符串有效载荷){logger.info(“发送payload ='{}'主题='{}'”,有效载荷,主题);kafkatemplate.send(主题,有效载荷);}}

我们的Kafkaproducer.上面定义的豆类仅仅是周围的包装KafkaTemplate班级。此类提供高级线程安全操作,例如向提供的主题发送数据,这正是我们在我们的内容发送方法

3.2。消费者设置

同样,我们现在将定义一个简单的消费豆,它将收听Kafka主题并接收消息:

@Component public class KafkaConsumer {private static final Logger Logger = LoggerFactory.getLogger(KafkaConsumer.class);private CountDownLatch latch = new CountDownLatch(1);private String payload = null;@KafkaListener(主题= "${test.topic}") public void receive(ConsumerRecord

我们简单的消费者使用@KafkaListener注释的收到方法来监听关于给定主题的消息。稍后我们将看到如何配置test.topic.从我们的测试中。

此外,接收方法将消息内容存储在我们的bean中并递减计数门闩变量。这个变量很简单线程安全的计数器字段,我们将在稍后的测试中使用该字段,以确保我们成功地接收到消息

现在我们使用Spring Boot实现了我们的简单Kafka应用程序让我们看看我们如何编写集成测试。

4.测试中的一个词

一般来说,在编写干净的集成测试时,我们不应该依赖于我们可能无法控制或可能突然停止工作的外部服务。这可能会对我们的测试结果产生不利影响。

同样,如果我们依赖于外部服务,在这种情况下,我们可能无法将其设置为,控制它并以我们想要的方式撕下它。

4.1。应用属性

我们将从测试中使用一组非常简单的应用程序配置属性。我们将在我们的SRC /测试/资源/ application.yml文件:

Spring:Kafka:消费者:自动偏移 - 重置:最早的Group-ID:Baeldung测试:主题:嵌入式 - 测试主题金宝搏188体育

这是我们在处理嵌入的Kafka实例或本地代理时需要的最小属性集。

其中大多数是不言自明的,但我们应该强调特别重要的是消费者财产自动偏移 - 重置:最早。此属性确保我们的消费者组获得我们发送的消息,因为容器可能在发送完成后启动。

另外,我们用这个值配置topic属性嵌入式测试主题,这是我们将在测试中使用的主题。

5.使用嵌入式Kafka进行测试

在这一节中,我们将看看如何使用内存中的Kafka实例来运行我们的测试。这也被称为嵌入式Kafka。

的依赖Spring-Kafka-Test我们添加以前包含一些有用的实用程序来帮助测试我们的应用程序。最值得注意的是,它包含EmbeddedKafkaBroker班级

考虑到这一点,让我们前进并编写我们的第一个集成测试:

@SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = {"listener =PLAINTEXT://localhost:9092", "port=9092"})类EmbeddedKafkaIntegrationTest {@Autowired private kafkaconconsumer consumer;@Autowired private KafkaProducer producer;@Value("${test.topic}") private String topic;@Test public void givenembeddedkafkabroker_whensendingtosimpleproducer_thenmessagerreceived () throws Exception{生产者。发送(主题,“发送与自己的简单KafkaProducer”);fourceer.getlatch()。等待(10000,TimeUnit.milliseconds);为了(consumer.getLatch () .getCount(),等于l (0));为了(consumer.getPayload (), containsString (embedded-test-topic "));}}

让我们过一遍测试的关键部分。首先,我们用两个标准的Spring注释来装饰我们的测试类:

  • @springboottest.注释将确保我们的测试引导Spring应用程序上下文
  • 我们也使用了@dirtiescontext.注释,这将确保在不同的测试之间清除并重置此上下文

关键的部分来了,我们用@EmbeddedKafka注释要注入一个实例EmbeddedKafkaBroker进入我们的测试。此外,还有几个属性可用我们可以用于配置嵌入式Kafka节点:

  • 分区-这是每个主题使用的分区数。为了保持良好和简单,我们只希望从测试中使用一个
  • 经纪商- Kafka代理的附加属性。同样,为了保持简单,我们指定了一个纯文本侦听器和端口号

接下来,我们自动链接我们的消费者生产者类并配置一个主题以使用我们的值application.properties

对于拼图的最后一块,我们只需向我们的测试主题发送消息,并验证已收到消息并包含我们的测试主题的名称

我们运行我们的测试时,我们将在详细的弹簧输出中看到:

。。。12:45:35.099 [main] INFO c.b.kafka.embedded.KafkaProducer - sending payload='Sending with our own simple KafkaProducer' to topic='embedded-test-topic' ... 12:45:35.103 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO c.b.kafka.embedded.KafkaConsumer - received payload= 'ConsumerRecord(topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, serialized key size = -1, serialized value size = 41, headers = RecordHeaders(headers = [], isReadOnly = false),key = null, value = send with our own simple KafkaProducer)'

这证实了我们的测试正常工作。惊人的!我们现在有一种方法可以使用内存kafka经纪人编写自包含的独立集成测试

6.使用TestContainers测试Kafka

有时我们可能会在专门为测试目的专门提供的服务的嵌入式内存实例之间看到嵌入式内存实例之间的小差异。虽然不太可能,但也可能是我们测试中使用的端口可能被占用,导致失败

记住这一点,在本节中,我们将看到前面使用Testcontainers框架。我们将看到如何实例化和管理托管在a中的外部Apache Kafka代理Docker容器从我们的集成测试。

让我们定义另一个集成测试,这与我们在上一节中看到的另一个集成测试会非常相似:

@RunWith(SpringRunner.class)@Import(com.b金宝搏188体育aeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class)@SpringBootTest(类= KafkaProducerConsumerApplication.class)@DirtiesContext公共类KafkaTestContainersLiveTest {@ClassRule公共静态KafkaContainer卡夫卡=新KafkaContainer(DockerImageName.parse(“ConfluentInc / CP-Kafka:5.4.3”));@Autowired私人Kafkaconsumer消费者;@Autowired private KafkaProducer producer;@Value("${test.topic}") private String topic;@Test public void gendkafkadockercontainer_whensingtosimpleproducer_thenmessagereceive()抛出异常{producer.send(主题,“使用自己的控制器发送”);fourceer.getlatch()。等待(10000,TimeUnit.milliseconds);为了(consumer.getLatch () .getCount(),等于l (0));为了(consumer.getPayload (), containsString (embedded-test-topic "));}}

让我们来看看这次差异。我们宣布了这一点Kafka.领域,这是标准junit.@classrule.该字段是一个实例Kafkacontainer.课程将准备和管理我们运行Kafka的集装箱的生命周期。

为了避免端口冲突,Testcontainers在docker容器启动时动态分配一个端口号。由于这个原因,我们使用这个类提供了一个定制的消费者和生产者工厂配置KafkatestContainersConfiguration:

@Bean公共映射 ConfeceConfigs(){Map  Props = new hashmap <>();props.put(confecerconfig.bootservers_config,kafka.getbootstrapservers());props.put(confecerconfig.auto_offset_reset_config,“最早”);props.put(confecerconfig.group_id_config,“b金宝搏188体育aeldung”);//更多标准配置返回道具;@bean公共producerfactory  producerfactory(){map  configprops = new hashmap <>();configprops.put(producerconfig.bootstrap_servers_config,kafka.getbootstrapservers());//更多标准配置返回New DefaultKafKaproducerFactory <>(ConfigProps);}

然后我们通过@ import在我们的测试开始时注释。

这样做的原因是我们需要一种方法可以将服务器地址注入我们的应用程序,如前所述,是动态生成的。我们通过致电来实现这一目标getBootstrapservers()方法,它将返回Bootstrap Server位置:

bootstrap.servers = [plaintext:// localhost:32789]

现在,当我们运行我们的测试时,我们应该看到TestContainers做了几件事:

  • 检查我们的本地Docker设置。
  • 拉这一点Confluentinc / CP-Kafka:5.4.3码头图像如有必要
  • 开始一个新的容器,等待它准备好了
  • 最后,在测试完成后关闭并删除容器

同样,通过检查测试输出来确认这一点:

13:33:10.396 [主要]信息?[ConfluentInc / CP-Kafka:5.4.3]  - 为图片创建容器:Confluentinc / CP-Kafka:5.4.3 13:33:10.454 [主要]信息?[Confluentinc / CP-Kafka:5.4.3]  - 带ID的启动容器:B22B752CEE2E9E9E6ADE38E46D0C6D881AD941D17223BDA073AFE4D2FE0559C3 13:33:10.785 [主要]信息?[Confluentinc / CP-Kafka:5.4.3]  -  ContainsfuluentInc / CP-Kafka:5.4.3开始:B22B752CEE2E9E9E6ADE38E46D0C6D881AD941D17223BDA073AFE4D2FE0559C3

Presto!使用KAFKA Docker容器进行工作集成测试。

7.结论

在本文中,我们已经了解了有关使用Spring Boot测试Ka金宝搏官网188befka应用程序的几种方法。在第一种方法中,我们了解如何配置和使用本地内存Kafka代理。

然后我们看到了如何使用Testcontainers来建立一个运行在docker容器中的外部Kafka代理。

一如既往,本文的完整源代码是可用的在github上

通用底部

通过。开始使用Spring 5和Spring Boot 2学习春天课程:

>>查看课程
评论在本文上关闭!