Class MessagingSetupExtensions
Namespace: Momentum.ServiceDefaults.Messaging
Assembly: Momentum.ServiceDefaults.dll
public static class MessagingSetupExtensions
Inheritance
object ← MessagingSetupExtensions
Inherited Members
object.GetType(), object.MemberwiseClone(), object.ToString(), object.Equals(object?), object.Equals(object?, object?), object.ReferenceEquals(object?, object?), object.GetHashCode()
Methods
AddServiceBus(IHostApplicationBuilder, Action<WolverineOptions>?)
Adds ServiceBus with Wolverine messaging framework with production-ready configuration for reliable message processing.
public static IHostApplicationBuilder AddServiceBus(this IHostApplicationBuilder builder, Action<WolverineOptions>? configure = null)
Parameters
builder
IHostApplicationBuilder
The host application builder to configure.
configure
Action<WolverineOptions>?
Optional action to configure Wolverine options for specific business requirements.
Returns
The configured host application builder for method chaining.
Examples
Basic E-commerce Order Processing Setup
var builder = WebApplication.CreateBuilder(args);
// Configure core messaging infrastructure
builder.AddWolverine(opts =>
{
// Configure custom routing for order events
opts.PublishMessage<OrderCreated>()
.ToKafkaTopic("ecommerce.orders.order-created")
.UseDurableOutbox();
// Configure local queues for background processing
opts.LocalQueue("order-processing")
.UseDurableInbox()
.ProcessInline();
});
var app = builder.Build();
await app.RunAsync(args);
Command Handler with Automatic Validation
// Command definition with validation
public record CreateOrder(Guid CustomerId, List<OrderItem> Items);
public class CreateOrderValidator : AbstractValidator<CreateOrder>
{
public CreateOrderValidator()
{
RuleFor(x => x.CustomerId).NotEmpty();
RuleFor(x => x.Items).NotEmpty().Must(items => items.Count <= 50);
}
}
// Handler - validation happens automatically before execution
public static class OrderHandlers
{
public static async Task<OrderCreated> Handle(
CreateOrder command,
IOrderRepository orders,
IMessageBus bus,
ILogger<OrderHandlers> logger)
{
// Command is guaranteed to be valid
var order = new Order(command.CustomerId, command.Items);
await orders.SaveAsync(order);
// Publish integration event for other services
await bus.PublishAsync(new OrderCreated(order.Id, order.CustomerId));
logger.LogInformation("Order {OrderId} created for customer {CustomerId}",
order.Id, order.CustomerId);
return new OrderCreated(order.Id, order.CustomerId);
}
}
Event Handler for Cross-Service Integration
// Integration event from external service
[EventTopic("ecommerce.payments.payment-completed")]
public record PaymentCompleted(Guid OrderId, decimal Amount);
// Handler automatically discovered and registered
public static class PaymentHandlers
{
[KafkaListener("ecommerce.payments")]
public static async Task Handle(
PaymentCompleted @event,
IOrderRepository orders,
IMessageBus bus)
{
var order = await orders.GetByIdAsync(@event.OrderId);
order.MarkAsPaid(@event.Amount);
await orders.SaveAsync(order);
// Trigger fulfillment process
await bus.SendToQueueAsync("fulfillment",
new FulfillOrder(order.Id, order.Items));
}
}
Configuration for Multi-Service Architecture
// appsettings.json
{
"ConnectionStrings": {
"ServiceBus": "Host=postgres;Database=order_service_messaging;Username=app;Password=secret"
},
"ServiceBus": {
"Domain": "ECommerce",
"PublicServiceName": "order-service",
"CloudEvents": {
"Source": "https://api.mystore.com/orders",
"DefaultType": "com.mystore.orders"
}
},
"Kafka": {
"BootstrapServers": "kafka:9092",
"GroupId": "order-service-v1"
}
}
Manual Service Registration for Testing
// In test setup
public class OrderServiceIntegrationTests : IClassFixture<TestFixture>
{
private readonly IServiceProvider _services;
public OrderServiceIntegrationTests(TestFixture fixture)
{
var services = new ServiceCollection();
var config = fixture.Configuration;
var env = fixture.Environment;
// Add Wolverine with test-specific configuration
services.AddWolverineWithDefaults(env, config, opts =>
{
// Use in-memory transport for testing
opts.UseInMemoryTransport();
// Disable external dependencies
opts.DisableKafka();
// Enable immediate processing for synchronous testing
opts.Policies.DisableConventionalLocalRouting();
});
_services = services.BuildServiceProvider();
}
}
Environment-Specific Configuration
// appsettings.Development.json
{
"ConnectionStrings": {
"ServiceBus": "Host=localhost;Database=dev_messaging;Username=dev;Password=dev"
},
"ServiceBus": {
"PublicServiceName": "order-service-dev"
},
"Kafka": {
"BootstrapServers": "localhost:9092"
}
}
// appsettings.Production.json
{
"ConnectionStrings": {
"ServiceBus": "${MESSAGING_CONNECTION_STRING}"
},
"ServiceBus": {
"PublicServiceName": "order-service",
"CloudEvents": {
"Source": "https://api.production.mystore.com/orders"
}
},
"Kafka": {
"BootstrapServers": "${KAFKA_BOOTSTRAP_SERVERS}",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "Plain"
}
}
Remarks
Wolverine Setup Detailed Configuration Guide
Overview
This method configures Wolverine with enterprise-grade messaging capabilities:
- Registers a keyed PostgreSQL data source for durable message persistence
- Configures Wolverine with CQRS patterns and reliable messaging defaults
- Integrates with OpenTelemetry for distributed tracing and performance monitoring
- Sets up automatic validation, exception handling, and transaction management
- Enables CloudEvent support for standardized cross-service communication
- Provides health checks for messaging infrastructure monitoring
- Skips registration if SkipServiceRegistration is true (for testing scenarios)
The messaging system supports both synchronous request/response patterns and asynchronous event-driven architectures, making it suitable for complex business workflows.
Key Features
Message Persistence
Wolverine is configured with PostgreSQL for durable message storage, ensuring message delivery guarantees and supporting inbox/outbox patterns for transactional consistency.
CQRS Support
Built-in support for Command Query Responsibility Segregation (CQRS) patterns with automatic command and query handler discovery and registration.
Observability Integration
Full integration with OpenTelemetry for:
- Distributed tracing across message flows
- Performance metrics for message processing
- Health monitoring of messaging infrastructure
Validation and Error Handling
Automatic integration with FluentValidation for command validation, with configurable error handling policies and dead letter queue support.
CloudEvents Support
Standardized message formatting using CloudEvents specification for interoperability with external systems and services.
Health Monitoring
Comprehensive health checks for messaging infrastructure components including database connectivity and message queue health.
Configuration Patterns
Basic Configuration
The AddWolverine() method provides sensible defaults for most messaging scenarios while allowing customization through the configure action parameter.
Advanced Routing
Support for complex message routing patterns including:
- Topic-based routing for event publishing
- Queue-based routing for command processing
- Conditional routing based on message content
- Dead letter handling for failed messages
Transaction Management
Automatic transaction management with support for:
- Transactional inbox patterns
- Transactional outbox patterns
- Saga pattern implementation
- Compensating actions for rollbacks