Back-Pressure Queuing System in Elixir

Table of contents

    Back-Pressure Queuing System in Elixir

    At PSPDFKit, we build reliable and performant distributed systems with Elixir(opens in a new tab). In such systems, we often have to call external services, and asynchronous messaging allows clients to make these calls and not have to wait for the response. Messages that can’t be handled right away will be queued up and handled later, but what happens when the queue gets overloaded? Since we don’t want our system to crash, we have to use a back-pressure mechanism, which prevents the queue from growing indefinitely. This blog post explains how you can apply a back-pressure mechanism to your Elixir application with the sbroker(opens in a new tab) library.

    Using the sbroker Erlang Library in an Elixir Application

    The sbroker Erlang(opens in a new tab) library provides building blocks for creating a pool and/or a load regulator. It uses the broker pattern(opens in a new tab), where the communication to the service worker is handled by a broker that is responsible for the coordination between workers and the calls to them.

    A Simple Example

    Let’s look at a simple example of how to use the sbroker library in an Elixir application.

    First we run the following command in the command line:

    mix new example

    This will create an example project with the name “example” in the current directory. In the example, we will mock a call to external service in a worker and handle the communication by a broker. To do this, we edit the example/mix.exs file in order to add the sbroker library to our application:

    defmodule Example.Mixfile do
    use Mix.Project
    def project do
    [
    app: :example,
    version: "0.1.0",
    elixir: "~> 1.5",
    start_permanent: Mix.env == :prod,
    deps: deps()
    ]
    end
    def application do
    [
    applications: [:sbroker],
    extra_applications: [:logger],
    mod: {Example, []}
    ]
    end
    defp deps do
    [{:sbroker, "~> 1.0-beta"}]
    end
    end

    We added the sbroker library to our dependencies and applications above. We also referenced our application module on line 19 with mod: {Example, []}, which we will create later. Now we are ready to add a broker, so we create our broker module in example/lib/example/broker.ex:

    defmodule Example.Broker do
    @behaviour :sbroker
    def start_link() do
    start_link(timeout: 10000)
    end
    def start_link(opts) do
    :sbroker.start_link({:local, __MODULE__}, __MODULE__, opts, [])
    end
    def init(opts) do
    # See `DBConnection.Sojourn.Broker`.
    # Make the "left" side of the broker a FIFO queue that drops the request after the timeout is reached.
    client_queue =
    {:sbroker_timeout_queue,
    %{
    out: :out,
    timeout: opts[:timeout],
    drop: :drop,
    min: 0,
    max: 128
    }}
    # Make the "right" side of the broker a FIFO queue that has no timeout.
    worker_queue =
    {:sbroker_drop_queue,
    %{
    out: :out_r,
    drop: :drop,
    timeout: :infinity
    }}
    {:ok, {client_queue, worker_queue, []}}
    end
    end

    The module above implements the sbroker behavior. We start the broker in line 9 and set the timeout in the options to 10 seconds. This means that calls get dropped when they stay in the queue for more than 10 seconds while waiting for a worker. In the init\1 function, we define the client and the worker queue for the broker. After we define the broker module, we need to define the worker module — which is responsible for defining a worker — and ask the broker for jobs. We define the worker module in example/lib/example/worker.ex:

    defmodule Example.Worker do
    use GenServer
    alias Example.{Broker}
    def start_link() do
    GenServer.start_link(__MODULE__, [])
    end
    #
    # GenServer callbacks
    #
    def init([]) do
    state =
    ask(%{
    tag: make_ref()
    })
    {:ok, state}
    end
    def handle_info({tag, {:go, ref, {pid, {:fetch, [params]}}, _, _}}, %{tag: tag} = s) do
    send(pid, {ref, fetch_from_external_resource(params)})
    {:noreply, ask(s)}
    end
    # When sbroker has found a match, it'll send us `{tag, {:go, ref, req, _, _}}`.
    defp ask(%{tag: tag} = s) do
    {:await, ^tag, _} = :sbroker.async_ask_r(Broker, self(), {self(), tag})
    s
    end
    defp fetch_from_external_resource(params) do
    # Pretend to do work
    Process.sleep(1000)
    {:ok, "External service called with #{inspect(params)}"}
    end
    end

    The fetchfrom_external_resource/1 function is a simple mocking function that will make the process wait for one second and then return {:ok, "External service called with #{inspect(params)}"}. This function will be called when the worker GenServer receives the {tag, {:go, ref, {pid, {:fetch, [params]}},_, _}} message. The tag variable in this tuple is a unique identifier that is needed to identify the worker and is saved in the GenServer’s state.

    After the worker has fetched the data, it asks for a new job from the broker. Because we have defined the broker and the worker module, we can now define a supervisor, which should start the broker and a pool of workers. The supervisor is defined in example/lib/example/supervisor.ex:

    defmodule Example.Supervisor do
    use Supervisor
    alias Example.{Broker, Worker}
    def start_link() do
    Supervisor.start_link(__MODULE__, [])
    end
    def init(_args) do
    pool_size = 5
    broker = worker(Broker, [], id: :broker)
    workers =
    for id <- 1..pool_size do
    worker(Worker, [], id: id)
    end
    worker_sup_opts = [strategy: :one_for_one, max_restarts: pool_size]
    worker_sup = supervisor(Supervisor, [workers, worker_sup_opts], id: :workers)
    supervise([broker, worker_sup], strategy: :one_for_one)
    end
    end

    In this example, our worker pool consists of five workers. We are almost done with it, but we still need to create the application module in example/lib/example.ex:

    defmodule Example do
    use Application
    alias Example.{Broker}
    def start(_type, _args) do
    Example.Supervisor.start_link()
    end
    def fetch_from_external_resource(params) do
    perform({:fetch, [params]})
    |> inspect()
    |> IO.puts()
    end
    defp perform({action, args} = params) do
    case :sbroker.ask(Broker, {self(), params}) do
    {:go, ref, worker, _, _queue_time} ->
    monitor = Process.monitor(worker)
    receive do
    {^ref, result} ->
    Process.demonitor(monitor, [:flush])
    result
    {:DOWN, ^monitor, _, _, reason} ->
    exit({reason, {__MODULE__, action, args}})
    end
    {:drop, _time} ->
    {:error, :overload}
    end
    end
    end

    This module starts the supervisor and has a function, fetch_from_external_resource\1, which will ask the broker for a worker and sends the {:fetch, [params]} message to the worker when the broker can assign a worker to our call. When the broker cannot assign a worker, the response will be {:drop, time}, and our private perform\1 function will return {:error, :overload}. The fetch_from_external_resource\1 function will also print either the response from the worker or {:error, :overload} if the broker dropped the request.

    We can now test this example in iex by running this:

    iex -S mix run

    Then we can fetch data from the external resource by running the following:

    Example.fetch_from_external_resource("test")

    This will print the following output to iex after one second:

    {:ok, "External service called with \"test\""}
    :ok

    To simulate and test more calls, we can call Example.fetch_from_external_resource("test") multiple times in parallel by running this:

    Enum.each(1..500, fn _ ->
    Task.start(fn ->
    Example.fetch_from_external_resource("test")
    end)
    end)

    This will print the same line and increase the count by five lines at a time, because our example worker pool consists of five workers. We will also get {:error, :overload} responses because the broker could not assign a worker and the task was waiting in the queue for too long. The {:error, :overload} responses are examples of back-pressure that has been applied to prevent an overload to the external service. Our system could now, for example, reply to the clients requesting the service with HTTP/1.1 429 Too Many Requests, and it would not crash because of overload.

    Conclusion

    Using asynchronous processes with worker pools and queues is a great way to scale a system, but because we don’t want our system to crash, we should also think of ways to handle overload. One way is to apply a back-pressure mechanism to our application. In an Elixir application, this can be done easily using the sbroker library.

    Maximilian Störchle

    Maximilian Störchle

    Explore related topics

    FREE TRIAL Ready to get started?