Hello,
Trino version: 377 and Teradata version is 16.
We have developed our custom Teradata connector.
JoinPushdown works fine when joining 2 tables on integer datatype columns. But it does not work when joining 2 tables on varchar() datatype columns.
Details given below.
trino:ap_trino> desc t1;
Column | Type | Extra | Comment
--------+-------------+-------+---------
c1 | integer | |
c2 | char(25) | |
c3 | varchar(25) | |
trino:ap_trino> desc t2;
Column | Type | Extra | Comment
--------+-------------+-------+---------
c1 | integer | |
c2 | varchar(13) | |
c3 | date | |
Find below explain analyze output for integer based column join.
trino:ap_trino> explain analyze select a.c1, a.c2, a.c3, b.c1, b.c2, b.c3
-> from t1 a inner join t2 b on (a.c1=b.c1);
---------------------------------------------------------------------------------------------------------------------------------------------------------
Fragment 1 [SOURCE]
CPU: 15.14ms, Scheduled: 106.36ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 0 rows (0B); per task: avg.: 0.00 std.dev.: 0.00, Output:
Output layout: [c3, c1, c2, c2_1, c3_2]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
TableScan[ttd:Query[SELECT l."c1" AS "c1_0", l."c2" AS "c2_1", l."c3" AS "c3_2", r."c1" AS "c1_3", r."C2" AS "C2_4", r."C3" AS "C3_5" FROM (SELECT "
Layout: [c3:varchar(25), c1:integer, c2:char(25), c2_1:varchar(13), c3_2:date]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
CPU: 14.00ms (100.00%), Scheduled: 80.00ms (100.00%), Blocked: 0.00ns (?%), Output: 0 rows (0B)
Input avg.: 0.00 rows, Input std.dev.: ?%
c3 := c3_2:varchar(25):VARCHAR
c2_1 := C2_4:varchar(13):VARCHAR
c1 := c1_0:integer:INTEGER
c3_2 := C3_5:date:DATE
c2 := c2_1:char(25):CHAR
(1 row)
Query 20220828_173520_00007_hjrk8, FINISHED, 1 node
http://localhost:8080/ui/query.html?20220828_173520_00007_hjrk8
Splits: 34 total, 34 done (100.00%)
CPU Time: 0.0s total, 0 rows/s, 0B/s, 14% active
Per Node: 0.0 parallelism, 0 rows/s, 0B/s
Parallelism: 0.0
Peak Memory: 1.31KB
2.90 [0 rows, 0B] [0 rows/s, 0B/s]
Find below explain analyze output for varchar() based column join.
trino:ap_trino> explain analyze select a.c1, a.c2, a.c3, b.c1, b.c2, b.c3
-> from t1 a inner join t2 b on (a.c3=b.c2);
Query Plan
---------------------------------------------------------------------------------------------------------------------------------------------------------
Fragment 1 [HASH]
CPU: 554.02ms, Scheduled: 1.59s, Blocked 32.26s (Input: 18.36s, Output: 0.00ns), Input: 73416 rows (3.10MB); per task: avg.: 73416.00 std.dev.: 0.00
Output layout: [c1, c2, c3, c1_3, c2_4, c3_5]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
InnerJoin[("c3" = "expr")][$hashvalue, $hashvalue_7]
│ Layout: [c1:integer, c2:char(25), c3:varchar(25), c1_3:integer, c2_4:varchar(13), c3_5:date]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
│ CPU: 508.00ms (63.58%), Scheduled: 1.53s (75.52%), Blocked: 1.41s (4.37%), Output: 0 rows (0B)
│ Left (probe) Input avg.: 0.06 rows, Input std.dev.: 556.78%
│ Right (build) Input avg.: 2294.19 rows, Input std.dev.: 200.03%
│ Collisions avg.: 0.00 (0.00% est.), Collisions std.dev.: ?%
│ Distribution: PARTITIONED
│ dynamicFilterAssignments = {expr -> #df_319}
├─ RemoteSource[2]
│ Layout: [c1:integer, c2:char(25), c3:varchar(25), $hashvalue:bigint]
│ CPU: 0.00ns (0.00%), Scheduled: 3.00ms (0.15%), Blocked: 3.91s (12.11%), Output: 2 rows (64B)
│ Input avg.: 0.06 rows, Input std.dev.: 556.78%
└─ LocalExchange[HASH][$hashvalue_7] ("expr")
│ Layout: [c1_3:integer, c2_4:varchar(13), c3_5:date, expr:varchar(25), $hashvalue_7:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
│ CPU: 30.00ms (3.75%), Scheduled: 33.00ms (1.63%), Blocked: 12.52s (38.77%), Output: 73414 rows (3.10MB)
│ Input avg.: 2294.19 rows, Input std.dev.: 278.47%
└─ RemoteSource[3]
Layout: [c1_3:integer, c2_4:varchar(13), c3_5:date, expr:varchar(25), $hashvalue_8:bigint]
CPU: 2.00ms (0.25%), Scheduled: 2.00ms (0.10%), Blocked: 14.45s (44.75%), Output: 73414 rows (3.10MB)
Input avg.: 2294.19 rows, Input std.dev.: 278.47%
Fragment 2 [SOURCE]
CPU: 20.66ms, Scheduled: 97.57ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 2 rows (0B); per task: avg.: 2.00 std.dev.: 0.00, Output: 2
Output layout: [c1, c2, c3, $hashvalue_6]
Output partitioning: HASH [c3][$hashvalue_6]
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanFilterProject[table = ttd:ap_trino.t1 AP_trino.t1, grouped = false, filterPredicate = true, dynamicFilters = {"c3" = #df_319}]
Layout: [c1:integer, c2:char(25), c3:varchar(25), $hashvalue_6:bigint]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, netwo
CPU: 20.00ms (2.50%), Scheduled: 73.00ms (3.61%), Blocked: 0.00ns (0.00%), Output: 2 rows (64B)
Input avg.: 2.00 rows, Input std.dev.: 0.00%
$hashvalue_6 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("c3"), 0))
c3 := c3:varchar(25):VARCHAR
c1 := c1:integer:INTEGER
c2 := c2:char(25):CHAR
Input: 2 rows (0B), Filtered: 0.00%
Dynamic filters:
- df_319, [ SortedRangeSet[type=varchar(25), ranges=8, {[approved], ..., [wip]}] ], collection time=535.85ms
Fragment 3 [SOURCE]
CPU: 237.82ms, Scheduled: 383.87ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 73414 rows (0B); per task: avg.: 73414.00 std.dev.: 0.00,
Output layout: [c1_3, c2_4, c3_5, expr, $hashvalue_9]
Output partitioning: HASH [expr][$hashvalue_9]
Stage Execution Strategy: UNGROUPED_EXECUTION
Project[]
│ Layout: [c1_3:integer, c2_4:varchar(13), c3_5:date, expr:varchar(25), $hashvalue_9:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
│ CPU: 38.00ms (4.76%), Scheduled: 53.00ms (2.62%), Blocked: 0.00ns (0.00%), Output: 73414 rows (3.10MB)
│ Input avg.: 73414.00 rows, Input std.dev.: 0.00%
│ $hashvalue_9 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0))
└─ ScanProject[table = ttd:ap_trino.t2 AP_trino.T2 columns=[c1:integer:INTEGER, C2:varchar(13):VARCHAR, C3:date:DATE], grouped = false]
Layout: [c1_3:integer, c2_4:varchar(13), c3_5:date, expr:varchar(25)]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
CPU: 201.00ms (25.16%), Scheduled: 331.00ms (16.37%), Blocked: 0.00ns (0.00%), Output: 73414 rows (2.47MB)
Input avg.: 73414.00 rows, Input std.dev.: 0.00%
expr := CAST("c2_4" AS varchar(25))
c2_4 := C2:varchar(13):VARCHAR
c3_5 := C3:date:DATE
c1_3 := c1:integer:INTEGER
Input: 73414 rows (0B), Filtered: 0.00%
(1 row)
Query 20220828_174832_00010_hjrk8, FINISHED, 1 node
http://localhost:8080/ui/query.html?20220828_174832_00010_hjrk8
Splits: 131 total, 131 done (100.00%)
CPU Time: 0.8s total, 89.2K rows/s, 0B/s, 37% active
Per Node: 0.3 parallelism, 24.3K rows/s, 0B/s
Parallelism: 0.3
Peak Memory: 3.7MB
3.02 [73.4K rows, 0B] [24.3K rows/s, 0B/s]
Please help us to identify root cause of this issue.