This article explains how to integrate Apache Kafka with Exasol.
The Kafka Exasol Connector is an open-source project officially supported by Exasol, which provides an integration between Apache Kafka and Exasol. You can use this connector to import data from a Kafka topic into an Exasol table.
To integrate the Kafka application with Exasol, you need the following:
The following steps are needed to import data from a Kafka cluster to Exasol.
If you want to create a corresponding table in Exasol that stores the data from a Kafka topic, the table column names and types should match the Kafka topic Avro schema names and types.
Additionally, add two extra columns at the end of the table. These columns store the Kafka metadata and help to keep track of already imported records.
CREATE OR REPLACE TABLE <schema_name>.<table_name> (
-- These columns match the Kafka topic schema
SALES_ID INTEGER,
POSITION_ID SMALLINT,
ARTICLE_ID SMALLINT,
AMOUNT SMALLINT,
PRICE DECIMAL(9,2),
VOUCHER_ID SMALLINT,
CANCELED BOOLEAN
-- Required for Kafka import UDF
KAFKA_PARTITION DECIMAL(18, 0),
KAFKA_OFFSET DECIMAL(36, 0),
);
Avro supports several primitive and complex types. For information about how Avro types are matched with Exasol, see Avro Data Mapping.
In case you want to add whole json document in one single column, (see AS_JSON_DOC
in Optional consumer properties) then create a table shown below:
CREATE OR REPLACE TABLE <schema_name>.<table_name> (
-- Single column as JSON string for Kafka topic record
JSON_DOC_COL VARCHAR(2000000),
-- Required for Kafka import UDF
KAFKA_PARTITION DECIMAL(18, 0),
KAFKA_OFFSET DECIMAL(36, 0),
The last two columns in the example store metadata about Kafka topic partition and record offset inside a partition.
The following example shows how to import data from a Kafka cluster.
IMPORT INTO <schema_name>.<table_name>
FROM SCRIPT KAFKA_CONSUMER WITH
BOOTSTRAP_SERVERS = '<kafka_bootstap_servers>'
SCHEMA_REGISTRY_URL = '<schema_registry_url>'
TOPIC_NAME = '<kafka_topic>
TABLE_NAME = '<schema_name>.<table_name>'
GROUP_ID = 'exasol-kafka-udf-consumers';
For more information about what security options and consumer Kafka properties are available, see Secure Connection to Kafka Cluster and Kafka Consumer Properties.
If you want to contribute to the Kafka Connector Extension open source project, see Contributing.