Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/09-streams/01RocketMQStreamsOverview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# RocketMQ Streams 概览
RocketMQ Streams是基于RocketMQ的轻量级流计算引擎。能以SDK方式被应用依赖,无须部署复杂的流计算服务端即可获得流计算能力。
因此具有资源消耗少、扩展性好、支持流计算算子丰富的特点。

## 整体架构
![总体架构](../picture/33rocketmq-streams/总体-1.png)

数据从RocketMQ中被RocketMQ-streams消费,经过处理最终被写回到RocketMQ。

![总体架构](../picture/33rocketmq-streams/总体-2.png)

数据被RocketMQ Consumer消费,进入处理拓扑被算子处理,如果流处理任务中含有算子keyBy,则需要将数据按照Key进行分组,将分组数据写入shuffle topic。后续算子从
shuffle topic消费。如果还涉及count之类有状态算子,那么计算时需要读写state topic,计算结束后,将结果写回到RocketMQ中。


## 消费模型

![img_2.png](../picture/33rocketmq-streams/消费模型.png)

计算实例实质上是依赖了Rocket-streams SDK的client,因此,计算实例消费的MQ依赖RocketMQ rebalance分配,
计算实例总个数也不能大于消费总MQ个数,否则将有部分计算实例处于等待状态,消费不到数据。

一个计算实例可以消费多个MQ,一个实例内也只有一张计算拓扑图。

## 状态
![img_3.png](../picture/33rocketmq-streams/状态存储.png)

对于有状态算子,比如count,需要先对count算子进行分组,然后才能求和。分组算子keyBy会将数据按照分组的key重新写回RocketMQ,并且使相同key写入同一分区(这一过程称作shuffle),
保证这个含有相同key的数据被同一个消费者消费。 状态本地依赖RocksDB加速读取,远程依赖RocketMQ做持久化。


## 扩缩容

![img.png](../picture/33rocketmq-streams/RocketMQ-streams扩缩容.png)

当计算实例从3个缩容到2个,借助于RocketMQ集群消费模式下的rebalance功能,被消费的分片MQ会在计算实例之间重新分配。Instance1上消费的MQ2和MQ3被分配到Instance2和Instance3上,
这两个MQ的状态数据也需要迁移到Instance2和Instance3上,这也暗示,状态数据是根据源数据分片MQ保存的;扩容则是刚好相反的过程。

65 changes: 65 additions & 0 deletions docs/09-streams/02RocketMQStreamsConcept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# RocketMQ Streams 核心概念

## 领域模型

### StreamBuilder
![img_2.png](../picture/33rocketmq-streams/领域模型-1.png)

* 一个StreamBuilder实例,有1到N个pipeline,pipeline表示一个数据处理路径;
* 一个pipeline可以含有1到N个处理节点GroupNode;
* 一个StreamBuilder实例,有一个TopologyBuilder,TopologyBuilder可构建出数据处理器processor;
* 一个JobId对应一个StreamBuilder实例。

### RocketMQStream
![img_2.png](../picture/33rocketmq-streams/领域模型-2.png)

* 一个RocketMQStream实例,有一个拓扑构建器TopologyBuilder;
* 一个RocketMQStream实例,可实例化1到N个worker线程;
* 每个线程WorkerThread实例,包含一个engine;
* 一个engine包含执行数据处理的所有逻辑,包含一个consumer实例、一个producer实例、一个StateStore实例;

### 流处理实例
流处理实例表示一个运行RocketMQ Streams的进程;

* 一个流处理实例包含一个StreamBuilder,一个RocketMQStream,一个拓扑图,一到多个pipeline;


## StreamBuilder
+ ```StreamBuilder(jobId)``` 构建实例;
+ ```<OUT> RStream<OUT> source(topicName, deserializer) ``` 定义source topic 和反序列化方式;


## RStream
+ ```<K> GroupedStream<K, T> keyBy(selectAction)``` 按照特定字段分组;
+ ```<O> RStream<O> map(mapperAction)``` 对数据进行一对一转化;
+ ```RStream<T> filter(predictor)``` 对数据进行过滤
+ ```<VR> RStream<T> flatMap(mapper)```对数据进行一对多转化;
+ ```<T2> JoinedStream<T, T2> join(rightStream)``` 双流Join;
+ ```sink(topicName, serializer)``` 将结果输出到特定topic;


## GroupedStream
对含有相同Key的数据进行操作
+ ```<OUT> GroupedStream<K, Integer> count(selectAction)``` 统计含有某个字段数据的个数;
+ ```GroupedStream<K, V> min(selectAction)``` 对某个字段统计最小值;
+ ```GroupedStream<K, V> max(selectAction)``` 对某个字段统计最大值;
+ ```GroupedStream<K, ? extends Number> sum(selectAction)``` 对某个字段统计和;
+ ```GroupedStream<K, V> filter(predictor)``` 对某个字段进行过滤;
+ ```<OUT> GroupedStream<K, OUT> map(valueMapperAction)``` 对数据进行一对一转化;
+ ```<OUT> GroupedStream<K, OUT> aggregate(accumulator)``` 对数据进行聚合操作,且聚合支持二阶聚合,例如在窗口未触发时添加数据,在窗口触发时计算结果这类算子;
+ ```WindowStream<K, V> window(windowInfo)``` 对窗口划定window;
+ ```GroupedStream<K, V> addGraphNode(name, supplier)``` 底层接口,向流处理拓扑中增加自定义算子;
+ ```RStream<V> toRStream()``` 转化为RStream,只是在接口形式上转化,对数据无任何操作;
+ ```sink(topicName, serializer)``` 按照自定义序列化形式将结果写出到topic;


## WindowStream
对被划分window的数据进行操作
+ ```WindowStream<K, Integer> count()``` 统计窗口内数据个数;
+ ```WindowStream<K, V> filter(predictor)``` 过滤窗口内数据;
+ ```<OUT> WindowStream<K, OUT> map(mapperAction)``` 对窗口内数据一对一转化;
+ ```<OUT> WindowStream<K, OUT> aggregate(aggregateAction)``` 对窗口内数据多对一转化;
+ ```<OUT> WindowStream<K, OUT> aggregate(accumulator)``` 对数据进行聚合操作,且聚合支持二阶聚合,例如在窗口未触发时添加数据,在窗口触发时计算结果这类算子;
+ ```void sink(topicName, serializer)``` 按照自定义序列化形式将结果写出到topic;


162 changes: 162 additions & 0 deletions docs/09-streams/03RocketMQStreamsQuickStart.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# RocketMQ Streams 快速开始

## RocketMQ Streams工程中运行
参考RocketMQ Streams工程rocketmq-streams-examples模块下程序可以直接运行;运行example步骤:
* 本地启动RocketMQ 5.0及以上版本;
* 使用mqAdmin创建example中数据源topic;
* 启动example中例子;
* 向RocketMQ的源topic中写入合适数据(依据示例而定);

## RocketMQ Streams以SDK方式被应用依赖
### 环境准备
- 64bit JDK 1.8及以上
- Maven 3.2及以上
- 本地启动RocketMQ,[启动文档](https://rocketmq.apache.org/docs/quick-start/)

### 构建RocketMQ Streams


### 添加pom依赖

```xml
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-streams</artifactId>
<!-- 根据需要修改 -->
<version>1.1.0</version>
</dependency>
</dependencies>
```

### 编写流计算程序
```java
public class WordCount {
public static void main(String[] args) {
StreamBuilder builder = new StreamBuilder("wordCount");

builder.source("sourceTopic", total -> {
String value = new String(total, StandardCharsets.UTF_8);
return new Pair<>(null, value);
})
.flatMap((ValueMapperAction<String, List<String>>) value -> {
String[] splits = value.toLowerCase().split("\\W+");
return Arrays.asList(splits);
})
.keyBy(value -> value)
.count()
.toRStream()
.print();

TopologyBuilder topologyBuilder = builder.build();

Properties properties = new Properties();
properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");

RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);

final CountDownLatch latch = new CountDownLatch(1);

Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {
@Override
public void run() {
rocketMQStream.stop();
latch.countDown();
}
});

try {
rocketMQStream.start();
latch.await();
} catch (final Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
```

### 向RocketMQ sourceTopic中写入数据并观察结果
如果向sourceTopic中写入的数据如下:每行数据作为一个消息发送;
```xml
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"And by opposing end them?--To die,--to sleep,--",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks",
"That flesh is heir to,--'tis a consummation",
```
统计单词出现频率,计算结果如下:
```xml
(key=to, value=1)
(key=be, value=1)
(key=or, value=1)
(key=not, value=1)
(key=to, value=2)
(key=be, value=2)
(key=that, value=1)
(key=is, value=1)
(key=the, value=1)
(key=whether, value=1)
(key=tis, value=1)
(key=nobler, value=1)
(key=mind, value=1)
(key=against, value=1)
(key=troubles, value=1)
(key=slings, value=1)
(key=die, value=1)
(key=natural, value=1)
(key=flesh, value=1)
(key=sea, value=1)
(key=fortune, value=1)
(key=shocks, value=1)
(key=consummation, value=1)
(key=to, value=3)
(key=to, value=4)
(key=to, value=5)
(key=say, value=1)
(key=end, value=1)
(key=end, value=2)
(key=to, value=6)
(key=to, value=7)
(key=to, value=8)
(key=or, value=2)
(key=them, value=1)
(key=take, value=1)
(key=arms, value=1)
(key=of, value=1)
(key=and, value=1)
(key=of, value=2)
(key=and, value=2)
(key=by, value=1)
(key=sleep, value=1)
(key=and, value=3)
(key=by, value=2)
(key=sleep, value=2)
(key=and, value=4)
(key=that, value=2)
(key=arrows, value=1)
(key=heir, value=1)
(key=question, value=1)
(key=is, value=2)
(key=the, value=2)
(key=suffer, value=1)
(key=a, value=1)
(key=the, value=3)
(key=no, value=1)
(key=a, value=2)
(key=opposing, value=1)
(key=the, value=4)
(key=the, value=5)
(key=a, value=3)
(key=in, value=1)
(key=more, value=1)
(key=heartache, value=1)
(key=outrageous, value=1)
(key=we, value=1)
(key=thousand, value=1)
(key=tis, value=2)
```

Loading
Loading