Apache Kafka

From Quickstart to Reality with Ansible and Confluent

By: Ian Whitney


Ever since reading Desgining Data Intensive Applications I’ve looked for any excuse to use Apache Kafka. I experimented with the Kafka quickstart and ran some small-scale proof-of-concept projects. When a real project came along that was a perfect fit for using Kafka, I jumped at the chance.

Which is when I learned that there’s a large difference between tinkering with Kafka on my laptop and deploying it to a bunch of real servers. And when I went looking for guides to moving from Quickstart Kafka to Real Kafka, I couldn’t find much. So I wrote my own! The following post covers my use of Confluent and Ansible to configure and automate a multi-host deployment of:

If you’re interested in learning about the Whys or the Whens of using Kafka there are other blog posts (or books, such as Designing Data Intensive Applications) that cover those topics in detail. If you’re interested in a quickstart, then I recommend trying out Confluent’s quckstart. But if you’re looking for how to move beyond experimentation in to a real running Kafka system, read on.

The Preamble

In the steps below I’ll be using Ansible to deploy the Confluent OSS platform to a collection of Red Hat 7 hosts. Most of what I discuss should apply to other operating systems or configuration automation systems.

Feel free to use any code I provide! But be aware that everything is specific to UMN servers and your infrastructure may be different.

The Playbooks

The repo https://github.com/umn-asr/kafka_ansible_demo_playbooks contains the Ansible playbooks that I used.

The inventory file contains the (fake) host names and groups them by playbook.

We’ll discuss the playbooks in the same order they are run.

  1. Confluent
  2. Zookeeper
  3. Kafka
  4. Schema Registry
  5. Connect

I’ll describe what each playbook does and how you can test that it ran successfully.

Confluent

The first playbook is confluent.yml which, unsurprisingly, installs the Confluent OSS Platform

Confluent provides great instructions for installing Confluent on Red Hat machines, so this playbook was mostly a matter of converting their instructions into Ansible commands.

We’re using the Ansible Galaxy geerlingguy java role to get OpenJDK installed on the server.

Testing Confluent

Once run, you should be able to connect to your host(s) and run confluent and get see the Confluent usage instructions

Zookeeper

Kafka requires Zookeeper to run. You can run with a single Zookeeper process, but that defeats the distributed, redundant nature of Zookeeper. A collection of Zookeeper processes is called an “Ensemble”. Ensembles usually contain an odd number of processes. We are starting with a 3-member ensemble.

The zookeeper playbook configures a few things that all Zookeeper processes need.

Open ports 2181, 2888 and 3888

Port 2181 is the default port used by Zookeeper clients, such as Kafka. We need to open this port to all hosts that are running client processes.

Zookeeper uses ports 2888 and 3888 for Leader election. We need to open this port to all hosts that are running members of our Zookeeper ensemble.

We use a custom Ansible role – ipset – to manage IP Sets. Ansible Galaxy contains a role for managing ipset that can do the same thing, though the syntax will be a little different.

A unique ID number for each Ensemble member

You’ll need to uniquely identify each Zookeeper ensemble member with an integer between 1 and 255. We’ll be using this ID number a bunch, so I made it a host variable in the inventory file.

A Zookeeper Config File

Confluent creates a default Zookeeper configuration file in /etc/kafka/zoookeeper.properties. The default configuration won’t run an ensemble, so we have Ansible create a new one, using the template in files/zookeeper/zookeeper.properties.j2

The Zookeeper admin site describes these settings in detail, but here’s a quick summary.

tickTime=2000 means that a single tick is equal to 2000 milliseconds. initLimit and syncLimit set restrictions on connecting to and synchronizing with the Ensemble Leader. The numbers are in ticks, so our initLimit is 5 ticks or 10 seconds.

Zookeeper stores its files in dataDir.

Zookeeper processes will listen for clients on clientPort. We’ve set this to 2181 to match the port we opened in the first step.

The loop at the end of the template declares all the ensemble members and where they live. For our 3-host ensemble, it’ll look like this when deployed to the 2nd of our 3 Zookeeper hosts:

server.1=zk-01.umn.edu:2888:3888
server.2=0.0.0.0:2888:3888
server.3=zk-03.umn.edu:2888:3888

Each line follows the pattern server.[id]=[host]:[port1]:[port2]

The server.[id] section uses the same zookeeper_id we use to uniquely identify each Zookeeper process.

The [host] section declares the host running that ensemble member.

The [port1] and [port2] declare which ports to use for leader election. We opened these ports earlier in the playbook.

But why is the [host] section 0.0.0.0 for server.2?

Zookeeper documentation uses examples that look like this:

server.1=zk-01.umn.edu:2888:3888
server.2=zk-02.umn.edu:2888:3888
server.3=zk-03.umn.edu:2888:3888

But when I followed that example Zookeeper would fail. Each Zookeeper process was unable to communicate with the other two.

Some helpful folks at the UMN helped me discover that our Zookeeper processes were listening on the correct ports, but only for internal requests. Requests from other servers were being ignored.

After even more help (and a lot of Googling, experimentation and cursing) I found this answer on Stack Overflow

Once I updated my template to use 0.0.0.0 for the current host, my Zookeeper processes were able to communicate. Yay!

I suspect the root cause of this behavior is something in /etc/hosts, but I’m not sure yet.

A file that contains their ID number

Each Zookeeper process needs a file that contains their ID number so that they know which member of the ensemble they are. This file is called myid and must live in the Zookeeper dataDir.

Testing Zookeeper

Those steps should be all you need, but we still want to see Zookeeper in action. For each host:

If Zookeeper is especially misconfigured, it’ll quit. Or it might run while complaining about not being able to connect to the other members of the ensemble. If it does neither of those things, you’re probably good.

Once Zookeeper is running, you can ask each member of the ensemble how it’s doing. Look at the Four Letter Words section of the Zookeeper Admin Guide for details. stat is a particularly helpful command because it will tell you who is the current Leader of the ensemble, allowing you to verify that Leader election is working:

echo stat | nc localhost:2181

Kafka

Like Zookeeper you can run a single Kafka broker. But doing so reduces the redundancy and safety of your system. A collection of Kafka processes are a “cluster”. Clusters work best with an odd number of members. We chose to start with a cluster of 3 Kafka brokers.

The kafka playbook contains two tasks:

Open ports for Kafka

Kafka brokers communicate over port 9092. We need to open this port to all hosts that will be connecting to our Kafka cluster. Using the same ipset role that we used before does the trick.

Configure Kafka brokers

Confluent creates a default Kafka configuration file in /etc/kafka/server.properties. But the file configures Kafka for local development. Our template file changes the defaults to allow multi-host deployments.

Building the zookeeper.connect string is the oddest part of the playbook. We need to turn a collection of hosts like:

zk-01.umn.edu
zk-02.umn.edu
zk-03.umn.edu

Into a comma-separated string of host:port values like:

zk-01.umn.edu:2181,zk-02.umn.edu:2181,zk-03.umn.edu:2181

I found this tricky to do in Ansible/Jinja2. After a lot of searching in Stack Overflow I came up with a two step approach.

Step one turns the list of hosts into a collection of facts that contain host:2181


- set_fact:
    host_and_port: ':2181'
  with_items:
    {{ groups['zookeeper'] }}
  register: zookeeper_servers

Step two is in the template. It takes our zookeeper_servers facts, pulls out the host_and_port value and then joins them into a comma-separated string.

zookeeper.connect={{ zookeeper_servers.results | map(attribute='ansible_facts.hosts_and_ports') | join(',') }}

The final result is what we need:

zookeeper.connect=zk-01.umn.edu:2181,zk-02.umn.edu:2181,zk-03.umn.edu:2181

Testing Kafka

As with Zookeeper we want to see our Kafka brokers run. First, follow the steps to start up your Zookeeper ensemble. Then, on each host:

Like Zookeeper the Kafka brokers will quit if misconfigured too badly. In other cases they may keep running while spitting out errors. If they do neither then they are probably fine.

A quick test you can run is to ask each broker its topic list. Run this on each host:

kafka-topics --list --zookeeper zk-01.umn.edu:2181

This will show you the topics that the Kafka broker on the host knows. The topics will be the same for each broker. If they aren’t, something has gone awry.

Schema Registry

Many articles about Kafka in production contain a regretful note that goes something like:

We didn’t start out with Avro schemas, just unstructured JSON. This quickly became a problem and we had to [painfully] move to Avro.

I understood their pain and wanted to avoid it. JSON is hard to govern, validate, and evolve. Avro and the Schema Registry solve those problems. We are starting off with the Confluent Schema Registry and will be using Avro encoding on everything.

Our project is small, so we only need one Schema Registry server. I chose to run it on one of the hosts that is already running Zookeeper/Kafka but running it on its own host also makes sense. If you run it on a separate host be sure to install Confluent on the host first.

The schema-registry playbook performs three tasks

Open Ports

I chose to run my Schema Registry server on port 8080. The playbook opens that ports to all members of the Kafka cluster. ipset to the rescue again.

Upload Schema Registry config file

Confluent creates a default Schema Registry configuration file in /etc/schema-registry/schema-registry.properties. We replace it with our template file. Our template file declares the port of 8080 and the now-familiar Zookeeper connection string.

Upload a Avro configuration file

This file is actually used by the Connect process, which we’re configuring next. But the contents of the file are only concerned with Schema Registry details, so I chose to configure it in the Schema Registry playbook.

Confluent will create two default Avro config files for you

Our template file replaces the standalone file, since that’s what we currently use.

The template file is like the others we’ve seen. It:

But down at the bottom is what seems to be a minor thing

plugin.path=/usr/share/java

This is the directory where Confluent puts a lot of JAR files and those files are not loaded by default. If you do not declare this setting you may end up with an error like

ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:108) java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches io.confluent.connect.jdbc.JdbcSourceConnector

This error haunted me for a day before I figured it out. Afterwards I found this documentation about plugin.path and its usage. But it is very hard to link the error you get to this setting.

Testing Schema Registry

As with Zookeeper and Kafka we want to see Schema Registry run for real. First, start up your Zookeeper ensemble and Kafka cluster. Then:

It should say that it’s listening on 8080 (or whatever port you chose).

You can test it by using cURL (or equivalent) from any host that should have access to the Schema Registry.

curl http://zk-02.umn.edu:8080/subjects

You should get [] as the response body, because you haven’t yet put any data into your registry.

Connect

Not all Kafka deployments will need Connect but it’s a big part of our platform. It was also one of the trickier bits to set up (see the Avro configuration details above). Hopefully I can save someone some time by documenting our steps.

Our connect playbook performs two tasks.

Upload a JDBC Driver

This step is only necessary if you need a JDBC driver that Confluent does not provide. We’re connecting to a SQL Server DB which means we need to provide Microsoft’s JDBC driver. You can upload it anywhere but we are putting in the same directory as the Confluent-provided drivers which is also part of the plugin.path that we declared in the Avro configuration file.

Configure our Connect Worker

Unlike previous steps we are not overwriting an existing configuration file here. We are creating our own, using this template file. Every Connect configuration will vary as this file describes what database to connect to, how to identify new records, what tables to watch, etc.

More details on configuring a Connect worker are at the Confluent web site

Andrew Zenk, awesome member of the UMN Ansible community, added the LastPass lookup feature to Ansible in 2.3. We use it in the playbook to set our database connection variables to the credentials stored in LastPass.

Testing Connect

With the configuration in place we want to see the the Connect worker in action. First, start your Zookeeper, Kafka and Schema Registry processes. Then:

It takes about 10 seconds for the Connect worker to get up and running. If it errors it will quit.

You may see an error saying that “no driver can be found”. You may need to declare the plugin.path variable in your Connect configuration as well. Setting it to the same value as the plugin.path in the Avro configuration file may work, or you may need to set it to the exact location (including file name) of your JDBC driver. I’ve had varying results here, so I’m not exactly sure what’s going on.

To see if your Connect worker is really working, you can use the same test we ran when testing our Kafka brokers to see what topics exist. You should see your Connect worker’s topic(s) in the list.

You can also check to see how many messages are in a topic with

kafka-run-class kafka.tools.GetOffsetShell --broker-list zk-01.umn.edu:9092 --topic your_topic_name

It will return your_topic_name:0:XXXX where XXXX is the number of messages in the topic.

Conclusion/Next Steps

While it’s very easy to get Confluent running for local development, configuring and deploying a real-world example takes some work. Even after all of the above we’re not done yet. Up next are important operational concerns such as:

And more, I’m sure. ASR Custom Solutions is still in early days with Kafka and there is much more to learn.