Introducing reducees
Elixir provides the concept of collections, which may be in-memory data structures, as well as events, I/O resources and more. Those collections are supported by the Enumerable protocol, which is an implementation of an abstraction we call “reducees”.
In this article, we will outline the design decisions behind such abstraction, often exploring ideas from Haskell, Clojure and Scala that eventually led us to develop this new abstraction called reducees, focusing specially on the constraints and performance characteristics of the Erlang Virtual Machine.
Recursion and Elixir
Elixir is a functional programming language that runs on the Erlang VM. All the examples on this article will be written in Elixir although we will introduce the concepts bit by bit.
Elixir provides linked-lists. Lists can hold many items and, with pattern matching, it is easy to extract the head (the first item) and the tail (the rest) of a list:
iex> [h|t] = [1, 2, 3]
iex> h
1
iex> t
[2, 3]
An empty list won’t match the pattern [h|t]
:
[h|t] = []
** (MatchError) no match of right hand side value: []
Suppose we want to recurse every element in the list, multiplying each element by 2. Let’s write a double function:
defmodule Recursion do
def double([h | t]) do
[h * 2 | double(t)]
end
def double([]) do
[]
end
end
The function above recursively traverses the list, doubling the head at each step and invoking itself with the tail. We could define a similar function if we wanted to triple every element in the list but it makes more sense to abstract our current implementation. Let’s define a function called map
that applies a given function to each element in the list:
defmodule Recursion do
def map([h | t], fun) do
[fun.(h) | map(t, fun)]
end
def map([], _fun) do
[]
end
end
double
could now be defined in terms of map
as follows:
def double(list) do
map(list, fn x -> x * 2 end)
end
Manually recursing the list is straight-forward but it doesn’t really compose. Imagine we would like to implement other functional operations like filter
, reduce
, take
and so on for lists. Then we introduce sets, dictionaries, and queues into the language and we would like to provide the same operations for all of them.
Instead of manually implementing all of those operations for each data structure, it is better to provide an abstraction that allows us to define those operations only once, and they will work with different data structures.
That’s our next step.
Introducing Iterators
The idea behind iterators is that we ask the data structure what is the next item until the data structure no longer has items to emit.
Let’s implement iterators for lists. This time, we will be using Elixir documentation and doctests to detail how we expect iterators to work:
defmodule Iterator do
@doc """
Each step needs to return a tuple containing
the next element and a payload that will be
invoked the next time around.
iex> next([1, 2, 3])
{1, [2, 3]}
iex> next([2, 3])
{2, [3]}
iex> next([3])
{3, []}
iex> next([])
:done
"""
def next([h|t]) do
{h, t}
end
def next([]) do
:done
end
end
We can implement map
on top of next
:
def map(collection, fun) do
map_next(next(collection), fun)
end
defp map_next({h, t}, fun) do
[fun.(h)|map_next(next(t), fun)]
end
defp map_next(:done, _fun) do
[]
end
Since map
uses the next
function, as long as we implement next
for a new data structure, map
(and all future functions) should work out of the box. This brings the polymorphism we desired but it has some downsides.
Besides not having ideal performance, it is quite hard to make iterators work with resources (events, I/O, etc), leading to messy and error-prone code.
The trouble with resources is that, if something goes wrong, we need to tell the resource that it should be closed. After all, we don’t want to leave file descriptors or database connections open. This means we need to extend our next
contract to introduce at least one other function called halt
.
halt
should be called if the iteration is interrupted suddenly, either because we are no longer interested in the next items (for example, if someone calls take(collection, 5)
to retrieve only the first five items) or because an error happened. Let’s start with take:
def take(collection, n) do
take_next(next(collection), n)
end
# Invoked on every step
defp take_next({h, t}, n) when n > 0 do
[h|take_next(next(t), n - 1)]
end
# If we reach this, the collection finished
defp take_next(:done, _n) do
[]
end
# If we reach this, we took all we cared about before finishing
defp take_next(value, 0) do
halt(value) # Invoke halt as a "side-effect" for resources
[]
end
Implementing take
is somewhat straight-forward. However we also need to modify map
since every step in the user supplied function can fail. Therefore we need to make sure we call halt
on every possible step in case of failures:
def map(collection, fun) do
map_next(next(collection), fun)
end
defp map_next({h, t}, fun) do
[try do
fun.(h)
rescue
e ->
# Invoke halt as a "side-effect" for resources
# in case of failures and then re-raise
halt(t)
raise(e)
end|map_next(next(t), fun)]
end
defp map_next(:done, _fun) do
[]
end
This is not elegant nor performant. Furthermore, it is very error prone. If we forget to call halt
at some particular point, we can end-up with a dangling resource that may never be closed.
Introducing reducers
Not long ago, Clojure introduced the concept of reducers.
Since Elixir protocols were heavily inspired on Clojure protocols, I was very excited to see their take on collection processing. Instead of imposing a particular mechanism for traversing collections as in iterators, reducers are about sending computations to the collection so the collection applies the computation on itself. From the announcement: “the only thing that knows how to apply a function to a collection is the collection itself”.
Instead of using a next
function, reducers expect a reduce
implementation. Let’s implement this reduce
function for lists:
defmodule Reducer do
def reduce([h|t], acc, fun) do
reduce(t, fun.(h, acc), fun)
end
def reduce([], acc, _fun) do
acc
end
end
With reduce, we can easily calculate the sum of a collection:
def sum(collection) do
reduce(collection, 0, fn x, acc -> x + acc end)
end
We can also implement map in terms of reduce. The list, however, will be reversed at the end, requiring us to reverse
it back:
def map(collection, fun) do
reversed = reduce(collection, [], fn x, acc -> [fun.(x)|acc] end)
# Call Erlang reverse (implemented in C for performance)
:lists.reverse(reversed)
end
Reducers provide many advantages:
- They are conceptually simpler and faster
-
Operations like
map
,filter
, etc are easier to implement than the iterators one since the recursion is pushed to the collection instead of being part of every operation - It opens the door to parallelism as its operations are no longer serial (in contrast to iterators)
- No conceptual changes are required to support resources as collections
The last bullet is the most important for us. Because the collection is the one applying the function, we don’t need to change map
to support resources, all we need to do is to implement reduce
itself. Here is a pseudo-implementation of reducing a file line by line:
def reduce(file, acc, fun) do
descriptor = File.open(file)
try do
reduce_next(IO.readline(descriptor), acc, fun)
after
File.close(descriptor)
end
end
defp reduce_next({line, descriptor}, acc, fun) do
reduce_next(IO.readline(descriptor), fun.(line, acc), fun)
end
defp reduce_next(:done, acc, _fun) do
acc
end
Even though our file reducer uses something that looks like an iterator, because that’s the best way to traverse the file, from the map
function perspective we don’t care which operation is used internally. Furthermore, it is guaranteed the file is closed after reducing, regardless of success or failure.
There are, however, two issues when implementing reducers as proposed in Clojure into Elixir.
First of all, some operations like take
cannot be implemented in a purely functional way. For example, Clojure relies on reference types on its take implementation. This may not be an issue depending on the language/platform (it certainly isn’t in Clojure) but it is an issue in Elixir as side-effects would require us to spawn new processes every time take is invoked. Or use the process dictionary, which is generally considered a poor practice.
Another drawback of reducers is, because the collection is the one controlling the reducing, we cannot implement operations like zip
that requires taking one item from a collection, then suspending the reduction, then taking an item from another collection, suspending it, and starting again by resuming the first one and so on. Again, at least not in a purely functional way.
With reducers, we achieve the goal of a single abstraction that works efficiently with in-memory data structures and resources. However, it is limited on the amount of operations we can support efficiently, in a purely functional way, so we had to continue looking.
Introducing iteratees
It was at Code Mesh 2013 that I first heard about iteratees. I attended a talk by Jessica Kerr and, in the first minutes, she described exactly where my mind was at the moment: iterators and reducers indeed have their limitations, but they have been solved in scalaz-stream.
After the talk, Jessica and I started to explore how scalaz-stream solves those problems, eventually leading us to the Monad.Reader issue that introduces iteratees. After some experiments, we had a prototype of iteratees working in Elixir.
With iteratees, we have “instructions” going “up and down” between the source and the reducing function telling what is the next step in the collection processing:
defmodule Iteratee do
@doc """
Enumerates the collection with the given instruction.
If the instruction is a `{:cont, fun}` tuple, the given
function will be invoked with `{:some, h}` if there is
an entry in the collection, otherwise `:done` will be
given.
If the instruction is `{:halt, acc}`, it means there is
nothing to process and the collection should halt.
"""
def enumerate([h|t], {:cont, fun}) do
enumerate(t, fun.({:some, h}))
end
def enumerate([], {:cont, fun}) do
fun.(:done)
end
def enumerate(_, {:halt, acc}) do
{:halted, acc}
end
end
With enumerate
defined, we can write map
:
def map(collection, fun) do
{:done, acc} = enumerate(collection, {:cont, mapper([], fun)})
:lists.reverse(acc)
end
defp mapper(acc, fun) do
fn
{:some, h} -> {:cont, mapper([fun.(h)|acc], fun)}
:done -> {:done, acc}
end
end
enumerate
is called with {:cont, mapper}
where mapper
will receive {:some, h}
or :done
, as defined by enumerate
. The mapper
function then either returns {:cont, mapper}
, with a new mapper
function, or {:done, acc}
when the collection has told no new items will be emitted.
The Monad.Reader publication defines iteratees as teaching fold (reduce) new tricks. This is precisely what we have done here. For example, while map
only returns {:cont, mapper}
, it could have returned {:halt, acc}
and that would have told the collection to halt. That’s how take
could be implemented with iteratees, we would send cont
instructions until we are no longer interested in new elements, finally returning halt
.
So while iteratees allow us to teach reduce new tricks, they are much harder to grasp conceptually. Not only that, functions implemented with iteratees were from 6 to 8 times slower in Elixir when compared to their reducer counterpart.
In fact, it is even harder to see how iteratees are actually based on reduce since it hides the accumulator inside a closure (the mapper
function, in this case). This is also the cause of the performance issues in Elixir: for each mapped element in the collection, we need to generate a new closure, which becomes very expensive when mapping, filtering or taking items multiple times.
That’s when we asked: what if we could keep what we have learned with iteratees while maintaining the simplicity and performance characteristics of reduce?
Introducing reducees
Reducees are similar to iteratees. The difference is that they clearly map to a reduce operation and do not create closures as we traverse the collection. Let’s implement reducee for a list:
defmodule Reducee do
@doc """
Reduces the collection with the given instruction,
accumulator and function.
If the instruction is a `{:cont, acc}` tuple, the given
function will be invoked with the next item and the
accumulator.
If the instruction is `{:halt, acc}`, it means there is
nothing to process and the collection should halt.
"""
def reduce([h|t], {:cont, acc}, fun) do
reduce(t, fun.(h, acc), fun)
end
def reduce([], {:cont, acc}, _fun) do
{:done, acc}
end
def reduce(_, {:halt, acc}, _fun) do
{:halted, acc}
end
end
Our reducee implementations maps cleanly to the original reduce implementation. The only difference is that the accumulator is always wrapped in a tuple containing the next instruction as well as the addition of a halt
checking clause.
Implementing map
only requires us to send those instructions as we reduce:
def map(collection, fun) do
{:done, acc} =
reduce(collection, {:cont, []}, fn x, acc ->
{:cont, [fun.(x)|acc]}
end)
:lists.reverse(acc)
end
Compared to the original reduce implementation:
def map(collection, fun) do
reversed = reduce(collection, [], fn x, acc -> [fun.(x)|acc] end)
:lists.reverse(reversed)
end
The only difference between implementations is the accumulator wrapped in tuples. We have effectively replaced the closures in iteratees by two-item tuples in reducees, which provides a considerably speed up in terms of performance.
The tuple approach allows us to teach new tricks to reducees too. For example, our initial implementation already supports passing {:halt, acc}
instead of {:cont, acc}
, which we can use to implement take
on top of reducees:
def take(collection, n) when n > 0 do
{_, {acc, _}} =
reduce(collection, {:cont, {[], n}}, fn
x, {acc, count} -> {take_instruction(count), {[x|acc], n-1}}
end)
:lists.reverse(acc)
end
defp take_instruction(1), do: :halt
defp take_instruction(n), do: :cont
The accumulator in given to reduce
now holds a list, to collect results, as well as the number of elements we still need to take from the collection. Once we have taken the last item (count == 1
), we halt
the collection.
At the end of the day, this is the abstraction that ships with Elixir. It solves all requirements outlined so far: it is simple, fast, works with both in-memory data structures and resources as collections, and it supports both take
and zip
operations in a purely functional way.
Eager vs Lazy
Elixir developers mostly do not need to worry about the underlying reducees abstraction. Developers work directly with the module Enum which provides a series of operations that work with any collection. For example:
iex> Enum.map([1, 2, 3], fn x -> x * 2 end)
[2, 4, 6]
All functions in Enum are eager. The map
operation above receives a list and immediately returns a list. None the less, it didn’t take long for us to add lazy variants of those operations:
iex> Stream.map([1, 2, 3], fn x -> x * 2 end)
#Stream<...>
All the functions in Stream are lazy: they only store the computation to be performed, traversing the collection just once after all desired computations have been expressed.
In addition, the Stream
module provides a series of functions for abstracting resources, generating infinite collections and more.
In other words, in Elixir we use the same abstraction to provide both eager and lazy operations, that accepts both in-memory data structures or resources as collections, all conveniently encapsulated in both Enum and Stream modules. This allows developers to migrate from one mode of operation to the other as needed.
An enormous thank you to Jessica Kerr for introducing me to iteratees and pairing with me at Code Mesh. Also, thanks to Jafar Husein for the conversations at Code Mesh and the team behind Rx which we are exploring next. Finally, thank you to James Fish, Pater Hamilton, Eric Meadows-Jönsson and Alexei Sholik for the countless reviews, feedback and prototypes regarding Elixir’s future.
P.S.: This post was originally published on Plataformatec’s blog.