Golang CQRS, Metrics and AMQP - Watermill v0.3.0 released

54 days of work, 12,909 lines of code, 47 Monsters and 42 KFC Twisters later finally it is Watermill v0.3.0! To keep it short, let’s go through the changes.

One important thing: at the end of this post there is a 3 question survey. Please take a moment to fill it out, it will help us make Watermill even better.

CQRS component

One of the most important parts of the v0.3.0 release is built-in CQRS support. The implementation itself is pretty simple and based on the existing message.Router and message.PubSub examples.

We built CQRS in a way that allows you to use this component with your pre-existing (even not Golang-based) infrastructure. All you need to do is implement your own Marshaler.

If you are interested in how to use CQRS, please see watermill.io documentation.

CQRS Event Storming

Thanks to @adymitruk for a bit of consulting and inspirations!

Prometheus metrics component

Second, not less important feature is the metrics component. Our goal was to allow you to add metrics to your Watermill application in one minute.

All you need to do, is add a few lines:

prometheusRegistry, closeMetricsServer := metrics.CreateRegistryAndServeHTTP(":8081")
defer closeMetricsServer()
metrics.NewPrometheusMetricsBuilder(prometheusRegistry, "", "").AddPrometheusRouterMetrics(router)

and load our dashboard in your Grafana instance.

Watermill Grafana Dashboard

If you are not using Prometheus or Grafana in your system, you may use this component as the reference implementation. It is not using any internals, but only publicly available handler middlewares and pub/sub decorators.

For the detailed documentation, please check watermill.io website.

All kudos for the implementation goes to @maclav3!

AMQP (RabbitMQ) Pub/Sub implementation

We are happy to announce that AMQP Pub/Sub has joined the list of supported Pub/Sub implementations. You can now use it in addition to Kafka, Google Cloud Pub/Sub, and NATS.

The title may be a bit confusing, because RabbitMQ can work both as a Pub/Sub (fanout) and a queue - and we support both ways. There are available pre-created configs which allow you to create a Pub/Sub or a queue.

The implementation is based on github.com/streadway/amqp, but offers higher-level API and reconnect support. You can use it as a standalone Pub/Sub, but I kindly recommend to use message.Router.

As always, I encourage you to check getting started with Watermill and watermill.io documentation. Please keep in mind, that there are some glossary incompatibilities between Watermill and AMQP (for example, topic has a different meaning). They are all explained in the documentation.

HTTP Publisher

Can HTTP work as a Pub/Sub? Of course! At least when it implements Watermill’s message.PubSub interface. :)

But why? For example, you can create deadly easy Kafka-based webhooks.

See more details on this publisher in the documentation.

Thanks @maclav3 for the implementation!

Added context.Context to Subscribe

Subscribe now accepts a context as the first parameter. When the context is canceled, the subscription will be closed. This context is also added to the received message.

GoChannel Pub/Sub update

  • added Persistent option, which stores all produced message in memory and sends them to all new subscribers.
  • added BlockPublishUntilSubscriberAck, which will block Publish until the message is Acked by all consumers.

UUID and ULID functions

We were using satori/go.uuid in many places. Unfortunately, this library is no longer maintained. We replaced all UUID functions in the project with just added watermill.NewUUID(), watermill.NewULID() and watermill.NewShortUUID().

You may use our functions or your own implementation - UUID is still a string.


  • Travis CI replaced with GitLab CI. Thanks @m1_10sz!
  • Rewritten and optimized pub/sub tests.
  • Updated and added new examples and docs.
  • Made some breaking changes.
  • Added SubscribeInitialize method for some subscribers, which require initialization before publish (for example: create AMQP exchange, queue, etc.). If you are using Subscribe before Publish, it is not required!
  • Better stack traces for middleware.Recoverer.
  • Added subscriber.BulkReadWithDeduplication.
  • Added Publishers and Subscribers decorators support to the router (used in the metrics component, for example).
  • Added watermill.LogFields.Copy method.
  • Added watermill.LoggerAdapter.With method to the interface.


In the end, we would like to ask for your feedback. We created a short 1-minute survey.

We will be grateful for your answers, as it will help us to keep improving Watermill.


If you didn’t find an answer to your question in the documentation, you can hit me on Twitter: @roblaszczak.

Also, check out the #watermill channel on gophers.slack.com (invite here).

comments powered by Disqus