Filtering

Learn how to use the filter operator in Kafka Streams topology.

Now that we have our incoming Track objects properly deserialized, we’ll move on to fulfill the second requirement: tracks that have been listened to for less than 30 seconds should be disregarded.

To achieve that, we will use the filter operator. Using the filter operator on a stream creates a new stream that consists only of records satisfying a condition provided to the filter. Records that do not meet the condition are dropped and are not further processed.

The filter operator

This condition is better known as a predicate, which is simply a boolean expression that accepts the record’s key and value for arguments. If the predicate returns true, then the record is kept. If the predicate returns false, then the record is dropped. This is not different from the Java 8 Stream API filter operator, different only in that the predicate to Kafka Streams’ filter accepts two arguments instead of one.

Our predicate should be simple—it should return true if the value of the secondsListened is more than 30. We should add the filter right after the peek in a fluent style in line 19. After adding the filter, the stream should look like this:

Get hands-on with 1400+ tech skills courses.