Building a new MySQL adapter for Ecto, Part III: DBConnection Integration
Welcome to the “Building a new MySQL adapter for Ecto” series:
- Part I: Hello World
- Part II: Encoding/Decoding
- Part III: DBConnection Integration (you’re here!)
- Part IV: Ecto Integration
In the first two articles of the series we have learned the basic building blocks for interacting with a MySQL server using its binary protocol over TCP. To have a production-quality driver, however, there’s more work to do. Namely, we need to think about:
- maintaining a connection pool to talk to the DB efficiently from multiple processes
- not overloading the DB
- attempting to re-connect to the DB if connection is lost
- supporting common DB features like prepared statements, transactions, and streaming In short, we need: reliability, performance, and first-class support for common DB features. This is where DBConnection comes in.
DBConnection
DBConnection is a behaviour module for implementing efficient database connection client processes, pools and transactions. It has been created by Elixir and Ecto Core Team member James Fish and has been introduced in Ecto v2.0.
Per DBConnection documentation we can see how it addresses concerns mentioned above:
DBConnection handles callbacks differently to most behaviours. Some callbacks will be called in the calling process, with the state copied to and from the calling process. This is useful when the data for a request is large and means that a calling process can interact with a socket directly.
A side effect of this is that query handling can be written in a simple blocking fashion, while the connection process itself will remain responsive to OTP messages and can enqueue and cancel queued requests.
If a request or series of requests takes too long to handle in the client process a timeout will trigger and the socket can be cleanly disconnected by the connection process.
If a calling process waits too long to start its request it will timeout and its request will be cancelled. This prevents requests building up when the database cannot keep up.
If no requests are received for a period of time the connection will trigger an idle timeout and the database can be pinged to keep the connection alive.
Should the connection be lost, attempts will be made to reconnect with (configurable) exponential random backoff to reconnect. All state is lost when a connection disconnects but the process is reused.
The
DBConnection.Query
protocol provide utility functions so that queries can be prepared or encoded and results decoding without blocking the connection or pool.
Let’s see how we can use it!
DBConnection Integration
We will first create a module responsible for implementing DBConnection callbacks:
defmodule MyXQL.Protocol do
use DBConnection
end
When we compile it, we’ll get a bunch of warnings about callbacks that we haven’t implemented yet.
Let’s start with the connect/1
callback and while at it, add some supporting code:
defmodule MyXQL.Error do
defexception [:message]
end
defmodule MyXQL.Protocol do
@moduledoc false
use DBConnection
import MyXQL.Messages
defstruct [:sock]
@impl true
def connect(opts) do
hostname = Keyword.get(opts, :hostname, "localhost")
port = Keyword.get(opts, :port, 3306)
timeout = Keyword.get(opts, :timeout, 5000)
username = Keyword.get(opts, :username, System.get_env("USER")) || raise "username is missing"
sock_opts = [:binary, active: false]
case :gen_tcp.connect(String.to_charlist(hostname), port, sock_opts) do
{:ok, sock} ->
handshake(username, timeout, %__MODULE__{sock: sock})
{:error, reason} ->
{:error, %MyXQL.Error{message: "error when connecting: #{inspect(reason)}"}}
err_packet(message: message) ->
{:error, %MyXQL.Error{message: "error when performing handshake: #{message}"}}
end
end
@impl true
def checkin(state) do
{:ok, state}
end
@impl true
def checkout(state) do
{:ok, state}
end
@impl true
def ping(state) do
{:ok, state}
end
defp handshake(username, timeout, state) do
with {:ok, data} <- :gen_tcp.recv(state.sock, 0, timeout),
initial_handshake_packet() = decode_initial_handshake_packet(data),
data = encode_handshake_response_packet(username),
:ok <- :gen_tcp.send(state.sock, data),
{:ok, data} <- :gen_tcp.recv(state.sock, 0, timeout),
ok_packet() <- decode_handshake_response_packet(data) do
{:ok, sock}
end
end
end
defmodule MyXQL do
@moduledoc "..."
@doc "..."
def start_link(opts) do
DBConnection.start_link(MyXQL.Protocol, opts)
end
end
That’s a lot to unpack so let’s break this down:
-
per documentation,
connect/1
must return{:ok, state}
on success and{:error, exception}
on failure. Our connection state for now will be just the socket. (In a complete driver we’d use the state to manage prepared transaction references, status of transaction etc.) On error, we return an exception. -
we extract configuration from keyword list
opts
and provide sane defaults * we try to connect to the TCP server and if successful, perform the handshake. -
as we’ve learned in part I, the handshake goes like this: after connecting to the socket, we receive the “Initial Handshake Packet”. Then, we send “Handshake Response” packet. At the end, we receive the response and decode the result which can be an “OK Pacet” or an “ERR Packet”. If we receive any socket errors, we ignore them for now. We’ll talk about handling them better later on.
-
finally, we introduce a public
MyXQL.start_link/1
that is an entry point to the driver -
we also provide minimal implementations for
checkin/1
,checkout/1
andping/1
callbacks It’s worth taking a step back at looking at our overall design: -
MyXQL
module exposes a small public API and calls into an internal module -
MyXQL.Protocol
implementsDBConnection
behaviour and is the place where all side-effects are being handled -
MyXQL.Messages
implements pure functions for encoding and decoding packets This separation is really important. By keeping protocol data separate from protocol interactions code we have a codebase that’s much easier to understand and maintain.
Prepared Statements
Let’s take a look at handle_prepare/3
and handle_execute/4
callbacks that are used to
handle prepared statements:
iex> b DBConnection.handle_prepare
@callback handle_prepare(query(), opts :: Keyword.t(), state :: any()) ::
{:ok, query(), new_state :: any()}
| {:error | :disconnect, Exception.t(), new_state :: any()}
Prepare a query with the database. Return {:ok, query, state} where query is a
query to pass to execute/4 or close/3, {:error, exception, state} to return an
error and continue or {:disconnect, exception, state} to return an error and
disconnect.
This callback is intended for cases where the state of a connection is needed
to prepare a query and/or the query can be saved in the database to call later.
This callback is called in the client process.
iex> b DBConnection.handle_execute
@callback handle_execute(query(), params(), opts :: Keyword.t(), state :: any()) ::
{:ok, query(), result(), new_state :: any()}
| {:error | :disconnect, Exception.t(), new_state :: any()}
Execute a query prepared by c:handle_prepare/3. Return {:ok, query, result,
state} to return altered query query and result result and continue, {:error,
exception, state} to return an error and continue or {:disconnect, exception,
state} to return an error and disconnect.
This callback is called in the client process.
Notice the callbacks reference types like: query()
, result()
and params()
.
Let’s take a look at them too:
iex> t DBConnection.result
@type result() :: any()
iex> t DBConnection.params
@type params() :: any()
iex> t DBConnection.query
@type query() :: DBConnection.Query.t()
As far as DBConnection is concerned, result()
and params()
can be any term (it’s up to us to define these) and the query()
must implement the DBConnection.Query
protocol.
DBConnection.Query
is used for preparing queries, encoding their params, and decoding their
results. Let’s define query and result structs as well as minimal protocol implementation.
defmodule MyXQL.Result do
defstruct [:columns, :rows]
end
defmodule MyXQL.Query do
defstruct [:statement, :statement_id]
defimpl DBConnection.Query do
def parse(query, _opts), do: query
def describe(query, _opts), do: query
def encode(_query, params, _opts), do: params
def decode(_query, result, _opts), do: result
end
end
Let’s define the first callback, handle_prepare/3
:
defmodule MyXQL.Protocol do
# ...
@impl true
def handle_prepare(%MyXQL.Query{statement: statement}, _opts, state) do
data = encode_com_stmt_prepare(query.statement)
with :ok <- sock_send(data, state),
{:ok, data} <- sock_recv(state),
com_stmt_prepare_ok(statement_id: statement_id) <- decode_com_stmt_prepare_response(data) do
query = %{query | statement_id: statement_id}
{:ok, query, state}
else
err_packet(message: message) ->
{:error, %MyXQL.Error{message: "error when preparing query: #{message}"}, state}
{:error, reason} ->
{:disconnect, %MyXQL.Error{message: "error when preparing query: #{inspect(reason)}"}, state}
end
end
defp sock_send(data, state), do: :gen_tcp.recv(state.sock, data, :infinity)
defp sock_recv(state), do: :gen_tcp.recv(state.sock, :infinity)
end
The callback receives query
, opts
(which we ignore), and state
. We encode the query statement into COM_STMT_PREPARE
packet, send it, receive response, decode the COM_STMT_PREPARE Response
, and put the retrieved statement_id
into our query struct.
If we receive an ERR Packet
, we put the error message into our MyXQL.Error
exception and return that.
The only places that we could get {:error, reason}
tuple is we could get it from are the gen_tcp.send,recv
calls - if we get an error there it means there might be something wrong with the socket. By returning {:disconnect, _, _}
, DBConnection will take care of closing the socket and will attempt to re-connect with a new one.
Note, we set timeout
to :infinity
on our send/recv calls. That’s because DBConnection is managing the process these calls will be executed in and it maintains it’s own timeouts. (And if we hit these timeouts, it cleans up the socket automatically.)
Let’s now define the handle_execute/4
callback:
defmodule MyXQL.Protocol do
# ...
@impl true
def handle_execute(%{statement_id: statement_id} = query, params, _opts, state)
when is_integer(statement_id) do
data = encode_com_stmt_execute(statement_id, params)
with :ok <- sock_send(state, data),
{:ok, data} <- sock_recv(state),
resultset(columns: columns, rows: rows) = decode_com_stmt_execute_response() do
columns = Enum.map(columns, &column_definition(&1, :name))
result = %MyXQL.Result{columns: columns, rows: rows}
{:ok, query, result, state}
else
err_packet(message: message) ->
{:error, %MyXQL.Error{message: "error when preparing query: #{message}"}, state}
{:error, reason} ->
{:disconnect, %MyXQL.Error{message: "error when preparing query: #{inspect(reason)}"}, state}
end
end
end
defmodule MyXQL.Messages do
# ...
# https://dev.mysql.com/doc/internals/en/com-query-response.html#packet-ProtocolText::Resultset
defrecord :resultset, [:column_count, :columns, :row_count, :rows, :warning_count, :status_flags]
def decode_com_stmt_prepare_response(data) do
# ...
resultset(...)
end
# https://dev.mysql.com/doc/internals/en/com-query-response.html#packet-Protocol::ColumnDefinition41
defrecord :column_definition, [:name, :type]
end
Let’s break this down.
handle_execute/4
receives an already prepared query, params
to encode, opts, and the state.
Similarly to handle_prepare/3
, we encode the COM_STMT_EXECUTE
packet, send it and receive a response, decode COM_STMT_EXECUTE Response
, into a resultset
record, and finally build the result struct.
Same as last time, if we get an ERR Packet
we return an {:error, _, _}
response; on socket problems, we simply disconnect and let DBConnection handle re-connecting at later time.
We’ve mentioned that the DBConnection.Query
protocol is used to prepare queries, and in fact we could perform encoding of params and decoding the result in implementation functions. We’ve left that part out for brevity.
Finally, let’s add a public function that users of the driver will use:
defmodule MyXQL do
# ...
def prepare_execute(conn, statement, params, opts) do
query = %MyXQL.Query{statement: statement}
DBConnection.prepare_execute(conn, query, params, opts)
end
end
and see it all working.
iex> {:ok, pid} = MyXQL.start_link([])
iex> MyXQL.prepare_execute(pid, "SELECT ?", [42], [])
{:ok, %MyXQL.Query{statement: "SELECT ? + ?", statement_id: 1},
%MyXQL.Result{columns: ["? + ?"], rows: [[5]]}}
Arguments to MyXQL.start_link
are passed down to
DBConnection.start_link/2
,
so starting a pool of 2 connections is as simple as:
iex> {:ok, pid} = MyXQL.start_link(pool_size: 2)
{:ok, #PID<0.264.0>}
Conclusion
In this article, we’ve seen a sneak peek of integration with the DBConnection library. It gave us many benefits:
- a battle-tested connection pool without writing a single line of pooling code
-
we can use blocking
:gen_tcp
functions without worrying about OTP messages and timeouts; DBConnection will handle these - automatic re-connection, backoff etc
- a way to structure our code
With this, we’re almost done with our adapter series. In the final article we’ll use our driver as an Ecto adapter. Stay tuned!
P.S.: This post was originally published on Plataformatec’s blog.