Skip to content

Commit

Permalink
Merge pull request collabnix#127 from Phantom-Intruder/Kafka-fixes
Browse files Browse the repository at this point in the history
Kafka fixes
  • Loading branch information
collabnix authored Jan 20, 2023
2 parents f118be1 + e24a89e commit dc74933
Showing 1 changed file with 98 additions and 11 deletions.
109 changes: 98 additions & 11 deletions Logging101/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ A topic is simply an ordered list of events. Unlike database records, logs are u

Topics aren't static and can be considered to be a stream of data. This means that topics are being expanded as new events get added in. If you consider a large system, there may be hundreds of such topics that maintain logs for all sorts of events in real-time. I think you can already see how real-time data being appended into a continuous stream can be a gold mine for data scientists or engineers looking to perform analysis and gain insights from the data. So, this naturally means that entire services can be introduced simply to process or simply to visualize and display this data.

## Kafka brokers

Kafka brokers can be likened to worker nodes in a Kubernetes cluster. Each broker is a single Kafka server, and the number of brokers can be scaled up infinitely depending on the amount of data that needs to be streamed. All these brokers together are called a Kafka cluster. While this architecture looks a lot like the Kubernetes architecture, there is one major difference. In a Kubernetes cluster, the client would interact with the Kube API in the master node, which would then get the scheduler to schedule pods. In the case of a cluster, there is no master node, and the client may contact any of the brokers directly. Each broker contains information about all the other brokers and is able to act as an intermediary for the client. Due to this, the brokers are also known as bootstrap servers, which is a command you will be seeing used a lot later in this lesson.

## Kafka Connect

While Kafka was initially released in 2011, it really started gaining popularity in recent years. This means that many large businesses which already had large quantities of data and processes on how the data was handled would have a hard time switching to Kafka. Additionally, some parts of the system may never be converted to Kafka at all. Kafka connect exists to support these kinds of situations.
Expand All @@ -28,61 +32,144 @@ Now, what happens if your use case is so specific that there are no existing con

Since Kafka depends on Java, make sure that you first have Java 8 installed.

Now that you are armed with an overall knowledge of Kafka, let's see about setting it up. Depending on your use case, the way you would get started varies, but of course, the first thing is to [download Kafka](https://www.confluent.io/get-started/?product=software). The Confluent version of Kafka is one of the best options since it is well tested, and has plugins such as a REST proxy, connectors, etc... You can also choose to get the [Apache version](https://www.apache.org/dyn/closer.cgi?path=/kafka/3.2.0/kafka_2.13-3.2.0.tgz) instead and the installation instructions are still the same. Once you have the download, simply unzip it somewhere. Inside the bin folder, you will find Kafka-server-start.sh, which is the entry point of the program. Run it with:
Now that you are armed with an overall knowledge of Kafka, let's see about setting it up. Depending on your use case, the way you would get started varies, but of course, the first thing is to [download Kafka](https://www.confluent.io/get-started/?product=software). The Confluent version of Kafka is one of the best options since it is well tested, and has plugins such as a REST proxy, connectors, etc... You can also choose to get the [Apache version](https://www.apache.org/dyn/closer.cgi?path=/kafka/3.2.0/kafka_2.13-3.2.0.tgz) instead and the installation instructions are still the same. Once you have the download, simply unzip it somewhere. Inside the bin folder, you will find Kafka-server-start.sh, which is the entry point of the program. A full guide on setting up Kafka on Linux systems can be found in a [guide](https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-20-04) provided by DigitalOcean. It's best to follow it until step 5 of the guide. However, note that the version of Kafka used is slightly outdated, so keep that in mind when running this line:

```
$ bin/kafka-server-start.sh config/server.properties
curl "https://downloads.apache.org/kafka/2.6.3/kafka_2.13-2.6.3.tgz" -o ~/Downloads/kafka.tgz
```

Instead of getting kafka version 2.6.3, go to the [Kafka download page](https://kafka.apache.org/downloads) and use the latest version available.

That's all that takes to get a basic Kafka environment up and running. Let's move on to creating topics. As we've discussed, topics are a stream of events and are the most fundamental part of Kafka. You start it with:

```
$ bin/kafka-topics.sh --create --topic <topic-name> --bootstrap-server localhost:9092
~/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic collabnix
```

This will create a topic with the name you provide. The bootstrap-server option here is where your resource gets its initial data from, and further configuration related to this server can be found in the ```bootstrap.servers``` value of the ```consumer.properties``` file.
This will create a topic with the name you provide. You can see the bootstrap-server option being used here. This is used to connect to a broker so that metadata about the other brokers can be pulled. Further configuration related to this server can be found in the ```bootstrap.servers``` value of the ```consumer.properties``` file.

List out the topics and ensure that the topic is created:

```
~/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
```

Now that your topic is up, let's start logging events to the topic. For this, you need to run the producer client that will create a few logs and write them into the topic you specify. The content of these logs can be specified by you:

```
$ bin/kafka-console-producer.sh --topic <topic-name> --bootstrap-server localhost:9092
~/kafka/bin/kafka-console-producer.sh --topic collabnix --bootstrap-server localhost:9092
LoggingLab101 first event
LoggingLab101 second event
```

You can continue typing in newline separate events and use Ctrl + c to exit out. Now, your topic has events written into it, and it's time to read these events. Do so with the consumer in the same way as you did with the producer:

```
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
~/kafka/bin/kafka-console-consumer.sh --topic collabnix --from-beginning --bootstrap-server localhost:9092
```

You should see the output:

```
>LoggingLab101 first event
>LoggingLab101 second event
```

Once you have verified that Kafka is working well, it's time to start using Kafka connect. The file you are looking for here is called ```connect-file-<version>.jar```, and this file needs to be added to the ```plugin.path``` variable within the ```config/connect-standalone.properties``` file. If this variable does not exist, create it:
This output is a continous output, and will keep looking for messages produced. To test this out, open a new terminal, log into the Kafka user, and run the producer command again. Everything you enter here will be reflected in the consumer output.

Once you have verified that Kafka is working well, it's time to start using Kafka connect. The file you are looking for here is called ```connect-file-<version>.jar```, and this file needs to be added to the ```plugin.path``` variable within the ```config/connect-standalone.properties``` file. Open up the `config/connect-standalone.properties` file with any text editor and add the location of the jar, like so:

```
echo "plugin.path=libs/connect-file-3.2.0.jar"
plugin.path=libs/connect-file-3.3.1.jar
```

If this variable does not exist, create it.

Now, you need some sample data to test with, and you can simply create a text file for this:

```
echo -e "foo\nbar" > test.txt
```

Next, it's time to fire up the Kafka connectors. We will be using two in this example, and the file you need to execute to start this is the ```bin/connect-standalone.sh```. The arguments that you need to pass to it are the properties file you just modified, the properties file of the data source (input) and the properties file of the data sink (output), both of which are already provided by Kafka:
Next, it's time to fire up the Kafka connectors. We will be using two in this example, and the file you need to execute to start this is the ```bin/connect-standalone.sh```. The arguments that you need to pass to it are the properties file you just modified, the properties file of the data source (input) and the properties file of the data sink (output), both of which are already provided by Kafka. Note that you **must** run the command while in the kafka directory so that the relative paths are available:

```
cd ~/kafka
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
```

The connectors provided by Kafka will now start reading and writing lines via the Kafka topic. In this case, the connection that Kafka connect makes is between the input text file, the Kafka topic, and the output text file. Of course, this is a very basic usage of Kafka connect, and you will most likely be using custom-written connectors that read from all sorts of inputs and are written to any number of outputs, but this small example shows Kafka connectors at work. You can verify that the data was indeed handled properly by looking at the output file (```test.sink.txt```). Since the topic that was used still has the data, you can also go ahead and run the previous command we used to read data from the topic:

```
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
```

Appending a line to the input txt should also show it in the consumer.

Another topic we briefly touched on was [Kafka streams](https://kafka.apache.org/documentation/streams/), and you can find a [starter example](https://kafka.apache.org/quickstart#quickstart_kafkastreams) provided by Apache that you can look through if you are planning on using this part of Kafka.
## Kafka partitions

A concept that we now need to touch on is Kafka partitions. Imagine you have a Kubernetes cluster that is continuously growing. You start with 1 worker node and soon, it isn't enough to run all your pods. So you provision a second worker node that starts initializing pods in it. In the same way, if you have only one Kafka broker in your Kafka cluster and it isn't enough to handle the number of logs passing through, you can introduce a second broker that will take on the load. Now that the resource problem has been taken care of, a different problem arises with the size of the topics. Log files can be huge, and there are a large number of these files piling in through to a single topic. However, if a topic is constrained to a single broker, then the topic can never exceed the limits of the broker.

This is where partitions come in. Topics are allowed to be distributed across multiple brokers while the part of the topic present in a broker is called a partition. These partitions can then be replicated onto other brokers to provide redundancy. It also means that a single topic can be infinitely large and have as much data flowing in to it from a producer. In the same way, if a consumer wants to read data, it can be read from multiple partitions across different brokers which would significantly improve the read speed.

## Kafka Streams

Now that we have explored Kafka topics as well as the consumer-producer model, let's move on to a different concept: [Kafka streams](https://kafka.apache.org/documentation/streams/).

### What is Kafka Streams?

So far, we've looked at how data can be moved in and out of Kafka. You simply use the producer and consumer to read and write data to and from topics. You also saw how to use connectors which are predefined Java classes that help you move logs around specific data sources and sinks without having to spend time writing the code yourself. Moving data around like this is a common practice, but it isn't the only thing Kafka is good for. What if you wanted to transform the data as it passed through?

Imagine you are running a Kubernetes cluster that regularly creates and destroys pods periodically. Every time a pod is created, metadata about that pod is transferred through Kafka. Now imagine you want to separate out the pods that failed the readiness probe. This information would be passed through Kafka, and having a validation within Kafka itself would be the most efficient way to identify these logs and have them sent to a separate topic for further analysis. If you were to do this simple validation by creating your own validation class, you would first have to create consumer and producer objects, subscribe the consumer to the events, start a loop that runs forever and repeatedly validates each log manually, and do all the error handling by yourself. The resulting class would be about 50 lines long and prone to throwing random errors.

This is a lot of work, but is completely unnecessary thanks to Kafka Streams. Kafka Streams is a Java library that allows you to re-implement the whole thing in a single line stream statement. This is a more declarative way of doing things where you define what you want done without specifying what needs doing every step of the way.

### Kafka Streams lab

Since you already have Kafka running, go ahead an create two new topics. One will be the input topic and the other will be the output topic.

```
~/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic collabnix-streams-input
~/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic collabnix-streams-output --config cleanup.policy=compact
```

Make sure that the topics have been created:

```
~/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
```

You can also replace `--list` with `--describe` to get detailed information about each topic.

Now, start the word count application. This is an application created by Apache to demo the Streams library. It reads any data that you pass in from the producer, processes it so that the count of each word passed in is calculated, and outputs the results to the consumer. Start it with:

```
~/kafka/bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
```

Open a new terminal instance and start the producer and specify the topic that you created earlier with:

```
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic collabnix-streams-input
```

Open another terminal instance and start the consumer:

```
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic collabnix-streams-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
```

Now, go back to the producer terminal and type in a test sentence. If everything has been set up correctly, you should be able to see the words grouped into occurrences printed out in the consumer terminal. Earlier in the lesson, we did the same thing with the consumer and producer but didn't run a class with the streams library. This meant that anything entered into the producer was output as is in the consumer. Now that a class has been introduced in the middle with the Streams library, the input gets transformed before being output.

## Teardown Kafka

Now, since you have a Kafka environment up and running along with Kafka connectors, feel free to play around with the system and see how things work. Once you're done, you can tear down the environment by stopping the producer and consumer clients, Kafka broker, and the Zookeeper server (with ```Ctrl + C```). Delete any leftover events with a recursive delete:

Expand Down

0 comments on commit dc74933

Please sign in to comment.