Search⌘ K
AI Features

Group and Sort

Explore how to use Elixir's Flow library to group and sort large datasets concurrently. Learn methods like Flow.group_by, Flow.take_sort, and how to optimize processing pipelines. Understand how these tools help you organize data effectively and improve performance in concurrent data workflows.

We'll cover the following...

Group

Like Enum.group_by/2, Flow.group_by/2 groups the data by the given criteria. They both are implemented using the reduce function behind the scenes. Therefore, we’ll keep our Flow.partition/2 function to continue routing events to the correct process. This ensures that the same process groups airports of the same country.

Let’s replace Flow.reduce/3 with Flow.group_by/2:

C++
#file path -> airports/lib/airports.ex
def open_airports() do
airports_csv()
|> File.stream!()
|> Flow.from_enumerable()
|> Flow.map(fn row ->
[row] = CSV.parse_string(row, skip_headers: false)
%{
id: Enum.at(row, 0),
type: Enum.at(row, 2),
name: Enum.at(row, 3),
country: Enum.at(row, 8)
}
end)
|> Flow.reject(&(&1.type == "closed"))
|> Flow.partition(key: {:key, :country})
|> Flow.group_by(& &1.country)
|> Flow.map(fn {country, data} -> {country, Enum.count(data)} end)
|> Enum.to_list()
end

The result of group_by/2 is a list of tuples, where the first element is the group key, and the second is the group items. We add ...