1.概述

在本文中,我们将研究KafkaStreams图书馆

KafkaStreams是由Apache Kafka的创建者设计的这款软件的主要目标是让程序员创建高效、实时、流媒体的应用程序,可以像微服务一样工作。

KafkaStreams使我们能够从Kafka主题消费,分析或转换数据,并潜在地,发送它到另一个Kafka主题。

为了演示KafkaStreams,我们将创建一个简单的应用程序,它从主题中读取句子,计数单词的出现次数并打印每个单词的计数。

值得注意的是KafkaStreams库不是响应式的,也不支持异步操作和反压处理。

2.Maven的依赖

开始写流处理逻辑使用KafkaStreams,我们需要添加一个依赖项kafka-streamskafka-clients

<依赖> < groupId >表示。kafka kafka-streams 1.0.0   org.apache。kafka kafka-clients 1.0.0 

我们还需要安装并启动Apache Kafka,因为我们将使用Kafka主题。这个主题将是我们流作业的数据源。

我们可以下载Kafka和其他必需的依赖官方网站

3.配置KafkaStreams输入

我们要做的第一件事是定义输入的Kafka主题。

我们可以使用融合性的它包含一个Kafka服务器。它还包含kafka-console-producer我们可以使用它向Kafka发布消息。

首先让我们运行我们的Kafka集群:

。/汇合的开始

一旦Kafka启动,我们就可以使用APPLICATION_ID_CONFIG

String inputopic = " inputopic ";
streamsConfiguration = new Properties();streamsConfiguration。把(StreamsConfig。APPLICATION_ID_CONFIG wordcount-live-test);

一个关键的配置参数是BOOTSTRAP_SERVER_CONFIG。这是我们刚刚启动的本地Kafka实例的URL:

private String bootstrapServers = "localhost:9092";streamsConfiguration。把(StreamsConfig。BOOTSTRAP_SERVERS_CONFIG bootstrapServers);

接下来,我们需要传递将被消费的消息的键和值的类型inputTopic:

streamsConfiguration。把(StreamsConfig。DEFAULT_KEY_SERDE_CLASS_CONFIG Serdes.String () .getClass () . getname ());streamsConfiguration。把(StreamsConfig。DEFAULT_VALUE_SERDE_CLASS_CONFIG Serdes.String () .getClass () . getname ());

流处理通常是有状态的。当我们想要保存中间结果时,我们需要指定STATE_DIR_CONFIG参数

在我们的测试中,我们使用了一个本地文件系统:

streamsConfiguration。把(StreamsConfig。STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());

4.构建流拓扑

一旦我们定义了输入主题,我们就可以创建一个流拓扑——这是一个关于事件应该如何处理和转换的定义。

在我们的示例中,我们想实现一个单词计数器。每句话发送到inputTopic,我们想把它分成单词,并计算每个单词的出现次数。

的实例KStreamsBuilder类开始构造我们的拓扑:

KStreamBuilder builder = new KStreamBuilder();KStream textLines = builder.stream(inputopic);Pattern Pattern = Pattern.compile("\\W+", Pattern. unicode_character_class);KTable wordCounts = textLines .flatMapValues(value -> Arrays.asList(pattern.split(value. tolowercase ()))) .groupBy((key, word) -> word) .count();

为了实现单词计数,首先,我们需要使用正则表达式分割值。

split方法返回一个数组。我们使用flatMapValues ()压平它。否则,我们将得到一个数组列表,并且使用这种结构编写代码会很不方便。

最后,我们聚合每个单词的值并调用count ()它将计算特定单词的出现次数。

5.处理结果

我们已经计算了输入消息的字数。现在,让我们使用foreach ()方法:

wordCounts .foreach((w, c) ->系统输出。Println ("word: " + w + " -> " + c));

在生产中,这种流作业通常会将输出发布到另一个Kafka主题。

我们可以用()方法:

String outputopic = "输出主题";Serde stringSerde = Serdes.String();Serde longSerde = Serdes.Long();wordcount。(stringSerde longSerde outputTopic);

Serde类为我们提供了用于将对象序列化为字节数组的Java类型的预配置序列化器。字节数组将被发送到Kafka主题。

我们使用字符串作为我们主题的关键作为实际计数的值。的()方法将结果数据保存到outputTopic

6.KafkaStream开始工作

到目前为止,我们构建了一个可以执行的拓扑。然而,工作还没有开始。

我们需要通过调用start ()方法KafkaStreams实例:

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);streams.start ();thread . sleep (30000);streams.close ();

注意,我们要等待30秒才能完成任务。在真实场景中,该任务将一直运行,处理来自Kafka的事件。

我们可以通过发布一些事件到我们的Kafka主题来测试我们的工作。

让我们开始一个kafka-console-producer并手动发送一些事件到我们的inputTopic:

./kafka-console-producer——topic inputopic——broker-list localhost:9092 >"this is a pony" >"this is a horse and pony"

通过这种方式,我们向Kafka发布了两个事件。我们的应用程序将使用这些事件,并打印以下输出:

Word: -> 1 Word: this -> 1 Word: is -> 1 Word: a -> 1 Word: pony -> 1 Word: -> 2 Word: this -> 2 Word: is -> 2 Word: a -> 2 Word: horse -> 1 Word: and -> 1 Word: pony ->

我们可以看到,当第一个信息到达的时候,这个词小马只发生一次。但当我们发出第二条信息时,那个词小马发生了第二次印刷:字:小马-> 2″

6.结论

本文讨论了如何使用Apache Kafka作为数据源来创建一个主流处理应用程序KafkaStreams库作为流处理库。

所有这些示例和代码片段都可以在GitHub项目-这是一个Maven项目,所以它应该很容易导入和运行。

通用的底部

从Spring 5和Spring Boot 2开始学习的春天课程:

>>看看课程
2评论
最古老的
最新的
内联反馈
查看所有评论
这篇文章的评论已经关闭!