SQLite Pub/Sub, Quickstart, and more — Watermill 1.5 Released

It’s been almost a year since the last Watermill release post, and we have a bunch of news to share!

In case you’re new here, Watermill is a Go library for building event-driven applications the easy way. It’s like an HTTP router but for messages. Watermill is a library, not a framework, so there’s no vendor lock-in.

Watermill is now close to 9k GitHub stars and 79 contributors, just on the main repository. It supports 13 Pub/Subs including Kafka, Redis Streams, NATS, Google Cloud Pub/Sub, Amazon SQS/SNS, SQL, and RabbitMQ.

See the documentation for more details.

Let’s look at what’s new.

SQLite Pub/Sub

Watermill now officially supports the SQLite Pub/Sub. The messages are moved between publishers and subscribers using tables in an SQLite database.

A huge thank you to Dima Kotik who contributed the new package: github.com/ThreeDotsLabs/watermill-sqlite. See the documentation.

SQLite Pub/Sub is simple to set up, as you don’t need to run any external services: everything is just a file on disk. It supports persistence, so you don’t lose messages when your application restarts.

So far, we recommended PostgreSQL or MySQL Pub/Sub for when you don’t want to set up a separate message broker. Now it’s even easier to start with SQLite.

It can also be a great alternative to using the GoChannel Pub/Sub in smaller projects.

We’ll soon publish Dima’s article about when it shines. Stay tuned!

The implementation has two packages: modernc.org/sqlite and zombiezen.com/go/sqlite. You can choose either one when creating the Pub/Sub.

Here’s an example of creating a Subscriber with the modernc driver:

import "github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc"

// ...

subscriber, err := wmsqlitemodernc.NewSubscriber(
    db,
    wmsqlitemodernc.SubscriberOptions{
        InitializeSchema: true,
        Logger:           logger,
    },
)

messages, err := subscriber.Subscribe(ctx, "messages")
for msg := range messages {
    fmt.Printf("Received message: %s, payload: %s\n", msg.UUID, string(msg.Payload))

    msg.Ack()
}

And then publishing messages:

import "github.com/ThreeDotsLabs/watermill-sqlite/wmsqlitemodernc"

// ...

publisher, err := wmsqlitemodernc.NewPublisher(
    db,
    wmsqlitemodernc.PublisherOptions{
        InitializeSchema: true,
        Logger:           logger,
    },
)

msg := message.NewMessage(watermill.NewUUID(), []byte(`{"message": "Hello, world!"}`))

err := publisher.Publish("messages", msg)

We consider this a beta release: the full test suite passes (and it’s pretty extensive!), but we want to gather more feedback from real-world usage. If you try it, please share your experience. You can use the main repository’s issues or join our Discord.

Watermill Quickstart

We recently launched Watermill Quickstart, a free hands-on training that teaches you how to use Watermill in your project.

We keep looking for the most effective way to learn programming topics. It’s clear that reading docs and books is not enough: you learn best when you actually practice by writing code, failing, and fixing it.

The common advice for newcomers in any technology is to “create a pet project.” But coming up with an idea and sticking with it long enough to learn something isn’t trivial.

The Quickstart guides you to build this kind of project. You code in your own IDE, so the next time you want to use what you learned, you already know the setup and have the experience.

If you try it, let us know what you think!

Watermill Quickstart

Watermill v1.5

While there are no major new features in the main Watermill package, we reviewed a ton of PRs and merged many improvements and fixes. You can see the releases page and the full changeset on GitHub. Thank you to everyone who contributed, and please keep the PRs coming!

Let’s look at some API changes and deprecations.

CQRS Marshaler updates

cqrs.ProtobufMarshaler is now deprecated and replaced with cqrs.ProtoMarshaler. Ideally, we’d remove it completely to get rid of the github.com/gogo/protobuf dependency, but that would be a breaking change. We recommend using ProtoMarshaler, which uses google.golang.org/protobuf instead.

There is now an easier way to decorate marshallers. If you want to add some extra logic when marshaling or unmarshaling commands or events, you usually wrap the marshaler in a custom struct.

Now you can use cqrs.CommandEventMarshalerDecorator to do that with less boilerplate.

cqrsMarshaler := cqrs.CommandEventMarshalerDecorator{
	CommandEventMarshaler: cqrs.JSONMarshaler{},
	DecorateFunc: func(v any, msg *message.Message) error {
		event, ok := v.(Event)
		if !ok {
			return fmt.Errorf("%T does not implement Event and can't be marshaled", v)
		}

		partitionKey := event.PartitionKey()
		if partitionKey == "" {
			return fmt.Errorf("PartitionKey is empty")
		}

		msg.Metadata.Set("partition_key", partitionKey)
		return nil
	},
}

Small API changes

You can now create messages with context using message.NewMessageWithContext.

msg := message.NewMessageWithContext(ctx, watermill.NewUUID(), payload)

This is a trivial addition, but we hope it’ll help make it clear that it’s the message that carries the context. For newcomers, it can be confusing that Publish() takes no context argument. This API is similar to net/http with NewRequestWithContext. A tiny change that should improve the developer experience.

Similarly, we deprecated the Router’s AddNoPublisherHandler in favor of AddConsumerHandler. When we initially designed the Router, we assumed the default behavior of handlers would be to always publish messages. In practice, it’s the opposite: most handlers consume messages and don’t publish anything, or publish the message in other ways. AddNoPublisherHandler sounds a bit awkward, so we decided to rename it.

Context in GoChannel Pub/Sub

There is now an option to propagate the message context to the handler when using the GoChannel Pub/Sub.

This has been something many people have asked for, especially for passing some arbitrary values through the context. You can enable it by setting the PreserveContext field to true.

pubSub := gochannel.NewGoChannel(
    gochannel.Config{
        PreserveContext: true,
    },
    logger,
)

Since GoChannel works in-memory, you can access the context you set when creating the message in the handler.

However, keep in mind this is not how most Pub/Subs work. When using Kafka, Redis, or any other “proper” Pub/Sub, the context is lost when the message is sent over the network.

Use it only if you know what you’re doing! If you switch to another Pub/Sub later, this will break.

Watermill SQL v4

We finally released watermill-sql v4.0.0 after a long time as a release candidate.

You can now use it with packages other than database/sql, like pgx (Postgres driver) or ORMs. This includes support for transactions, which is key for the Outbox pattern. Shoutout to @julesjcraske for contributing these changes!

Subscriber and Publisher constructors now take a Beginner interface argument instead of *sql.DB.

subscriber, err := sql.NewSubscriber(
    sql.BeginnerFromStdSQL(db),
    sql.SubscriberConfig{
        SchemaAdapter:    sql.DefaultMySQLSchema{},
        OffsetsAdapter:   sql.DefaultMySQLOffsetsAdapter{},
        InitializeSchema: true,
    },
    logger,
)

Migration: Wrap your *sql.DB in sql.BeginnerFromStdSQL(db) when passing it to the watermill-sql API.

Pgx has its own BeginnerFromPgx(db) constructor. When working with transactions, use TxFromStdSQL(tx) or TxFromPgx(tx) instead. If you use another driver or ORM, you can implement your own adapter that implements the Beginner or Tx interface. See the implementation.

The other major change is that we decided to make SchemaAdapter and OffsetsAdapter interfaces more flexible. Instead of raw arguments, they now take a params struct and always return an error. We will stick to this approach for most APIs going forward. While more verbose, it’s easier to extend without breaking changes.

type SchemaAdapter interface {
	InsertQuery(params InsertQueryParams) (Query, error)
	SelectQuery(params SelectQueryParams) (Query, error)
	UnmarshalMessage(params UnmarshalMessageParams) (Row, error)
	SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error)
	SubscribeIsolationLevel() sql.IsolationLevel
}

type OffsetsAdapter interface {
	AckMessageQuery(params AckMessageQueryParams) (Query, error)
	ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error)
	NextOffsetQuery(params NextOffsetQueryParams) (Query, error)
	SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error)
	BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error)
}

Migration: If you use the default schema or offsets adapters, you don’t need to change anything. If you have custom implementations, update the method signatures to match the new ones.

Watermill GoogleCloud v2

The Go Pub/Sub client library v1 will lose support next year. We updated watermill-googlecloud to use the new v2 library.

There’s only one breaking change: SubscriberConfig no longer exposes the SubscriptionConfig field. Instead, it now has the GenerateSubscription function.

The function returns *pubsubpb.Subscription, which mostly maps 1:1 with the old SubscriptionConfig.

Migration: Replace SubscriptionConfig with GenerateSubscription.

    googlecloud.SubscriberConfig{
        ProjectID:                "tests",
        GenerateSubscriptionName: subscriptionName,
-       SubscriptionConfig: pubsub.SubscriptionConfig{
-           RetainAckedMessages:   false,
-           EnableMessageOrdering: enableMessageOrdering,
-       },
+       GenerateSubscription: func(params googlecloud.GenerateSubscriptionParams) *pubsubpb.Subscription {
+           return &pubsubpb.Subscription{
+               RetainAckedMessages:   false,
+               EnableMessageOrdering: enableMessageOrdering,
+           }
+       },
        Unmarshaler: unmarshaler,
    },

We considered mapping the config to make this change non-breaking. While it’s possible, we wouldn’t get rid of the old dependency, and we’ve seen reports that the old and new clients don’t work well together. So this means a major version bump, but the migration should be straightforward.

Hall of Fame 🏆

We had 43 unique contributors to all Watermill repositories since the last release post! 💪 Thank you all for your help!

Docs refresh

We updated watermill.io a bit, so it’s easier to tell what the library is about. It should also be easier to choose the proper learning path.

Once again, give the Quickstart a try and let us know how it went.

We consider making free training like this for other open-source libraries as well. If you have any suggestions, please share!

Last update:
  • September 24, 2025
comments powered by Disqus