diff --git a/docs/content.zh/docs/dev/table/config.md b/docs/content.zh/docs/dev/table/config.md index 1748fde1b723c..258993043e1e5 100644 --- a/docs/content.zh/docs/dev/table/config.md +++ b/docs/content.zh/docs/dev/table/config.md @@ -116,30 +116,40 @@ Flink SQL> SET 'table.exec.mini-batch.size' = '5000'; {{< /tab >}} {{< /tabs >}} + + ### 执行配置 以下选项可用于优化查询执行的性能。 {{< generated/execution_config_configuration >}} + + ### 优化器配置 以下配置可以用于调整查询优化器的行为以获得更好的执行计划。 {{< generated/optimizer_config_configuration >}} + + ### Planner 配置 以下配置可以用于调整 planner 的行为。 {{< generated/table_config_configuration >}} + + ### Materialized Table 配置 以下配置可以用于调整 Materialized Table 的行为。 {{< generated/materialized_table_config_configuration >}} + + ### SQL Client 配置 以下配置可以用于调整 sql client 的行为。 diff --git a/docs/content.zh/docs/dev/table/tuning.md b/docs/content.zh/docs/dev/table/tuning.md index 7166f44957280..e8ff6d8d2c972 100644 --- a/docs/content.zh/docs/dev/table/tuning.md +++ b/docs/content.zh/docs/dev/table/tuning.md @@ -289,3 +289,60 @@ ON a.id = b.id 默认情况下,对于 regular join 算子来说,mini-batch 优化是被禁用的。开启这项优化,需要设置选项 `table.exec.mini-batch.enabled`、`table.exec.mini-batch.allow-latency` 和 `table.exec.mini-batch.size`。更多详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。 {{< top >}} + +## Delta Joins + +在流作业中,regular join 会维护来自两个输入的所有历史数据,以确保结果的准确性。随着时间的推移,这会导致状态不断增长,从而增加资源的使用,并影响作业的稳定性。 + +为了应对这些挑战,Flink 引入了 delta join 算子。其核心思想是基于双向 lookup join 来替代 regular join 所维护的大状态,直接重用源表中的数据。与传统的 regular join 相比,delta join 显著减少了状态大小,提高了作业的稳定性,并降低了总体的资源消耗。 + +该功能默认启用。当满足以下所有条件时, regular join 将自动优化为 delta join。 + +1. 作业拓扑结构满足优化条件。具体可以查看[支持的功能和限制]({{< ref "docs/dev/table/tuning" >}}#supported-features-and-limitations)。 +2. 源表所在的外部存储系统提供了可供 delta join 快速查询的索引信息。目前 [Apache Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) 已支持在 Flink 中提供表级别的索引信息,其上的表可作为 delta join 的源表。具体可参考 [Fluss 文档](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support)。 + +### 工作原理 + +在 Flink 中,regular join 将来自两个输入端的所有输入数据存储在状态中,以确保当对侧的数据到达时,能够正确地匹配对应的记录。 + +相比之下,delta join 利用了外部存储系统的索引功能,并不执行状态查找,而是直接对外部存储发出高效的、基于索引的查询,以获取匹配的记录。该方法消除了 Flink 状态与外部系统之间冗余的数据存储。 + +{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}} + +### 关键参数 + +Delta join 优化默认启用。您可以通过设置以下配置手动禁用此功能: + +```sql +SET 'table.optimizer.delta-join.strategy' = 'NONE'; +``` + +详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#optimizer-options)页面。 + +您还可以配置以下参数来调整优化 delta join 的性能。 + +- `table.exec.delta-join.cache-enabled` +- `table.exec.delta-join.left.cache-size` +- `table.exec.delta-join.right.cache-size` + +详细信息请参见[配置]({{< ref "docs/dev/table/config" >}}#execution-options)页面。 + + + +### 支持的功能和限制 + +目前 delta join 仍在持续演进中,当前版本已支持的功能如下: + +1. 支持 **INSERT-only** 的表作为源表。 +2. 支持不带 **DELETE 操作**的 **CDC** 表作为源表。 +3. 支持源表和 delta join 间包含 **project** 和 **filter** 算子。 +4. Delta join 算子内支持**缓存**。 + +然而,delta join 也存在几个**限制**,包含以下任何条件的作业无法优化为 delta join。 + +1. 表的**索引键**必须包含在 join 的**等值条件**中 +2. 目前仅支持 **INNER JOIN**。 +3. **下游节点**必须能够处理**冗余变更**。例如以 **UPSERT 模式**运行、不带 `upsertMaterialize` 的 sink 节点。 +4. 当消费 **CDC 流**时,**join key** 必须是**主键**的一部分。 +5. 当消费 **CDC 流**时,所有 **filter** 必须应用于 **upsert key** 上。 +6. 所有 project 和 filter 都不能包含**非确定性函数**。 diff --git a/docs/content/docs/dev/table/tuning.md b/docs/content/docs/dev/table/tuning.md index ae372f082966d..b600dd7f90862 100644 --- a/docs/content/docs/dev/table/tuning.md +++ b/docs/content/docs/dev/table/tuning.md @@ -368,3 +368,57 @@ FROM TenantKafka t LEFT JOIN InventoryKafka i ON t.tenant_id = i.tenant_id AND ...; ``` +## Delta Joins + +In streaming jobs, regular joins keep all historical data from both inputs to ensure accuracy. Over time, this causes the state to grow continuously, increasing resource usage and impacting stability. + +To mitigate these challenges, Flink introduces the delta join operator. The key idea is to replace the large state maintained by regular joins with a bidirectional lookup-based join that directly reuses data from the source tables. Compared to traditional regular joins, delta joins substantially reduce state size, enhances job stability, and lowers overall resource consumption. + +This feature is enabled by default. A regular join will be automatically optimized into a delta join when all the following conditions are met: + +1. The sql pattern satisfies the optimization criteria. For details, please refer to [Supported Features and Limitations]({{< ref "docs/dev/table/tuning" >}}#supported-features-and-limitations) +2. The external storage system of the source table provides index information for fast querying for delta joins. Currently, [Apache Fluss(Incubating)](https://fluss.apache.org/blog/fluss-open-source/) has provided index information at the table level for Flink, allowing such tables to be used as source tables for delta joins. Please refer to the [Fluss documentation](https://fluss.apache.org/docs/0.8/engine-flink/delta-joins/#flink-version-support) for more details. + +### Working Principle + +In Flink, regular joins store all incoming records from both input sides in the state to ensure that corresponding records can be matched correctly when data arrives from the opposite side. + +In contrast, delta joins leverage the indexing capabilities of external storage systems. Instead of performing state lookups, delta joins issue efficient index-based queries directly against the external storage to retrieve matching records. This approach eliminates redundant data storage between the Flink state and the external system. + +{{< img src="/fig/table-streaming/delta_join.png" width="70%" height="70%" >}} + +### Important Configurations + +Delta join optimization is enabled by default. You can disable this feature manually by setting the following configuration: + +```sql +SET 'table.optimizer.delta-join.strategy' = 'NONE'; +``` + +Please see [Configuration]({{< ref "docs/dev/table/config" >}}#optimizer-options) page for more details. + +To fine-tune the performance of delta joins, you can also configure the following parameters: + +- `table.exec.delta-join.cache-enabled` +- `table.exec.delta-join.left.cache-size` +- `table.exec.delta-join.right.cache-size` + +Please see [Configuration]({{< ref "docs/dev/table/config" >}}#execution-options) page for more details. + +### Supported Features and Limitations + +Delta joins are continuously evolving, and supports the following features currently. + +1. Support for **INSERT-only** tables as source tables. +2. Support for **CDC** tables without **DELETE operations** as source tables. +3. Support for **projection** and **filter** operations between the source and the delta join. +4. Support for **caching** within the delta join operator. + +However, Delta Joins also have several **limitations**. Jobs containing any of the following conditions cannot be optimized into a delta join: + +1. The **index key** of the table must be included in the join’s **equivalence conditions**. +2. Only **INNER JOIN** is currently supported. +3. The **downstream operator** must be able to handle **duplicate changes**, such as a sink operating in **UPSERT mode** without `upsertMaterialize`. +4. When consuming a **CDC stream**, the **join key** must be part of the **primary key**. +5. When consuming a **CDC stream**, all **filters** must be applied on the **upsert key**. +6. **Non-deterministic functions** are not allowed in filters or projections. \ No newline at end of file diff --git a/docs/static/fig/table-streaming/delta_join.png b/docs/static/fig/table-streaming/delta_join.png new file mode 100644 index 0000000000000..6645cef43647c Binary files /dev/null and b/docs/static/fig/table-streaming/delta_join.png differ