Anorm
Anorm is a simple SQL data access library. pgmq4s provides AnormPgmqClient and AnormPgmqAdmin that return Futures, backed by a JDBC DataSource.
Warning
Anorm is JVM-only. The client uses Future and performs blocking JDBC calls on the provided ExecutionContext — size it appropriately for your connection pool.
Dependency
libraryDependencies ++= Seq(
"io.github.matejcerny" %% "pgmq4s-anorm" % "0.8.0",
"io.github.matejcerny" %% "pgmq4s-circe" % "0.8.0", // or any JSON codec
"org.postgresql" % "postgresql" % "42.7.5"
)
Setup
Create a DataSource and provide an ExecutionContext:
import pgmq4s.*
import pgmq4s.anorm.{AnormPgmqAdmin, AnormPgmqClient}
import scala.concurrent.ExecutionContext
given ExecutionContext = ExecutionContext.global
val ds = new org.postgresql.ds.PGSimpleDataSource()
ds.setURL("jdbc:postgresql://localhost:5432/pgmq")
ds.setUser("pgmq")
ds.setPassword("pgmq")
val client = AnormPgmqClient(ds)
val admin = AnormPgmqAdmin(ds)
Full Example
import io.circe.{Decoder, Encoder}
import pgmq4s.*
import pgmq4s.anorm.{AnormPgmqAdmin, AnormPgmqClient}
import pgmq4s.circe.given
import scala.concurrent.duration.*
import scala.concurrent.{Await, ExecutionContext}
case class OrderCreated(orderId: Long, email: String) derives Encoder.AsObject, Decoder
@main def anormExample(): Unit =
given ExecutionContext = ExecutionContext.global
val queue = QueueName("orders")
val ds = new org.postgresql.ds.PGSimpleDataSource()
ds.setURL("jdbc:postgresql://localhost:5432/pgmq")
ds.setUser("pgmq")
ds.setPassword("pgmq")
val client = AnormPgmqClient(ds)
val admin = AnormPgmqAdmin(ds)
val result =
for
_ <- admin.createQueue(queue)
_ <- client.send(queue, OrderCreated(1L, "dev@example.com"))
messages <- client.read[OrderCreated](queue, VisibilityTimeout(30.seconds), 10.messages)
yield println(s"read: ${messages.map(_.payload)}")
Await.result(result, 10.seconds)
In this article
