Skip to content

Commit 3432252

Browse files
committed
[FLINK-38611][doc] Add doc for delta join
1 parent c3bf061 commit 3432252

File tree

3 files changed

+101
-0
lines changed

3 files changed

+101
-0
lines changed

docs/content.zh/docs/dev/table/tuning.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,3 +289,55 @@ ON a.id = b.id
289289
默认情况下,对于 regular join 算子来说,mini-batch 优化是被禁用的。开启这项优化,需要设置选项 `table.exec.mini-batch.enabled``table.exec.mini-batch.allow-latency``table.exec.mini-batch.size`。更多详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。
290290

291291
{{< top >}}
292+
293+
## Delta Joins
294+
295+
在流作业中,regular join 算子存储了两侧输入历史上所有的数据,以保证计算结果的正确性。在收到一条输入数据时,regular join 将会去另一侧的状态中查询能够关联上的记录进行下发,同时更新本侧的状态。
296+
然而,随着作业的长时间运行和输入数据的不断增加,regular join 节点的状态会逐渐增大,这可能导致计算资源成为瓶颈,从而影响作业的整体性能,并引发一系列稳定性问题。
297+
298+
为此,我们引入了全新的 delta join 算子。其核心思想是利用双向 lookup join 来复用源表中的数据,以此替代 regular join 所需的状态。与传统的 regular join 相比,delta join 算子极大的减少了状态大小,提高作业的稳定性的同时,也降低了计算资源。
299+
300+
目前该功能默认启用,会在同时满足以下条件时,将拓扑中的 regular join 算子优化为 delta join 算子。
301+
302+
1. 作业拓扑结构满足优化条件。具体可以查看[支持的功能和限制]({{< ref "docs/dev/table/tuning" >}}#supported-features-and-limitations)。
303+
2. 源表所在的外部存储系统提供了可供 delta join 快速查询的索引信息。目前 [Apache Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) 已支持在 Flink 中提供表级别的索引信息,其上的表可作为 delta join 的源表。具体可参考 [Fluss 文档](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support)
304+
305+
### 工作原理
306+
307+
Regular join 需要将两端输入的数据完整地保存在状态中,当对端的数据到来时,从状态中匹配数据。相比之下,delta join 借助外部存储系统提供的索引能力,将查询状态中数据的行为,转换为利用索引键高效查询外部存储系统中的数据,从而避免了外部存储系统和状态中重复存储相同的数据。
308+
309+
{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}}
310+
311+
### 关键参数
312+
313+
目前,delta join 的优化功能默认启用,您可以通过设置下面的参数,来强制关闭启用 delta join。详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#optimizer-options)页面。
314+
315+
```sql
316+
SET 'table.optimizer.delta-join.strategy' = 'NONE';
317+
```
318+
319+
另外,您也可以通过设置下面这些参数,来调整 delta join 的性能。更多详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。
320+
321+
- `table.exec.delta-join.cache-enabled`
322+
- `table.exec.delta-join.left.cache-size`
323+
- `table.exec.delta-join.right.cache-size`
324+
325+
<a name="supported-features-and-limitations" />
326+
327+
### 支持的功能和限制
328+
329+
目前 delta join 仍在持续演进中,当前版本已支持的功能如下:
330+
331+
1. 支持 INSERT-ONLY 的表作为 delta join 的源表。
332+
2. 支持不带 DELETE 的 CDC 表作为 delta join 的源表。
333+
3. 支持源表和 delta join 算子之间包含 project 和 filter 算子。
334+
4. Delta join 算子内支持 cache。
335+
336+
当前版本,delta join 有如下限制,包含下面任一条件的作业将无法优化为 delta join。
337+
338+
1. join 的等值条件中,必须包含源表提供的索引信息。
339+
2. join 类型必须为 INNER join。
340+
3. 下游节点必须能够消费 delta join 产生的冗余数据。如支持 UPSERT 模式、不带 upsertMaterialize 的 sink 节点。
341+
4. 当消费 CDC 源表时,join key 必须是源表主键的一部分。
342+
5. 当消费 CDC 源表时,所有的 filter 条件必须应用于 upsert key 上。
343+
6. 所有 project 和 filter 都不包含非确定性函数。

docs/content/docs/dev/table/tuning.md

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,3 +368,52 @@ FROM TenantKafka t
368368
LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
369369
```
370370

371+
## Delta Joins
372+
373+
In stream jobs, regular joins store all historical data from both sides of the input to ensure the accuracy of the computation results. When an input record is received, regular joins query the state of the other side to find matching records to output, while simultaneously updating its own state.
374+
However, as the job runs for a long term and the input data increases, the state of regular joins will gradually grow larger. This may lead to computational resources becoming a bottleneck, impacting the overall performance of the job and potentially causing a series of stability issues.
375+
376+
To address this, we have introduced a new delta join operator. The core idea is to leverage a bidirectional lookup join approach to reuse data from source tables, replacing the state required by regular joins. Compared to traditional regular joins, delta joins significantly reduce the state size, improve the stability of the job, and also decrease the demand for computational resources.
377+
378+
This feature is currently enabled by default and regular join will be optimized into delta join when the following conditions are simultaneously met:
379+
380+
1. The sql pattern satisfies the optimization criteria. For details, please refer to [Supported Features and Limitations]({{< ref "docs/dev/table/tuning" >}}#supported-features-and-limitations)
381+
2. The external storage system of the source table provides index information for fast querying for delta joins. Currently, [Apache Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) has provided index information at the table level for Flink, allowing such tables to be used as source tables for delta joins. Please refer to the [Fluss documentation](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support) for more details.
382+
383+
### Working Principle
384+
385+
In Flink, regular joins require completely storing incoming data from both sides in the state, matching that data when the opposite side's data arrives. In contrast, delta joins utilize the indexing capabilities provided by external storage systems to convert the behavior of querying state data into efficient queries against data in the external storage system using index keys. This approach avoids the need for duplicate storage of the same data in both the external storage system and the state.
386+
387+
{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}}
388+
389+
### Important Configurations
390+
391+
Currently, the optimization for delta joins is enabled by default. You can disable this feature manually by setting the following configuration. Please see [Configuration]({{< ref "docs/dev/table/config" >}}#optimizer-options) page for more details.
392+
393+
```sql
394+
SET 'table.optimizer.delta-join.strategy' = 'NONE';
395+
```
396+
397+
Additionally, you can adjust the performance of delta joins by configuring the following configurations. Please see [Configuration]({{< ref "docs/dev/table/config" >}}#execution-options) page for more details.
398+
399+
- `table.exec.delta-join.cache-enabled`
400+
- `table.exec.delta-join.left.cache-size`
401+
- `table.exec.delta-join.right.cache-size`
402+
403+
### Supported Features and Limitations
404+
405+
Delta joins are continuously evolving, and supports the following features currently.
406+
407+
1. Support INSERT-ONLY tables as source tables for delta join.
408+
2. Support CDC tables without DELETE as source tables for delta join.
409+
3. Support project and filter between source and delta join.
410+
4. Support cache in delta join.
411+
412+
However, delta joins has the following limitations. Any job containing one of these conditions cannot be optimized into a delta join.
413+
414+
1. The index key of the tables must be included as part of the equivalence conditions in the join.
415+
2. The join must be a INNER join.
416+
3. The downstream nodes of the join can accept duplicate changes, such as a sink that provides UPSERT mode without `upsertMaterialize`.
417+
4. When consuming a CDC stream, the join key used in the delta join must be part of the primary key.
418+
5. When consuming a CDC stream, all filters must be applied on the upsert key.
419+
6. Neither filters nor projections should contain non-deterministic functions.
87.5 KB
Loading

0 commit comments

Comments
 (0)