Skunk

Skunk is a pure-functional Postgres library for Scala built on Cats Effect and fs2. pgmq4s provides SkunkPgmqClient and SkunkPgmqAdmin backed by a Skunk Session pool.

Note

Skunk is the only cross-platform backend — it works on JVM, Scala.js, and Scala Native.

Dependency

libraryDependencies += "io.github.matejcerny" %% "pgmq4s-skunk" % "0.8.0"
libraryDependencies += "io.github.matejcerny" %% "pgmq4s-circe" % "0.8.0" // or any JSON codec

For Scala.js or Scala Native, use %%%:

libraryDependencies += "io.github.matejcerny" %%% "pgmq4s-skunk" % "0.8.0"

Setup

Create a Session pool as a Resource and pass it to SkunkPgmqClient and SkunkPgmqAdmin:

import _root_.skunk.Session
import cats.effect.{IO, Resource}
import natchez.Trace.Implicits.noop

val pool: Resource[IO, Resource[IO, Session[IO]]] =
  Session.pooled[IO](
    host = "localhost",
    port = 5432,
    user = "pgmq",
    database = "pgmq",
    password = Some("pgmq"),
    max = 10
  )

Then instantiate the client and admin:

import pgmq4s.*
import pgmq4s.skunk.{SkunkPgmqAdmin, SkunkPgmqClient}

val client: PgmqClient[IO] = SkunkPgmqClient[IO](pool)
val admin: PgmqAdmin[IO]   = SkunkPgmqAdmin[IO](pool)

Full Example

import _root_.skunk.Session
import cats.effect.{IO, IOApp}
import io.circe.{Decoder, Encoder}
import natchez.Trace.Implicits.noop
import pgmq4s.*
import pgmq4s.circe.given
import pgmq4s.skunk.{SkunkPgmqAdmin, SkunkPgmqClient}
import scala.concurrent.duration.*

case class OrderCreated(orderId: Long, email: String) derives Encoder.AsObject, Decoder

object SkunkExample extends IOApp.Simple:
  private val queue = QueueName("orders")

  val run: IO[Unit] =
    Session
      .pooled[IO](
        host = "localhost",
        port = 5432,
        user = "pgmq",
        database = "pgmq",
        password = Some("pgmq"),
        max = 10
      )
      .use: pool =>
        val client = SkunkPgmqClient[IO](pool)
        val admin  = SkunkPgmqAdmin[IO](pool)

        for
          _        <- admin.createQueue(queue)
          msgId    <- client.send(queue, OrderCreated(1L, "dev@example.com"))
          messages <- client.read[OrderCreated](queue, VisibilityTimeout(30.seconds), 10.messages)
          _        <- IO.println(s"read: ${messages.map(_.payload)}")
        yield ()