Skip to content

Commit 3988fdd

Browse files
committed
Move "Application" section to MQTT Durable Sessions
1 parent 539fe9b commit 3988fdd

File tree

4 files changed

+75
-79
lines changed

4 files changed

+75
-79
lines changed

en_US/design/durable-storage.md

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ A single EMQX cluster can host multiple DS databases.
5959

6060
#### Shard
6161

62-
A shard is the horizontal partition of a durable storage database. Data is distributed across shards based on the publisher's client ID, enabling parallel processing and high availability. Each EMQX node can host one or more shards, and the total number of shards is determined by [n_shards](./managing-replication.md#number-of-shards) configuration parameter during the initial startup of EMQX.
62+
A shard is the horizontal partition of a durable storage database. Data is distributed across shards based on the publisher's client ID, enabling parallel processing and high availability. Each EMQX node can host one or more shards, and the total number of shards is determined by [n_shards](../durability/managing-replication.md#number-of-shards) configuration parameter during the initial startup of EMQX.
6363

6464
Shards also serve as the fundamental unit of replication. Each shard is replicated across multiple nodes according to the `durable_storage.messages.replication_factor` setting, ensuring that all replicas maintain identical message sets for redundancy and fault tolerance.
6565

@@ -76,7 +76,7 @@ Generations may also differ in how they internally structure and store data, dep
7676

7777
#### Slab
7878

79-
A slab is a physical partition of data identified by both shard ID and generation ID. Each slab acts as a durable container for one or more streams. All data in a slab shares the same encoding schema, eliminating the need for storing extra metadata. Atomicity and consistency properties are guaranteed within a slab.
79+
A slab is a physical partition of data identified by both shard ID and generation ID. Each slab acts as a durable container for one or more durable storage streams. All data in a slab shares the same encoding schema, eliminating the need for storing extra metadata. Atomicity and consistency properties are guaranteed within a slab.
8080

8181
Example: `shard 2, gen 3` represents a distinct slab that stores all streams written during that generation’s time range.
8282

@@ -160,24 +160,6 @@ Both pools group subscribers by stream and topic, reusing resources to serve mul
160160

161161
<img src="./assets/real-time_subscriptions.png" alt="real-time_subscriptions" style="zoom:67%;" />
162162

163-
## Applications: Durable Sessions and Shared Subscriptions
164-
165-
Durable Storage is the backbone for EMQX's advanced reliability features:
166-
167-
### Durable Sessions (EMQX 5+)
168-
169-
Durable sessions are a parallel session implementation that uses DS for message routing.
170-
171-
- **Mechanism:** When a client connects with a session expiry interval greater than zero and subscribes to a topic, the filter is marked as durable. Messages published to matching topics are saved to DS *in addition* to being dispatched.
172-
- **State:** Durable sessions access saved messages via the DS subscription mechanism. Their state includes a set of iterators for each matching stream, allowing them to precisely track their progress. Only one copy of each message is stored per database replica, regardless of how many durable sessions share it.
173-
174-
### Shared Subscriptions (EMQX 6.0)
175-
176-
EMQX 6.0 extended DS to shared subscriptions for enhanced load balancing and reliability.
177-
178-
- **Iterator Management:** The iterator sets for shared subscriptions are managed by a separate entity called the **shared sub leader**.
179-
- **Replay and Rebalancing:** Sessions subscribing to a shared topic communicate with the leader, which **lends them iterators** for message replay. Updated iterators are reported back. If a client disconnects or the group is rebalanced, the leader **revokes the iterators** and redistributes them to other members, ensuring consumption continuity and load distribution.
180-
181163
## Conclusion: The Foundation of High-Reliability MQTT
182164

183165
The Optimized Durable Storage in EMQX 6.0 is the resilient foundation for high-reliability MQTT messaging. By re-engineering RocksDB and embedding concepts like TTVs and Streams, DS provides a purpose-built, highly available, and persistent internal database. This architecture, coupled with sophisticated features like the LTS algorithm and Raft replication, ensures lossless message delivery and optimal retrieval for complex wildcard and shared subscriptions, solidifying EMQX's position as a leading solution for demanding IoT infrastructure.

en_US/durability/durability_introduction.md

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ Before learning the Durable Sessions feature in EMQX, it's essential to understa
1212

1313
**Session**: A session is a lightweight process within EMQX created for every client connection. Sessions implement behaviors prescribed to the broker by MQTT Standard, including initial connection, subscribing and unsubscribing to topics, and message dispatching.
1414

15-
**Durable Storage**: Durable storage is an internal database within EMQX. Sessions may use it to save their state and MQTT messages sent to the topics. Database engine powering durable storage uses [RocksDB](https://rocksdb.org/) to save the data on disk, and [Raft algorithm](https://raft.github.io/) to consistently replicate data across the cluster. It is important not to confuse durable storage with **Durable Sessions**.
15+
**Durable Storage (DS)**: Durable storage is an internal database within EMQX. Sessions may use it to save their state and MQTT messages sent to the topics. Database engine powering durable storage uses [RocksDB](https://rocksdb.org/) to save the data on disk, and [Raft algorithm](https://raft.github.io/) to consistently replicate data across the cluster. It is important not to confuse durable storage with **Durable Sessions**.
1616

1717
### Session Expiry Interval
1818

@@ -161,6 +161,40 @@ Even if durable sessions are not enabled, following steps 2-4 will still retain
161161

162162
Durable Sessions rely on the Durable Storage for persisting session state and messages. To understand how this storage layer is structured and operates, refer to the *Architecture: Backends and Storage Hierarchy* section in [Design for Durable Storage](../design/durable-storage.md).
163163

164+
## How Durable Storage Supports Durable and Shared Subscription Sessions
165+
166+
Durable Storage is the backbone for durable sessions and shared subscription sessions in EMQX.
167+
168+
### Durable Sessions
169+
170+
Durable Sessions are implemented on top of the DS database engine. When a client connects with a **non-zero session expiry interval**, EMQX stores the session state and the messages routed to that session in DS.
171+
172+
- **Message persistence:**
173+
174+
When a durable session subscribes to a topic, matching messages are saved to the DS in addition to being delivered to online clients. This ensures that messages published while the client is offline are available when it reconnects.
175+
176+
- **Progress tracking:**
177+
178+
Durable sessions read messages from DS using *iterators*, lightweight markers that track how far the session has progressed within each durable storage stream. This allows message replay to resume reliably after disconnection or node restart.
179+
180+
- **Efficient storage:**
181+
182+
Messages are stored only once per DS replica, regardless of how many durable sessions subscribe to the topic, minimizing storage overhead.
183+
184+
### Shared Subscription Sessions
185+
186+
Starting from EMQX v6.0, DS also supports the persistence of shared subscription sessions. Shared subscriptions rely on DS to maintain consistent message distribution across a subscriber group.
187+
188+
- **Iterator management:**
189+
190+
A designated shared subscription leader manages iterator sets for the group. It assigns iterators to members to ensure coordinated consumption.
191+
192+
- **Replay and rebalancing:**
193+
194+
Sessions subscribing to a shared topic communicate with the leader, which lends them iterators for message replay. Updated iterators are reported back. If a client disconnects or the group is rebalanced, the leader revokes the iterators and redistributes them to other members, ensuring consumption continuity and load distribution.
195+
196+
These mechanisms ensure load balancing, message ordering, and fault tolerance across the entire subscription group.
197+
164198
## Durable Storage Across Cluster
165199

166200
Each node within an EMQX cluster is assigned a unique *Site ID*, which serves as a stable identifier, independent of the Erlang node name (`emqx@...`). Site IDs are persistent, and they are randomly generated at the first startup of the node. This stability maintains the integrity of the data, especially in scenarios where nodes might undergo name modifications or reconfigurations.

zh_CN/design/durable-storage.md

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ EMQX 提供了两个不依赖外部服务的内嵌后端:
1515

1616
### 数据存储层次结构
1717

18+
赋予 EMQX 内置持久化能力的数据库存储引擎采用分层结构组织数据。下图展示了持久存储数据库在 EMQX 集群中的分布方式:
19+
20+
![emqx_ds_sharding](./assets/emqx_ds_sharding.png)
21+
1822
在内部,DS 将数据组织为一个多层次但对应用透明的结构,以实现**水平扩展****时间分区管理**,确保在分布式环境中的高效数据存取与可维护性。
1923

2024
完整的层次结构如下所示:
@@ -26,7 +30,7 @@ flowchart TB
2630
SH["分片(Shard)"]
2731
GEN(["代(Generation,逻辑上的时间分区)"])
2832
SL["数据块(Slab,物理数据单元)"]
29-
ST["(Stream)"]
33+
ST["持久存储流(Stream)"]
3034
TTV["主题-时间戳-值三元组(TTV)"]
3135
3236
%% Main hierarchy
@@ -42,7 +46,7 @@ flowchart TB
4246

4347
#### 数据库(Database, DB)
4448

45-
数据的顶层逻辑容器。每个数据库都是独立的,可根据需要创建、管理或删除,并负责自身的分片、数据块和流
49+
数据的顶层逻辑容器。每个数据库都是独立的,可根据需要创建、管理或删除,并负责自身的分片、数据块和持久存储流
4650
例如:
4751

4852
- **会话数据库(Sessions DB)**:存储持久会话状态。
@@ -52,30 +56,34 @@ flowchart TB
5256

5357
#### 分片(Shard)
5458

55-
数据库的**水平分区**。不同 MQTT 客户端的数据会被分配到不同的分片中,从而实现并行处理与高可用性。
59+
数据库的**水平分区**。来自不同 MQTT 客户端的数据会被分配到不同的分片中,从而实现并行处理与高可用性。
60+
61+
每个 EMQX 节点可以托管一个或多个分片,分片的总数量由 EMQX 首次启动时的 [`n_shards`](../durability/managing-replication.md#number-of-shards) 配置参数决定。
5662

57-
每个分片在集群中可以拥有多个副本(Replica),从而在节点故障时保持数据冗余与一致性
63+
分片同时也是复制的基本单位。根据 `durable_storage.messages.replication_factor` 的配置,每个分片会在多个节点之间复制,确保所有副本都持有一致的消息集合,从而实现冗余备份和容错能力
5864

5965
#### 代(Generation)
6066

61-
分片内部的数据按时间划分为不同的****。定期创建新代主要有以下目的
67+
代是数据库基于时间的逻辑分区。在不同时间段写入的数据会被归入不同的代中。新消息始终写入当前代,而旧代会变为不可变且只读。EMQX 定期创建新代,主要用于以下目的
6268

63-
1. **向后兼容与数据迁移:** 新数据写入新的代,可采用改进的编码方式;旧代保持不可变并只读
64-
2. **基于时间的数据保留:** 每一代覆盖一个特定时间区间,删除旧代即可快速清理过期数据
69+
1. **向后兼容与数据迁移:** 新数据会写入新的代,并可能使用改进后的编码格式;而旧代保持不可变且只读
70+
2. **基于时间的数据保留:** 由于每一代对应一个特定的时间范围,可以通过直接删除完整的旧代来高效清理过期数据
6571

66-
代是逻辑上的时间分区,并非物理存储单元。它定义了每个分片中 Slab 的时间边界。
72+
虽然在概念上与数据块(Slab)相关,但代并不是物理存储单元。代充当时间边界,用于组织每个分片内的 Slab。
73+
74+
代内部的结构和存储方式可能会因配置的存储布局而有所不同。目前,DS 支持一种主要面向高吞吐通配符订阅与单主题订阅的布局。未来版本将引入适用于不同工作负载的更多布局。新代使用的布局通过 `durable_storage.messages.layout` 参数配置,每种布局引擎都定义了自己的配置选项。
6775

6876
#### 数据块(Slab)
6977

70-
**分片 ID****代 ID** 共同标识的物理数据单元。每个 Slab 是一个持久化的数据容器,包含一个或多个流(Stream)。Slab 中的所有数据共享相同的编码格式,无需额外存储元数据;同时,Slab 内的数据写入具备原子性与一致性保证。
78+
**分片 ID****代 ID** 共同标识的物理数据单元。每个 Slab 是一个持久化的数据容器,包含一个或多个持久存储流(Stream)。Slab 中的所有数据共享相同的编码格式,无需额外存储元数据;同时,Slab 内的数据写入具备原子性与一致性保证。
7179

7280
示例:`shard 2, gen 3` 表示“分片 2、第 3 代”的一个独立数据块,用于承载该分片在该代中的一个或多个流及其数据。
7381

7482
#### 流(Stream)
7583

76-
每个数据块内的逻辑序列与批处理单元。流将主题相似的**主题–时间戳–值(TTV)** 三元组进行分组,使数据能够按时间顺序、确定性地读取
84+
持久存储流是每个 Slab 内用于批处理和序列化的逻辑单位。流将具有相似主题结构的 **Topic–Timestamp–Value(TTV)** 三元组进行分组,使数据能够以时间顺序、确定性的方式成批读取。单个流可以包含来自多个主题的消息,不同的存储布局可能会采用不同策略将主题映射到流中
7785

78-
流也是持久存储中订阅与迭代的基本单位,用于高效处理带有通配符的主题过滤器
86+
在持久存储中,流也是订阅与迭代的基本单位,使系统能够高效处理通配符主题过滤,并实现有序数据的可重复读取。[持久会话](../durability/durability_introduction.md#durable-sessions)会以批处理方式从流中读取消息,批大小可通过 `durable_sessions.batch_size` 配置参数进行控制
7987

8088
#### 主题–时间戳–值(Topic–Timestamp–Value, TTV)
8189

@@ -163,24 +171,6 @@ DS 维护两个订阅者池:
163171

164172
<img src="./assets/real-time_subscriptions.png" alt="real-time_subscriptions" style="zoom:67%;" />
165173

166-
## 应用场景:持久会话与共享订阅
167-
168-
DS 是 EMQX 高可靠性功能的核心基础。
169-
170-
### 持久会话(EMQX 5+)
171-
172-
持久会话是一种基于 DS 的并行会话实现,用于可靠的消息路由。
173-
174-
- **机制:** 当客户端以非零的会话过期间隔(Session Expiry Interval)连接并订阅主题时,该订阅过滤器会被标记为“持久化”。之后,发布到匹配主题的消息会**同时写入 DS** 并分发给在线订阅者。
175-
- **状态:** 持久会话通过 DS 的订阅机制读取保存的消息,其状态包含每个匹配流的迭代器集合,以精确追踪消息消费进度。在数据库每个副本中,同一消息仅存储一份,即使多个持久会话共享它。
176-
177-
### 共享订阅(EMQX 6.0)
178-
179-
在 EMQX 6.0 中,DS 被扩展用于共享订阅,以增强负载均衡与可靠性。
180-
181-
- **迭代器管理:** 共享订阅的迭代器集合由一个称为共享订阅主节点(Shared Sub Leader)的独立实体负责管理。
182-
- **消息重放与重平衡:** 订阅共享主题的会话与 Leader 通信,由其借出迭代器以实现消息重放。当客户端断开或发生组重平衡时,Leader 会回收迭代器并将其重新分配给其他成员,从而保证消息消费的连续性与负载分配的均衡性。
183-
184174
## 结论:高可靠 MQTT 的基石
185175

186176
EMQX 6.0 中的优化的持久存储(DS)为高可靠 MQTT 消息传递提供了坚实的基础。通过重新设计 RocksDB 并引入 TTV 与 Stream 等概念,DS 构建了一个专用的、高可用且持久的内部数据库系统。

0 commit comments

Comments
 (0)