How to parse protobuf object in Kafka message by Trino

Hello!

I hope this is not the wrong section. If so, please move it :pray:

I run Trino in a Docker container using this command and passing configuration files:

sudo docker run -p 8080:8080  -v /mnt/c/trino-config-test/catalog/kafka.properties:/etc/trino/catalog/kafka.properties -v /mnt/c/trino-config-test/topic_description/kafka.mario.test.json:/etc/kafka/kafka.mario.test.json trinodb/trino

kafka.properties

connector.name=kafka
kafka.nodes=(hidden):9092
kafka.table-names=mario.test
kafka.hide-internal-columns=false

kafka.mario.test.json

{
    "tableName": "test",
    "schemaName": "mario",
    "topicName": "mario.test",
    "key": {
        "dataFormat": "raw",
        "fields": [
            {
                "name": "kafka_key",
                "dataFormat": "LONG",
                "type": "BIGINT",
                "hidden": "false"
            }
        ]
    }
}

I checked inside the container and files are copied successfully.

I successfully manage to connect Trino to Kafka, in fact I can run queries on the topic “mario.test” using the CLI.

The topic has the following structure:

  • The key is type of String but the actual value of this is an Integer, e.g. “12345”
  • The value is a Protobuf object

I have two problems:

  1. kafka.mario.test.json is not taken for whatever reason, in fact I have the column “kafka_key” is not created. It’s not relavant for the moment, but I wanted to mention this anyway.
  2. When I try to parse the Protobuf object, I get an error message.

This is the code for reference:

try (
            Connection conn = DriverManager.getConnection(url, properties);
            Statement stmt = conn.createStatement();
            ResultSet rs = stmt.executeQuery("SELECT _key as key,  _partition_id as partition, _message as value FROM kafka.mario.test LIMIT 1");
        ) {
            log.info("Iterating on result set...");
            // Extract data from result set
            while (rs.next()) {
                log.info("{}", rs.getBigDecimal("partition"));
                log.info("{}", rs.getString("key"));
                final String valueString = rs.getString("value");
                final ScheduledJob value = ScheduledJob.parseFrom(valueString.getBytes(StandardCharsets.UTF_8)); // EXCEPTION THROWN
                log.info("{}", value);

            }
        } catch (SQLException e) {
            e.printStackTrace();
        }

The error message is:

Exception in thread "main" com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).

I can’t really find a way to deserialize that object properly and Trino doesn’t offer any support for Protobuf as far as I could read. I hope that someone is able to help me.

This used to be not supported. Newer Trino versions support protobuf encoding and decoding. Check the documentation for details and upgrade your cluster.