Hello!
I hope this is not the wrong section. If so, please move it
I run Trino in a Docker container using this command and passing configuration files:
sudo docker run -p 8080:8080 -v /mnt/c/trino-config-test/catalog/kafka.properties:/etc/trino/catalog/kafka.properties -v /mnt/c/trino-config-test/topic_description/kafka.mario.test.json:/etc/kafka/kafka.mario.test.json trinodb/trino
kafka.properties
connector.name=kafka
kafka.nodes=(hidden):9092
kafka.table-names=mario.test
kafka.hide-internal-columns=false
kafka.mario.test.json
{
"tableName": "test",
"schemaName": "mario",
"topicName": "mario.test",
"key": {
"dataFormat": "raw",
"fields": [
{
"name": "kafka_key",
"dataFormat": "LONG",
"type": "BIGINT",
"hidden": "false"
}
]
}
}
I checked inside the container and files are copied successfully.
I successfully manage to connect Trino to Kafka, in fact I can run queries on the topic “mario.test” using the CLI.
The topic has the following structure:
- The key is type of String but the actual value of this is an Integer, e.g. “12345”
- The value is a Protobuf object
I have two problems:
kafka.mario.test.json
is not taken for whatever reason, in fact I have the column “kafka_key” is not created. It’s not relavant for the moment, but I wanted to mention this anyway.- When I try to parse the Protobuf object, I get an error message.
This is the code for reference:
try (
Connection conn = DriverManager.getConnection(url, properties);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT _key as key, _partition_id as partition, _message as value FROM kafka.mario.test LIMIT 1");
) {
log.info("Iterating on result set...");
// Extract data from result set
while (rs.next()) {
log.info("{}", rs.getBigDecimal("partition"));
log.info("{}", rs.getString("key"));
final String valueString = rs.getString("value");
final ScheduledJob value = ScheduledJob.parseFrom(valueString.getBytes(StandardCharsets.UTF_8)); // EXCEPTION THROWN
log.info("{}", value);
}
} catch (SQLException e) {
e.printStackTrace();
}
The error message is:
Exception in thread "main" com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).
I can’t really find a way to deserialize that object properly and Trino doesn’t offer any support for Protobuf as far as I could read. I hope that someone is able to help me.