Twice a year, we run an online training where you can learn Go Event-Driven. The time has come! Learn more.

Available until 14 November 🔥

Watermill 1.4 Released (Event-Driven Go Library)

It’s Autumn over here, and it usually means another release of Watermill! 🍂 It’s hard to believe it’s already been five years since the v1.0 release.

In case you’re new to Watermill, here’s TL;DR.

Watermill is a Go library for building message-driven or event-driven applications the easy way. Think of it like an HTTP router but for messages. It’s a library, not a framework, so you don’t need to change your architecture to use it.

Over the last year, Watermill grew to 7.5k GitHub stars and 55 contributors, just on the main repository. It supports 12 Pub/Subs including Kafka, Redis Streams, NATS, Google Cloud Pub/Sub, Amazon SQS/SNS, SQL, and RabbitMQ.

To learn more, see the documentation and examples.

What’s new?

Let’s see what happened in Watermill over the last year.

For the complete list of changes, see the GitHub releases page.

Hall of Fame 🏆

Big thank you to everyone who contributed to all Watermill repositories since the last release! 💪

Note

Side Note on Design

Watermill 1.0 was released in 2019, and the public API has been stable. I especially like the modular design, which allows you to combine low-level components like building blocks to create more complex features. You can see this in rather simple logic of components like FanIn, FanOut, or the Forwarder.

You can also notice this approach in the new features described below. We added a few simple components that work well separately, but you can combine them for more interesting effects.

Delayed PostgreSQL Requeuer

One common issue developers new to messaging have is dealing with errors. People often share an output like the one below, asking what’s happening.

Infinite loop of errors.

What you see above is a single message failing to process and being retried indefinitely. It blocks other messages, so it seems like the application is stuck in an infinite loop.

This is how most Pub/Subs work, and it’s crucial if you care about message ordering. But sometimes, you don’t want a broken message blocking the entire topic.

We used to work with Google Cloud Pub/Sub a lot, and one helpful feature it has is retrying messages. Once a message fails, you can make it reappear in the queue after a delay. Other messages are not blocked, and the failed message can be retried later.

It’s a helpful feature I missed in other Pub/Subs. There are ways to recreate this, but it’s not as straightforward. For example, you can use delayed messages with RabbitMQ, but you need a plugin, so it’s not compatible with all providers.

I thought it would be cool to have this feature in Watermill working out of the box, and here we are!

Watermill now supports a universal requeuer component. It works with all Pub/Subs, but for now, it also requires PostgreSQL to be around (this can be extended in the future with more implementations). The PostgreSQL is used as an intermediate poison queue where the failed messages go until they are retried. It also supports delaying the messages, so you can avoid this infinite loop of errors.

There are a few layers of how this works, and I’ll dive deeper into them below. Let’s first see the high-level API you need to get started. You can see the full example in the Watermill repository.

First, create the SQL requeuer. You need to pass a publisher to the regular Pub/Sub so it knows where to route messages.

delayedRequeuer, err := sql.NewPostgreSQLDelayedRequeuer(sql.DelayedRequeuerConfig{
    DB:        db,
    Publisher: redisPublisher,
    Logger:    logger,
})
if err != nil {
    panic(err)
}

In your router, add middleware provided by the requeuer.

router := message.NewDefaultRouter(logger)
router.AddMiddleware(delayedRequeuer.Middleware()...)

Finally, run the requeuer:

go func() {
    err = delayedRequeuer.Run(context.Background())
    if err != nil {
        panic(err)
    }
}()

That’s it if you’re happy with the default settings. The messages will be moved to the SQL poison queue if an error happens. After a delay, they will be moved back to the original topic.

Note

The PostgreSQLDelayedRequeuer is available in watermill-sql v1.4.0-rc.1: it’s still a release candidate. Read more below in the section on watermill-sql.

Poison Queue CLI

When working with poison queues, the main challenge is visibility into what’s happening. You need to know what messages are failing and why.

Watermill 1.4 ships with a pq CLI tool to help you check this.

go install github.com/ThreeDotsLabs/watermill/tools/pq@latest
export DATABASE_URL=postgres://user:password@localhost:5432/db
pq -backend postgres -topic requeue

You can preview the poison queue, move messages back to the original topic, or delete them.

Delayed Messages

Delayed messages can be useful for more than requeuing. Many scenarios involve delays: “Ask the customer for feedback in two days”, “Send an invoice reminder in a week”, etc. It’s not a complex logic to implement, but if you already use events, you can leverage it and have delays working out of the box.

Watermill’s new delay package allows you to add delay metadata to messages. The delay metadata does nothing by itself. You need to use a Pub/Sub implementation that supports it to make it work. For now, it’s supported by the PostgreSQL Pub/Sub with NewDelayedPostgreSQLPublisher and NewDelayedPostgreSQLSubscriber.

There are two APIs you can use. If you work with raw messages, use delay.Message:

msg := message.NewMessage(watermill.NewUUID(), []byte("hello"))
delay.Message(msg, delay.For(time.Second * 10))

If you use the CQRS component, use delay.WithContext instead (since you can’t access the message directly). You can also use delay.Until instead of delay.For to specify time.Time instead of time.Duration.

ctx = delay.WithContext(ctx, delay.Until(invoiceDueDate))

err := commandBus.Send(ctx, cmd)
if err != nil {
    return err
}

See the full example for more details.

PostgreSQL Queue

The Delayed PostgreSQL Pub/Sub is based on the new queue schema adapter.

It’s a simpler schema that doesn’t support consumer groups. In return, it allows you to specify a custom WHERE clause to filter messages by a custom condition, like metadata or payload fields. You can also enable delete on ack, so the table doesn’t grow with time: the messages are deleted after they are acknowledged.

sub, err := sql.NewSubscriber(
    db, 
    sql.SubscriberConfig{
        SchemaAdapter: sql.PostgreSQLQueueSchema{
            GenerateWhereClause: func(params sql.GenerateWhereClauseParams) (string, []any) {
                return "metadata->>'urgent' = true", []any{}
            },
        },
        OffsetsAdapter: sql.PostgreSQLQueueOffsetsAdapter{
            DeleteOnAck: true,
        },
    },
    logger,
)

Requeuer

The Requeuer component is a wrapper on the Router, similar to the Forwarder, but simpler. It listens to messages on one topic and routes them to another, dynamically generated one.

You can combine it with the Poison middleware that moves failed messages to a separate topic. You can use the Requeuer to move them back to the original topic based on the Poison metadata. This is exactly what the PostgreSQLDelayedRequeuer does. It combines the Requeuer, Poison middleware, DelayOnError middleware, and PostgreSQL queue adapter and hides all this behind a simple API.

AWS SNS/SQS Pub/Sub v1.0

Using SNS or SQS makes a lot of sense when your services are hosted on AWS. Adding support for Pub/Subs managed by AWS was one of the most requested message brokers.

In the current release, we have added support for both SNS and SQS. Thanks to that, you can choose the tool that fits your needs better.

While adding support for SQS is not challenging, implementing a proper Pub/Sub on top of SNS is not trivial. SNS differs from other Pub/Subs: it’s not just a matter of subscribing to a topic and receiving messages. You need to create a queue, subscribe to the topic, create a valid access policy, and then receive messages from the queue. It also doesn’t help that AWS error messages are sometimes a bit cryptic.

The good news is that we already made this work for you, and you can use it out of the box.

newSubscriber := func(name subName) (message.Subscriber, error) {
   subscriberConfig := sns.SubscriberConfig{
      AWSConfig: aws.Config{
         Credentials: aws.AnonymousCredentials{},
      },
      OptFns:        snsOpts,
      TopicResolver: topicResolver,
      GenerateSqsQueueName: func(ctx context.Context, snsTopic sns.TopicArn) (string, error) {
         topic, err := sns.ExtractTopicNameFromTopicArn(snsTopic)
         if err != nil {
            return "", err
         }

         return fmt.Sprintf("%v-%v", topic, subName), nil
      },
   }

   sqsSubscriberConfig := sqs.SubscriberConfig{
      AWSConfig: aws.Config{
         Credentials: aws.AnonymousCredentials{},
      },
      OptFns: sqsOpts,
   }

   return sns.NewSubscriber(subscriberConfig, sqsSubscriberConfig, logger)
}

As always, you can find a fully working example (including a local emulator so that you can run it in Docker locally):

Big thanks to @Mariscal6 for creating a big part of the implementation!

watermill-amqp: v3 released

We released a major version of the AMQP Pub/Sub. The breaking change was introduced in the PR #27 originally contributed by @tobiasjaster .

The BuildTopology method of TopologyBuilder was missing the routing key. We decided to introduce a params struct to make it more flexible in the future.

Upgrading

You don’t need to do anything if you use the default TopologyBuilder.

If you implement a custom TopologyBuilder, update the BuildTopology method. It now takes params amqp.BuildTopologyParams instead of queueName string, exchangeName string.

-	BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error
+	BuildTopology(channel *amqp.Channel, params BuildTopologyParams, config Config, logger watermill.LoggerAdapter) error

Inside the method, replace queueName with params.QueueName and exchangeName with params.ExchangeName.

watermill-sql v3 and v4

We introduced the v3 major version of watermill-sql a while ago. This made the PostgreSQL implementation more bulletproof.

Now, the v4 is around the corner. Similar to watermill-amqp, we decided to make SchemaAdapter and OffsetsAdapter interfaces more flexible. Instead of raw arguments, they now take a params struct and always return an error.

It’s a release candidate because we want to include one more breaking change so we don’t bump the major version again soon. There’s an ongoing development in PR #29: streamline the database interfaces and add pgx adapter contributed by @julesjcraske . The aim is to allow watermill-sql to be used with pgx and ORMs not compatible with database/sql, a common request from the community. Stay tuned!

Mapping Watermill’s log levels to slog

Sometimes, the default INFO logs of Watermill may be too verbose for your application. To help you with that, we added a new feature to map Watermill’s log levels to slog levels.

You can now map Watermill’s log levels to slog levels with NewSlogLoggerWithLevelMapping:

logger := NewSlogLoggerWithLevelMapping(
    nil,
    map[slog.Level]slog.Level{
        slog.LevelInfo: slog.LevelDebug,
    },
)

New docs website

We refreshed Watermill’s documentation. It should be now easier to find your way around it.

See the new version at watermill.io.

Watermill’s Gopher got a new look! 🎨

New Examples

In case you missed it, there’s a new example featuring Server-Sent Events and htmx.

You may also like the examples on distributed transactions where the outbox pattern is used with Watermill.

There are also the mentioned above examples on delayed messages and delayed requeue

Releases Dashboard

You can now track the releases of all Watermill repositories in one place: releases.threedots.tech

Terraform

We set up a Terraform repository to manage all Pub/Sub repositories. If you want to take a look, it’s here: watermill-terraform.

Celebrating the Launch

Just like the last two times, we’re hosting a live event to celebrate the release of Watermill v1.4!

We will share more details about the changes, show live demos, and answer your questions during Q&A.

We start on October 30 at:

  • 🇪🇺 6 PM CEST
  • 🇺🇸 9 AM PT / 12 PM EST
  • 🇮🇳 9:30 PM IST

Join us on YouTube (Add the event to Google Calendar)

We will also have news about our Go Event-Driven training.

We have a tradition of running a quiz during the live events. The prize is a unique crotchet Gopher. 😎

Meet the Gophers at our live event!

See you there!

Last update:
  • October 29, 2024
comments powered by Disqus