Homemade analytics with Ecto and Elixir

For the Dashbit website, we wanted to avoid tracking users as much as possible. This means no cookies and unfortunately most analytics use cookies for tracking and/or fingerprinting. However, we still want to see which pages on our website are being frequently accessed. For this purpose, we have decided to roll our own analytics system.

In this article, we will cover how we implemented the analytics system with Ecto upserts and how we have used the Elixir registry and Elixir processes to reduce the pressure on the database.

Tracking with upserts

The idea is very simple: every time someone accesses a page, we will store this information in the database. However, we don’t need to track each access at the instant they happen. For us, tracking how many accesses a page had in a day is completely fine. Therefore, every time a page is accessed on a given date, we will attempt to insert an entry in the database. If an entry already exists, we update its counter instead.

Luckily, this can be done with an upsert in Ecto. Let’s first define the schema for the database resource:

defmodule MyApp.Metrics.Metric do
  use Ecto.Schema

  @primary_key false
  schema "metrics" do
    field :date, :date, primary_key: true
    field :path, :string, primary_key: true
    field :counter, :integer, default: 0
  end
end

It has three fields: a date, the page path, and the counter (number of accesses). The date and path make a composite primary key. Our migration looks like this:

defmodule Dashbit.Repo.Migrations.CreateMetrics do
  use Ecto.Migration

  def change do
    create table(:metrics, primary_key: false) do
      add :date, :date, primary_key: true
      add :path, :string, primary_key: true
      add :counter, :integer, default: 0
    end
  end
end

Now we execute the following command whenever we want to count one page access:

defp upsert!(path, counter) do
  import Ecto.Query
  date = Date.utc_today()
  query = from(m in Dashbit.Metrics.Metric, update: [inc: [counter: ^counter]])

  Dashbit.Repo.insert!(
    %Dashbit.Metrics.Metric{date: date, path: path, counter: counter},
    on_conflict: query,
    conflict_target: [:date, :path]
  )
end

The code above performs an upsert, incrementing the number of accesses in a page by the value of counter, which is typically 1. If an entry does not exist, one is immediately created.

This is the core of our analytics. It is a very straight-forward solution, but it does have a strong requirement on the database accepting all of our writes. While most applications heavily rely on a database, the analytics system is the only place in our website that uses a database, so we believe it is important to show an article, such as this blog post, even if there is an error when talking to the storage layer. To address this, we have decided to move the upserts to separate processes.

Async and batched writes with processes

As laid out in the previous section, we want to move all the database writes done by our analytics code to a separate process. Another concern we have with our solution so far is how it will handle overloads. If there is a huge spike in traffic, could we end up putting too much pressure in the database? In this sense, would it be a good idea to batch our writes?

To be honest, our application will be just fine with spikes. Most of our page loads are within hundreds of microseconds, thanks to Phoenix, and our database usage is minimal. On the other hand, such a small project is a perfect opportunity to experiment, so we decided to explore how our analytics solution would look like if we performed writes asynchronously and in batches.

Here is what we came up with. Every time a user accesses a page, we will spawn an Elixir process that tracks all accesses to that page. If a process already exists for said page, we will message the existing process instead. The goal of this process is to collect all accesses within a time interval, writing to the database after X seconds.

We are going to call this the Worker process and it starts like this:

defmodule Dashbit.Metrics.Worker do
  use GenServer, restart: :temporary

We define a module for the process and declare it as a GenServer. We also say that this process is :temporary. I.e. if it dies, we don’t want the supervisor to restart it. That’s because we are assuming that, if the process dies, our logic that dynamically spawns processes for each page will eventually start a new one anyway.

Next we define the init callback of the process:

  @impl true
  def init(path) do
    Process.flag(:trap_exit, true)
    {:ok, {path, _counter = 0}}
  end

The init callback traps exits and sets the process state to {path, 0}. The first element is the page path, the second element is the number of page visits.

Our process should be able to receive a :bump message. This message is sent whenever we need to bump the counter and is handled by the handle_info callback:

  @impl true
  def handle_info(:bump, {path, 0}) do
    schedule_upsert()
    {:noreply, {path, 1}}
  end

  @impl true
  def handle_info(:bump, {path, counter}) do
    {:noreply, {path, counter + 1}}
  end

If we receive the :bump when the page had no access (i.e. counter is zero), we will bump the counter to 1 and we will also schedule an upsert event, so we eventually write those accesses to the database. If the counter is more than 0, we simply bump it and return an updated state.

The scheduling and upsert code will look like this:

  defp schedule_upsert() do
    Process.send_after(self(), :upsert, Enum.random(10..20) * 1_000)
  end

  @impl true
  def handle_info(:upsert, {path, counter}) do
    upsert!(path, counter)
    {:noreply, {path, 0}}
  end

  defp upsert!(path, counter) do
    # same function as the previous section
  end

The schedule_upsert() function schedules a message to the current process (self()). The message will be named :upsert and it will be delivered in a random value between 10s to 20s. The reason we picked a random value is to avoid a scenario where multiple processes for different pages are spawned at the same time and they all write to the database at the same time.

Next we define another handle_info clause, this time to handle the scheduled :upsert message. This clause simply invokes the upsert! function, defined in the previous section, and resets the state back to {path, 0}. This makes it so that, once there is a new bump, we will schedule a new upsert.

Finally, we implement the terminate callback, which will be invoked whenever our application is shutting down:

  @impl true
  def terminate(_, {_path, 0}), do: :ok
  def terminate(_, {path, counter}), do: upsert!(path, counter)
end

If our application is shutting down, we may have pending writes in our worker, so we want to send them to the database as part of our termination logic. One important thing to remember is that the terminate callback is not called by default when shutting down, unless you are trapping exits. That’s why we called Process.flag(:trap_exit, true) in the init function.

The process we just implemented delivers all of the requirements we have so far: writes are now asynchronous, as they happen in a separate process, and they are also batched, using intervals between 10s and 20s. The last step we need to implement is to actually spawn those processes on the fly as users navigate through the website.

Dynamic processes with the Elixir registry

In order to spawn and find processes for each page, we are going to use Elixir’s Registry. We also need a dynamic supervisor which is going to be the parent of all worker processes. Let’s implement this logic in the overaching Metrics module, alongside our bump(page) function.

Let’s get started with the basics:

defmodule Dashbit.Metrics do
  use Supervisor

  @worker Dashbit.Metrics.Worker
  @registry Dashbit.Metrics.Registry
  @supervisor Dashbit.Metrics.WorkerSupervisor

Our Dashbit.Metrics module is a Supervisor, which will have two children: the registry and the supervisor of all workers. Since the workers are started dynamically, as requests come, we will use a DynamicSupervisor. We store the names of the worker, registry and dynamic supervisor processes in module attributes for convenience.

Next we will define how our supervisor is started and its init callback:

  def start_link(_opts) do
    Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  @impl true
  def init(:ok) do
    children = [
      {Registry, keys: :unique, name: @registry},
      {DynamicSupervisor, name: @supervisor, strategy: :one_for_one}
    ]

    Supervisor.init(children, strategy: :one_for_all)
  end

With the registry and dynamic supervisor in place, we can write the bump function:

  def bump(path) when is_binary(path) do
    pid =
      case Registry.lookup(@registry, path) do
        [{pid, _}] ->
          pid

        [] ->
          case DynamicSupervisor.start_child(@supervisor, {@worker, path}) do
            {:ok, pid} -> pid
            {:error, {:already_started, pid}} -> pid
          end
      end

    send(pid, :bump)
  end
end

The bump function looks up in the registry if there is a process for the given path and returns its process identifier (pid). If one does not exist, we ask the worker supervisor to start a worker dynamically. We expect two possible outcomes from start_child:

  • {:ok, pid} - the worker was started

  • {:error, {:already_started, pid}} - a worker for the given path already exists

We need the second branch to address a potential race condition where two users may access a page for the first time at the same time. In this scenario, the Registry.lookup/2 will fail for both them, and both will attempt to spawn the worker. One of them will succeed and the other will return the “already started” error. Once we find the pid, we send it the :bump message.

We are almost there. There are just two steps left. First, we need to configure the worker to register itself whenever it is started. This is done via the start_link function. Let’s go back to the worker and add this:

  @registry Dashbit.Metrics.Registry

  def start_link(path) do
    GenServer.start_link(__MODULE__, path, name: {:via, Registry, {@registry, path}})
  end

Now we just need to start the Dashbit.Metrics supervision tree. This is typically done in your application supervision tree, typically located in “lib/my_app/application.ex”:

  children = [
    Dashbit.Repo,
    Dashbit.Metrics,
    Dashbit.Endpoint
  ]

And that’s it. Now whenever a user accesses a page, we just need to call Dashbit.Metrics.bump(path) where path is the current page address. In our case, we store just the path, without host and without the query string). If you are using Plug, it can be built from the conn.path_info field. We also only perform writes if the page was successfully rendered with 200 status. Overall, our bumping code looks like this:

plug :bump_metric

defp bump_metric(conn, _opts) do
  register_before_send(conn, fn conn ->
    if conn.status == 200 do
      path = "/" <> Enum.join(conn.path_info, "/")
      Dashbit.Metrics.bump(path)
    end

    conn
  end)
end

Summary

In this article we have covered a minimal analytics system, using Ecto, GenServer and Elixir’s Registry, that performs writes asynchronously and in batches. The usage of the Registry to dynamically spawn processes that map to different resources, each with their own life-cycle, can be used in many different scenarios.

One important aspect in our solution is that, after a process for a page is created, it stays alive until there is a new deployment. This works for us because we have less than 100 pages, so we know the maximum number of processes is bound to a very low value.

Although Elixir process are lightweight thanks to the Erlang VM, if we had a large number of pages, such as millions of pages, we could potentially end-up with hundreds of thousands of unused processes. In this case, we would slightly change our solution to terminate the process after every upsert. Something along these lines:

  @impl true
  def handle_info(:upsert, {path, counter}) do
    # We first unregister ourselves so we stop receiving new messages.
    Registry.unregister(@registry, path)

    # Schedule to stop in 2 seconds, this will give us time to process
    # any late messages.
    Process.send_after(self(), :stop, 2_000)
    {:noreply, {path, counter}}
  end

  @impl true
  def handle_info(:stop, {path, counter})
    # Now we just stop. The terminate callback will write all pending writes.
    {:stop, :shutdown, {path, counter}}
  end

That’s it, we hope you have enjoyed the article and learned a thing or two that could be useful in your next project!