Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/content.zh/docs/dev/table/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,30 +116,40 @@ Flink SQL> SET 'table.exec.mini-batch.size' = '5000';
{{< /tab >}}
{{< /tabs >}}

<a name="execution-options" />

### 执行配置

以下选项可用于优化查询执行的性能。

{{< generated/execution_config_configuration >}}

<a name="optimizer-options" />

### 优化器配置

以下配置可以用于调整查询优化器的行为以获得更好的执行计划。

{{< generated/optimizer_config_configuration >}}

<a name="table-options" />

### Planner 配置

以下配置可以用于调整 planner 的行为。

{{< generated/table_config_configuration >}}

<a name="materialized-table-options" />

### Materialized Table 配置

以下配置可以用于调整 Materialized Table 的行为。

{{< generated/materialized_table_config_configuration >}}

<a name="sql-client-options" />

### SQL Client 配置

以下配置可以用于调整 sql client 的行为。
Expand Down
57 changes: 57 additions & 0 deletions docs/content.zh/docs/dev/table/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,60 @@ ON a.id = b.id
默认情况下,对于 regular join 算子来说,mini-batch 优化是被禁用的。开启这项优化,需要设置选项 `table.exec.mini-batch.enabled`、`table.exec.mini-batch.allow-latency` 和 `table.exec.mini-batch.size`。更多详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。

{{< top >}}

## Delta Joins

在流作业中,regular join 会维护来自两个输入的所有历史数据,以确保结果的准确性。随着时间的推移,这会导致状态不断增长,从而增加资源的使用,并影响作业的稳定性。

为了应对这些挑战,Flink 引入了 delta join 算子。其核心思想是基于双向 lookup join 来替代 regular join 所维护的大状态,直接重用源表中的数据。与传统的 regular join 相比,delta join 显著减少了状态大小,提高了作业的稳定性,并降低了总体的资源消耗。

该功能默认启用。当满足以下所有条件时, regular join 将自动优化为 delta join。

1. 作业拓扑结构满足优化条件。具体可以查看[支持的功能和限制]({{< ref "docs/dev/table/tuning" >}}#supported-features-and-limitations)。
2. 源表所在的外部存储系统提供了可供 delta join 快速查询的索引信息。目前 [Apache Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) 已支持在 Flink 中提供表级别的索引信息,其上的表可作为 delta join 的源表。具体可参考 [Fluss 文档](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support)。

### 工作原理

在 Flink 中,regular join 将来自两个输入端的所有输入数据存储在状态中,以确保当对侧的数据到达时,能够正确地匹配对应的记录。

相比之下,delta join 利用了外部存储系统的索引功能,并不执行状态查找,而是直接对外部存储发出高效的、基于索引的查询,以获取匹配的记录。该方法消除了 Flink 状态与外部系统之间冗余的数据存储。

{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}}

### 关键参数

Delta join 优化默认启用。您可以通过设置以下配置手动禁用此功能:

```sql
SET 'table.optimizer.delta-join.strategy' = 'NONE';
```

详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#optimizer-options)页面。

您还可以配置以下参数来调整优化 delta join 的性能。

- `table.exec.delta-join.cache-enabled`
- `table.exec.delta-join.left.cache-size`
- `table.exec.delta-join.right.cache-size`

详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。

<a name="supported-features-and-limitations" />

### 支持的功能和限制

目前 delta join 仍在持续演进中,当前版本已支持的功能如下:

1. 支持 **INSERT-only** 的表作为源表。
2. 支持不带 **DELETE 操作**的 **CDC** 表作为源表。
3. 支持源表和 delta join 间包含 **project** 和 **filter** 算子。
4. Delta join 算子内支持**缓存**。

然而,delta join 也存在几个**限制**,包含以下任何条件的作业无法优化为 delta join。

1. 表的**索引键**必须包含在 join 的**等值条件**中
2. 目前仅支持 **INNER JOIN**。
3. **下游节点**必须能够处理**冗余变更**。例如以 **UPSERT 模式**运行、不带 `upsertMaterialize` 的 sink 节点。
4. 当消费 **CDC 流**时,**join key** 必须是**主键**的一部分。
5. 当消费 **CDC 流**时,所有 **filter** 必须应用于 **upsert key** 上。
6. 所有 project 和 filter 都不能包含**非确定性函数**。
54 changes: 54 additions & 0 deletions docs/content/docs/dev/table/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,57 @@ FROM TenantKafka t
LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...;
```

## Delta Joins

In streaming jobs, regular joins keep all historical data from both inputs to ensure accuracy. Over time, this causes the state to grow continuously, increasing resource usage and impacting stability.

To mitigate these challenges, Flink introduces the delta join operator. The key idea is to replace the large state maintained by regular joins with a bidirectional lookup-based join that directly reuses data from the source tables. Compared to traditional regular joins, delta joins substantially reduce state size, enhances job stability, and lowers overall resource consumption.

This feature is enabled by default. A regular join will be automatically optimized into a delta join when all the following conditions are met:

1. The sql pattern satisfies the optimization criteria. For details, please refer to [Supported Features and Limitations]({{< ref "docs/dev/table/tuning" >}}#supported-features-and-limitations)
2. The external storage system of the source table provides index information for fast querying for delta joins. Currently, [Apache Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) has provided index information at the table level for Flink, allowing such tables to be used as source tables for delta joins. Please refer to the [Fluss documentation](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support) for more details.

### Working Principle

In Flink, regular joins store all incoming records from both input sides in the state to ensure that corresponding records can be matched correctly when data arrives from the opposite side.

In contrast, delta joins leverage the indexing capabilities of external storage systems. Instead of performing state lookups, delta joins issue efficient index-based queries directly against the external storage to retrieve matching records. This approach eliminates redundant data storage between the Flink state and the external system.

{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}}

### Important Configurations

Delta join optimization is enabled by default. You can disable this feature manually by setting the following configuration:

```sql
SET 'table.optimizer.delta-join.strategy' = 'NONE';
```

Please see [Configuration]({{< ref "docs/dev/table/config" >}}#optimizer-options) page for more details.

To fine-tune the performance of delta joins, you can also configure the following parameters:

- `table.exec.delta-join.cache-enabled`
- `table.exec.delta-join.left.cache-size`
- `table.exec.delta-join.right.cache-size`

Please see [Configuration]({{< ref "docs/dev/table/config" >}}#execution-options) page for more details.

### Supported Features and Limitations

Delta joins are continuously evolving, and supports the following features currently.

1. Support for **INSERT-only** tables as source tables.
2. Support for **CDC** tables without **DELETE operations** as source tables.
3. Support for **projection** and **filter** operations between the source and the delta join.
4. Support for **caching** within the delta join operator.

However, Delta Joins also have several **limitations**. Jobs containing any of the following conditions cannot be optimized into a delta join:

1. The **index key** of the table must be included in the join’s **equivalence conditions**.
2. Only **INNER JOIN** is currently supported.
3. The **downstream operator** must be able to handle **duplicate changes**, such as a sink operating in **UPSERT mode** without `upsertMaterialize`.
4. When consuming a **CDC stream**, the **join key** must be part of the **primary key**.
5. When consuming a **CDC stream**, all **filters** must be applied on the **upsert key**.
6. **Non-deterministic functions** are not allowed in filters or projections.
Binary file added docs/static/fig/table-streaming/delta_join.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.