Watermill 1.3 released, an open-source event-driven Go library

Hey, it’s been a long time!

We’re happy to share that Watermill v1.3 is now out!

What is Watermill

Watermill is an open-source library for building message-driven or event-driven applications the easy way in Go. Our definition of “easy” is as easy as building an HTTP server in Go. With all that, it’s a library, not a framework. So your application is not tied to Watermill forever.

Currently, Watermill has over 6k stars on GitHub, has over 50 contributors, and has been used by numerous projects in the last 4 years. It makes Watermill the most popular open-source event-driven library in Go.

Watermill provides support for 12 different Pub/Sub implementations, including Kafka, Redis Streams, NATS, Google Cloud Pub/Sub, Amazon SQS/SNS (alpha), SQL, RabbitMQ and more. It’s also possible to provide a custom Pub/Sub implementation if needed.

We believe that the foundation of every good open-source is excellent documentation together with real-life examples showing all features.

Yesterday, we just wrapped up the v1.3 release. We included multiple features that make Watermill even more powerful and easy to use.

What we’ve been up to over the last year

In March, we kicked off the work on our second training, Go Event-Driven. To check the interest, we organized a pre-sale with the goal of selling at least 50 copies. To our amazement, 323 copies sold within a week and generated over $54k of income. Even though we didn’t have any content ready yet.

Working on the training limited our time for Watermill, but on the other hand, it created an excellent synergy. While coming up with examples for the exercises, we also found a few improvements to Watermill.

Finally, once we finished the training, we could wrap up the work on Watermill v1.3.

Celebrating the Launch

You can watch the release party recording on YouTube.

Let’s dive deeper into the changes!

What’s new?

For the complete list of changes, see the GitHub releases page.

Big thanks go to everybody who contributed to this release! 💪

New CQRS API

There are three APIs for handling messages in Watermill:

  • The raw Subscribe() calls on a Subscriber.
  • The Router and message handlers.
  • The cqrs component and event/command handlers.

Each is a more high-level abstraction than the previous one. There’s no best API — depending on your use case, you may want to use one or the other.

During the last release, we mentioned we wanted to introduce a generic API to eliminate conversions from the interface{} (now: any).

At the same time, we noticed some design issues with the CQRS component. The way to use it was via the Facade, which wired everything up using a big config. If you wanted to create a simple setup, it felt like too much effort compared to the Router.

Here’s what changed in v1.3.

The cqrs.Facade is now deprecated

We won’t remove it, so your current code will keep working as before. We recommend using the new API instead of the Facade for new use cases.

For consuming messages, you want to use the CommandProcessor and EventProcessor:

For publishing messages, you want to use the CommandBus and EventBus:

The bus and processor concepts are not new. The previous API also had them, so this change is backward-compatible. Their API is now more cohesive and easier to use.

Generic handlers

With the new API, you can use the same event or command handlers as before. There’s also the new generic API that eliminates the type assertion and is easier to write.

This is the original API (still supported):

type OrderPlacedHandler struct {}

func (h OrderPlacedHandler) HandlerName() string {
	return "OrderPlacedHandler"
}

func (h OrderPlacedHandler) NewCommand() interface{} {
	return &OrderPlaced{}
}

func (h OrderPlacedHandler) Handle(ctx context.Context, e interface{}) error {
	event := e.(*OrderPlaced)
	
	return nil
}

And this is the new generic API:

cqrs.NewEventHandler(
	"OrderPlacedHandler",
	func(ctx context.Context, event *OrderPlaced) error {
		// No more type assertions! 
		return nil
	},
)

Of course, you can still use structs for dependency injection:

type OrderPlacedHandler struct {
	repository OrderRepository
}

func (h OrderPlacedHandler) Handle(ctx context.Context, event *OrderPlaced) error {
	return h.repository.Save(event)
}
h := OrderPlacedHandler{
	repository: NewRepository(),
}

eventHandler := cqrs.NewEventHandler("OrderPlacedHandler", h.Handle)

Event handler groups

The EventProcessor creates one subscriber for each event handler. This works great out of the box for the common use case of one event type per topic.

If you keep all your events on one topic, you can use the new AddHandlersGroup() method to create a single subscriber for all specified handlers. This is especially important if you care about the message ordering. Otherwise, your events may be processed in parallel by different handlers.

err := eventProcessor.AddHandlersGroup(
	"events", 
	UserSignedUpHandler{},
	OrderPlacedHandler{},
	OrderCancelledHandler{},
)

In this setup, set the AckOnUnknownEvent config option to true so the processor skips events that don’t have a handler assigned.

Circuit Breaker Middleware

A new CircuitBreaker middleware implements the circuit breaker pattern. It uses the gobreaker library.

Like the Throttle middleware, it can help to avoid overloading your downstream services in case of a failure.

If processing the message fails several times, the middleware immediately returns an error without calling the handler for the following messages. After some time, it tries to rerun the handler. If it succeeds, it returns to normal. Otherwise, it keeps failing.

The number of fails, the time it needs to fail within, and the time to wait before retrying are configurable, among other things.

Request-Reply support for commands

While communication over messages is asynchronous most of the time, it sometimes makes sense to receive a confirmation or result from a command. Many Pub/Subs support this use case with the request-reply pattern. We introduced a similar feature that works on top of any Pub/Sub.

The new requestreply package works on top of the cqrs.CommandBus. It allows you to send a command and wait for a reply. The Reply can contain an error or, optionally, a (generic) result. If you are returning just an error from the handler, the function signature is compatible with the generic handler signature.

err := commandProcessor.AddHandlers(
    requestreply.NewCommandHandler(
        "hotel_room_booking",
        ts.RequestReplyBackend,
        func(ctx context.Context, cmd *BookHotelRoom) error {
            return fmt.Errorf("some error")
        },
    ),
)
// ...

After running the command processor with the handler, you can send a command and wait for a reply:

reply, err := requestreply.SendWithReply[requestreply.NoResult](
    context.Background(),
    ts.CommandBus,
    ts.RequestReplyBackend,
    &BookHotelRoom{ID: "1"},
)

// ...

fmt.Println(reply.Error) // it's equal to `fmt.Errorf("some error")`

You can also send a result from the handler:

err := commandProcessor.AddHandlers(
    requestreply.NewCommandHandlerWithResult[PayForRoom, PayForRoomResult](
        "pay_for_room",
        ts.RequestReplyBackend,
        func(ctx context.Context, cmd *PayForRoom) (PayForRoomResult, error) {
            return PayForRoomResult{PaymentReference: "1234"}, nil
        },
    ),
)
// ...

After running the command processor with the handler, you can send a command and wait for a reply:

reply, err := requestreply.SendWithReply[requestreply.NoResult](
    context.Background(),
    ts.CommandBus,
    ts.RequestReplyBackend,
    &TestCommand{ID: "1"},
)

// ...

fmt.Println(reply.Result.PaymentReference) // it's equal to "1234"
fmt.Println(reply.Error) // it's nil

We have also used generic types here, so your code is type-safe.

By default, we don’t recommend using the request-reply pattern because it introduces coupling between services. Usually, using RPC is a better choice. But if you have a good argument for using request-reply (for example, you want to ensure that the request is processed, even if there was a transient error, and you wish to return a response to the customer), we’ve got you covered.

Slog adapter

Watermill now supports a logger backed by the new slog API.

watermill-sql: v2 released

We discovered a low chance of losing messages in watermill-sql in some specific cases. This is due to how transactions work — if you’re interested in the details, see this blog post.

We had to change the default schema and the public API to fix this, so this is a major version bump. We recommend using watermill-sql v2 from now on.

Note the minimal PostgreSQL version to use it is now 13.

Migrating the existing database schema would be pretty complicated. Instead, we recommend using new tables.

If you use watermill-sql just for the outbox pattern and there is one topic you care about, you can rename it. For example, if you’ve used the topic events-to-forward, you can rename it to events-to-forward-v2 while bumping watermill-sql to v2. This will create new tables with the new schema.

If you use more topics, you can use the GenerateMessagesTableName option to customize the table name. The default prefix is watermill_. You can change it to something else, like watermill_v2_, while bumping watermill-sql to v2.

GenerateMessagesTableName: func(topic string) string{
	return fmt.Sprintf("watermill_v2_%s", topic)
}

Remember to pass this option to both the schema and the offsets schema!

Usually, these tables are used just for delivering the messages and are not needed after it’s done. But you need to ensure all messages are processed before switching to new tables. In high-traffic environments, you may need to run both versions of watermill-sql for the migration period.

Amazon SQS/SNS — looking for testers!

There’s some work going on within PRs of watermill-amazonsqs (thank you, @Mariscal6!). If you’re familiar with this Pub/Sub or are running a project using it, we’d love to hear your feedback!

Live Event Recording

You can watch the release party recording on YouTube.

Last update:
  • September 25, 2023
comments powered by Disqus