Kafka Connect JDBC Source Connector

Apache Kafka Sep 22, 2020

Getting data from database to Apache Kafka is certainly one of the most popular use case of Kafka Connect. Kafka Connect provides scalable and reliable way to move the data in and out of Kafka. As it uses plugins for specific plugins for connectors and it is run by only configuration (without writing code) it is an easy integration point.

Visit the Kafka Connect Basics post if you would like to get an introduction.

1- Running Kafka Cluster

We can use the following docker-compose file to get Kafka cluster with a single broker up and running.

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

If you would like to use a user interface rather than console tools to manage the Kafka, Confluent Control Center is one of the best choice. It is commercial tool but it comes with 30 days licence. There are also Landoop UI which has Kafka Connect management interface as well. If you would like to use Confluent Control Center you can add it as a service to the docker-compose file as follows:

control-center:
    image: confluentinc/cp-enterprise-control-center:5.5.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - kafka
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:9092'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

2- Preparing Connector Library

Download the Kafka Connect JDBC plugin from Confluent hub and extract the zip file to the Kafka Connect's plugins path. While we start Kafka Connector we can specify a plugin path that will be used to access the plugin libraries. For example plugin.path=/usr/local/share/kafka/plugins. Check Install Connector Manually documentation for details.

We also need JDBC 4.0 driver as it will be used by the connector to communicate with the database.  Postgresql and sqlite drivers are already shipped with JDBC connector plugin. If you like to connect to another database system add the driver to the same folder with kafka-connect-jdbc jar file. See Installing JDBC Driver Manual.

3- Running Kafka Connect

We can run the Kafka Connect with connect-distributed.sh script that is located inside the kafka bin directory. We need to provide a properties file while running this script for configuring the worker properties.

connect-distributed.sh <worker properties file>

We can create create connect-distributed.properties file to specify the worker properties as follows:

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
bootstrap.servers=localhost:29092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data.
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
config.storage.topic=connect-configs
config.storage.replication.factor=1

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
status.storage.topic=connect-status
status.storage.replication.factor=1

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

plugin.path=/Users/cemalturkoglu/kafka/plugins

Note that the plugin.path is the path that we need to place the library that we downloaded.

After running the connector we can confirm that connector's REST endpoint is accessible, and we can confirm that JDBC connector is in the plugin list by calling http://localhost:8083/connector-plugins

[{"class":"io.confluent.connect.jdbc.JdbcSinkConnector","type":"sink","version":"5.5.1"},{"class":"io.confluent.connect.jdbc.JdbcSourceConnector","type":"source","version":"5.5.1"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.6.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.6.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]

4. Starting the JDBC Connector

As we operate on distributed mode we run the connectors by calling REST endpoints with the configuration JSON. We can specify the configuration payload from a file for curl command. The following command starts the connector.

curl -d @"jdbc-source.json" \
-H "Content-Type: application/json" \
-X POST http://localhost:8083/connectors

The configuration for the plugin is stored in jdbc-source.json file can be as follows:

{
    "name": "jdbc_source_connector_postgresql_01",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://localhost:5432/demo",
        "connection.user": "postgres",
        "connection.password": "root",
        "topic.prefix": "postgres-01-",
        "poll.interval.ms" : 3600000,
        "mode":"bulk"
    }
}
  • The connector connects to the database with using the JDBC URL and connection credentials.
  • It will create kafka topic per table. Topics are named with the topic.prefix + <table_name>
  • The data is retrieved from database with the interval specified by poll.interval.ms config.
  • The mode configuration is to specify the working mode which will be discussed below. Bulk mode is used to load all the data.

We can see that my demo database with 4 tables are loaded to the 4 kafka topics:

And each row in the tables are loaded as a message.

The message contains the following fields:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "int64",
        "optional": false,
        "field": "id"
      },
      {
        "type": "string",
        "optional": true,
        "field": "flat"
      },
      {
        "type": "string",
        "optional": true,
        "field": "postal_code"
      },
      {
        "type": "string",
        "optional": true,
        "field": "street"
      },
      {
        "type": "string",
        "optional": true,
        "field": "title"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "city_id"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "user_id"
      }
    ],
    "optional": false,
    "name": "address"
  },
  "payload": {
    "id": 3,
    "flat": "3/1B",
    "postal_code": "ABX501",
    "street": "Street2",
    "title": "work address",
    "city_id": 7,
    "user_id": 3
  }
}

Note that it contains the fields attribute with the information about the fields and payload with the actual data.

Selecting Schema and Tables To Copy

We can use catalog.pattern or schema.pattern to filter the schemas to be copied.

By default all tables are queried to be copied. However we include or exclude the list of tables in copying by table.whitelist and table.blacklist configurations. We can use either blacklist or whitelist at the same time.

table.whitelist:"Users,Address,City"

table.blacklist:"Groups"

Query Modes

There are alternative incremental query modes to bulk mode which is used in the above demonstration. Incremental modes can be used to load the data only if there is a change. Certain columns are used to detect if there is a change in the table or row.

bulk: In this mode connector will load all the selected tables in each iteration. If the iteration interval is set to some small number (5 seconds default) it wont make much sense to load all the data as there will be duplicate data. It can be useful if a periodical backup, or dumping the entire database.

incrementing: This mode uses a single column that is unique for each row, ideally auto incremented primary keys to detect the changes in the table. If new row with new ID is added it will be copied to Kafka. However this mode lacks the capability of catching update operation on the row as it will not change the ID. incrementing.column.name is used to configure the column name.

timestamp: Uses a single column that shows the last modification timestamp and in each iteration queries only for rows that have been modified since that time. As timestamp is not unique field, it can miss some updates which have the same timestamp. timestamp.column.name is used to configure the column name.

timestamp+incrementing: Most robust and accurate mode that uses both a unique incrementing ID and timestamp. Only drawback is that it is needed to add modification timestamp column on legacy tables.

query: The connector supports using custom queries to fetch data in each iteration. It is not very flexible in terms of incremental changes. It can be useful to fetch only necessary columns from a very wide table, or to fetch a view containing multiple joined tables. If the query gets complex, the load and the performance impact on the database increases.

Incremental Querying with Timestamp

Using only unique ID or timestamp has pitfalls as mentioned above. It is better approach to use them together. The following configuration shows an example of timestamp+incrementing mode:

{
    "name": "jdbc_source_connector_postgresql_02",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url": "jdbc:postgresql://localhost:5432/demo-db",
        "connection.user": "postgres",
        "connection.password": "root",
        "topic.prefix": "postgres-02-",
        "table.whitelist": "store,tag,category,address,city",
        "mode":"timestamp+incrementing",
        "timestamp.column.name": "last_modified_date",
        "validate.non.null": false,
        "db.timezone": "Europe/Warsaw"
    }
}

Note the validate.non.null is used because connector requires the timestamp column to be NOT NULL, we can either set these columns NOT NULL or we can disable this validation with setting validate.not.null false.

While using the timestamp column timezone of the database system matters. There might be different behaviour because of time mismatches so it can be configure by db.timezone.

table.whitelist configuration is used to limit the tables to given list. So these 5 tables are copied to Kafka topics.

It is mentioned above that using incrementing mode without timestamp causes not capturing the UPDATE operations on the table. With the timestamp+incrementing mode update operations are captured as well.

Final words

JDBC Connector is great way to start for shipping data from relational databases to Kafka. It is easy to setup and use, only it is needed to configure few properties to get you data streamed out. However there are some drawbacks of JDBC connector as well. Some of the drawbacks can be listed as:

  • It needs to constantly run queries, so it generates some load on the physical database. To not cause performance impacts, queries should be kept simple, and scalability should not be used heavily.
  • As the incremental timestamp is mostly needed, working on legacy datastore would need extra work to add columns. There can be also cases that it is not possible to update the schema.
  • JDBC Connector can not fetch DELETE operations as it uses SELECT queries to retrieve data and there is no sophisticated mechanism to detect the deleted rows. You can implement your solution to overcome this problem.

References

Tags