Apache Kafka integration
This section explains how to integrate Apache Kafka with Exasol.
Apache Kafka Exasol Connector
The Kafka Exasol Connector is an open-source project officially supported by Exasol, which provides an integration between Apache Kafka and Exasol. You can use this connector to import data from a Kafka topic into an Exasol table.
Prerequisites
To integrate the Kafka application with Exasol, you need the following:
- An operational Exasol cluster
- An operational Kafka cluster
- A connection between Exasol and Kafka clusters
Setting up the UDFs
Do the following to use the UDFs:
- Download the latest jar files from Kafka Connector Extension Releases.
- Upload the jar to a bucket in the BucketFS as described in Upload the JAR Files. To know about BucketFS, see BucketFS.
- Create the UDF scripts as mentioned in Create UDF Script.
Import Data
The following sections show how to import data.
Prepare Exasol Table
Avro Preparation
If you want to create a corresponding table in Exasol that stores the data from a Kafka topic, the table column names and types should match the Kafka topic Avro schema names and types.
Additionally, add two extra columns at the end of the table. These columns store the Kafka metadata and help to keep track of the already imported records.
CREATE OR REPLACE TABLE <schema_name>.<table_name> (
-- These columns match the Kafka topic schema
SALES_ID INTEGER,
POSITION_ID SMALLINT,
ARTICLE_ID SMALLINT,
AMOUNT SMALLINT,
PRICE DECIMAL(9,2),
VOUCHER_ID SMALLINT,
CANCELED BOOLEAN
-- Required for Kafka import UDF
KAFKA_PARTITION DECIMAL(18, 0),
KAFKA_OFFSET DECIMAL(36, 0),
);
Avro supports several primitive and complex types. If you want to know how the Avro types are matched with Exasol, see Avro Data Mapping.
JSON Preparation
In case you want to add whole json document in one single column, (see AS_JSON_DOC in Optional consumer properties) then create a table shown below:
CREATE OR REPLACE TABLE <schema_name>.<table_name> (
-- Single column as JSON string for Kafka topic record
JSON_DOC_COL VARCHAR(2000000),
-- Required for Kafka import UDF
KAFKA_PARTITION DECIMAL(18, 0),
KAFKA_OFFSET DECIMAL(36, 0),
The last two columns in the above example store metadata about Kafka topic partition and record offset inside a partition.
Import From Kafka Cluster
The following examples show how to import data from a Kafka cluster.
IMPORT INTO <schema_name>.<table_name>
FROM SCRIPT KAFKA_CONSUMER WITH
BOOTSTRAP_SERVERS = '<kafka_bootstap_servers>'
SCHEMA_REGISTRY_URL = '<schema_registry_url>'
TOPIC_NAME = '<kafka_topic>
TABLE_NAME = '<schema_name>.<table_name>'
GROUP_ID = 'exasol-kafka-udf-consumers';
For more information about what security options and consumer Kafka properties are available, see Secure Connection to Kafka Cluster and Kafka Consumer Properties.
Contribute to the Project
Exasol encourages your contribution to the open source project. To know about how to contribute to the project, see Contributing.