1.概述

Kafka是一个围绕分布式消息传递队列构建的消息处理系统。它提供了一个Java库,以便应用程序可以将数据写入数据或读取数据,或从Kafka主题读取数据。

现在,由于大多数业务域逻辑通过单元测试验证,应用程序通常会模拟所有I / O操作在Junit。Kafka还提供了一个模仿Producer.嘲笑生产者申请。

在本教程中,我们首先将实现Kafka Producer应用程序。稍后,我们将实施一个单元测试以验证共同的生产者操作模仿Producer.

2. Maven依赖项

在我们实现生产者应用程序之前,我们将添加Maven依赖项Kafka-Clients.

<依赖>  org.apache.kafka   KAFKA-Clients   2.5.0  

3.模仿Producer.

Kafka-Clients.库包含一个用于在Kafka中发布和消费消息的Java库。Producer应用程序可以使用这些API将键值记录发送给Kafka主题:

公共类kafkaproducer {私有最终生产者制作人;Public Kafkaproducer(Producer  Producer){this.producer = producer;公共将来发送(String键,字符串值){producerrecord记录= new producerrecord(“主题_sports_news”,键,值);返回producer.send(记录);}}

任何Kafka生产者必须实施生产者接口在客户端库中。Kafka还提供了一个Kafkaproducer.类是一种具体实现,该实现对Kafka经纪人执行I / O操作。

此外,Kafka提供了一个模仿Producer.实现相同的生产者界面和模拟所有I / O操作所实施的Kafkaproducer.

@test voidedkeyvalue_whensend_thenverifyhistory(){mockproducer mockproducer = new mockproducer <>(true,new stringserializer(),new stringserializer());Kafkaproducer = New Kafkaproducer(模型产品);未来 RecordmetAtafuture = Kafkaproducer.send(“足球”,“{\”网站\“:\”baeldung \“}”)金宝搏188体育;asserttrue(mockproducer.history()。size()== 1);}

虽然这样的I / O操作也可以嘲笑Mockito.模仿Producer.让我们访问我们需要在模拟之上实施的大量功能。一个这样的特征是历史()方法。模仿Producer.缓存记录发送()被调用,从而允许我们验证生产者的发布行为。

此外,我们还可以验证主题名称,分区,记录密钥或值等元数据:

asserttrue(mockproducer.history()。get(0).key()。EqualsignoreCase(“数据”));asserttrue(RecordmetAdatafuture.get()。分区()== 0);

4.嘲笑KAFKA集群

在我们到目前为止的模拟测试中,我们假设一个只有一个分区的主题。但是,为了实现生产者和消费者线程之间的最大并发,Kafka主题通常被分成多个分区。

这允许生产者将数据写入多个分区。这通常通过基于键和映射特定键对特定分区来分区记录来实现:

公共类偶数分派器扩展DefaultPartitioner {@override public int分区(字符串主题,对象键,byte [] keybytes,对象值,byte [] valugebytes,cluster cluster){if((字符串)键).length()%2 == 0){返回0;}返回1;}}

因此,所有偶数密钥都将发布为分区“0”,同样,奇长的键分区“1”。

模仿Producer.使我们能够通过用多个分区映射Kafka群集来验证此类分区分配算法:

@test voidedkeyvalue_whensendwithpartition_thenverifypAttitionNumber()抛出executionException,InterruptedException {partitioninfo partitioninfo0 = new partitioninfo(topic_name,0,null,null,null);partitioninfo partitioninfo1 = new partitioninfo(topic_name,1,null,null,null);列表列表= new arraylist <>();list.add(partitionInfo0);list.add(partitionInfo1);群集群集=新群集(“kafkab”,new arraylist (),list,amptyset(),amptyset());this.mockproducer = new mockproducer <>(群集,true,new extoddpartitioner(),new stringserializer(),new stringserializer());Kafkaproducer = New Kafkaproducer(模型产品);未来 RecordmetAtaFuture = Kafkaproducer.send(“分区”,“{\”网站\“:\”baeldung \“}”);金宝搏188体育asserttrue(RecordmetAdatafuture.get()。分区()== 1); }

我们嘲笑A.有两个分区,0和1.我们可以验证偶数分派者将记录发布到分区1。

5.嘲笑错误模仿Producer.

到目前为止,我们只嘲笑生产者成功向Kafka主题发送记录。但是,如果在编写记录时出现异常会发生什么?

应用程序通常通过重试或抛出客户端来处理此类异常。

模仿Producer.允许我们在内部模拟异常发送()这样我们就可以验证异常处理代码:

@test voidedkeyvalue_whensend_thenreturnexception(){mockproducer  mockproducer = new mockproducer <>(false,new stringserializer(),new stringserializer())kafkaproducer = new kafkaproducer(模型Procucer);未来 Record = Kafkaproducer.send(“站点”,“{\”网站\“:\”Baeldung \“金宝搏188体育}”);runtimeexception e = new runtimeexception();MockProducer.Errornext(E);尝试{Record.get();catch(executionException | InterruptedException ex){assertequals(e,ex.getcause());} astertrue(record.isdone());}

此代码中有两个值得注意的东西。

首先,我们打电话给了模仿Producer.构造函数自动完成作为错误的。这告诉了这一点模仿Producer.在完成之前等待输入发送()方法。

其次,我们会打电话mockproducer.errornext(e),以便模仿Producer.返回最后一个例外发送()称呼。

6.嘲笑交易写入模仿Producer.

Kafka 0.11在Kafka经纪商,生产者和消费者之间引入了交易。这允许端到端完全是一旦消息传递语义在卡夫卡。简而言之,这意味着事务生产者只能用一个经纪人发布记录两阶段提交协议。

模仿Producer.还支持事务性写入并允许我们验证此行为:

@test voidedkeyvalue_whensendwithtxn_thensendonlyontxncommit(){mockproducer  mockproducer = new mockproducer <>(true,new stringserializer(),new stringserializer())kafkaproducer = new kafkaproducer(模型producer);kafkaproducer.inittransaction();kafkaproducer.begintransaction();未来 Record = Kafkaproducer.send(“数据”,“{\”网站\“:\”baeldung \金宝搏188体育“}”);asserttrue(mockproducer.history()。isempty());kafkaproducer.committransaction();asserttrue(mockproducer.history()。size()== 1);}

自从模仿Producer.还支持与混凝土相同的APIkafkaproducer,它只更新历史一旦我们提交交易。这种嘲笑行为可以帮助应用程序验证commitTransaction()为每个交易调用。

7.结论

在本文中,我们看了模仿Producer.班级Kafka-Client.图书馆。我们讨论了这一点模仿Producer.实现与混凝土相同的层次结构Kafkaproducer.因此,我们可以使用Kafka Broker模拟所有I / O操作。

我们还讨论了一些复杂的模型方案,并且能够使用该方案测试异常,分区和交易模型产品。

与始终一样,所有代码示例都可用在github上

通用底部

使用Spring 5和Spring Boot 2开始,通过学习春天课程:

>>查看课程
评论在本文上关闭!