JoinPushdown not working when joining tables on varchar datatype

Hello,

Trino version: 377 and Teradata version is 16.
We have developed our custom Teradata connector.
JoinPushdown works fine when joining 2 tables on integer datatype columns. But it does not work when joining 2 tables on varchar() datatype columns.

Details given below.

trino:ap_trino> desc t1;
 Column |    Type     | Extra | Comment
--------+-------------+-------+---------
 c1     | integer     |       |
 c2     | char(25)    |       |
 c3     | varchar(25) |       |

trino:ap_trino> desc t2;
 Column |    Type     | Extra | Comment
--------+-------------+-------+---------
 c1     | integer     |       |
 c2     | varchar(13) |       |
 c3     | date        |       |

Find below explain analyze output for integer based column join.

trino:ap_trino> explain analyze select a.c1, a.c2, a.c3, b.c1, b.c2, b.c3
             -> from t1 a inner join t2 b on (a.c1=b.c1);

---------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 1 [SOURCE]
     CPU: 15.14ms, Scheduled: 106.36ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 0 rows (0B); per task: avg.: 0.00 std.dev.: 0.00, Output:
     Output layout: [c3, c1, c2, c2_1, c3_2]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     TableScan[ttd:Query[SELECT l."c1" AS "c1_0", l."c2" AS "c2_1", l."c3" AS "c3_2", r."c1" AS "c1_3", r."C2" AS "C2_4", r."C3" AS "C3_5" FROM (SELECT "
         Layout: [c3:varchar(25), c1:integer, c2:char(25), c2_1:varchar(13), c3_2:date]
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
         CPU: 14.00ms (100.00%), Scheduled: 80.00ms (100.00%), Blocked: 0.00ns (?%), Output: 0 rows (0B)
         Input avg.: 0.00 rows, Input std.dev.: ?%
         c3 := c3_2:varchar(25):VARCHAR
         c2_1 := C2_4:varchar(13):VARCHAR
         c1 := c1_0:integer:INTEGER
         c3_2 := C3_5:date:DATE
         c2 := c2_1:char(25):CHAR


(1 row)

Query 20220828_173520_00007_hjrk8, FINISHED, 1 node
http://localhost:8080/ui/query.html?20220828_173520_00007_hjrk8
Splits: 34 total, 34 done (100.00%)
CPU Time: 0.0s total,     0 rows/s,     0B/s, 14% active
Per Node: 0.0 parallelism,     0 rows/s,     0B/s
Parallelism: 0.0
Peak Memory: 1.31KB
2.90 [0 rows, 0B] [0 rows/s, 0B/s]

Find below explain analyze output for varchar() based column join.

trino:ap_trino> explain analyze select a.c1, a.c2, a.c3, b.c1, b.c2, b.c3
             -> from t1 a inner join t2 b on (a.c3=b.c2);
                                                                                      Query Plan
---------------------------------------------------------------------------------------------------------------------------------------------------------
 Fragment 1 [HASH]
     CPU: 554.02ms, Scheduled: 1.59s, Blocked 32.26s (Input: 18.36s, Output: 0.00ns), Input: 73416 rows (3.10MB); per task: avg.: 73416.00 std.dev.: 0.00
     Output layout: [c1, c2, c3, c1_3, c2_4, c3_5]
     Output partitioning: SINGLE []
     Stage Execution Strategy: UNGROUPED_EXECUTION
     InnerJoin[("c3" = "expr")][$hashvalue, $hashvalue_7]
     │   Layout: [c1:integer, c2:char(25), c3:varchar(25), c1_3:integer, c2_4:varchar(13), c3_5:date]
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
     │   CPU: 508.00ms (63.58%), Scheduled: 1.53s (75.52%), Blocked: 1.41s (4.37%), Output: 0 rows (0B)
     │   Left (probe) Input avg.: 0.06 rows, Input std.dev.: 556.78%
     │   Right (build) Input avg.: 2294.19 rows, Input std.dev.: 200.03%
     │   Collisions avg.: 0.00 (0.00% est.), Collisions std.dev.: ?%
     │   Distribution: PARTITIONED
     │   dynamicFilterAssignments = {expr -> #df_319}
     ├─ RemoteSource[2]
     │      Layout: [c1:integer, c2:char(25), c3:varchar(25), $hashvalue:bigint]
     │      CPU: 0.00ns (0.00%), Scheduled: 3.00ms (0.15%), Blocked: 3.91s (12.11%), Output: 2 rows (64B)
     │      Input avg.: 0.06 rows, Input std.dev.: 556.78%
     └─ LocalExchange[HASH][$hashvalue_7] ("expr")
        │   Layout: [c1_3:integer, c2_4:varchar(13), c3_5:date, expr:varchar(25), $hashvalue_7:bigint]
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
        │   CPU: 30.00ms (3.75%), Scheduled: 33.00ms (1.63%), Blocked: 12.52s (38.77%), Output: 73414 rows (3.10MB)
        │   Input avg.: 2294.19 rows, Input std.dev.: 278.47%
        └─ RemoteSource[3]
               Layout: [c1_3:integer, c2_4:varchar(13), c3_5:date, expr:varchar(25), $hashvalue_8:bigint]
               CPU: 2.00ms (0.25%), Scheduled: 2.00ms (0.10%), Blocked: 14.45s (44.75%), Output: 73414 rows (3.10MB)
               Input avg.: 2294.19 rows, Input std.dev.: 278.47%

 Fragment 2 [SOURCE]
     CPU: 20.66ms, Scheduled: 97.57ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 2 rows (0B); per task: avg.: 2.00 std.dev.: 0.00, Output: 2
     Output layout: [c1, c2, c3, $hashvalue_6]
     Output partitioning: HASH [c3][$hashvalue_6]
     Stage Execution Strategy: UNGROUPED_EXECUTION
     ScanFilterProject[table = ttd:ap_trino.t1 AP_trino.t1, grouped = false, filterPredicate = true, dynamicFilters = {"c3" = #df_319}]
         Layout: [c1:integer, c2:char(25), c3:varchar(25), $hashvalue_6:bigint]
         Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, netwo
         CPU: 20.00ms (2.50%), Scheduled: 73.00ms (3.61%), Blocked: 0.00ns (0.00%), Output: 2 rows (64B)
         Input avg.: 2.00 rows, Input std.dev.: 0.00%
         $hashvalue_6 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("c3"), 0))
         c3 := c3:varchar(25):VARCHAR
         c1 := c1:integer:INTEGER
         c2 := c2:char(25):CHAR
         Input: 2 rows (0B), Filtered: 0.00%
         Dynamic filters:
             - df_319, [ SortedRangeSet[type=varchar(25), ranges=8, {[approved], ..., [wip]}] ], collection time=535.85ms

 Fragment 3 [SOURCE]
     CPU: 237.82ms, Scheduled: 383.87ms, Blocked 0.00ns (Input: 0.00ns, Output: 0.00ns), Input: 73414 rows (0B); per task: avg.: 73414.00 std.dev.: 0.00,
     Output layout: [c1_3, c2_4, c3_5, expr, $hashvalue_9]
     Output partitioning: HASH [expr][$hashvalue_9]
     Stage Execution Strategy: UNGROUPED_EXECUTION
     Project[]
     │   Layout: [c1_3:integer, c2_4:varchar(13), c3_5:date, expr:varchar(25), $hashvalue_9:bigint]
     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
     │   CPU: 38.00ms (4.76%), Scheduled: 53.00ms (2.62%), Blocked: 0.00ns (0.00%), Output: 73414 rows (3.10MB)
     │   Input avg.: 73414.00 rows, Input std.dev.: 0.00%
     │   $hashvalue_9 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr"), 0))
     └─ ScanProject[table = ttd:ap_trino.t2 AP_trino.T2 columns=[c1:integer:INTEGER, C2:varchar(13):VARCHAR, C3:date:DATE], grouped = false]
            Layout: [c1_3:integer, c2_4:varchar(13), c3_5:date, expr:varchar(25)]
            Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
            CPU: 201.00ms (25.16%), Scheduled: 331.00ms (16.37%), Blocked: 0.00ns (0.00%), Output: 73414 rows (2.47MB)
            Input avg.: 73414.00 rows, Input std.dev.: 0.00%
            expr := CAST("c2_4" AS varchar(25))
            c2_4 := C2:varchar(13):VARCHAR
            c3_5 := C3:date:DATE
            c1_3 := c1:integer:INTEGER
            Input: 73414 rows (0B), Filtered: 0.00%


(1 row)

Query 20220828_174832_00010_hjrk8, FINISHED, 1 node
http://localhost:8080/ui/query.html?20220828_174832_00010_hjrk8
Splits: 131 total, 131 done (100.00%)
CPU Time: 0.8s total, 89.2K rows/s,     0B/s, 37% active
Per Node: 0.3 parallelism, 24.3K rows/s,     0B/s
Parallelism: 0.3
Peak Memory: 3.7MB
3.02 [73.4K rows, 0B] [24.3K rows/s, 0B/s]

Please help us to identify root cause of this issue.

Hi,

We have fixed this issue by implementing Table function in our custom Teradata connector.
Also, we have upgraded Trino version to 394.

Now, we are running all TD queries using below syntax.

SELECT * FROM TABLE(teradata.system.query(query => 'SELECT * from tab1'));

Thanks and regards,
Mahebub Sayyed

hello, I have the same problem when I use elasticsearch connector,could you please give more detail about the solution?

There is new feature in Trino called table functions. When you use it, all the complete queries gets pushed down to the source irrespective of any join, datatype , etc.
But you need to check if this table function is available for Elasticsearch connector or not.