Kafka

Apache Kafka is a high-throughput distributed messaging system.

Overview

At first, system architectures start simple: we have some source system a target system and data is passed from source to target:

As requirements increase, the system gets increasingly complex and more source and target systems arise. In the worst case scenario, we have n source systems and n target systems where each source system must communicate with every target system. Fundamentally, this requires n^2 integrations:

Moreover, each integration comes with difficulties:

  • Protocol: how the data is transported (TCP, HTTP, REST, FTP, JDBC…)
  • Data format: how the data is parsed (Binary, CSV, JSON, Avro…)
  • Data schema and evolution: how the data is shaped and may change

Ultimately, this is error prone, time consuming, and not scalable.

Apache Kafka helps decouple data streams and systems. Going back to the previous example, using a Kafka centered architecture, if we still had n source systems and n target systems where each source system has to communicate with every target system we would only need n+n integrations:

About Kafka:

  • Created by LinkedIn, now open source and mainly maintained by Confluent
  • Distributed, resilient architecture, fault tolerant
  • Horizontal scalability:
    • Can scale to 100s of brokers
    • Can scale to millions of messages per second
  • High performance (<10ms – i.e., real time)

Use cases:

  • Messaging system
  • Activity tracking
  • Metric gathering from various locations
  • Application log gathering
  • Stream processing (e.g., with the Kafka streams API or Spark)
  • De-coupling of system dependencies
  • Integration with Spark, Flink, Storm, Hadoop, and other Big Data technologies

Kafka Theory

Topics, partitions and offsets

In Kafka, a topic is a particular stream of data. Assuming storage cost isn’t an issue, you can have as many topics as you want. Each topic is identified by a name. Topics can be split into one or more partitions (similar to database sharding). Partitions are ordered (e.g., partition 0, partition 1, partition 2, etc..). Each message within a partition gets an incremental id, called offset.

Note that offset and partition alone have no meaning. To identify a specific message (or “record”) in Kafka, you will need to know its topic, partition, and offset.

Example scenario:

Say you have a fleet of trucks and each truck reports its GPS position to Kafka. In this scenario, we can have a topic truck_gps that contains the position of all trucks. Each truck will send a message to this Kafka topic every 20 seconds. The message is composed of: truck ID, latitude, longitude. We can choose to create the topic with 10 partitions (completely arbitrary in this case). Having this topic, services can subscribe to it and use its data. For example, a location dashboard can use this data to show the truck’s position. A notification service might use this data to notify when a truck has been driving too long or if it might be low on gas.

Food for thought: why use Kafka in this scenario? Why not use RabbitMQ? What about AWS SQS?

Order in a Kafka topic is only guaranteed within a partition (not across partitions).

Data is kept only for a limited time (default is one week, however retention period can be modified). When retention period is reached and data is cleared, offsets remain (i.e., they DO NOT go back to 0).

After data is written to a partition, it cannot be changed (immutability).

Data is assigned randomly to a partition unless a key is provided.

Brokers and Topics

A Kafka cluster is composed of multiple brokers (servers). Each broker is identified by an ID (integer). Each broker contains some topic partitions but not all (Kafka is a distributed system afterall). After connecting to one broker from a cluster (called a bootstrap broker), you will be connected to entire cluster. A good number to get started is 3 brokers (will explain why 3 later). However, there are cases where clusters can reach over 100 brokers.

When you create a topic, Kafka will automatically distributed it across all your brokers. For example, if we have three brokers (e.g., broker 101, 102, 103) and we create a topic “A” with three partitions, then each broker will hold one partition. So broker 101 will hold topic A partition 0, broker 102 will hold topic A partition 1, broker 103 will hold topic A parition 2.

Continuing the previous example, lets say we also create another topic “B” with 2 partitions. If we also create a topic “B” but with two partitions, then broker 101 could hold topic B partition 1 and broker 102 would hold topic B partition 0. The idea is that partitions are distributed across brokers.

Topic Replication

The idea behind having multiple brokers and topic replication is to guarantee fault tolerance. Fault tolerance is the property that enables a system to continue operating properly in the event of the failure of some of its components.

With Kafka, we want to guarantee that the system continues to run as expected even if some brokers go down. It is recommended that topic replication should be > 1 (usually between 2 and 3) – 3 being the “gold standard”.

Topic replication is when a topic (and all it’s partitions) are replicated across brokers. This way if any one broker goes down, another broker can serve the data.

Replication factor determines the number of replications each partition has, this allows Kafka to automatically failover to these replicas when a server in the cluster fails so that messages remain available in case of failures

A broker can only host a single replica for a given topic partition – it gains us nothing to hold two copies of the same partition on the same broker.

As mentioned before, partition replicas are distributed across brokers and since one broker should keep one replica that means we can’t have more replicas than the number of brokers.

So if our cluster has 3 brokers, the maximum replication factor we can have is 3.

Therefore: max replication factor <= brokers number

This is also meant to determine min.insync.replicas which always must be less than or equal to replication-factor: min.insync.replicas <= replication factor

min.insync.replicas decide how many brokers that must acknowledge a producer when a message is sent with acks=all. min.insync.replicas increase the durability of your data since you will know that once the producer gets an acknowledge, you can be certain that the data is stored in configured numbers of brokers. However, this setting might completely stop your service during a rolling restart. If you have a cluster with three nodes and you set min.insync.replicas to three, your producers will require acks from three brokers, but this is not possible when one broker is down, which it is during the restart. You will not be able to produce any messages until the restart has finished. In these cases is a regular restart (where you are taking down all brokers at the same) faster than a rolling restart (source).

While deciding replication factor consider below points as well:

A): Broker Size Replication factor directly impacts the overall broker disk size

So a high replication factor requires more disk size

B)Large Number of Partition replication: In case of a large number of partitions replication extra latency is added.

Concept of Partition Leader

  • At any one time only ONE broker can be a leader for a given partition
  • Only that leader can receive and serve data for a partition
  • The other brokers will synchronize the data
  • Therefore, each partition has one leader and multiple ISR (in-sync replicas)

ZooKeeper is the one who decides which broker is a partition leader for which topic. If a broker goes down, then ZooKeeper will perform “elections” to decide the next bootstrap broker for the cluster as well as new partition leaders for each topic partition.

Producers and Message Keys

  • Producers write data to topics (which may be broken up into multiple partitions)
  • Producers automatically know which broker and partition to write to for a given topic
  • In case of a broker failure, producers will automatically recover
  • If a producer published a record without a key (key=null) then Kafka will write messages round robin to available partitions for the given topic (simple, automatic load balancing done by Kafka)
  • If the producer specifies a key when writing to a topic, then Kafka will always write records with this key to the same partition for this topic (Kafka uses a hashing mechanism based on given key and number of topic partitions to determine which topic to write to – similar to hashing concept in hashtable data structure)
  • key can be a string or a number
  • Producers can choose to receive acknowledgement of data writes
    • acks=0 : producer won’t wait for acknowledgement (possible data loss)
    • acks=1 : producer will wait for partition leader acknowledgement (limited data loss)
    • acks=all : partition leader + replicas acknowledgement (no data loss)

Very important: you only need to connect to one broker (any broker) and just provide the topic name you want to write to. Kafka Clients will route your data to the appropriate brokers and partitions for you!

Consumers

  • Consumers read data from a topic
  • Consumers automatically know which broker to read from (this is done for us “auto-magically” by Kafka)
  • In case of broker failures, consumers know how to recover
  • Data is read in order within each partition but read order is NOT guaranteed across partitions

Very important: you only need to connect to one broker (any broker) and just provide the topic name you want to read from. Kafka will route your calls to the appropriate brokers and partitions for you!

Consumer Groups

  • Consumers read data in consumer groups
  • A consumer can read data from multiple partitions but a partition can only have one consumer
  • If you have more consumers than partitions, some consumers will be inactive
  • Consumer groups is what enables parallel processing in Kafka
  • Because of consumer groups Kafka can be used either as a pub-sub or RabbitMQ queue like system
  • If we want our Kafka system to process messages once (similar to a queue architecture) then we can have only one consumer group per topic. If we want a pub-sub broadcast architecture, then we can have multiple consumer groups per topic and each consumer group will separately consume messages from that topic in parallel (see more on Hussein Nassar’s video on Kafka)

Consumer Offsets

  • Kafka stores the offsets at which a consumer group has been reading
  • The offsets are commited live in a Kafka topic named consumer_offsets
  • When a consumer in a group has processed data received from Kafka, it should be committing the offsets
  • If a consumer dies, it will be able to read back from where it left off thanks to the committed consumer offsets

Delivery Semantics for Consumers

  • Consumers choose when to commit offsets
  • At most once
    • Offsets are committed as soon as the message is received
    • If the processing goes wrong, the message will be lost
  • At least once
    • Offsets are committed after the message is processed
    • If the processing goes wrong, the message will be read again
    • This can result in duplicate processing of messages, so make sure you processing is idempotent (i.e., processing the message again won’t impact the result / outcome & your system)
  • Exactly once
    • Can be achieved using Kafka Streams API

Kafka Broker Discovery

  • Every Kafka broker is also called a bootstrap broker
  • This means you only need to connect to one broker and you will be connected to the entire cluster
  • Each broker knows about all brokers, topics, and partitions (metadata)

So workflow is as follows:

  1. Consumer submits connection + metadata request to some broker in the cluster
  2. Consumer received a list of all brokers
  3. Consumer can connect to the needed broker which is the partition leader for the topic and partition that the consumer wants to read from

Zookeeper

  • Zookeeper manages brokers (keeps a list of them)
  • Zookeeper helps in performing leader elections for partitions
  • Zookeeper sends notifications to Kafka in case of changes (e.g., new topic, broker dies, broker comes up, delete topics, etc…)
  • Zookeeper by design operates with an odd number of servers (3, 5, 7, …)
  • Zookeeper has a leader (handle writes) the rest of the servers are followers (handle reads)

Kafka Guarantees

  • Messages are appended to a topic-partition in the order they are sent
  • Consumers read messages in the order stored in a topic-partition
  • With a replication factor of N, producers and consumers can tolerate up to N-1 brokers being down (given that min.insync.replicas != N – in this case we would need to always have N brokers up to ack a given write)
  • Having a replication factor of 3 and a min.insync.replica of 2 would allow us to:
    • Allow for one brokers to be taken down for maintainance
    • Allow for one broker to be taken down unexpectedly
  • As long as the number of partitions remains constant for a topic (no new partitions), the same key will always go to same partition (due to hashing – record partition num. == key % num. of partitions)

Kafka CLI

Apache ZooKeeper is a required Kafka dependency.

To start ZooKeeper

In the Kafka installation directory (e.g., /usr/local/kafka_2.12-2.5.0), run:

zookeeper-server-start.sh config/zookeeper.properties.

To start Kafka

Assuming the Kafka binaries are in your $PATH, run:

kafka-server-start.sh server.properties.

If the Kafka binaries are not in $PATH, you need to navigate to the Kafka installation directory. In my case, I have Kafka installed here: /usr/local/kafka_2.12-2.5.0)

To create a Kafka topic

kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --create --partitions 3 --replication-factor 1

(Note: replication factor must always be less than or equal to the amount of brokers in our Kafka cluster – having replication factor > amt. of kafka brokers would mean that atleast two replicas are sitting on the same broker which gives no benefits and is a waste of space)

Explanation:

  • --bootstrap-server localhost:9092 points to a Kafka broker in our cluster. By connecting to any one of the available Kafka brokers we are also connecting to the entire Kafka cluster. (Any broker in our cluster can be considered a “bootstrap broker”)
  • --topic --create first_topic tells Kafka to create a topic named first_topic
  • --partitions 3 tells Kafka to split first_topic into 3 partitions (i.e., “shards”)
  • --replication-factor 1 requests a replication factor of 1

Again, note: replication factor cannot be greater than the number of brokers. If replication factor > number of brokers, one or more insync replica partition will be on the same broker as the lead partition. This is counter productive. The purpose of a replica partition is to be live if the lead partition dies.. if they are on the same broker (i.e., machine / server), then we loose both anyways.

To describe a Kafka topic

kafka-topics.sh --bootstrap-server localhost:9092 --topic second_topic --describe

To list all Kafka topics

kafka-topics.sh --bootstrap-server localhost:9092 --list

To purge (i.e., clean up) Kafka topic of records

Apache Kafka comes with bin/kafka-configs.sh script which provides many useful options to modify Kafka configurations. We will use -–alter and -–add-config retention.ms options to temporarily change the retention policy to 1 second which will delete all messages from a topic:

kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --add-config retention.ms=1000 --entity-name first_topic

Now, lets verify that retention period value was updated:

kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --describe --entity-name first_topic

or we can also use this command:

kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --describe

Running command above produced output below:

Topic: first_topic	PartitionCount: 1	ReplicationFactor: 1	Configs: cleanup.policy=delete,segment.bytes=1073741824,retention.ms=1000
	Topic: first_topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

This confirms retention.ms was correctly updated to 1 second.

Wait for a few seconds and the topic should clear of all existing records. We can confirm this by trying to consume data from the beggining of the topic:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --from-beginning

If data existed in this topic and our commands from above worked correctly, then the consumer should not have anything left to read.

Now remove the retention policy using --delete-config config which will set it back to default:

kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --delete-config retention.ms --entity-name first_topic

Read (this) for more information on deleting a topic or cleaning a topic manually see.

Kafka Producer

This section covers using the Kafka Client API in Java to write a simple producer.

Notes:

  • If you don’t call close(), depending on the implementation/language you might end up with resource/memory leaks

Kafka Producer Configurations

This section analyzes the various Kafka Producer configurations and what they mean (WIP).

About batch.size

The producer will attempt to batch records toghether into fewer requests when records are being sent to the same partition (i.e., records have same key). Again, batching applies only to a given partition. Records produced to different partitions cannot be batched together, though they could form multiple batches. This helps reduce amount of requests Producer has to make and decreases network traffic.

This configuration controls the default batch size in bytes.

No attempt will be made to batch records larger than this size.

Considerations:

  • A small batch size will make batching less common. Small batch size implies more network calls which implies additional latency and increased network traffic
  • A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records

Conclusion: batch.size needs to be tuned according to application requirements and expected behaviour. Basically it is a trade off between number of number of requests and throughput

About linger.ms

The Kafka producer groups together any records that arrive in between request transmissions into a single batched request. In some circumstances the producer may want to reduce the number of requests being sent across the wire. This setting accomplishes this by adding a small amount of artificial delay. So, rather than immediately sending out a record, the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This is analogous to Nagle’s algorithm in TCP.

This setting gives an upper bound on the delay for batching. So, once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will “linger” for the specified time waiting for more records to show up.

So, latency using linger.ms is introduced to increase the chances of batching. By introducing a small delay, we can send more messages in a batch thereby improving throughput and less latency as there are less number of request sent over to Apache Kafka (source).

About buffer.memory

The total amount of bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception.

This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.

Some confusion may arise regarding the difference between batch.size and buffer.memory:

batch.size only defines the buffer size Producer will use for batching records going to the same topic (i.e., ones with same key)

buffer.memory defines total memory the producer will use which among other things includes the batching buffers

About max.request.size

The maximum size of a Producer request in bytes. This is effectively a cap on the maximum record batch size.

Misc

In Kafka Producer configurations, what is the expected behavior if linger.ms is set to 0 and batch.size to a non-zero value?

In this case, since linger.ms is set to 0, some may expect that the Producer will wait until batch.size is reached. However, this is not true.

By default (linger.ms=0), the Producer will send messages as soon as there is a sender thread available to send them, even if there’s just one message in the batch.

So let’s say we have defined a batch size of 10kb, then messages can be buffered in this batch until either 10kb is reached or a sender thread becomes available. If a sender thread becomes available at 5kb, it will just send these 5kb worth of messages. Otherwise at 10kb it will anyways try to send explicitly. If we had linger.ms, then producer will wait at least that amount of time even if sender thread became available earlier and batch size is not reached.

Kafka Controller

In a Kafka cluster, each Kafka broker is a bootstrap broker, however there is only one Kafka controller. A Kafka controller is responsible for managing the states of partitions and replicas and for performing administrative tasks like reassigning partitions. If the Kafka controller goes down, then ZooKeeper performs elections to reassign a different broker as the controller (read more here).

Good reads:

  • When to use RabbitMQ vs Kafka: (link)
  • https://www.sderosiaux.com/articles/2017/08/07/looking-at-kafka-s-consumers-offsets/
  • https://dzone.com/articles/kafka-internals-consumer
  • https://stackoverflow.com/questions/42015158/what-is-the-difference-in-kafka-between-a-consumer-group-coordinator-and-a-consu#:~:text=The%20group%20coordinator%20is%20nothing,coordinator%20will%20trigger%20a%20rebalance.
  • https://stackoverflow.com/questions/32390265/what-determines-kafka-consumer-offset

My Understanding of Kafka __consumer_offsets

The purpose of the __consumer_offsets topic is to keep the latest consumed offset per (groupId, topic, partition), which is why the key is the combination of them. Because of compaction, only the latest value will be saved into Kafka’s data, the past offsets are useless.

Kafka scales topic consumption by distributing partitions among a consumer group, which is a set of consumers sharing a common group identifier. The following diagram depicts a single topic with three partitions and a consumer group with two members:

(include diagram here)

For each consumer group, a broker is chosen as a group coordinator.

The consumer group coordinator is one of the brokers while the group leader is one of the consumer in a consumer group.

When a consumer wants to join a consumer group, it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the group leader. The leader receives a list of all consumers in the group from the group coordinator (this will include all consumers that sent a heartbeat recently and are therefore considered alive) and it is responsible for assigning a subset of partitions to each consumer. It uses an implementation of the PartitionAssignor interface to decide which partitions should be handled by which consumer.

After deciding on the partition assignment, the consumer leader sends the list of assignments to the GroupCoordinator which sends this information to all the consumers. Each consumer only sees his own assignment - the leader is the only client process that has the full list of consumers in the group and their assignments. This process repeats every time a rebalance happens.

The group coordinator is responsible for:

  • Managing consumer group state (e.g., checking all consumers are healthy, etc..)

My understanding is that once consumer group state changes, the group coordinator informs the group leader of the change and the leader proceeds to perform group rebalancing: - A new consumer is spawned - An old consumer goes down - A topic meta data changes

The process of reassigning partitions to consumers is called consumer group rebalancing.

When a group is first connected to a broker:

  • Consumers start reading from either the earliest or latest offset in each partition based on the configuration auto.offset.reset
  • Messages in each partition are then read sequentially.
  • The consumer commits the offsets of messages it has successfully processed

In the following figure, the consumer’s position is at offset 6 and its last committed offset is at offset 1:

(include diagram here)

When a consumer group is rebalanced, a new consumer is assigned to a partition:

  • It starts reading from the last committed offset
  • It reprocesses some messages if the old consumer has processed some messages but crashed before committing the offset of the processed messages

In the above diagram, if the current consumer crashes and then the new consumer starts consuming from offset 1 and reprocesses messages until offset 6. Other markings in the above diagram are:

  • Log end offset is the offset of the last message written to the partition
  • High watermark is the offset of the last message that was successfully replicated to all partition replicas

Kafka ensures that the consumer can read only up to the high watermark so that we don’t consume data that may be lost if some broker goes down.

Example:

  1. Assume we have a topic foo with three partitions
  2. Two consumers come up (A and B) and both are in same consumer group (groupId: fooConsumers)
  3. A broker is chosen as the group coordinator for this consumer group
  4. A and B communicate with broker coordinator, send JoinGroup request, and pass health checks
  5. A, the first consumer to join the group, becomes the group leader
  6. A performs group rebalancing and assigns partitions for each consumer (so far just A and B). A will consume from partition 1 and B from partitions 2 and 3
  7. After deciding on the partition assignment, the consumer leader (A) sends the list of assignments to the group coordinator broker
  8. Group coordinator broker received this information and sends it to all consumers
  9. Each consumer only sees his own assignment
  10. A starts consuming data
  11. A commits its offset data to __consumer_offsets. This is a record with key containing: (groupId, topic, partition) and data:
{
    "topic":"mytopic1",
    "partition":11,
    "group":"console-consumer-26549",
    "version":1,
    "offset":95,
    "metadata":"",
    "commitTimestamp":1501542796444,
    "expireTimestamp":1501629196444
}

So the key could be include: (fooConsumers, foo, 1).

  1. B starts consuming data from partition 2 and 3
  2. B commits its offset data to __consumer_offsets. Since B is consuming from two partitions, this could be two records with keys:
    • (fooConsumers, foo, 2)
    • (fooConsumers, foo, 3)
  3. Now assume a new consumer (C) enters the consumer group
  4. The group coordinator will capture this state changea and notify the group leader
  5. The group leader will perform a group rebalance
  6. Now A will continue to consume from offsets defined in __consumer_offsets topic with record with key (fooConsumers, foo, 1), B will get (fooConsumers, foo, 2), and C will get (fooConsumers, foo, 3)

When you run a consumer with auto.offset.reset=latest, if no committed offsets exist, the consumer will restart consuming from the end of the log. See more here: https://stackoverflow.com/questions/56172742/consumer-id-and-group-id-in-kafka-what-makes-two-consumers-the-same also https://stackoverflow.com/questions/34550873/difference-between-groupid-and-consumerid-in-kafka-consumer

How to kill ZooKeeper:

  1. Get PID of lsof -i tcp:2181
  2. kill -9 <PID>, or, in one command:
  3. `kill -9 $(lsof -ti tcp:2181)

To delete a topic:

  1. Set delete.topic.enable=true in Kafka server.properties
  2. Run kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic <topic_name>
  3. Read more: https://contactsunny.medium.com/manually-delete-apache-kafka-topics-424c7e016ff3

To alter partition count of an existing topic:

  1. kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic my_topic_name --partitions 3
  2. Read more: https://stackoverflow.com/questions/33677871/is-it-possible-to-add-partitions-to-an-existing-topic-in-kafka-0-8-2

Consume from beginning partition 0 from PRICING_FEED topic with String key, Double value and, and print the key:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic PRICING_FEED --partition 0 --value-deserializer "org.apache.kafka.common.serialization.DoubleDeserializer" --from-beginning --property print.key=true --key-deserializer "org.apache.kafka.common.serialization.StringDeserializer"

Written on June 1, 2020