Streams: Lazy Enumerables

Learn about streams and how to create them for different purposes in Elixir.

In Elixir, the Enum module is greedy. This means that when we pass it a collection, it potentially consumes all the contents of that collection. It also means the result will typically be another collection. Look at the following pipeline:

[ 1, 2, 3, 4, 5 ]
#=> [ 1, 2, 3, 4, 5 ]

|> Enum.map(&(&1*&1))
#=> [ 1, 4, 9, 16, 25 ]

|> Enum.with_index
#=> [ {1, 0}, {4, 1}, {9, 2}, {16, 3}, {25, 4} ]

|> Enum.map(fn {value, index} -> value - index end) 
#=> [1, 3, 7, 13, 21]

|> IO.inspect 
#=> [1, 3, 7, 13, 21]

The first map function takes the original list and creates a new list of its squares. Using with_index takes this list and returns a list of tuples. The next map then subtracts the index from the value, generating a list that gets passed to IO.inspect.

So, this pipeline generates four lists on its way to outputting the final result.

Let’s look at something different. Here’s some code that reads lines from a file and returns the longest line:

IO.puts File.read!("/usr/share/dict/words")   
        |> String.split
        |> Enum.max_by(&String.length/1)

In this case, we read the whole dictionary into memory, then split it into a list of words before processing it to find the longest.

In both of these examples, our code is suboptimal because each call to Enum is self-contained. Each call takes a collection and returns a collection. What we really want is to process the elements in the collection as we need them. We don’t need to store intermediate results as full collections. We just need to pass the current element from function to function. And that’s what streams do.

Stream is a composable enumerator

Here’s a simple example of creating a stream:

iex> s = Stream.map [1, 3, 5, 7], &(&1 + 1)
#Stream<[enum: [1, 3, 5, 7], funs: [#Function<46.3851/1 in Stream.map/2>] ]>

If we’d called Enum.map, we’d have seen the result [2,4,6,8] come back immediately. Instead, we get back a stream value that contains a specification of what we intended.

How do we get the stream to start giving us results? We treat it as a collection and pass it to a function in the Enum module:

iex> s = Stream.map [1, 3, 5, 7], &(&1 + 1)
#Stream<[enum: [1, 3, 5, 7], funs: [#Function<46.3851/1 in Stream.map/2>] ]> 
iex> Enum.to_list s
[2, 4, 6, 8]

Because streams are enumerable, we can also pass a stream to a stream function. Because of this, we say that streams are composable.

iex> squares = Stream.map [1, 2, 3, 4], &(&1*&1) 
         #Stream<[enum: [1, 2, 3, 4],
         funs: [#Function<32.133702391 in Stream.map/2>] ]>

iex> plus_ones = Stream.map squares, &(&1+1) 
         #Stream<[enum: [1, 2, 3, 4],
         funs: [#Function<32.133702391 in Stream.map/2>,
                #Function<32.133702391 in Stream.map/2>] ]>

iex> odds = Stream.filter plus_ones, fn x -> rem(x,2) == 1 end 
         #Stream<[enum: [1, 2, 3, 4],
         funs: [#Function<26.133702391 in Stream.filter/2>,
                #Function<32.133702391 in Stream.map/2>,
                #Function<32.133702391 in Stream.map/2>] ]>

iex> Enum.to_list odds 
[5, 17]

Of course, in real life we’d have written this as follows:

[1,2,3,4]
|> Stream.map(&(&1*&1))
|> Stream.map(&(&1+1))
|> Stream.filter(fn x -> rem(x,2) == 1 end) 
|> Enum.to_list

Note: We’re never creating intermediate lists. We’re just passing successive elements of each of the collections to the next in the chain.

Streams aren’t only for lists. Now, many Elixir modules support streams. For example, here’s our longest-word code written using streams:

IO.puts File.open!("/usr/share/dict/words") 
        |> IO.stream(:line)
        |> Enum.max_by(&String.length/1)

The magic here is the call to IO.stream, which converts an IO device (in this case, the open file) into a stream that serves one line at a time. In fact, this is such a useful concept that there’s a shortcut:

IO.puts File.stream!("/usr/share/dict/words") |> Enum.max_by(&String.length/1)

The good news is that there’s no intermediate storage. The bad news is that it runs about two times slower than the previous version.

However, consider the case where we were reading data from a remote server. Successive lines might arrive slowly, and they might go on forever. With the Enum implementation, we’d have to wait for all the lines to arrive before we started processing. With streams, we can process them as they arrive.

Infinite streams

Because streams are lazy, there’s no need for the whole collection to be available upfront. For example, suppose we write the following:

iex> Enum.map(1..10_000_000, &(&1+1)) |> Enum.take(5) [2, 3, 4, 5, 6]

It takes about eight seconds before we see the result. Elixir is creating a 10-million-element list, then taking the first five elements from it. Suppose that instead, we write the following:

iex> Stream.map(1..10_000_000, &(&1+1)) 
|> Enum.take(5) [2, 3, 4, 5, 6]

The result comes back instantaneously. The take call just needs five values, which it gets from the stream. Once it has them, there’s no more processing.

Get hands-on with 1200+ tech skills courses.