1.概述

在本教程中,我们将探索模仿Consumer.,之一Kafka.s消费者实现。

首先,我们将讨论在测试Kafka时需要考虑的主要问题消费者。然后,我们将看看如何使用模仿Consumer.实现测试。

2.测试一个卡夫卡消费者

消费来自Kafka的数据包括两个主要步骤。首先,我们必须订阅主题或手动分配主题分区。其次,我们轮询了使用的批量记录轮询方法。

轮询通常是在无限循环中完成的。这是因为我们通常希望连续地使用数据。

例如,让我们考虑一个简单的消费逻辑,包括订阅和轮询循环:

void consume() {try {consumer.subscribe(Arrays. subscribe);asList(“foo”、“酒吧”));while (true) {userrecords  records = consumer.poll(Duration.ofSeconds(1));记录。forEach(记录- > processRecord(记录);}} catch (WakeupException ex){//忽略shutdown} catch (RuntimeException ex){//异常处理}finally {consumer.close();}}

看看上面的代码,我们可以看到有一些东西可以测试:

  • 订阅
  • 轮询循环
  • 例外处理
  • 如果消费者正确关闭

我们有多个选项来测试消费逻辑。

我们可以使用内存中的Kafka实例。但是,这种方法有一些缺点。通常,内存中的Kafka实例会使测试非常繁重和缓慢。此外,设置它不是一项简单的任务,可能导致不稳定的测试。

或者,我们可以使用mock框架来mock消费者。虽然使用这种方法使测试轻量级,但设置它可能有点棘手。

最终选择,也许是最好的,就是使用模仿Consumer.,这是一个消费者实施意味着测试。它不仅帮助我们构建轻量级测试,而且还很容易设置

让我们看看它提供的特性。

3.使用模仿Consumer.

模仿Consumer.实施消费者接口,kafka-clients库提供所以,它嘲笑真实的全部行为消费者如果没有我们需要编写大量代码

让我们来看看一些使用的例子模仿Consumer.。特别是,我们将采取一些常见的场景,我们可能会在测试消费者应用程序时遇到,并使用它来实现它们模仿Consumer.

对于我们的示例,让我们考虑一个应用程序,该应用程序从Kafka主题中消耗国家/地区人口更新。更新仅包含国家/地区的名称及其当前人口:

class CountryPopulation {private String country;私人整数人口;//标准构造函数}

我们的消费者只是使用Kafka的更新轮询消费者实例,处理它们,最后使用commitSync方法:

公共类oparyypulationconsumer {私人消费者消费者;private java.util.function.consumer 异常consumer;private java.util.function.consumer  oparyypulationconsumer;//标准构造函数void startbysubscribing(String Topic){cancume(() - > confeceer.subscribe(collections.singleton(主题)));void startbyassigning(字符串主题,int分区){canumume(() - > consumer.assign(collections.singleton(新主题partition(主题,分区)))));私有void消费(可逃离的PreaspollingTask){try {preaspollingtask.run();while(true){consumerRecords  Records = Confifer.Poll(持续时间.OfMillis(1000));streamsupport.stream(Records.spliterator(),false).map(记录 - >新招财(Record.key(),Record.Value())).Foreach(ConnentypulationConsumer);confuler.commitsync();catch(wakeupException e){system.out.println(“关闭......”); } catch (RuntimeException ex) { exceptionConsumer.accept(ex); } finally { consumer.close(); } } public void stop() { consumer.wakeup(); } }

3.1。创建一个模仿Consumer.实例

接下来,让我们看看我们如何创建一个实例模仿Consumer.:

@beforeeach void setup(){fuginer = new mockconsumer <>(offsetresetstrategy.earliest);更新= new arraylist <>();CountyPopulationConsumer = New CollowypulationConsumer(消费者,EX  - > This.Pollexception = EX,更新:: ADD);}

基本上,我们需要提供的就是偏移复位策略。

注意我们使用更新收集记录COMPOXPOPULULULULCONSUMER.会收到。这将有助于我们断言预期结果。

同样的,我们用Pollexception.收集并断言异常。

对于所有的测试用例,我们将使用上面的setup方法。现在,让我们看看消费者应用程序的一些测试用例。

3.2。分配主题分区

首先,让我们创建一个测试startbyassign.方法:

@Test void whenStartingByAssigningTopicPartition_thenExpectUpdatesAreConsumedCorrectly(){//给定消费者. schedulepolltask(() ->消费者。记录(TOPIC, PARTITION, "Romania", 19_410_000));consumer.schedulePollTask (() - > countryPopulationConsumer.stop ());HashMap startOffsets = new HashMap<>();TopicPartition tp = new TopicPartition(TOPIC, PARTITION);startoffsets.put(tp,0l);consumer.updateBeginningOffsets (startOffsets);/ /当countryPopulationConsumer。startbyassign.(TOPIC, PARTITION); // THEN assertThat(updates).hasSize(1); assertThat(consumer.closed()).isTrue(); }

首先,我们设置MockConsumer。方法向使用者添加一条记录addrecord.方法

首先要记住的是在分配或订阅主题之前,我们无法添加记录。这就是为什么我们使用schedulepolltask.方法我们计划的任务将在获取记录之前在第一次轮询上运行。因此,记录的添加将在分配发生之后发生。

同样重要的是我们不能添加到模仿Consumer.不属于主题和分配给该主题的分区的记录

然后,为了确保消费者不会无限期地运行,我们将其配置为在第二次轮询时关闭。

此外,我们必须设置开始偏移。我们这样做使用了updateBeginningOffsets方法。

最后,我们检查是否正确地使用了更新,以及消费者是关闭的。

3.3。订阅主题

现在,让我们为startBySubscribing方法:

@test void whinstartingbysubscribingtotopic_thenexpectupdatesareconsumedcorrectly(){//给定confuler.schedulepolltask(() - > {conticeer.rebalance(collections.singletronlist(new touppartitory(主题,0))); consumer.addrecord(记录(“罗马尼亚”,1000,主题,0));});consumer.schedulePollTask (() - > countryPopulationConsumer.stop ());HashMap startOffsets = new HashMap<>();TopicPartition tp = new TopicPartition(TOPIC, 0);startoffsets.put(tp,0l);consumer.updateBeginningOffsets (startOffsets);/ /当countryPopulationConsumer.startBySubscribing(主题);/ /然后为了(更新).hasSize (1);为了(consumer.closed ()) .isTrue ();}

在这种情况下,在添加记录之前的第一件事是重新平衡。我们通过致电重新平衡方法,模拟重新平衡。

其余的与之相同startbyassign.测试用例。

3.4。控制轮询循环

我们可以多种方式控制轮询循环。

第一个选择是安排轮询任务正如我们在上面的测试中所做的那样。我们通过SchedulePolltask,一个可运行的作为参数。我们调用时,我们计划的每项任务将运行轮询方法。

我们的第二个选择是调用醒来方法。通常,我们就是这样打断长时间的投票电话的。实际上,这就是我们如何实现停止方法opardypulationconsumer。

最后,我们可以属性设置要抛出的异常setPollException方法:

@Test void whenstartingbysubscribingtotopicandexceptionoccurred(){//给定消费者. schedulepolltask(() ->消费者。setPollException(新KafkaException(“调查异常”)));consumer.schedulePollTask (() - > countryPopulationConsumer.stop ());HashMap startOffsets = new HashMap<>();TopicPartition tp = new TopicPartition(TOPIC, 0);startoffsets.put(tp,0l);consumer.updateBeginningOffsets (startOffsets);/ /当countryPopulationConsumer.startBySubscribing(主题);/ /然后为了(pollException) .isInstanceOf (KafkaException.class)。hasMessage(“调查异常”);为了(consumer.closed ()) .isTrue ();}

3.5。嘲笑结束偏移和分区信息

如果我们的消费逻辑是基于终端偏移量或分区信息的,我们也可以使用模仿Consumer.

当要模拟结束偏移量时,可以使用addEndOffsetsupdateEndOffsets方法。

如果我们想模拟分区信息,我们可以使用UpdatePartitions.方法。

4。结论

在本文中,我们探索了如何使用模仿Consumer.测试KAFKA消费者应用程序。

首先,我们看了一个消费者逻辑的例子,以及哪些是需要测试的基本部分。然后,我们测试了一个简单的Kafka消费者应用使用模仿Consumer.

一路上,我们看着这个功能模仿Consumer.以及如何使用它。

与往常一样,所有这些代码示例都是可用的在github上

通用底部

开始使用Spring 5和Spring Boot 2,通过学习春天课程:

>>查看课程
对这篇文章的评论关闭!