Skunk

Skunk is a pure-functional Postgres library for Scala built on Cats Effect and fs2. pgmq4s provides SkunkPgmqClient and SkunkPgmqAdmin backed by a Skunk Session pool.

Note

Skunk is the only cross-platform backend — it works on JVM, Scala.js, and Scala Native.

Dependency

libraryDependencies += "io.github.matejcerny" %% "pgmq4s-skunk" % "0.12.0"
libraryDependencies += "io.github.matejcerny" %% "pgmq4s-circe" % "0.12.0" // or any JSON codec

For Scala.js or Scala Native, use %%%:

libraryDependencies += "io.github.matejcerny" %%% "pgmq4s-skunk" % "0.12.0"

Setup

Create a Session pool as a Resource and pass it to SkunkPgmqClient and SkunkPgmqAdmin:

import _root_.skunk.Session
import cats.effect.{IO, Resource}
import org.typelevel.otel4s.metrics.Meter.Implicits.noop
import org.typelevel.otel4s.trace.Tracer.Implicits.noop

val pool: Resource[IO, Resource[IO, Session[IO]]] =
  Session
    .Builder[IO]
    .withHost("localhost")
    .withPort(5432)
    .withUserAndPassword("pgmq", "pgmq")
    .withDatabase("pgmq")
    .pooled(10)

Then instantiate the client and admin:

import pgmq4s.*
import pgmq4s.skunk.{SkunkPgmqAdmin, SkunkPgmqClient}

val client: PgmqClient[IO] = SkunkPgmqClient[IO](pool)
val admin: PgmqAdmin[IO]   = SkunkPgmqAdmin[IO](pool)

Full Example

import _root_.skunk.Session
import cats.effect.{IO, IOApp}
import io.circe.{Decoder, Encoder}
import org.typelevel.otel4s.metrics.Meter.Implicits.noop
import org.typelevel.otel4s.trace.Tracer.Implicits.noop
import pgmq4s.*
import pgmq4s.circe.given
import pgmq4s.skunk.{SkunkPgmqAdmin, SkunkPgmqClient}
import scala.concurrent.duration.*

case class OrderCreated(orderId: Long, email: String) derives Encoder.AsObject, Decoder

object SkunkExample extends IOApp.Simple:
  private val queue = q"orders"

  val run: IO[Unit] =
    Session
      .Builder[IO]
      .withHost("localhost")
      .withPort(5432)
      .withUserAndPassword("pgmq", "pgmq")
      .withDatabase("pgmq")
      .pooled(10)
      .use: pool =>
        val client = SkunkPgmqClient[IO](pool)
        val admin  = SkunkPgmqAdmin[IO](pool)

        for
          _        <- admin.createQueue(queue)
          msgId    <- client.send(queue, OrderCreated(1L, "dev@example.com"))
          messages <- client.read[OrderCreated](queue, 30.secondsVisibility, 10.messages)
          _        <- IO.println(s"read: ${messages.map(_.payload)}")
        yield ()