Milton Mazzarri
Milton Mazzarri
14 min read

Tags

In this article, I will continue talking about Oban, but I’ll focus on how to test your workers and, also, your configuration.

This article is the second one of a series about Oban, an Elixir library for background job processing:

Testing the implementation of the Oban.Worker behaviour

Before implementing an Oban Worker, I start writing the unit test that will insert and execute the job; these tests are usually short because your workers should be as lean as possible, I tend to treat Workers as Resolvers or Controllers that orchestrate a series of actions that, most of the time, execute functions that are already being tested and are part of the business logic layer. It would also help validate that the job is running the side effects in these unit tests.

defmodule MyApp.Migrations.MyDataMigrationWorkerTest do
  use ExUnit.Case, async: true 
  use Oban.Testing, repo: MyApp.Repo

  alias MyApp.Migrations.MyDataMigrationWorker 

  describe "perform/1" do
    test "performs data migration" do
      # prepare your test
      assert {:ok, result} = perform_job(MyDataMigrationWorker, %{"my_arg" => 1})
      # assert the side-effects of the Oban Worker
    end
  end
end

In the previous test, we used the perform_job/2 helper from the Oban.Testing module, this helper constructs a job and execute it with the given worker module. Apart from reducing the ceremony when constructing jobs for unit test, one of the neat things about perform_job/2 is that it does some assertions for us.

From the docs, we have the following:

  • That the worker implements the Oban.Worker behaviour
  • That the options provided build a valid job
  • That the return is valid, e.g. :ok, {:ok, value}, {:error, value}, etc.

If all of the assertions pass, the function returns the result of perform/1 for you to make additional assertions.

Apart from perform_job/2,3 there are other testing helpers such as: assert_enqueued/2,3, all_enqueued/2, refute_enqueued/2,3. For more information please check the Oban.Testing documentation.

It is important to note that if you want to test a worker included in Oban Pro like the Batch, Chunk, or Workflow, you should import the module Oban.Pro.Testing and use the process_job/2,3 helper instead:

import Oban.Pro.Testing

alias MyApp.MyBatchWorker

describe "process/1" do
  test "archives the given thing" do
    # prepare your test
    assert :ok == process_job(MyBatchWorkerBatch, %{thing_id: thing.id})
    # assert the side-effects of the Oban Batch Worker
  end
end

One important thing about testing, don’t forget to test your Oban configuration. Let’s talk about it next.

Testing your Oban Configuration

Do not wait until you deploy your application into your staging environment to catch errors in your Oban configuration, try to catch those errors as soon as possible; you can start by doing:

defmodule MyApp.ObanConfigTest do
  use ExUnit.Case, async: true

  test "check oban configuration" do
    [config, prod] =
      Enum.map(["config/config.exs", "config/prod.exs"], fn path ->
        path
        |> Config.Reader.read!(imports: [], env: :prod, target: :host)
        |> get_in([:my_app, Oban])
      end)

    assert %Oban.Config{plugins: _plugins} =
             config
             |> Keyword.merge(prod)
             |> Oban.Config.new()
  end
end

NOTE: In the previous test, I merged the Oban config that comes from config/config.exs and config/prod.exs; you should adapt this test based on your scenario.

But, for the previous unit test to make sense, you need to know a bit about the internal implementation of Oban. Well, the gist is this, when you try to start the supervision tree for Oban, it first tries to create an Oban.Config struct via Oban.Config.new/1, and if you supply the wrong configuration, it will blow up.

For example, let’s assume you made a typo in the repo option:

$ mix test test/my_app/oban_config_test.exs

1) test check oban configuration (MyApp.ObanConfigTest)
   test/my_app/oban_config_test.exs:4
   ** (ArgumentError) expected :repo to be an Ecto.Repo, got: MyApp.Rep0
   code: |> Oban.Config.new()
   stacktrace:
     ...

Once you fix the typo, you get:

$ mix test test/my_app/oban_config_test.exs
.
Finished in 0.02 seconds (0.02s async, 0.00s sync)
1 test, 0 failures

It is important to note that the Oban.Config.new/1 function doesn’t validate the internal configuration for the plugins; it only validates that the given plugin is an atom, is loaded, the plugin in question exports an init/1 function, and also the given opts is a keyword list. We will explore a workaround for this limitation in the following sub-section.

Testing your plugins configuration

Previously I mentioned that you should test your Oban configuration. Still, there were limitations in the previous approach; mainly, Oban.config.new/1 doesn’t validate the internal configuration that you pass to each plugin listed under the plugins option.

test "check oban configuration" do
  # ...
  assert %Oban.Config{plugins: _plugins} =
           config
           |> Keyword.merge(prod)
           |> Oban.Config.new()
end

So, if you want to go further and validate the given options to the Oban.Plugins.Cron plugin, for example, you need to know a bit about the internal implementation and use Oban.Plugins.Cron.validate!/1 and assert that the returned value is :ok.

I like to validate the cron plugin configuration because it constantly evolves, and it’s easy to catch typos in the cron worker module names also validating the cron expressions by doing the following:

assert %Oban.Config{plugins: plugins} =
         config
         |> Keyword.merge(prod)
         |> Oban.Config.new()

{_, cron_opts} = Enum.find(plugins, &match?({Oban.Plugins.Cron, _}, &1))

assert :ok = Oban.Plugins.Cron.validate!(cron_opts)

A word of caution about the previous unit test segment, regardless of that Oban.Plugins.Cron.validate!/1 is a public function; it has a @doc false, which usually means that function is for internal use. There could be unannounced breaking changes in the future, but you gain the following checks:

  • that the value for the crontab option is a list.
  • if you use the timezone option, it checks that the given value it’s a known timezone

For example, let’s assume that you’re using an invalid timezone:

$ mix test test/my_app/oban_config_test.exs

1) test check oban configuration (MyApp.ObanConfigTest)
   test/my_app/oban_config_test.exs:4
   ** (ArgumentError) expected :timezone to be a known timezone
   code: assert :ok = Oban.Plugins.Cron.validate!(cron_opts)
   stacktrace:
     ...

In the specific case of the cron plugin, the validate!/1 function also parses the crontab expressions; if a term is invalid, the validation will fail. But on the other hand, it also validates that the given module is loaded and implements the perform/1 callback, so this will catch possible typos.

$ mix test test/my_app/oban_config_test.exs

1) test check oban configuration (MyApp.ObanConfigTest)
   test/my_app/oban_config_test.exs:4
   ** (ArgumentError) MyApp.DailWorker not found or can't be loaded
   code: assert :ok = Oban.Plugins.Cron.validate!(cron_opts)

In the previous test, you can see that we tried to load a MyApp.DialWorker, but in this case, the correct name for that module is MyApp.DailyWorker.

I think it is worth taking the risk of using a non-documented public function in this case; you gain more in your daily routine because it will allow you to catch errors in your Oban Configuration as soon as possible.

After talking with Parker about the previous challenges, he suggested opening the following issue in the Oban repository: Improve the testing experience for Oban and its plugins

Testing workers included in Oban Pro

In a previous section, I mentioned that when you’re testing a worker included in Oban Pro, like the Batch, Chunk, or Workflow, you should import the module Oban.Pro.Testing and use the process_job/2,3 helper instead:

Possibly, the most challenging worker to test is the BatchWorker, especially if you want to try the whole cycle, including their handle callbacks, but let’s start with some dummy Batch Worker. Then we can start talking about the details of its tests.

So, let’s imagine the following worker:

defmodule MyApp.MyBatchWorker do
  use Oban.Pro.Workers.Batch,
    queue: :default

  require Logger

  @impl Batch
  def process(_) do
    :ok
  end

  @impl Batch
  def handle_exhausted(%Job{} = _job) do
    Logger.info("exhausted callback")
  end
end

The body of the module MyApp.ByBatchWorker is very similar to the usual Oban.Worker. The exception is that you need to handle your work under process/1 instead of the perform/1; the latter is used internally. One of the excellent additions of the Oban.Pro.Workers.Batch behaviour allows you to define a few callbacks that are perfectly explained in the behaviour documentation. In this example, I’m using the handle_exhausted callback, called after all jobs in the batch have either a completed or discarded state. You also see that I’m just logging that the callback was called in this case to keep the example as simple as possible, but you can do whatever you want here.

Okay, now the exciting part, let’s see how to test this worker.

defmodule MyApp.MyBatchWorkerTest do
  use ExUnit.Case

  import Oban.Pro.Testing

  alias MyApp.MyBatchWorker

  describe "process/1" do
    test "processes the background job" do
      assert :ok = process_job(MyBatchWorker, %{})
    end
  end
end

Here we’re testing the “bulk” of the worker. Now, let’s check one of the handler callbacks as a unit:

  use Oban.Testing, repo: MyApp.Repo
  
  describe "handle_exhausted/1" do
    test "handle exhausted behaves correctly" do
      assert :ok = perform_job(MyBatchWorker, %{}, meta: %{"callback" => "exhausted"})
    end
  end

As far as I know, the test helper process_job/2,3 will not work in the previous test because it ends calling our MyApp.MyBatchWorker.process/1, and we actually want to test our MyApp.MyBatchWorker.perform/1, which coordinates the execution of the callbacks. Again, here you need to know how Oban works internally. That’s why I ended up using: perform_job/3, passing the meta option. But please, be aware that perform_job/3 will not complain if your handle callback last expression returns things like {:ok, value}, {:error, value}, etc. Please remember that the handle callbacks contract specifies that you must return :ok. To fix this issue, a possible solution that is being considered is to include a perform_callback/3 helper under the Oban.Pro.Testing module.

Now, let’s assume that we want to go further in our test and check the whole cycle. In that case, we do:

defmodule MyApp.MyBatchWorkerTest do
  use ExUnit.Case

  import ExUnit.CaptureLog
  
  alias MyApp.MyBatchWorker

  require Logger
  
  # ...

  setup do
    oban_name = start_supervised_oban!(queues: [default: 3])
    Logger.put_module_level(MyBatchWorker, :all)

    on_exit(fn ->
      Logger.delete_module_level(MyBatchWorker)
    end)

    %{oban_name: oban_name}
  end

  test "testing our batch worker", %{oban_name: oban_name} do
    batch =
      ["[email protected]", "[email protected]"]
      |> Enum.map(&%{email: &1})
      |> MyBatchWorker.new_batch()

    log =
      capture_log(fn ->
        Oban.insert_all(oban_name, batch)
        Oban.drain_queue(queue: :default)
      end)
      
    assert log =~ "exhausted callback"
  end

  defp start_supervised_oban!(opts) do
    default_opts = [
      name: make_ref(),
      repo: MyApp.Repo,
      plugins: [
        {Oban.Pro.Plugins.BatchManager, debounce_interval: 5},
        {Oban.Plugins.Repeater, interval: 25}
      ],
      poll_interval: :timer.minutes(10),
      shutdown_grace_period: 25
    ]

    opts = Keyword.merge(default_opts, opts)
    name = opts[:name]

    start_supervised!({Oban, opts})

    name
  end
end

Wow, I know this is a lot to digest at once, so let’s split the previous snipped.

Suppose you saw the talk Testing Oban Jobs From the Inside Out from Parker. In that case, you remember that he calls this kind of testing “inline testing”, and this should be used when you absolutely must normally run jobs during your test (e.g., LiveView, Browser Testing), or in my case, I wanted to test the whole path in my Batch Workers. So, let’s start analyzing the start_supervised_oban!/1 helper:

  defp start_supervised_oban!(opts) do
    default_opts = [
      name: make_ref(),
      repo: MyApp.Repo,
      plugins: [
        {Oban.Pro.Plugins.BatchManager, debounce_interval: 5},
        {Oban.Plugins.Repeater, interval: 25}
      ],
      poll_interval: :timer.minutes(10),
      shutdown_grace_period: 25
    ]

    opts = Keyword.merge(default_opts, opts)
    name = opts[:name]

    start_supervised!({Oban, opts})

    name
  end

Here, with the help of start_supervised!/2, which comes with ExUnit, we’re setting up an Oban supervisor with some default options; the essential thing in the previous snippet is the plugins. So first, Oban.Pro.Plugins.BatchManager is needed because that’s the plugin that will track the execution of the Oban jobs within a batch and enqueue the callback jobs. So, yes, the callbacks are also Oban Jobs.

Then, we have the Oban.Plugins.Repeater plugin, which will poll every 25 milliseconds to look for new jobs. This plugin is essential in our case because inside the unit tests, PostgreSQL notifications don’t work. We’re using the Ecto Sandbox, which means that every unit test runs inside a transaction. Also, note that I’m using make_ref to create a unique reference that we will use as the name for the Oban supervisor; you cannot have more than one supervisor with the same id.

  setup do
    oban_name = start_supervised_oban!(queues: [default: 3])
    Logger.put_module_level(MyBatchWorker, :all)

    on_exit(fn ->
      Logger.delete_module_level(MyBatchWorker)
    end)

    %{oban_name: oban_name}
  end

Then, in our setup function, we use the previous helper start_supervised_oban!/1 to start a new Oban supervisor that will only handle jobs for the default queue. Then we say that we want to include all the logging levels available in the MyApp.MyBatchWorker module, the on_exit/1 callback will clean up this setting at the end of our test. Finally, we put the Oban supervisor’s name in the test context.

  test "testing our batch worker", %{oban_name: oban_name} do
    batch =
      ["[email protected]", "[email protected]"]
      |> Enum.map(&%{email: &1})
      |> MyBatchWorker.new_batch()

    assert capture_log(fn ->
             Oban.insert_all(oban_name, batch)
             Oban.drain_queue(oban_name, queue: :default)
           end) =~ "exhausted callback"
  end

And here is where we test our batch worker. First, we create a new batch with MyApp.MyBatchWorker.new_batch/1 and immediately insert with Oban.insert_all/2. It is essential to pass the oban_name as the first argument to that function. We drain the default queue right after using Oban.drain_queue/2. The way I’m testing here that the handle_exhausted/1 is called is by capturing the log or side-effect produced for that callback, but as I said before, you can do here whatever you want; you need to check the side-effects produced by your callback.

But wait, something is missing in our previous start_supervised_oban!/1 helper. Do you remember that I mentioned before that the plugins implement the GenServer behaviour? Also, in the test environment, we’re using Ecto.Adapters.SQL.Sandbox to allow concurrent transactional tests. So we need to enable collaboration from Oban processes to run these tests successfully for these conditions. All these processes should use the same connection, so they all belong to the same transaction.

From the Ecto.Adapters.SQL.Sandbox documentation, we have the following:

The test above will fail with an error similar to:

** (DBConnection.OwnershipError) cannot find ownership process for #PID<0.35.0>

That’s because the setup block is checking out the connection only for the test process. Once the worker attempts to perform a query, there is no connection assigned to it and it will fail.

The sandbox module provides two ways of doing so, via allowances or by running in shared mode.

And also, we have:

The idea behind allowances is that you can explicitly tell a process which checked out connection it should use, allowing multiple processes to collaborate over the same connection.

So, we need to include this allowance in our start_supervised_oban!/1 helper. Thankfully, Parker recently shared in the #oban channel the following gist to demonstrate how to integration test batch callbacks, there you can see this clever piece to allow the Oban Producers, Plugins, and other modules to use the checked out connection:


  # For Oban < v2.11 you can remove the `{:_, Oban.Peer}` part
  for key <- [{:_, Oban.Peer}, {:_, {:producer, :_}}, {:_, {:plugin, :_}}],
      pid <- Registry.select(Oban.Registry, [{{key, :"$2", :_}, [], [:"$2"]}]) do
    Ecto.Adapters.SQL.Sandbox.allow(MyApp.Repo, self(), pid)
  end

But again, this piece of code assumes you know some internals, so I will try to explain a bit the previous code.

In this excellent PR, Saša Jurić introduced Registry into Oban to replace all the locally registered names, and also to hold the configuration for those processes as part of the Registry metadata.

So, if you get everything that’s in the Oban Registry you see something like:


iex(1)> Registry.select(Oban.Registry, [{{:"$1", :"$2", :"$3"}, [], [{{:"$1", :"$2", :"$3"}}]}])
[
  {{Oban, {:watchman, "default"}}, #PID<0.576.0>, nil},
  {{Oban, Oban.Notifier}, #PID<0.568.0>, nil},
  {{Oban, {:plugin, Oban.Plugins.Stager}}, #PID<0.571.0>, nil},
  {{Oban, {:foreman, "default"}}, #PID<0.574.0>, nil},
  {{Oban, {:plugin, Oban.Plugins.Pruner}}, #PID<0.570.0>, nil},
  {{Oban, {:supervisor, "default"}}, #PID<0.573.0>, nil},
  {{Oban, {:producer, "default"}}, #PID<0.575.0>, nil},
  {Oban, #PID<0.567.0>, %Oban.Config{...}},
  {{Oban, Oban.Midwife}, #PID<0.569.0>, nil}
]

Then, based on the previous Registry output, the for comprehension filters the results based on the key pattern, and we return the pid (a.k.a. :"$2") for each of those coincidences. Once we get the pid, we explicitly assign the test process’s connection to each of these Oban processes.

And with the previous pattern, you can execute e2e tests, integration testing, and so forth.

Finally, if you want to know more about how to test Oban jobs, I highly recommend watching the talk Testing Oban Jobs From the Inside Out from Parker Selbert.

Conclusion

Oban offers some good helpers to run your Unit and Integration tests; you should pay special attention to the Oban.Testing documentation, and also keep in mind that you also have helpers to test workers included in Oban Pro such as Batch, Chunk, or Workflow.

But, indeed, there is always room for improvement. Especially when you want to test your Oban configuration as soon as possible, it would be awesome to have helpers validate all the settings, including plugins, at once. Also, in my opinion, the plugins should have a uniform way to test their configuration. As I mentioned in this issue, a possible solution could be:

A public test helper to validate the Oban configuration, including plugins. I think it’s also worth normalizing the plugins under a contract or behavior; Oban now enforces defining an init/1 function for plugins, but I think the behaviour should also include a validate!/1 function. At least the validate!/1 should help with the testing experience.

There are also plugins in the Pro package that could include complex configurations, like the Oban.Pro.Plugins.DynamicPruner. So, I think it’s worth it to offer a unified way to validate these configs.

I hope you find this article helpful, and you have a better idea of how to test your Oban Workers and your Oban Configuration.

If you want to reach me, you can do it at @milmazz on Twitter, or you can find me in the #oban channel on the Elixir Slack.

That’s all, folks! Thanks for reading.

Acknowledgments

Thank you, Parker Selbert, and Andrew Ek for reviewing drafts of this post.