Search⌘ K
AI Features

Windowing: Tumbling and Hopping Windows

Explore the concepts of tumbling and hopping windows in Kafka Streams for grouping records into fixed-size, time-based windows before aggregation. Understand how tumbling windows create non-overlapping intervals for data processing, while hopping windows allow overlapping intervals. Learn how to control intermediate aggregation results using the suppress operator for effective stream processing.

We'll cover the following...

Windowing in Kafka Streams allow us to group records into time or event-based groups before an aggregation operation. There are four types of windows supported by Kafka Streams:

  • Tumbling window

  • Hopping window

  • Session window

  • Sliding window

Tumbling window

A tumbling window is a time-based, fixed-size window. There are no overlapping events between tumbling windows, meaning every event can belong exactly to one window.

Another important thing to know about tumbling windows is that they have an inclusive start time and an exclusive end time. Consider a window starting at t=5 with a fixed size of five seconds. An event arriving at t=10 will belong to the window starting at t=10 and not to the one ending with t=10.

A tumbling window is created by defining its size:

Java
Duration windowSize = Duration.ofSeconds(5);
TimeWindows tumblingWindow = TimeWindows.of(windowSize);

Let’s run a topology with a tumbling window. Our topology will again reduce a series of words into a sentence, but this time, the sentence will not continue unbounded—each sentence will be contained within its own tumbling window:

package io.github.stavshamir;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;

import java.time.Duration;
import java.util.List;
import java.util.Properties;

public class StatefulApplication {

    public static void main(String[] args) {
        createTopics();

        Properties props = buildConfiguration();
        Topology topology = buildTopology();

        var streams = new KafkaStreams(topology, props);
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        System.out.println("Starting");
        streams.start();
    }

    private static Topology buildTopology() {
        var builder = new StreamsBuilder();

        KStream<String, String> stream = builder.stream("words");

        Duration windowSize = Duration.ofSeconds(5);
        TimeWindows tumblingWindow = TimeWindows.ofSizeWithNoGrace(windowSize);

        stream.peek((k, v) -> System.out.printf("Incoming record - Key: %s, Value: %s%n", k, v))
                .groupByKey()
                .windowedBy(tumblingWindow)
                .reduce((a, b) -> a + " " + b)
                .toStream()
                .peek((k, v) -> System.out.printf("Reduced - Key: %s, Value: %s%n", k, v));

        return builder.build();
    }

    private static Properties buildConfiguration() {
        var properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "stav1");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

        return properties;
    }

    private static void createTopics() {
        var properties = new Properties();
        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        try (var adminClient = AdminClient.create(properties)) {
            adminClient.createTopics(List.of(
                    new NewTopic("words", 1, (short) 1)
            ));
        }
    }

}
Stateful Kafka Streams application with tumbling window

Now let’s publish some records using the CLI command below:

Shell
/kafka/bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic words \
--property key.separator=, \
--property parse.key=true

Publish a few records with the same or a different key:

1,a
1,b
2,c
1,d
Example messages

The output should look somewhat like this (but might differ based on the time interval between each message we published):

Incoming record - Key: 1, Value: a
Reduced - Key: [1@1689414900000/1689414905000], Value: a
Incoming record - Key: 1, Value: b
Reduced - Key: [1@1689414900000/1689414905000], Value: a b
Incoming record - Key: 2, Value: c
Reduced - Key: [2@1689414900000/1689414905000], Value: c
Incoming record - Key: 1, Value: d
Reduced - Key: [1@1689414905000/1689414910000], Value: d
Example output

There are multiple interesting things to note about this output:

  • The key of a windowed event contains its original key (1, in this case) and the window boundaries in epoch time.

  • Unlike aggregation without windowing, the reducing operator is not unbounded—we can see that the values are added only in the context of the window’s boundaries. For the first window in the example output, the result is a b, but the second window starts with a clean slate.

  • The aggregation result is emitted for each new value added to the window (notice that the first three logs starting with Reduced... have the same timestamps), and not only when the window is closed.

Suppression

The third point in the previous section is important. The outcome of having the aggregation result emitted for every incoming event, not only when the window is closed, means that downstream operators will receive this incomplete result.

While this might be acceptable and even desirable for applications optimizing for low latency, it might be undesirable for other use cases, such as using windowing to reduce the number of actions taken for similar events arriving in the same window.

Kafka Streams allow us to control this behavior using the suppress operator, which can be used to emit only the final result of the window while suppressing all the intermediate results:

Java
stream.peek((k, v) -> System.out.printf("Incoming record - Key: %s, Value: %s%n", k, v))
.groupByKey()
.windowedBy(tumblingWindow)
.reduce((a, b) -> a + " " + b, Materialized.with(Serdes.String(), Serdes.String()))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.peek((k, v) -> System.out.printf("Reduced - Key: %s, Value: %s%n", k, v));

Copy this code over the current topology (lines 43–48). We call the suppress operator right after the reduce operator, and the value we pass to it tells Kafka Streams to suppress all events until the window closes. The unbounded parameter will let the buffer consume memory as needed until the window is closed.

If we run the application and publish the same message as before, we will see only one log per window.

Hopping window

Hopping windows are also time-based, fixed-size windows. The difference between hopping and tumbling windows is that hopping windows can overlap, meaning a single event can belong to more than one window. The overlap is defined by the advance size, which is the time to wait after the current window has started before starting the next one.

For example, let’s consider a hopping window with a fixed size of three seconds and an advance size of two seconds:

Defining a hopping window is very similar to defining a tumbling window:

Java
Duration windowSize = Duration.ofSeconds(3);
Duration advanceSize = Duration.ofSeconds(2);
TimeWindows tumblingWindow = TimeWindows
.ofSizeWithNoGrace(windowSize)
.advanceBy(advanceSize);

The window defined in this code will behave like the window in the example slides above it.