Building a new MySQL adapter for Ecto, Part III: DBConnection Integration

Welcome to the “Building a new MySQL adapter for Ecto” series:

In the first two articles of the series we have learned the basic building blocks for interacting with a MySQL server using its binary protocol over TCP. To have a production-quality driver, however, there’s more work to do. Namely, we need to think about:

  • maintaining a connection pool to talk to the DB efficiently from multiple processes
  • not overloading the DB
  • attempting to re-connect to the DB if connection is lost
  • supporting common DB features like prepared statements, transactions, and streaming In short, we need: reliability, performance, and first-class support for common DB features. This is where DBConnection comes in.

DBConnection

DBConnection is a behaviour module for implementing efficient database connection client processes, pools and transactions. It has been created by Elixir and Ecto Core Team member James Fish and has been introduced in Ecto v2.0.

Per DBConnection documentation we can see how it addresses concerns mentioned above:

DBConnection handles callbacks differently to most behaviours. Some callbacks will be called in the calling process, with the state copied to and from the calling process. This is useful when the data for a request is large and means that a calling process can interact with a socket directly.

A side effect of this is that query handling can be written in a simple blocking fashion, while the connection process itself will remain responsive to OTP messages and can enqueue and cancel queued requests.

If a request or series of requests takes too long to handle in the client process a timeout will trigger and the socket can be cleanly disconnected by the connection process.

If a calling process waits too long to start its request it will timeout and its request will be cancelled. This prevents requests building up when the database cannot keep up.

If no requests are received for a period of time the connection will trigger an idle timeout and the database can be pinged to keep the connection alive.

Should the connection be lost, attempts will be made to reconnect with (configurable) exponential random backoff to reconnect. All state is lost when a connection disconnects but the process is reused.

The DBConnection.Query protocol provide utility functions so that queries can be prepared or encoded and results decoding without blocking the connection or pool.

Let’s see how we can use it!

DBConnection Integration

We will first create a module responsible for implementing DBConnection callbacks:

defmodule MyXQL.Protocol do
  use DBConnection
end

When we compile it, we’ll get a bunch of warnings about callbacks that we haven’t implemented yet.

Let’s start with the connect/1 callback and while at it, add some supporting code:

defmodule MyXQL.Error do
  defexception [:message]
end

defmodule MyXQL.Protocol do
  @moduledoc false
  use DBConnection
  import MyXQL.Messages
  defstruct [:sock]

  @impl true
  def connect(opts) do
    hostname = Keyword.get(opts, :hostname, "localhost")
    port = Keyword.get(opts, :port, 3306)
    timeout = Keyword.get(opts, :timeout, 5000)
    username = Keyword.get(opts, :username, System.get_env("USER")) || raise "username is missing"
    sock_opts = [:binary, active: false]

    case :gen_tcp.connect(String.to_charlist(hostname), port, sock_opts) do
      {:ok, sock} ->
        handshake(username, timeout, %__MODULE__{sock: sock})

      {:error, reason} ->
        {:error, %MyXQL.Error{message: "error when connecting: #{inspect(reason)}"}}

      err_packet(message: message) ->
        {:error, %MyXQL.Error{message: "error when performing handshake: #{message}"}}
    end
  end

  @impl true
  def checkin(state) do
    {:ok, state}
  end

  @impl true
  def checkout(state) do
    {:ok, state}
  end

  @impl true
  def ping(state) do
    {:ok, state}
  end

  defp handshake(username, timeout, state) do
    with {:ok, data} <- :gen_tcp.recv(state.sock, 0, timeout),
         initial_handshake_packet() = decode_initial_handshake_packet(data),
         data = encode_handshake_response_packet(username),
         :ok <- :gen_tcp.send(state.sock, data),
         {:ok, data} <- :gen_tcp.recv(state.sock, 0, timeout),
         ok_packet() <- decode_handshake_response_packet(data) do
      {:ok, sock}
    end
  end
end

defmodule MyXQL do
  @moduledoc "..."

  @doc "..."
  def start_link(opts) do
    DBConnection.start_link(MyXQL.Protocol, opts)
  end
end

That’s a lot to unpack so let’s break this down:

  • per documentation, connect/1 must return {:ok, state} on success and {:error, exception} on failure. Our connection state for now will be just the socket. (In a complete driver we’d use the state to manage prepared transaction references, status of transaction etc.) On error, we return an exception.

  • we extract configuration from keyword list opts and provide sane defaults * we try to connect to the TCP server and if successful, perform the handshake.

  • as we’ve learned in part I, the handshake goes like this: after connecting to the socket, we receive the “Initial Handshake Packet”. Then, we send “Handshake Response” packet. At the end, we receive the response and decode the result which can be an “OK Pacet” or an “ERR Packet”. If we receive any socket errors, we ignore them for now. We’ll talk about handling them better later on.

  • finally, we introduce a public MyXQL.start_link/1 that is an entry point to the driver

  • we also provide minimal implementations for checkin/1, checkout/1 and ping/1 callbacks It’s worth taking a step back at looking at our overall design:

  • MyXQL module exposes a small public API and calls into an internal module

  • MyXQL.Protocol implements DBConnection behaviour and is the place where all side-effects are being handled

  • MyXQL.Messages implements pure functions for encoding and decoding packets This separation is really important. By keeping protocol data separate from protocol interactions code we have a codebase that’s much easier to understand and maintain.

Prepared Statements

Let’s take a look at handle_prepare/3 and handle_execute/4 callbacks that are used to handle prepared statements:

iex> b DBConnection.handle_prepare
@callback handle_prepare(query(), opts :: Keyword.t(), state :: any()) ::
            {:ok, query(), new_state :: any()}
            | {:error | :disconnect, Exception.t(), new_state :: any()}
Prepare a query with the database. Return {:ok, query, state} where query is a
query to pass to execute/4 or close/3, {:error, exception, state} to return an
error and continue or {:disconnect, exception, state} to return an error and
disconnect.
This callback is intended for cases where the state of a connection is needed
to prepare a query and/or the query can be saved in the database to call later.
This callback is called in the client process.
iex> b DBConnection.handle_execute
@callback handle_execute(query(), params(), opts :: Keyword.t(), state :: any()) ::
            {:ok, query(), result(), new_state :: any()}
            | {:error | :disconnect, Exception.t(), new_state :: any()}
Execute a query prepared by c:handle_prepare/3. Return {:ok, query, result,
state} to return altered query query and result result and continue, {:error,
exception, state} to return an error and continue or {:disconnect, exception,
state} to return an error and disconnect.
This callback is called in the client process.

Notice the callbacks reference types like: query(), result() and params(). Let’s take a look at them too:

iex> t DBConnection.result
@type result() :: any()
iex> t DBConnection.params
@type params() :: any()
iex> t DBConnection.query
@type query() :: DBConnection.Query.t()

As far as DBConnection is concerned, result() and params() can be any term (it’s up to us to define these) and the query() must implement the DBConnection.Query protocol.

DBConnection.Query is used for preparing queries, encoding their params, and decoding their results. Let’s define query and result structs as well as minimal protocol implementation.

defmodule MyXQL.Result do
  defstruct [:columns, :rows]
end

defmodule MyXQL.Query do
  defstruct [:statement, :statement_id]

  defimpl DBConnection.Query do
    def parse(query, _opts), do: query

    def describe(query, _opts), do: query

    def encode(_query, params, _opts), do: params

    def decode(_query, result, _opts), do: result
  end
end

Let’s define the first callback, handle_prepare/3:

defmodule MyXQL.Protocol do
  # ...

  @impl true
  def handle_prepare(%MyXQL.Query{statement: statement}, _opts, state) do
    data = encode_com_stmt_prepare(query.statement)

    with :ok <- sock_send(data, state),
         {:ok, data} <- sock_recv(state),
         com_stmt_prepare_ok(statement_id: statement_id) <- decode_com_stmt_prepare_response(data) do
      query = %{query | statement_id: statement_id}
      {:ok, query, state}
    else
      err_packet(message: message) ->
        {:error, %MyXQL.Error{message: "error when preparing query: #{message}"}, state}

      {:error, reason} ->
        {:disconnect, %MyXQL.Error{message: "error when preparing query: #{inspect(reason)}"}, state}
    end
  end

  defp sock_send(data, state), do: :gen_tcp.recv(state.sock, data, :infinity)

  defp sock_recv(state), do: :gen_tcp.recv(state.sock, :infinity)
end

The callback receives query, opts (which we ignore), and state. We encode the query statement into COM_STMT_PREPARE packet, send it, receive response, decode the COM_STMT_PREPARE Response, and put the retrieved statement_id into our query struct.

If we receive an ERR Packet, we put the error message into our MyXQL.Error exception and return that.

The only places that we could get {:error, reason} tuple is we could get it from are the gen_tcp.send,recv calls - if we get an error there it means there might be something wrong with the socket. By returning {:disconnect, _, _}, DBConnection will take care of closing the socket and will attempt to re-connect with a new one.

Note, we set timeout to :infinity on our send/recv calls. That’s because DBConnection is managing the process these calls will be executed in and it maintains it’s own timeouts. (And if we hit these timeouts, it cleans up the socket automatically.)

Let’s now define the handle_execute/4 callback:

defmodule MyXQL.Protocol do
  # ...

  @impl true
  def handle_execute(%{statement_id: statement_id} = query, params, _opts, state)
      when is_integer(statement_id) do
    data = encode_com_stmt_execute(statement_id, params)

    with :ok <- sock_send(state, data),
         {:ok, data} <- sock_recv(state),
         resultset(columns: columns, rows: rows) = decode_com_stmt_execute_response() do
      columns = Enum.map(columns, &column_definition(&1, :name))
      result = %MyXQL.Result{columns: columns, rows: rows}
      {:ok, query, result, state}
    else
      err_packet(message: message) ->
        {:error, %MyXQL.Error{message: "error when preparing query: #{message}"}, state}

      {:error, reason} ->
        {:disconnect, %MyXQL.Error{message: "error when preparing query: #{inspect(reason)}"}, state}
    end
  end
end

defmodule MyXQL.Messages do
  # ...

  # https://dev.mysql.com/doc/internals/en/com-query-response.html#packet-ProtocolText::Resultset
  defrecord :resultset, [:column_count, :columns, :row_count, :rows, :warning_count, :status_flags]

  def decode_com_stmt_prepare_response(data) do
    # ...
    resultset(...)
  end

  # https://dev.mysql.com/doc/internals/en/com-query-response.html#packet-Protocol::ColumnDefinition41
  defrecord :column_definition, [:name, :type]
end

Let’s break this down.

handle_execute/4 receives an already prepared query, params to encode, opts, and the state.

Similarly to handle_prepare/3, we encode the COM_STMT_EXECUTE packet, send it and receive a response, decode COM_STMT_EXECUTE Response, into a resultset record, and finally build the result struct.

Same as last time, if we get an ERR Packet we return an {:error, _, _} response; on socket problems, we simply disconnect and let DBConnection handle re-connecting at later time.

We’ve mentioned that the DBConnection.Query protocol is used to prepare queries, and in fact we could perform encoding of params and decoding the result in implementation functions. We’ve left that part out for brevity.

Finally, let’s add a public function that users of the driver will use:

defmodule MyXQL do
  # ...

  def prepare_execute(conn, statement, params, opts) do
    query = %MyXQL.Query{statement: statement}
    DBConnection.prepare_execute(conn, query, params, opts)
  end
end

and see it all working.

iex> {:ok, pid} = MyXQL.start_link([])
iex> MyXQL.prepare_execute(pid, "SELECT ?", [42], [])
{:ok, %MyXQL.Query{statement: "SELECT ? + ?", statement_id: 1},
%MyXQL.Result{columns: ["? + ?"], rows: [[5]]}}

Arguments to MyXQL.start_link are passed down to DBConnection.start_link/2, so starting a pool of 2 connections is as simple as:

iex> {:ok, pid} = MyXQL.start_link(pool_size: 2)
{:ok, #PID<0.264.0>}

Conclusion

In this article, we’ve seen a sneak peek of integration with the DBConnection library. It gave us many benefits:

  • a battle-tested connection pool without writing a single line of pooling code
  • we can use blocking :gen_tcp functions without worrying about OTP messages and timeouts; DBConnection will handle these
  • automatic re-connection, backoff etc
  • a way to structure our code

With this, we’re almost done with our adapter series. In the final article we’ll use our driver as an Ecto adapter. Stay tuned!

P.S.: This post was originally published on Plataformatec’s blog.