11 min read

An introduction to Redpanda: Creating a chat application in Python in less than 100 lines of code.

A great challenge engineers — especially beginners — face is learning how to use a new piece of tech. This article is here to help!
An introduction to Redpanda: Creating a chat application in Python in less than 100 lines of code.
A Red Panda developing software, as imagined by DALL-E and completely unrelated to Redpanda, the streaming data platform

One of the biggest challenges software engineers — especially beginners — face is learning how to use a new piece of technology. This is to a large extent true for streaming data platforms, as they are considered advanced by nature.

In this tutorial, we will create a simple terminal-based chat application in Python using Redpanda. We will cover the basics of Redpanda and how to use Python to create a producer, a consumer, and even topics, which will be used as the central log to store our messages.

To communicate with Redpanda from Python we will use the kafka-python library. Because Redpanda offers an Apache KafkaⓇ-compatible API, we can use any Kafka client to interact with it.

Our final chat application demonstrated in two side-by-side terminals

All the code shown in the article is available in this repository. Let’s dive in!

The Broker

At the center of Redpanda is the broker. It is responsible for storing messages in topics, which are further divided into partitions. A partition is a sequence of messages that are appended to and read from in order. A topic can have one or more partitions. A cluster is a group of brokers that work together to provide redundancy and high availability of the partitions. Each partition is replicated to one or more nodes in the cluster. This way, if a node goes down, the data is still available for consumption and production, without skipping a beat.

To start a Redpanda cluster (of one broker, which is enough for development), the easiest way is to use rpk which is a command line tool created to configure, manage, and tune Redpanda clusters.

Make sure you have Docker installed and running on your machine before continuing.

Redpanda is loved by developers and architects for its ability to start up a local cluster very quickly with just the following command:

rpk container start -n 1

After a few seconds (rpk might need to download the latest Redpanda image) you should see a message that says:

Starting clusterWaiting for the cluster to be ready...  NODE ID  ADDRESS            0        127.0.0.1:57232  
Cluster started and exposed on the ip:port! You may use `rpk` to interact with it outside of your docker container. E.g:
rpk cluster info --brokers 127.0.0.1:57232

The cluster is now ready to use. As the message says, you can check the status of the cluster by running:

rpk cluster info --brokers 127.0.0.1:57232

Which should return:

BROKERS=======ID    HOST       PORT0*    127.0.0.1  57232

Great! Now that we have a cluster up and running, let’s see how to interact with it from Python. Make note of the HOST and PORT values. We'll need them later in order to connect to our cluster.

The Python client

As we mentioned earlier, to interact with Redpanda from Python, we’ll use the kafka-python. To install it, run:

pip install kafka-python

By design,kafka-python is very similar to the official Java client, with some extra Pythonic stuff included, such as iterators for consumers (we'll see this in action in the Consumers section!).

The Producer

A producer is a client application that sends messages to a Redpanda cluster, which are stored in so-called topics.

For more advanced producer configuration options, see this producer deep dive article.

With the kafka-python library, we can create a simple producer like this:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers="127.0.0.1:57232")

The bootstrap_servers parameter is a list of brokers that the producer will use to connect to the cluster, in a large cluster you simply need to provide one or a few broker addresses and they will provide back a list of all of the brokers to connect to if you specify several bootstrap servers it simply provides a means for redundancy in case one of them is down. In our case we only have one broker, so we can just use the HOST and PORT values we noted down earlier.

If you run this snippet, the output will be nothing, which is expected. The producer is created but it doesn’t do much yet.

Let’s send a message to a topic. We can do this by calling the send method on the producer:

producer.send(topic='my-topic', value=b'Hello, world!')

The value parameter is self-explanatory, it's the actual string that we want to store in Redpanda. The type of the value has to be bytes (or has to be serializable to bytes by a custom serializer, but this is out of the scope of this tutorial, so let's just stick with a byte-string for now).

But wait, what is my-topic? We haven't created any topics yet. Let's do that now. We could do it from the command line using rpk, but we can also do it from Python, which is the more interesting part for us!

The Topics

A topic is a log of messages that are stored in Redpanda. Each topic has a unique name and can have 1 or more partitions.

To create a topic in Python, we can use the KafkaAdminClient class from kafka-python:

from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(bootstrap_servers='127.0.0.1:57232')
admin_client.create_topics(new_topics=[NewTopic(name='my-topic', num_partitions=1, replication_factor=1)])

There are a few things to note here. First, we are using the KafkaAdminClient class to create the topic. This allows us to communicate with the Admin API of Redpanda. The NewTopic class is used to define the topic. The name is the name of the topic, num_partitions is the number of partitions that the topic will have and replication_factor is the number of replicas that each partition will have.

Let’s run the snippet and quickly verify that the topic was created:

rpk topic list --brokers 127.0.0.1:57232

Should return:

NAME      PARTITIONS  REPLICASmy-topic  1           1

The Producer, again

Alright, now that we have a topic, let’s try sending a message again:

producer.send(topic='my-topic', value=b'Hello, world!')

This time, our producer will send the message to Redpanda, which will store it in the my-topic topic. But how do we know that it worked? In the next section we'll see how to consume messages from a topic.

The Consumer

A consumer is a client that reads messages from a topic. Consuming is the act of reading messages from a topic. The setup is similar to Producers, but there are a few different configuration options.

To create a basic consumer, we can do this:

from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='127.0.0.1:57232')

Again, we are using the bootstrap_servers parameter to connect to the cluster. If not specified, the consumer will not do much by itself, we have to tell it which topics data to read! This is called subscribing to a topic.

consumer.subscribe(topics=['my-topic'])

After subscribing to a topic, we can start consuming messages. The consumer object is an iterator, so we can use it in a for loop:

for message in consumer:    print(message)

If we run this snippet, we will see that it will block and wait for new messages to arrive. This is because we haven’t specified that we want to consume from the beginning of the topic. By default, the consumer will only read new messages.

Let’s modify the consumer to read from the beginning of the topic:

consumer = KafkaConsumer(bootstrap_servers='127.0.0.1:57232', auto_offset_reset='earliest')

The auto_offset_reset parameter specifies where the consumer should start reading from. In this case, we are telling it to read from the beginning of the topic.

Now, if we run the consumer snippet again, we should see the message that we sent earlier:

ConsumerRecord(topic='my-topic', partition=0, offset=2, timestamp=1663068430936, timestamp_type=0, key=None, value=b'Hello, world!', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=13, serialized_header_size=-1)

The ConsumerRecord object contains all the information about the message. The value field contains the actual message.

Try sending a few more messages from a different terminal window, and you should see them appear in the consumer in real-time!

Building the chat application

Chat away!

We now know how to send and receive messages from Redpanda, but we haven’t really done anything interesting yet. Let’s build a chat application!

What we want to achieve is a simple command line utility that allows two users to send and receive messages. In the middle of the architecture, we will have Redpanda, which will store the messages and the users will interact with it using the kafka-python library that we have already seen.

Sending messages will be done by a producer client while receiving messages will be done by a consumer client.

We can reuse the broker from the previous part of the article for our app.

Our goal with the application is that after the user starts it, it should be able to handle three things:

  1. “Connect” to another user
  2. Send messages to the other user
  3. Receive messages from the other user

Only one-to-one communication is supported for now, but we can extend this to support multiple users in the future.

We also include the click library for the command line interface, which makes it easy to parse command line arguments and offers helpful utilities such as clearing the terminal screen.

Connecting to a user

The first thing we need to do is to connect to another user. We will do this by asking the user for the username of the target person. We will use this username to create a topic that we will use to share messages from both parties.

This is the function that we will use to do all of this:

def create_chat_room(current_username: str, target_username: str) -> str:    # Sort the user pair alphabetically to ensure that the same topic is used for both users    chat_topic_name = md5(        f"{'-'.join([u for u in sorted([current_username, target_username])])}".encode()    ).hexdigest()
    admin_client = KafkaAdminClient(        bootstrap_servers=BOOTSTRAP_SERVERS, client_id="chat-app"    )
    # Check if the topic already exists first, if not, create it!    existing_topics = admin_client.list_topics()    if chat_topic_name not in existing_topics:        admin_client.create_topics(            [NewTopic(chat_topic_name, num_partitions=1, replication_factor=1)]        )    return chat_topic_name

So, what does this function do? First, we sort the usernames alphabetically, and then we hash the result. This ensures that the same topic is used for both users. This transformation is also deterministic so if we “log in” with the same username later and add the same target username the function will generate the same topic name, which means we can “reconnect” to the chat, seeing all the previous messages. We then use the KafkaAdminClient to check if the topic already exists. If it doesn't, we create it, and finally, we return the name of the topic.

Message structure, serialization, and deserialization

Data in Kafka lives as bytes, so when we produce messages we’ll always have to convert them into this format and when consuming them we will always do the inverse operation to transform them into something more interpretable.

In this tutorial our messages will be wrapped as a basic Python dictionary to make it easier as this way we can use built-in serializers and deserializers but if you were to wrap your messages in a more complex format, for example a custom class that includes various types you might need to write your own serializer and deserializer functions!

Sending messages

Now that we have a topic, we can start sending messages. We will use the KafkaProducer class to do this:

def send_message(        producer: KafkaProducer,        message_text: str,        chat_topic_name: str,        current_username: str,) -> None:    payload = json.dumps(        {            "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),            "message": message_text,        }    )    producer.send(        topic=chat_topic_name,        key=current_username.encode("utf-8"),        value=payload.encode("utf-8"),    )

The KafkaProducer class is initialized with the bootstrap_servers parameter, which we already know. We also specify a key for the message. This key will be used to identify the sender of the message. The value is the actual message that we want to send.

Receiving messages

Receiving messages is done by the KafkaConsumer class:

def read_messages(topic: str, message_history: List[str]) -> None:    consumer = KafkaConsumer(        topic,        bootstrap_servers=BOOTSTRAP_SERVERS,    )
    for message in consumer:        click.clear()        message_history.append(parse_message(message))        for _msg in message_history:            click.echo(_msg)

The KafkaConsumer class is also initialized with the same bootstrap_servers parameter as the producer, so we connect to the same broker. We also tell the consumer to use a specific topic to read messages from, which we created in the first function.

The consumer object is an iterator, so we can use it in a for loop. In the loop, we parse the message and add it to the message history. Finally, we clear the terminal and print the full message history. This can be done in a more efficient way (without rendering the whole history on every new message), but for the sake of simplicity, we will leave it like this.

Putting it all together

Now that we have all the pieces, we can put them together and build the actual application:

@click.command()@click.option("--current-username", prompt="Your username")@click.option(    "--target-username", prompt="Target username", help="The person to chat with.")def chat(current_username: str, target_username: str) -> None:    producer = KafkaProducer(bootstrap_servers=BOOTSTRAP_SERVERS)
    chat_topic_name = create_chat_room(current_username, target_username)
    # Get message history    message_history = get_message_history(chat_topic_name)
    # Start a background thread to read messages from the topic    thread = threading.Thread(        target=read_messages,        args=(            chat_topic_name,            message_history,        ),    )    thread.start()
    while True:        message_input = click.prompt("")        send_message(producer, message_input, chat_topic_name, current_username)

The chat function is the entry point of the application. It takes two arguments: the current username and the target username. We then initialize the producer and create the chat room. We also get the message history and start a thread to read messages from the chat topic. The reason we want to do this in a separate thread is that we want to be able to read messages while we are sending messages. Finally, we start a loop that will prompt the user for a message and send it to the chat. This loop will run until the user exits the application.

The decorator @click.command() tells the click library that this function is the entry point of the application. The @click.option() decorator is used to define command line arguments. In this case, we define two arguments: current-username and target-username. Current username is the username of the user that is running the application. Target username is the username of the user that we want to chat with.

The prompt parameter tells click to prompt the user for the value of the argument. The help parameter is used to display a help message when the user runs the application with the --help flag.

The get_message_history function is used to get the message history from the chat topic. It is implemented as follows:

consumer = KafkaConsumer(    topic,    bootstrap_servers=BOOTSTRAP_SERVERS,    auto_offset_reset="earliest",    consumer_timeout_ms=1000,  # Exit if no messages after 1 second)messages = []
for message in consumer:    parsed_message = parse_message(message)    messages.append(parsed_message)
return messages

We use the KafkaConsumer class to read messages from the topic. We also specify the auto_offset_reset parameter to be earliest, which means that we want to read all messages from the beginning of the topic. We also set the consumer_timeout_ms parameter to 1000, which means that the consumer will exit if no messages are received after one second. This is done to prevent the consumer from blocking the thread forever.

Running the application

Now that we have the application, we can run it and see how it works. We will start two instances of the app, one for each user:

First terminal:

$ python chat.pyYour username: DaniTarget username: Marci

Second terminal:

$ python chat.pyYour username: MarciTarget username: Dani

We can now send messages from one user to the other, and we can see that the messages are received in real-time:

Still of the chat application in work

Another neat feature of Redpanda is that we can read messages from the beginning of the topic. This means that we can read all the messages that were sent before we started the application, for example, when “reconnecting” to the chat.

The end of WhatsApp and iMessage?

Probably not, but now you know how easy it is to start building with Redpanda and Python, so go ahead and try it out! If you are looking to get your hands dirty here are a few feature ideas to extend this application:

Allow multiple users to join a single chat. Introduce schemas for the records sent and received in the application. This will help document the structure of the messages and will make updating them a lot easier in the future! For a great guide on the details make sure to check out Ben Pope’s article about the native Schema Registry of Redpanda. Implement some user authentication!