Doobie
Doobie is a pure-functional JDBC layer for Scala. pgmq4s provides DoobiePgmqClient and DoobiePgmqAdmin backed by a Doobie Transactor.
Note
Doobie is JVM-only. For cross-platform support (JS, Native), use the Skunk backend.
Dependency
libraryDependencies ++= Seq(
"io.github.matejcerny" %% "pgmq4s-doobie" % "0.8.0",
"io.github.matejcerny" %% "pgmq4s-circe" % "0.8.0", // or any JSON codec
"org.tpolecat" %% "doobie-hikari" % "1.0.0-RC12",
"org.postgresql" % "postgresql" % "42.7.5"
)
Setup
Create a HikariTransactor as a Resource and pass it to DoobiePgmqClient and DoobiePgmqAdmin:
import cats.effect.{IO, Resource}
import doobie.ExecutionContexts
import doobie.hikari.HikariTransactor
val hikariTransactor: Resource[IO, HikariTransactor[IO]] =
for
ce <- ExecutionContexts.fixedThreadPool[IO](32)
xa <- HikariTransactor.newHikariTransactor[IO](
driverClassName = "org.postgresql.Driver",
url = "jdbc:postgresql://localhost:5432/pgmq",
user = "pgmq",
pass = "pgmq",
connectEC = ce
)
yield xa
Then instantiate the client and admin:
import pgmq4s.*
import pgmq4s.doobie.{DoobiePgmqAdmin, DoobiePgmqClient}
val client: PgmqClient[IO] = DoobiePgmqClient[IO](xa)
val admin: PgmqAdmin[IO] = DoobiePgmqAdmin[IO](xa)
Full Example
import cats.effect.{IO, IOApp, Resource}
import doobie.ExecutionContexts
import doobie.hikari.HikariTransactor
import io.circe.{Decoder, Encoder}
import pgmq4s.*
import pgmq4s.circe.given
import pgmq4s.doobie.{DoobiePgmqAdmin, DoobiePgmqClient}
import scala.concurrent.duration.*
case class OrderCreated(orderId: Long, email: String) derives Encoder.AsObject, Decoder
object DoobieExample extends IOApp.Simple:
private val queue = QueueName("orders")
private val transactor: Resource[IO, HikariTransactor[IO]] =
for
ce <- ExecutionContexts.fixedThreadPool[IO](32)
xa <- HikariTransactor.newHikariTransactor[IO](
driverClassName = "org.postgresql.Driver",
url = "jdbc:postgresql://localhost:5432/pgmq",
user = "pgmq",
pass = "pgmq",
connectEC = ce
)
yield xa
val run: IO[Unit] = transactor.use: xa =>
val client = DoobiePgmqClient[IO](xa)
val admin = DoobiePgmqAdmin[IO](xa)
for
_ <- admin.createQueue(queue)
msgId <- client.send(queue, OrderCreated(1L, "dev@example.com"))
messages <- client.read[OrderCreated](queue, VisibilityTimeout(30.seconds), 10.messages)
_ <- IO.println(s"read: ${messages.map(_.payload)}")
yield ()
In this article
