Anorm

Anorm is a simple SQL data access library. pgmq4s provides AnormPgmqClient and AnormPgmqAdmin that return Futures, backed by a JDBC DataSource.

Warning

Anorm is JVM-only. The client uses Future and performs blocking JDBC calls on the provided ExecutionContext — size it appropriately for your connection pool.

Dependency

libraryDependencies ++= Seq(
  "io.github.matejcerny" %% "pgmq4s-anorm" % "0.8.0",
  "io.github.matejcerny" %% "pgmq4s-circe" % "0.8.0", // or any JSON codec
  "org.postgresql"        % "postgresql"    % "42.7.5"
)

Setup

Create a DataSource and provide an ExecutionContext:

import pgmq4s.*
import pgmq4s.anorm.{AnormPgmqAdmin, AnormPgmqClient}

import scala.concurrent.ExecutionContext

given ExecutionContext = ExecutionContext.global

val ds = new org.postgresql.ds.PGSimpleDataSource()
ds.setURL("jdbc:postgresql://localhost:5432/pgmq")
ds.setUser("pgmq")
ds.setPassword("pgmq")

val client = AnormPgmqClient(ds)
val admin  = AnormPgmqAdmin(ds)

Full Example

import io.circe.{Decoder, Encoder}
import pgmq4s.*
import pgmq4s.anorm.{AnormPgmqAdmin, AnormPgmqClient}
import pgmq4s.circe.given

import scala.concurrent.duration.*
import scala.concurrent.{Await, ExecutionContext}

case class OrderCreated(orderId: Long, email: String) derives Encoder.AsObject, Decoder

@main def anormExample(): Unit =
  given ExecutionContext = ExecutionContext.global

  val queue = QueueName("orders")

  val ds = new org.postgresql.ds.PGSimpleDataSource()
  ds.setURL("jdbc:postgresql://localhost:5432/pgmq")
  ds.setUser("pgmq")
  ds.setPassword("pgmq")

  val client = AnormPgmqClient(ds)
  val admin  = AnormPgmqAdmin(ds)

  val result =
    for
      _        <- admin.createQueue(queue)
      _        <- client.send(queue, OrderCreated(1L, "dev@example.com"))
      messages <- client.read[OrderCreated](queue, VisibilityTimeout(30.seconds), 10.messages)
    yield println(s"read: ${messages.map(_.payload)}")

  Await.result(result, 10.seconds)