[DRAFT] Debugging FLINK-36808 Issue
Recently I picked up the FLINK-36808 bug in the Apache Flink project. The solution to bug covers topics such as SQL LookupJoin, Flink SQL Planner, and Volcano optimizer (which is provided by the Apache Calcite project).
In this post I describe my experience debugging the issue and the lessons learned along the way. You can already find the pull request with the fix here PR #26514.
The issue was internally reported by my colleague Jun Qin and initial investigations were done by my teammate Qingsheng Ren. Let us start by first trying to understand the issue.
As mentioned in the ticket, given the union query of two lookup joins on the dimension tables, we get wrong results.
-- Data of table `stream`:
-- (1, Alice)
-- (2, Bob)
CREATE TEMPORARY TABLE `stream` (
`id` BIGINT,
`name` STRING,
`txn_time` as proctime(),
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/postgres',
'table-name' = 'stream',
'username' = 'postgres',
'password' = 'postgres'
);
-- Data of table `dim`:
-- (1, OK)
-- (2, OK)
CREATE TEMPORARY TABLE `dim` (
`id` BIGINT,
`status` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/postgres',
'table-name' = 'dim',
'username' = 'postgres',
'password' = 'postgres'
);
-- Lookup join two tables twice with different filter, and union them together
SELECT
s.id,
s.name,
s.txn_time,
d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS `d`
ON
`s`.`id` = `d`.`id`
WHERE
`d`.`status` = 'OK'
UNION ALL
SELECT
s.id,
s.name,
s.txn_time,
d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS `d`
ON
`s`.`id` = `d`.`id`
WHERE
`d`.`status` = 'NOT_EXISTS';
We expect to get the following results:
1, Alice 2024-11-27 11:52:19.332, OK
2, Bob 2024-11-27 11:52:19.332, OK
However, the actual results we got were:
1, Alice, 2024-11-27 11:52:19.332, OK
2, Bob, 2024-11-27 11:52:19.332, OK
1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS
2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS
This is obviously wrong, since there are no statuses with value NOT_EXISTS in the dimension dim table.
And SQL plans reported on the ticket seem correct.
Abstract syntax tree:
LogicalUnion(all=[true])
:- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
: +- LogicalFilter(condition=[=($4, _UTF-16LE'OK')])
: +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 2}])
: :- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
: : +- LogicalTableScan(table=[[default_catalog, default_database, stream]])
: +- LogicalFilter(condition=[=($cor0.id, $0)])
: +- LogicalSnapshot(period=[$cor0.txn_time])
: +- LogicalTableScan(table=[[default_catalog, default_database, dim]])
+- LogicalProject(id=[$0], name=[$1], txn_time=[$2], status=[$4])
+- LogicalFilter(condition=[=($4, _UTF-16LE'NOT_EXISTS')])
+- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 2}])
:- LogicalProject(id=[$0], name=[$1], txn_time=[PROCTIME()])
: +- LogicalTableScan(table=[[default_catalog, default_database, stream]])
+- LogicalFilter(condition=[=($cor1.id, $0)])
+- LogicalSnapshot(period=[$cor1.txn_time])
+- LogicalTableScan(table=[[default_catalog, default_database, dim]])
Optimized physical plan:
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
:- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status])
: +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
: +- Calc(select=[id, name, PROCTIME() AS txn_time])
: +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])
+- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" AS VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS status])
+- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])
+- Calc(select=[id, name, PROCTIME() AS txn_time])
+- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])
And optimized execution plan:
Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status])
+- Union(all=[true], union=[id, name, txn_time, status])
:- Calc(select=[id, name, txn_time, CAST('OK' AS VARCHAR(2147483647)) AS status])
: +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id])(reuse_id=[1])
: +- Calc(select=[id, name, PROCTIME() AS txn_time])
: +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])
+- Calc(select=[id, name, txn_time, CAST('NOT_EXISTS' AS VARCHAR(2147483647)) AS status])
+- Reused(reference_id=[1])
I have also run the same query using our internal Flink engines or using MySQL as another database, but the results were the same. Thus fixing this bug in the open-source Apache Flink is important, all other enterprise Flink distributions will also benefit.
Understanding the Issue
From the quick look at the optimized execution plan, we can see that the first lookup join is reused. To make sure that this is not the source of the problem, we can disable the reuse by setting the following configuration:
SET table.optimizer.reuse-sub-plan-enabled=false;
We are still getting the wrong results.
To better debug the issue, it is a good idea to create and reproduce the issue in the tests. After looking into the planner tests I came up with a test case using the 'values' connector for both tables. For example, we can update the properties of the dimension table as below:
val dimTableId = TestValuesTableFactory.registerData(Seq(row(1L, "OK"), row(2L, "OK")))
val dimTableDDL =
s"""
| CREATE TABLE `dim` (
| `id` BIGINT,
| `status` STRING,
| PRIMARY KEY (`id`) NOT ENFORCED
| ) WITH (
| 'connector' = 'values',
| 'data-id' = '$dimTableId'
| )
|""".stripMargin
tEnv.executeSql(dimTableDDL)
Hold and behold, the issue does not happen! Okay, this is good since sign, we can now try to find the reasons for differences when using different connectors.
Comparing Planner Transformations
We know that two queries behave differently when used with different connectors, to investigate further, let’s compare each query transformation. To do so, enable the debug logging or add log statements in the FlinkChainedProgram#optimize method.
(There are many optimization steps in the program they require a topic of their own)
Listing all the optimization steps and comparing each plan (with minor simplifications), we can see the following differences.
Logical Rewrite
The first difference is in the logical rewrite step.
| Logical Rewrite rule for union all query using JDBC connector | Logical Rewrite rule for union all query using test 'values' connector |
|---|---|
optimize 'logical_rewrite' cost 29 ms.
original input:
FlinkLogicalUnion(all=[true])
:- FlinkLogicalCalc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647)) AS status])
: +- FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
: :- FlinkLogicalCalc(select=[id, name, PROCTIME() AS txn_time])
: : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])
: +- FlinkLogicalCalc(select=[id], where=[=(status, _UTF-16LE'OK':VARCHAR(2147483647))])
: +- FlinkLogicalSnapshot(period=[$cor0.txn_time])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, dim]], fields=[id, status])
+- FlinkLogicalCalc(select=[id, name, txn_time, CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647)) AS status])
+- FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
:- FlinkLogicalCalc(select=[id, name, PROCTIME() AS txn_time])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])
+- FlinkLogicalCalc(select=[id], where=[=(status, _UTF-16LE'NOT_EXISTS':VARCHAR(2147483647))])
+- FlinkLogicalSnapshot(period=[$cor1.txn_time])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, dim]], fields=[id, status])
optimize output:
FlinkLogicalUnion(all=[true])
:- FlinkLogicalCalc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647)) AS status])
: +- FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
: :- FlinkLogicalCalc(select=[id, name, PROCTIME() AS txn_time])
: : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])
: +- FlinkLogicalSnapshot(period=[$cor0.txn_time])
: +- FlinkLogicalCalc(select=[id])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, dim, filter=[=(status, _UTF-16LE'OK':VARCHAR(2147483647))]]], fields=[id, status])
+- FlinkLogicalCalc(select=[id, name, txn_time, CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647)) AS status])
+- FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
:- FlinkLogicalCalc(select=[id, name, PROCTIME() AS txn_time])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])
+- FlinkLogicalSnapshot(period=[$cor1.txn_time])
+- FlinkLogicalCalc(select=[id])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, dim, filter=[=(status, _UTF-16LE'NOT_EXISTS':VARCHAR(2147483647))]]], fields=[id, status])
|
optimize 'logical_rewrite' cost 29 ms.
original input:
FlinkLogicalUnion(all=[true])
:- FlinkLogicalCalc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647)) AS status])
: +- FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
: :- FlinkLogicalCalc(select=[id, name, PROCTIME() AS txn_time])
: : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])
: +- FlinkLogicalCalc(select=[id], where=[=(status, _UTF-16LE'OK':VARCHAR(2147483647))])
: +- FlinkLogicalSnapshot(period=[$cor0.txn_time])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, dim]], fields=[id, status])
+- FlinkLogicalCalc(select=[id, name, txn_time, CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647)) AS status])
+- FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
:- FlinkLogicalCalc(select=[id, name, PROCTIME() AS txn_time])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])
+- FlinkLogicalCalc(select=[id], where=[=(status, _UTF-16LE'NOT_EXISTS':VARCHAR(2147483647))])
+- FlinkLogicalSnapshot(period=[$cor1.txn_time])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, dim]], fields=[id, status])
optimized output:
FlinkLogicalUnion(all=[true])
:- FlinkLogicalCalc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647)) AS status])
: +- FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
: :- FlinkLogicalCalc(select=[id, name, PROCTIME() AS txn_time])
: : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])
: +- FlinkLogicalSnapshot(period=[$cor0.txn_time])
: +- FlinkLogicalCalc(select=[id], where=[=(status, _UTF-16LE'OK':VARCHAR(2147483647))])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[id, status])
+- FlinkLogicalCalc(select=[id, name, txn_time, CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647)) AS status])
+- FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
:- FlinkLogicalCalc(select=[id, name, PROCTIME() AS txn_time])
: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name])
+- FlinkLogicalSnapshot(period=[$cor1.txn_time])
+- FlinkLogicalCalc(select=[id], where=[=(status, _UTF-16LE'NOT_EXISTS':VARCHAR(2147483647))])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[id, status])
|
Well, what is going on here?
We have the same logical input, but the optimized results are different. After discussing with my colleagues, I realized that the JDBC connector supports filter push down capability. Because of this, the Flink SQL planner pushes the filter condition down to the table level when using the JDBC connector. This is not the case for the test values connector, and the filter/where condition is kept on the calculated logical expression that will be applied after the table scan.
Note
Okay, this is fine. Let’s check the next difference.
Physical Optimization
The second difference happens in the physical optimization step.
| Physical Optimization rule for union all query using JDBC connector | Physical Optimization rule for union all query using test 'values' connector |
|---|---|
optimize 'physical' cost 29 ms. original input: FlinkLogicalCalc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status]) +- FlinkLogicalUnion(all=[true]) :- FlinkLogicalCalc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647)) AS status]) : +- FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner]) : :- FlinkLogicalCalc(select=[id, name, PROCTIME() AS txn_time]) : : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) : +- FlinkLogicalSnapshot(period=[$cor0.txn_time]) : +- FlinkLogicalCalc(select=[id]) : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, dim, filter=[=(status, _UTF-16LE'OK':VARCHAR(2147483647))]]], fields=[id, status]) +- FlinkLogicalCalc(select=[id, name, txn_time, CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647)) AS status]) +- FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner]) :- FlinkLogicalCalc(select=[id, name, PROCTIME() AS txn_time]) : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) +- FlinkLogicalSnapshot(period=[$cor1.txn_time]) +- FlinkLogicalCalc(select=[id]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, dim, filter=[=(status, _UTF-16LE'NOT_EXISTS':VARCHAR(2147483647))]]], fields=[id, status]) optimized output: Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status]) +- Union(all=[true], union=[id, name, txn_time, status]) :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647)) AS status]) : +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id], upsertKey=[[0]]) : +- Calc(select=[id, name, PROCTIME() AS txn_time]) : +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) +- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647)) AS status]) +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id], upsertKey=[[0]]) +- Calc(select=[id, name, PROCTIME() AS txn_time]) +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) |
optimize 'physical' cost 29 ms. original input: FlinkLogicalCalc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status]) +- FlinkLogicalUnion(all=[true]) :- FlinkLogicalCalc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647)) AS status]) : +- FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner]) : :- FlinkLogicalCalc(select=[id, name, PROCTIME() AS txn_time]) : : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) : +- FlinkLogicalSnapshot(period=[$cor0.txn_time]) : +- FlinkLogicalCalc(select=[id], where=[=(status, _UTF-16LE'OK':VARCHAR(2147483647))]) : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[id, status]) +- FlinkLogicalCalc(select=[id, name, txn_time, CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647)) AS status]) +- FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner]) :- FlinkLogicalCalc(select=[id, name, PROCTIME() AS txn_time]) : +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) +- FlinkLogicalSnapshot(period=[$cor1.txn_time]) +- FlinkLogicalCalc(select=[id], where=[=(status, _UTF-16LE'NOT_EXISTS':VARCHAR(2147483647))]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, dim, filter=[]]], fields=[id, status]) optimized output: Calc(select=[id, name, PROCTIME_MATERIALIZE(txn_time) AS txn_time, status]) +- Union(all=[true], union=[id, name, txn_time, status]) :- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'OK':VARCHAR(2147483647)) AS status]) : +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[status=_UTF-16LE'OK', id=id], where=[=(status, _UTF-16LE'OK':VARCHAR(2147483647))], select=[id, name, txn_time, id], upsertKey=[[0]]) : +- Calc(select=[id, name, PROCTIME() AS txn_time]) : +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) +- Calc(select=[id, name, txn_time, CAST(_UTF-16LE'NOT_EXISTS':VARCHAR(2147483647)) AS status]) +- LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[status=_UTF-16LE'NOT_EXISTS', id=id], where=[=(status, _UTF-16LE'NOT_EXISTS':VARCHAR(2147483647))], select=[id, name, txn_time, id], upsertKey=[[0]]) +- Calc(select=[id, name, PROCTIME() AS txn_time]) +- TableSourceScan(table=[[default_catalog, default_database, stream]], fields=[id, name]) |
This transformation also looks fine. But let’s pay closer attention to understand what is going on here.
This step converts the join into the lookup join, adds additional information, e.g, joinType, select and lookup definitions. It adds id as a lookup key to both transformations. However, for the values connector plan (on the right) the lookup definition includes the status column as a lookup key. Plus, it also contains information about the filter condition in where definition.
But on the JDBC connector plan (on the left) does not include where definition and status in the lookup definition. This shouldn’t be a problem for JDBC or any other source that support filter pushdowns since the filter condition (here status column comparison) is pushed down in the table scan.
But still, it would be helpful to include the filter pushdowns in the lookup join definitions in query explanations.
Warning
At this point, I have conducted several debug sessions and discussions with my colleagues, but I still could not identify the root cause of the bug.
I was thinking maybe some other optimization rule is messing up the filter pushdowns, e.g., discards them. Thus, I tried to create minimal test case that only applies the relevant physical optimization rules. You can find the example file here.
While debugging and reading other test cases, I noticed that the values connector could simulate the filter pushdowns, by just adding filterable-fields property to the table creation.
By updating the dimension table create statement to:
val dimTableId = TestValuesTableFactory.registerData(Seq(row(1L, "OK"), row(2L, "OK")))
val dimTableDDL =
s"""
| CREATE TABLE `dim` (
| `id` BIGINT,
| `status` STRING,
| PRIMARY KEY (`id`) NOT ENFORCED
| ) WITH (
| 'connector' = 'values',
| 'filterable-fields' = 'id;status',
| 'data-id' = '$dimTableId'
| )
|""".stripMargin
tEnv.executeSql(dimTableDDL
We have no the reproducible test case, and no need to depend on the databases. Okay good, but we are still have no clue about the root cause of the bug.
Continuing debugging, I added breakpoint on the StreamPhysicalUnionRule#440 and noticed that the logical FlinkLogicalJoin relations for both inputs are the same.
-
this = StreamPhysicalUnionRule @13203
-
rel = FlinkLogicalUnion @13204
-
▾
union = FlinkLogicalUnion @13204
-
id = 888
-
▾
inputs = RegularImmutableList @13218
-
▾
0 = RelSubset @13227
-
id = 876
-
▾
best = FlinkLogicalCalc @13232
-
id = 875
-
▾
input = RelSubset @13237
-
id = 874
-
▾
best = FlinkLogicalJoin @13242
-
id = 873
-
▸
joinType = JoinRelType @13255
-
▸
condition = RexCall @13256
-
▸
joinInfo = JoinInfo @13258
-
▸
left = RelSubset @13259
-
▸
right = RelSubset @13260
-
-
-
-
-
▾
1 = RelSubset @13228
-
id = 886
-
▾
best = FlinkLogicalCalc @13248
-
id = 885
-
▾
input = RelSubset @13237
-
id = 874
-
▾
best = FlinkLogicalJoin @13242
id = 873
- ▸
joinType = JoinRelType @13255
- ▸
condition = RexCall @13256
- ▸
joinInfo = JoinInfo @13258
- ▸
left = RelSubset @13259
- ▸
right = RelSubset @13260
-
-
-
-
-
Somehow the SQL optimizer finds the best plan (cheapest cost plan) for the union with both logical joins equivalent.
Indeed, if we print the logical union relation before converting to the physical union, System.out.println(union.explain()):
Union: FlinkLogicalUnion(all=[true])
FlinkLogicalCalc(subset=[rel#876:RelSubset#19.LOGICAL.any.None: 0.[NONE].[NONE]], select=[id, name, CAST('OK' AS VARCHAR(2147483647)) AS status])
FlinkLogicalJoin(subset=[rel#874:RelSubset#18.LOGICAL.any.None: 0.[NONE].[NONE]], condition=[=($0, $2)], joinType=[inner])
FlinkLogicalTableSourceScan(subset=[rel#867:RelSubset#14.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, stream]], fields=[id, name])
FlinkLogicalSnapshot(subset=[rel#872:RelSubset#17.LOGICAL.any.None: 0.[NONE].[NONE]], period=[$cor0.txn_time])
FlinkLogicalCalc(subset=[rel#870:RelSubset#16.LOGICAL.any.None: 0.[NONE].[NONE]], select=[id])
FlinkLogicalTableSourceScan(subset=[rel#868:RelSubset#15.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, dim, filter=[=(status, _UTF-16LE'OK':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]]], fields=[id, status])
FlinkLogicalCalc(subset=[rel#886:RelSubset#24.LOGICAL.any.None: 0.[NONE].[NONE]], select=[id, name, CAST('NOT_EXISTS' AS VARCHAR(2147483647)) AS status])
FlinkLogicalJoin(subset=[rel#874:RelSubset#18.LOGICAL.any.None: 0.[NONE].[NONE]], condition=[=($0, $2)], joinType=[inner])
FlinkLogicalTableSourceScan(subset=[rel#867:RelSubset#14.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, stream]], fields=[id, name])
FlinkLogicalSnapshot(subset=[rel#872:RelSubset#17.LOGICAL.any.None: 0.[NONE].[NONE]], period=[$cor0.txn_time])
FlinkLogicalCalc(subset=[rel#870:RelSubset#16.LOGICAL.any.None: 0.[NONE].[NONE]], select=[id])
FlinkLogicalTableSourceScan(subset=[rel#868:RelSubset#15.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, dim, filter=[=(status, _UTF-16LE'OK':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")]]], fields=[id, status])
|
We can see that the FlinkLogicalJoin for both inputs are exactly the same deep into the table scan. Both parts only filter the data if the status column is equal to the string OK.
This is good finding!
At this point, I spent extra time debugging the LogicalJoin optimization rules, but it didn’t lead to anything.
Tip
But if both logical joins filter data matching the status 'OK', why do we get the following result with NOT EXISTS status rows?
1, Alice, 2024-11-27 11:52:19.332, OK
2, Bob, 2024-11-27 11:52:19.332, OK
1, Alice, 2024-11-27 11:52:19.333, NOT_EXISTS
2, Bob, 2024-11-27 11:52:19.333, NOT_EXISTS
Shouldn’t all four rows have status as OK?