Apache Kafka Integration

This section provides you with information about how to integrate Apache Kafka with Exasol. This is an open source project officially supported by Exasol.

Apache Kafka Exasol Connector

The Kafka Exasol Connector is an open-source project that provides an integration between Apache Kafka and Exasol. You can use this connector to import data from a Kafka topic into an Exasol table.

Prerequisites

To integrate the Kafka application with Exasol, you need the following:

  • An operational Exasol cluster
  • An operational Kafka cluster
  • A connection between Exasol and Kafka clusters

Setting up the UDFs

Do the following to use the UDFs:

  1. Download the latest jar files from Kafka Connector Extension Releases.
  2. Upload the jar to a bucket in the BucketFS as described in Upload the JAR Files. To know about BucketFS, see BucketFS.
  3. Create the UDF scripts as mentioned in Create UDF Script.

Import Data

The following sections show how to import data.

Prepare Exasol Table

Avro Preparation

If you want to create a corresponding table in Exasol that stores the data from a Kafka topic, the table column names and types should match the Kafka topic Avro schema names and types.

Additionally, add two extra columns at the end of the table. These columns store the Kafka metadata and help to keep track of the already imported records.

CREATE OR REPLACE TABLE <schema_name>.<table_name> (
    -- These columns match the Kafka topic schema
    SALES_ID    INTEGER,
    POSITION_ID SMALLINT,
    ARTICLE_ID  SMALLINT,
    AMOUNT      SMALLINT,
    PRICE       DECIMAL(9,2),
    VOUCHER_ID  SMALLINT,
    CANCELED    BOOLEAN
    -- Required for Kafka import UDF
    KAFKA_PARTITION DECIMAL(18, 0),
    KAFKA_OFFSET DECIMAL(36, 0),
);

Avro supports several primitive and complex types. If you want to know how the Avro types are matched with Exasol, see Avro Data Mapping.

JSON Preparation

In case you want to add whole json document in one single column, (see AS_JSON_DOC in Optional consumer properties) then create a table shown below:

CREATE OR REPLACE TABLE <schema_name>.<table_name> (
    -- Single column as JSON string for Kafka topic record
    JSON_DOC_COL    VARCHAR(2000000),
    -- Required for Kafka import UDF
    KAFKA_PARTITION DECIMAL(18, 0),
    KAFKA_OFFSET DECIMAL(36, 0),

The last two columns in the above example store metadata about Kafka topic partition and record offset inside a partition.

Import From Kafka Cluster

The following examples show how to import data from a Kafka cluster.

IMPORT INTO <schema_name>.<table_name>
FROM SCRIPT KAFKA_CONSUMER WITH
  BOOTSTRAP_SERVERS   = '<kafka_bootstap_servers>'
  SCHEMA_REGISTRY_URL = '<schema_registry_url>'
  TOPIC_NAME          = '<kafka_topic>
  TABLE_NAME          = '<schema_name>.<table_name>'
  GROUP_ID            = 'exasol-kafka-udf-consumers';

For more information about what security options and consumer Kafka properties are available, see Secure Connection to Kafka Cluster and Kafka Consumer Properties.

Contribute to the Project

Exasol encourages your contribution to the open source project. To know about how to contribute to the project, see Contributing.