SkillAgentSearch skills...

Proletarian

A durable job queuing and worker system for Clojure backed by PostgreSQL or MySQL.

Install / Use

/learn @msolli/Proletarian
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Proletarian

Download Proletarian from Clojars Read documentation at Cljdoc

A durable job queuing and worker system for Clojure backed by PostgreSQL 9.5+/MySQL 8.0.1+

Overview

Use Proletarian for asynchronously executing tasks in the background. It's useful for offloading long-running tasks from the request thread in user-facing web applications. What kind of tasks? Anything that uses external services, and anything that takes more than a few milliseconds:

  • sending emails
  • making HTTP calls to external systems
  • updating search indexes
  • batch imports and exports

If you're already using PostgreSQL/MySQL as your main database, there is one very nice advantage to having your job queue in PostgreSQL/MySQL as well:

Using a transaction, you can atomically commit changes to the database along with the queueing of the job. A common use-case for this is sending an email after some action in the web application, e.g., when a user creates an account you want to send them a confirmation email. You want the creation of the account, and the enqueuing of the email job to either succeed or fail together. This is sometimes called the Outbox Pattern in distributed computing literature.

Usage

Here is a basic example, showing the creation of a queue worker in one namespace, and the enqueuing of a job in another namespace:

(ns your-app.workers
  "You'll probably want to use a component state library (Component, Integrant,
   Mount, or some such) for managing the worker state. For this example we're
   just def-ing the worker. The queue worker constructor function takes
   a javax.sql.DataSource as its first argument. You probably already have a
   data-source at hand in your application already. Here we'll use next.jdbc to
   get one from a JDBC connection URL.

   The second argument is the job handler function. Proletarian will invoke
   this whenever a job is ready for processing. It's a arity-2 function, with
   the job type (a keyword) as the first argument, and the job's payload as
   the second argument."
  (:require [next.jdbc :as jdbc]
            [proletarian.worker :as worker]
            [your-app.handlers :as handlers]))

(def email-worker
  (let [ds (jdbc/get-datasource "jdbc:postgresql://...")]
    (worker/create-queue-worker ds handlers/handle-job!)))

(worker/start! email-worker)
(ns your-app.handlers
  "Let's say this is a namespace where you handle web requests. We're going to
   handle the request, write something to the database, and enqueue a job.
   We'll do this in a transaction with a little bit of help from next.jdbc."
  (:require [next.jdbc :as jdbc]
            [proletarian.job :as job]))

(defn some-route-handler [system request]
  (jdbc/with-transaction [tx (:db system)]
    ;; Do some business logic here
    ;; Write some result to the database
    ;; Enqueue the job:
    (job/enqueue! tx ::confirmation-email
      {:email email-address, :other-data-1 :foo, :other-data-2 :bar})
    ;; Return a response
    response))

(defmulti handle-job!
  "Since we passed this multimethod as the second argument to
  worker/create-queue-worker, it is called by the Proletarian Queue Worker when
  a job is ready for execution. Implement this multimethod for your job types."
  (fn [job-type _payload] job-type))

;; Implement the handle-job! multimethod for the job type.
(defmethod handle-job! ::confirmation-email
  [job-type {:keys [email-address other-data-1 other-data-2]}]
  ;; Send the mail and do other time-consuming work here.
  )

Logging

Proletarian does not depend on a logging framework, and has no opinions on how you should log in your application. The :proletarian/log option to create-queue-worker specifies a function that is going to be called by the Queue Worker when anything interesting and log-worthy happens during operation. It takes two arguments: The first is a keyword identifying the event being logged. The second is a map with data describing the event.

If you do not specify a logging function, the default is simply a println-logger that will print every event using println.

There is no "severity" or "level" information included with the log events. Every application will have different requirements here. A sensible default might be something like this (using clojure.tools.logging):

(ns your-app.workers
  (:require [clojure.tools.logging :as log]
            [next.jdbc :as jdbc]
            [proletarian.worker :as worker]
            [your-app.handlers :as handlers]))

(defn log-level
  [x]
  (case x
    ::worker/queue-worker-shutdown-error :error
    ::worker/handle-job-exception-with-interrupt :error
    ::worker/handle-job-exception :error
    ::worker/job-worker-error :error
    ::worker/polling-for-jobs :debug
    :proletarian.retry/not-retrying :error
    :info))

(defn logger
  [x data]
  (log/logp (log-level x) x data))

(def worker
  (let [ds (jdbc/get-datasource "jdbc:postgresql://...")]
    (worker/create-queue-worker ds handlers/handle-job! {:proletarian/log logger})))

Installation

Add Proletarian to your deps.edn file:

msolli/proletarian {:mvn/version "1.0.115"}

Or to your project.clj for Leiningen:

[msolli/proletarian "1.0.115"]

Proletarian works with your existing PostgreSQL/MySQL database. It uses the SKIP LOCKED feature that was introduced with PostgreSQL 9.5, so there's a hard requirement of at least version 9.5. With regard to MySQL, the SKIP LOCKED feature was added to MySQL 8.0.1, thusly you must be running MySQL 8.0.1 and above to avail of this library if you are using MySQL in your stack.

Proletarian works with any Clojure database library (next.jdbc, clojure.java.jdbc) you might be using, and does not itself depend on any such library.

You'll have to create two database tables, one for queueing jobs, and one for keeping a record of finished jobs.

For PostgreSQL:

These are defined in database/postgresql/tables.sql in this repository, along with a PostgreSQL schema to contain them, and an index.

For MySQL:

These are defined in database/mysql/tables.sql in this repository, along with a MySQL [schema] to contain them, and an index.

Before using the library, you must install these tables in your database. There are many ways you can do this. You are probably already using a migration library like Flyway or Migratus.

Copy the contents of the database/postgresql/tables.sql or database/mysql/tables.sql file into a migration file. You can change the PostgreSQL/MySQL schema and table names, but then you'll need to provide the :proletarian/job-table and :proletarian/archived-job-table options to create-queue-worker and enqueue!.

Examples

This repository contains a few examples that demonstrate features and usage patterns. You can run these by cloning this repo, execute a script to set up an example Proletarian database, and then run the examples from your terminal. All the details are in the example docs:

Terminology

Queue Worker

A queue worker is a process that works off a given named queue. It can have one or more worker threads, working in parallel. The worker threads pick jobs off a queue and run them. While there are jobs to be processed, the workers will work on them continuously until the queue is empty. Then they will poll the queue at a configurable interval.

There is a default queue, :proletarian/default, which is the one used by job/enqueue! and worker/create-queue-worker if you don't specify a queue in the options.

You can create as many queue workers as you like, consuming jobs from different queues. The jobs will all live in the same table, but are differentiated by the queue name. The parameters you provide when setting up the queue workers, like the polling interval, and the number of worker threads (i.e., the number of parallel worker instances that are polling the queue and working on jobs), will in effect control the priority of the jobs on the different queues.

A queue worker is local to one machine only. If you have several machines acting as job processing instances, they will each have a queue worker process running. The parallelization factor for a given queue will be the number of queue worker processes (on different machines) multiplied by the number of threads in each queue worker.

Job Handler

The job handler is the function that the Proletarian queue worker invokes when it pulls a job off the queue. You implement this function and pass it to worker/create-queue-worker when setting up the Queue Worker.

The function is invoked with two arguments:

  • job-type – the job type as a Clojure keyword (as provided to job/enqueue!).
  • payload – the job's paylo
View on GitHub
GitHub Stars219
CategoryData
Updated14d ago
Forks12

Languages

Clojure

Security Score

95/100

Audited on Mar 12, 2026

No findings