Implementing a back-pressure queuing system in Elixir
At PSPDFKit, we build reliable and performant distributed systems with Elixir. In such systems, we often have to call external services, and asynchronous messaging allows clients to make these calls and not have to wait for the response. Messages that can’t be handled right away will be queued up and handled later, but what happens when the queue gets overloaded? Since we don’t want our system to crash, we have to use a back-pressure mechanism, which prevents the queue from growing indefinitely. This blog post explains how you can apply a back-pressure mechanism to your Elixir application with the sbroker library.
Using the sbroker Erlang Library in an Elixir Application
The sbroker Erlang library provides building blocks for creating a pool and/or a load regulator. It uses the broker pattern, where the communication to the service worker is handled by a broker that is responsible for the coordination between workers and the calls to them.
A Simple Example
Let’s look at a simple example of how to use the sbroker library in an Elixir application.
First we run the following command in the command line:
mix new example
This will create an example project with the name “example” in the current directory. In the example, we will mock a call to external service in a worker and handle the communication by a broker. To do this, we edit the example/mix.exs
file in order to add the sbroker library to our application:
defmodule Example.Mixfile do use Mix.Project def project do [ app: :example, version: "0.1.0", elixir: "~> 1.5", start_permanent: Mix.env == :prod, deps: deps() ] end def application do [ applications: [:sbroker], extra_applications: [:logger], mod: {Example, []} ] end defp deps do [{:sbroker, "~> 1.0-beta"}] end end
We added the sbroker library to our dependencies and applications above. We also referenced our application module on line 19 with mod: {Example, []}
, which we will create later. Now we are ready to add a broker, so we create our broker module in example/lib/example/broker.ex
:
defmodule Example.Broker do @behaviour :sbroker def start_link() do start_link(timeout: 10000) end def start_link(opts) do :sbroker.start_link({:local, __MODULE__}, __MODULE__, opts, []) end def init(opts) do # See `DBConnection.Sojourn.Broker`. # Make the "left" side of the broker a FIFO queue that drops the request after the timeout is reached. client_queue = {:sbroker_timeout_queue, %{ out: :out, timeout: opts[:timeout], drop: :drop, min: 0, max: 128 }} # Make the "right" side of the broker a FIFO queue that has no timeout. worker_queue = {:sbroker_drop_queue, %{ out: :out_r, drop: :drop, timeout: :infinity }} {:ok, {client_queue, worker_queue, []}} end end
The module above implements the sbroker behavior. We start the broker in line 9 and set the timeout in the options to 10 seconds. This means that calls get dropped when they stay in the queue for more than 10 seconds while waiting for a worker. In the init\1
function, we define the client and the worker queue for the broker. After we define the broker module, we need to define the worker module — which is responsible for defining a worker — and ask the broker for jobs. We define the worker module in example/lib/example/worker.ex
:
defmodule Example.Worker do use GenServer alias Example.{Broker} def start_link() do GenServer.start_link(__MODULE__, []) end # # GenServer callbacks # def init([]) do state = ask(%{ tag: make_ref() }) {:ok, state} end def handle_info({tag, {:go, ref, {pid, {:fetch, [params]}}, _, _}}, %{tag: tag} = s) do send(pid, {ref, fetch_from_external_resource(params)}) {:noreply, ask(s)} end # When sbroker has found a match, it'll send us `{tag, {:go, ref, req, _, _}}`. defp ask(%{tag: tag} = s) do {:await, ^tag, _} = :sbroker.async_ask_r(Broker, self(), {self(), tag}) s end defp fetch_from_external_resource(params) do # Pretend to do work Process.sleep(1000) {:ok, "External service called with #{inspect(params)}"} end end
The fetchfrom_external_resource/1
function is a simple mocking function that will make the process wait for one second and then return {:ok, "External service called with #{inspect(params)}"}
. This function will be called when the worker GenServer receives the {tag, {:go, ref, {pid, {:fetch, [params]}},_, _}}
message. The tag variable in this tuple is a unique identifier that is needed to identify the worker and is saved in the GenServer’s state.
After the worker has fetched the data, it asks for a new job from the broker. Because we have defined the broker and the worker module, we can now define a supervisor, which should start the broker and a pool of workers. The supervisor is defined in example/lib/example/supervisor.ex
:
defmodule Example.Supervisor do use Supervisor alias Example.{Broker, Worker} def start_link() do Supervisor.start_link(__MODULE__, []) end def init(_args) do pool_size = 5 broker = worker(Broker, [], id: :broker) workers = for id <- 1..pool_size do worker(Worker, [], id: id) end worker_sup_opts = [strategy: :one_for_one, max_restarts: pool_size] worker_sup = supervisor(Supervisor, [workers, worker_sup_opts], id: :workers) supervise([broker, worker_sup], strategy: :one_for_one) end end
In this example, our worker pool consists of five workers. We are almost done with it, but we still need to create the application module in example/lib/example.ex
:
defmodule Example do use Application alias Example.{Broker} def start(_type, _args) do Example.Supervisor.start_link() end def fetch_from_external_resource(params) do perform({:fetch, [params]}) |> inspect() |> IO.puts() end defp perform({action, args} = params) do case :sbroker.ask(Broker, {self(), params}) do {:go, ref, worker, _, _queue_time} -> monitor = Process.monitor(worker) receive do {^ref, result} -> Process.demonitor(monitor, [:flush]) result {:DOWN, ^monitor, _, _, reason} -> exit({reason, {__MODULE__, action, args}}) end {:drop, _time} -> {:error, :overload} end end end
This module starts the supervisor and has a function, fetch_from_external_resource\1
, which will ask the broker for a worker and sends the {:fetch, [params]}
message to the worker when the broker can assign a worker to our call. When the broker cannot assign a worker, the response will be {:drop, time}
, and our private perform\1
function will return {:error, :overload}
. The fetch_from_external_resource\1
function will also print either the response from the worker or {:error, :overload}
if the broker dropped the request.
We can now test this example in iex
by running this:
iex -S mix run
Then we can fetch data from the external resource by running the following:
Example.fetch_from_external_resource("test")
This will print the following output to iex
after one second:
{:ok, "External service called with \"test\""} :ok
To simulate and test more calls, we can call Example.fetch_from_external_resource("test")
multiple times in parallel by running this:
Enum.each(1..500, fn _ -> Task.start(fn -> Example.fetch_from_external_resource("test") end) end)
This will print the same line and increase the count by five lines at a time, because our example worker pool consists of five workers. We will also get {:error, :overload}
responses because the broker could not assign a worker and the task was waiting in the queue for too long. The {:error, :overload}
responses are examples of back-pressure that has been applied to prevent an overload to the external service. Our system could now, for example, reply to the clients requesting the service with HTTP/1.1 429 Too Many Requests
, and it would not crash because of overload.
Conclusion
Using asynchronous processes with worker pools and queues is a great way to scale a system, but because we don’t want our system to crash, we should also think of ways to handle overload. One way is to apply a back-pressure mechanism to our application. In an Elixir application, this can be done easily using the sbroker library.