What's new in Flow v0.14

One of the benefits of our Elixir Development Subscription service is that we can work with companies that are using our projects and gather direct feedback, which in turn we use to improve our tools and the overall community.

With this in mind, Flow v0.14 has been recently released with more fine grained control on data emission. We will start with a brief recap of Flow and then go over the new features.

Quick introduction to Flow

Flow is a library for computational parallel flows in Elixir. It is built on top of GenStage which specifies how Elixir processes should communicate with back-pressure.

Flow is inspired by the MapReduce and Apache Spark models. It is a sibling to our Broadway project, but with a focus on data aggregation. It aims to use all cores of your machines efficiently.

The “hello world” of data processing is a word counter. Here is how we would count the words in a file with <code>Flow</code>:

File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

If you have a machine with 4 cores, the example above will create 9 light-weight Elixir processes that run concurrently:

  • 1 process for reading from the file (Flow.from_enumerable/1)
  • 4 processes for performing map operations (everything before Flow.partition/2)
  • 4 processes for performing reduce operations (everything after Flow.partition/2)

The key operation in the example above is precisely the <code>partition/2</code> call. Since we want to count words, we need to make sure that we will always route the same word to the same partition, so all occurrences belong to a single place and not scattered around.

The other insight here is that map operations can always stream the data, as they simply transform it. The <code>reduce</code> operation, on the other hand, needs to accumulate the data until all input data is fully processed. If the Flow is unbounded (i.e. it never finishes), then you need to specify windows and triggers to check point the data (for example, check point the data every minute or after 100_000 entries or on some condition specified by business rules).

My ElixirConf 2016 keynote also provides an introduction to Flow (tickets to ElixirConf 2018 are also available!).

With this in mind, let’s see what Flow v0.14 brings.

Explicit control over reducing stages

Flow v0.14 gives more explicit control on how the reducing stage works. Let’s see a pratical example. Imagine you want to connect to Twitter’s firehose and count the number of mentions of all users on Twitter. Let’s start by adapting our word counter example:

SomeTwitterClient.stream_tweets!()
|> Flow.from_enumerable()
|> Flow.flat_map(fn tweet -> tweet["mentions"] end)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn mention, acc ->
  Map.update(acc, mention, 1, & &1 + 1)
end)
|> Enum.to_list()

We changed our code to use some fictional twitter client that streams tweets and then proceeded to retrieve the mentions in each each tweet. The mentions are routed to partitions, which counts them. If we attempted to run the code above, the code would run until the machine eventually runs out of memory, as the Twitter firehose never finishes.

A possible solution is to use a window that controls the data accumulation. We will say that we want to accumulate the data for minute. When the minute is over, the “reduce” operation will emit its accumulator, which we will persist to some storage:

window = Flow.Window.periodic(1, :minute, :discard)

SomeTwitterClient.stream_tweets!()
|> Flow.from_enumerable()
|> Flow.flat_map(fn tweet -> tweet["mentions"] end)
|> Flow.partition(window: window)
|> Flow.reduce(fn -> %{} end, fn mention, acc ->
  Map.update(acc, mention, 1, & &1 + 1)
end)
|> Flow.each_state(fn acc -> MyDb.persist_count_so_far(acc) end)
|> Flow.start_link()

The first change is in the first line. We create a window that lasts 1 minute and discards any accumulated state before starting the next window. We pass the window as argument to <code>Flow.partition/1</code>.

The remaining changes are after the <code>Flow.reduce/3</code>. Whenever the current window terminates, we see that a trigger is emitted. This trigger means that the <code>reduce/3</code> stage will stop accumulating data and invoke the next functions in the Flow. One of these functions is <code>each_state/2</code>, that receives the state accumulated so far and persists it to a database.

Finally, since the flow is infinite, we are no longer calling <code>Enum.to_list/1</code> at the end of the flow, but rather <code>Flow.start_link/1</code>, allowing it to run permanently as part of a supervision tree.

While the solution above is fine, it unfortunately has two implicit decisions in it:

  • each_state only runs when the window finishes (i.e. a trigger is emitted) but this relationship is not clear in the code
  • The control of the accumulator is kept in multiple places: the window definition says the accumulator must be discarded after each_state while reduce controls its initial value

Flow v0.14 introduces a new function named <code>on_trigger/2</code> to make these relationships clearer. As the name implies, <code>on_trigger/2</code> is invoked with the reduced state whenever there is a trigger. The callback given to <code>on_trigger/2</code> must return a tuple with a list of the events to emit and the new accumulator. Let’s rewrite our example:

window = Flow.Window.periodic(1, :minute)

SomeTwitterClient.stream_tweets!()
|> Flow.from_enumerable()
|> Flow.flat_map(fn tweet -> tweet["mentions"] end)
|> Flow.partition(window: window)
|> Flow.reduce(fn -> %{} end, fn mention, acc ->
  Map.update(acc, mention, 1, & &1 + 1)
end)
|> Flow.on_trigger(fn acc ->
  MyDb.persist_count_so_far(acc)
  {[], %{}} # Nothing to emit, reset the accumulator
end)
|> Flow.start_link()

As you can see, the window no longer controls when data is discarded. <code>on_trigger/2</code> gives developers full control on how to change the accumulator and which events to emit. For example, you may choose to keep part of the accumulator for the next window. Or you could process the accumulator to pick only the most mentioned users to emit to the next step in the flow.

Flow v0.14 also introduces a <code>emit_and_reduce/3</code> function that allows you to emit data while reducing. Let’s say we want to track popular users in two ways:

  1. whenever a user reaches 100 mentions, we immediately send it to the next stage for processing and reset its counter
  2. for the remaining users, we will get the top 10 most mentioned per partition and send them to the next stage

We can perform this as:

window = Flow.Window.periodic(1, :minute)

SomeTwitterClient.stream_tweets!()
|> Flow.from_enumerable()
|> Flow.flat_map(fn tweet -> tweet["mentions"] end)
|> Flow.partition(window: window)
|> Flow.emit_and_reduce(fn -> %{} end, fn mention, acc ->
  counter = Map.get(acc, mention, 0) + 1

  if counter == 100 do
    {[mention], Map.delete(acc, mention)}
  else
    {[], Map.put(acc, mention, counter)}
  end
end)
|> Flow.on_trigger(fn acc ->
  most_mentioned =
  acc
  |> Enum.sort(acc, fn {_, count1}, {_, count2} -> count1 >= count2 end)
  |> Enum.take(10)

  {most_mentioned, %{}}
end)
|> Flow.shuffle()
|> Flow.map(fn mention -> IO.puts(mention) end)
|> Flow.start_link()

In the example above, we changed <code>reduce/3</code> to <code>emit_and_reduce/3</code>, so we can emit events as we process them. Then we changed <code>Flow.on_trigger/2</code> to also emit the most mentioned users.

Finally, we have added a call to <code>Flow.shuffle/1</code>, that will receive all of the events emitted by <code>emit_and_reduce/3</code> and <code>on_trigger/2</code> and shuffle them into a series of new stages for further parallel processing.

If you are familiar with data processing pipelines, you may be aware of two pitfalls in the solution above: 1. we are using processing time for handling events and 2. instead of a periodic window, it would probably be best to process events on sliding windows. For the former, you can learn more about the pitfalls of processing time vs event time in Flow’s documentation. For the latter, we note that Flow does not support sliding windows out of the box but they are straight-forward to implement on top of <code>reduce/3</code> and <code>on_trigger/2</code> above.

At the end of the day, the new functionality in Flow v0.14 gives developers more control over their flows while also making the code clearer. There are other additions in v0.14, such as through_stages/3, which complements from_stages/2 and into_stages/3, in making it easier to integrate Flow with existing GenStage pipelines.

P.S.: This post was originally published on Plataformatec’s blog.