Apache Kafka
This document describes how to use the Kafka Connect Sink Connector for YugabyteDB to store events from Apache Kafka into YugabyteDB via YSQL and YCQL APIs.
Before you can start using Kafka Connect Sink, ensure that you have the following:
Confluent Platform with free community features.
For more information and installation instructions, see On-Premises Deployments in the Confluent Platform documentation.
YugabyteDB cluster.
For more information and instructions, see YugabyteDB Quick Start Guide.
Configuring Kafka
You configure Kafka Connect Sink as follows:
Download the TAR file using Confluent Platform, extract this file, and then set the
environment variables, as follows:# assuming a filename of confluent-community-6.1.1.tar... tar -xvf confluent-community-6.1.1.tar cd confluent-6.1.1 export CONFLUENT_HOME=/Users/myusername/confluent-6.1.1 export PATH=$PATH:$CONFLUENT_HOME/bin
Use the following command to start ZooKeeper, Kafka, Schema Registry, Kafka Connect REST API, and Kafka Connect:
confluent local services start
Expect the following output:
The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html Using CONFLUENT_CURRENT: /var/folders/_1/ltd94t1x2nsdrwj302jl85vc0000gn/T/confluent.127538 Starting ZooKeeper ZooKeeper is [UP] Starting Kafka Kafka is [UP] Starting Schema Registry Schema Registry is [UP] Starting Kafka REST Kafka REST is [UP] Starting Connect Connect is [UP] Starting ksqlDB Server ksqlDB Server is [UP]
After changing configuration, use the following command to stop Kafka Connect:
confluent local services connect stop
Expect the following output:
The local commands are intended for a single-node development environment only, NOT for production usage. https://docs.confluent.io/current/cli/index.html Using CONFLUENT_CURRENT: /var/folders/_1/ltd94t1x2nsdrwj302jl85vc0000gn/T/confluent.127538 Stopping Connect Connect is [DOWN]
Restart Kafka Connect to trigger loading of the YugabyteDB Sink JAR file.
Building YugabyteDB Kafka Connect Sink
Run the following commands to build the JAR files needed by Kafka Connect Sink:
git clone https://github.com/yugabyte/dsbulk.git
cd dsbulk
mvn clean install -DskipTests
The preceding command is used for the local installation.
git clone https://github.com/yugabyte/messaging-connectors-commons.git
cd messaging-connectors-commons
mvn clean install -DskipTests
The preceding command is used for the local installation.
git clone https://github.com/yugabyte/yb-kafka-sink.git
cd yb-kafka-sink
mvn clean package -DskipTests
Expect the following output:
[INFO] Replacing /home/centos/yb-kafka-sink/dist/target/kafka-connect-yugabytedb-sink-1.4.1-SNAPSHOT.jar with /home/centos/yb-kafka-sink/dist/target/kafka-connect-yugabytedb-sink-distribution-1.4.1-SNAPSHOT-shaded.jar
You can copy the kafka-connect-yugabytedb-sink-1.4.1-SNAPSHOT.jar
into the Kafka Connect class path.
The next step is to modify the etc/schema-registry/connect-avro-distributed.properties
file to add kafka-connect-yugabytedb-sink-1.4.1-SNAPSHOT.jar
to plugin.path
. That is, the line containing plugin.path
should contain the path to kafka-connect-yugabytedb-sink-1.4.1-SNAPSHOT.jar
, as demonstrated by the following example:
The next step is to start Kafka Connect again by executing the following command:
confluent local services connect start
//connect-distributed etc/schema-registry/connect-avro-distributed.properties
To list the connectors plugin and verify if the connectors are loaded, execute the following command:
confluent local services connect plugin
The following is a sample output produced by the preceding command:
"class": "com.datastax.kafkaconnector.DseSinkConnector",
"type": "sink",
"version": "1.4.1-SNAPSHOT"
"class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"type": "sink",
"version": "1.4.1-SNAPSHOT"
"class": "com.yugabyte.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "1.4.1-SNAPSHOT"
"class": "com.yugabyte.jdbc.JdbcSourceConnector",
"type": "source",
"version": "1.4.1-SNAPSHOT"
Verifying the Integration
You can verify the integration by producing a Record in Kafka.
The following produces a record into the orders topic using the Avro producer:
./bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic orders \
value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"product", "type": "string"}, {"name":"quantity", "type": "int"}, {"name":"price", "type": "float"}]}'
The console producer waits for input.
The next step is to copy and paste the record, such as the following sample, into the terminal and press Enter:
{"id": 999, "product": "foo", "quantity": 100, "price": 50}
Use the Avro consumer to verify that the message is published to the topic, as follows:
./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic orders --from-beginning
Configuring the JDBC Sink Connector
You can configure the JDBC Sink connector as follows:
Create a file named
with the following content:{ "name": "yb-jdbc-sink", "config": { "connector.class": "com.yugabyte.jdbc.JdbcSinkConnector", "tasks.max": "2", "topics": "orders", "connection.urls":"jdbc:postgresql://localhost:5433/yugabyte", "connection.user":"yugabyte", "connection.password":"yugabyte", "mode":"UPSERT", "auto.create":"true" } }
Load the JDBC Sink by executing the following command:
curl -X POST -H "Content-Type: application/json" -d @/Users/me/confluent-6.1.1/etc/kafka/yb-jdbc-sink-quickstart.properties "localhost:8083/connectors"
Query the YugabyteDB database to select all rows from the
table, as follows:yugabyte=# select * from orders;
The following ouput demonstrates that the
table was automatically created and that it contains the record:id | product | quantity | price ----+---------+----------+------- 999 | foo | 100 | 50 (1 row)
Configuring the YCQL Sink Connector
You can configure the YCQL Sink connector as follows:
Create a file named
with the following content:{ "name": "example-cql-sink", "config": { "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector", "tasks.max": "2", "topics": "orders", "contactPoints": "localhost", "loadBalancing.localDc": "datacenter1", "topic.orders.demo.orders.mapping": "id=value.id, product=value.product, quantity=value.quantity, price=value.price", "topic.orders.demo.orders.consistencyLevel": "LOCAL_QUORUM" } }
In YugabyteDB, create a table with following schema:
ycqlsh> create table demo.orders(id int primary key, product varchar, quantity int, price int);
Load the connector by executing the following command:
curl -X POST -H "Content-Type: application/json" -d @/Users/me/development/play/confluent/confluent-6.0.1/yb-cql-sink-connector.properties "localhost:8083/connectors"
Query the YugabyteDB database to select all rows from the
table, as follows:ycqlsh> select * from demo.orders;
Expect to see the following ouput:
id | product | quantity | price ----+---------+----------+------- 999 | foo | 100 | 50 (1 row)
Shutting Down the Cluster
To shut down the YugabyteDB cluster, execute the following commands:
confluent local services stop
confluent local destroy