Get Snapshots from Slow-running or Infinite Flows

Learn to slow down our file stream events to view progress screenshots of the occurring events.

Slow-running or infinite flows

So far, we’ve used a CSV file as our primary data source, which Elixir processes immediately. To demonstrate how windows and triggers work, we’ll intentionally slow down the file stream events using Process.sleep/1. We’ll also modify our business logic and still group all data by country code. However, this time, we capture progress snapshots of every one thousand events.

Slow down the stream of events

Let’s start by slowing the stream of events:

airports_csv()
  |> File.stream!()
  |> Stream.map(fn event ->
    Process.sleep(Enum.random([0, 0, 0, 1]))
    event
  end)
  |> Flow.from_enumerable()

Since we have tens of thousands of rows in our CSV file, we randomly delay some of the events only for one millisecond. The rest of the events will go through without delay. This means that the flow will still complete (eventually), but it won’t be as quick as before.

We can try running this code version. It should take about a minute to complete.

Define the window variable

Then, we define the window variable at the top of the function in the following way:

window = Flow.Window.trigger_every(Flow.Window.global(), 1000)

Now, we can give it as an argument to Flow.partition/2:

|> Flow.partition(window: window, key: {:key, :country})

Finally, let’s add the trigger logic:

|> Flow.group_by(& &1.country)
|> Flow.on_trigger(fn acc, _partition_info, {_type, _id, trigger} ->
  # Show progress in IEx, or use the data for something else.
  events =
    acc
    |> Enum.map(fn {country, data} -> {country, Enum.count(data)} end)
    |> IO.inspect(label: inspect(self()))

  case trigger do
    :done ->
      {events, acc}

    {:every, 1000} ->
      {[], acc}
  end
end)
|> Enum.to_list()

We use the ternary callback function with on_trigger/2 to access all three arguments. Within the callback function, we have the opportunity to use the snapshot data in whichever way we want. For example, we can update our database to persist the events or send them elsewhere for processing. We chose to use IO.inspect/2 and print the events list containing the country code and the number of airports. This way, we see some progress in IEx as the processes do their work. We use the :label argument with IO.inspect/2 to print the PID of the reducer process. We’ll see why we’re doing this soon.

We used pattern-matching to handle the :done trigger separately from the {:every, 1000} trigger. The latter occurs because we configure the window using Flow.Window.trigger_every/2 with a value of 1000. When the :done trigger executes, we emit the final result as a list of tuples. In the case of {:every, 1000}, we do not emit anything since we’re not finished processing yet.

The final version of the open_airports looks like the following:

Get hands-on with 1200+ tech skills courses.