Wolverine Integration in Momentum
Wolverine is a powerful messaging framework that serves as the backbone of Momentum's CQRS and messaging infrastructure. It provides in-process messaging, external transport integration, and advanced middleware capabilities.
What is Wolverine?
Wolverine (formerly Jasper) is a .NET messaging framework that provides:
- In-process messaging: CQRS command/query handling
- External transports: Kafka, RabbitMQ, Azure Service Bus integration
- Middleware pipeline: Powerful interceptor pattern
- Message persistence: Reliable message delivery with PostgreSQL
- Code generation: High-performance handlers through source generation
Architecture Overview
Basic Configuration
Wolverine is automatically configured through AddServiceDefaults()
:
var builder = WebApplication.CreateBuilder(args);
// Automatically configures Wolverine with sensible defaults
builder.AddServiceDefaults();
var app = builder.Build();
// Wolverine commands are supported
await app.RunAsync(args);
Manual Configuration
For more control, configure Wolverine manually:
builder.Services.AddWolverine(opts =>
{
opts.ServiceName = "MyService";
// Configure PostgreSQL persistence
opts.PersistMessagesWithPostgresql(connectionString);
// Add middleware
opts.Policies.AddMiddleware<CustomMiddleware>();
// Configure discovery
opts.Discovery.IncludeAssembly(typeof(MyDomainAssembly).Assembly);
});
Handler Discovery
Automatic Discovery
Handlers are automatically discovered from domain assemblies:
// Mark assemblies for discovery
[assembly: DomainAssembly(typeof(IAppDomainAssembly))]
// Handlers are automatically found
public static class CreateCashierCommandHandler
{
public static async Task<(Result<Cashier>, CashierCreated?)> Handle(
CreateCashierCommand command,
IMessageBus messaging,
CancellationToken cancellationToken)
{
// Handler logic
}
}
Handler Patterns
Wolverine supports various handler patterns:
// Static method handlers (recommended)
public static class MyCommandHandler
{
public static async Task<Result<T>> Handle(MyCommand command, CancellationToken token)
{
// Implementation
}
}
// Instance method handlers
public class MyQueryHandler
{
private readonly IDbContext _db;
public MyQueryHandler(IDbContext db)
{
_db = db;
}
public async Task<Result<T>> Handle(MyQuery query, CancellationToken token)
{
// Implementation using _db
}
}
// Multiple handlers in one class
public static class CashierHandlers
{
public static Task<Result<Cashier>> Handle(GetCashierQuery query, AppDomainDb db, CancellationToken token)
{
// Query handler
}
public static Task<(Result<Cashier>, CashierCreated?)> Handle(CreateCashierCommand command, IMessageBus bus, CancellationToken token)
{
// Command handler
}
}
Message Bus Usage
IMessageBus Interface
The IMessageBus
interface provides the primary API for messaging:
public interface IMessageBus
{
// Execute commands/queries
Task<TResponse> InvokeAsync<TResponse>(IQuery<TResponse> query, CancellationToken cancellationToken);
Task<TResponse> InvokeAsync<TResponse>(ICommand<TResponse> command, CancellationToken cancellationToken);
// Execute database commands
Task<TResponse> InvokeCommandAsync<TResponse>(ICommand<TResponse> command, CancellationToken cancellationToken);
// Publish events
Task PublishAsync<T>(T message, CancellationToken cancellationToken);
Task PublishAsync<T>(T message, DeliveryOptions? options, CancellationToken cancellationToken);
}
Usage Examples
public class CashierController : ControllerBase
{
private readonly IMessageBus _messageBus;
public CashierController(IMessageBus messageBus)
{
_messageBus = messageBus;
}
[HttpPost]
public async Task<ActionResult<Cashier>> CreateCashier(CreateCashierCommand command)
{
var (result, integrationEvent) = await _messageBus.InvokeAsync(command);
if (result.IsSuccess)
{
// Integration event is automatically published if returned from handler
return CreatedAtAction(nameof(GetCashier), new { id = result.Value.Id }, result.Value);
}
return BadRequest(result.Errors);
}
[HttpGet("{id}")]
public async Task<ActionResult<Cashier>> GetCashier(Guid id)
{
var query = new GetCashierQuery(GetTenantId(), id);
var result = await _messageBus.InvokeAsync(query);
if (result.IsSuccess)
{
return Ok(result.Value);
}
return NotFound();
}
}
Middleware Pipeline
Built-in Middleware
Momentum configures several middleware components automatically:
// Automatically configured middleware
opts.Policies.AddMiddleware<RequestPerformanceMiddleware>(); // Performance tracking
opts.Policies.AddMiddleware(typeof(OpenTelemetryInstrumentationMiddleware)); // Tracing
opts.Policies.Add<ExceptionHandlingPolicy>(); // Error handling
opts.Policies.Add<FluentValidationPolicy>(); // Validation
Custom Middleware
Create custom middleware for cross-cutting concerns:
public class AuditMiddleware
{
public async Task InvokeAsync<T>(MessageContext context, T message, Func<Task> next)
{
var auditService = context.Services.GetRequiredService<IAuditService>();
// Before handler execution
var auditId = await auditService.StartAuditAsync(typeof(T).Name, message);
try
{
await next();
// After successful execution
await auditService.CompleteAuditAsync(auditId, success: true);
}
catch (Exception ex)
{
// After failed execution
await auditService.CompleteAuditAsync(auditId, success: false, ex.Message);
throw;
}
}
}
// Register middleware
builder.Services.AddWolverine(opts =>
{
opts.Policies.AddMiddleware<AuditMiddleware>();
});
Conditional Middleware
Apply middleware conditionally:
// Only apply to commands
opts.Policies.AddMiddleware<AuditMiddleware>(
x => x.HandledMessageType.Implements<ICommand>()
);
// Only apply to specific handlers
opts.Policies.AddMiddleware<CachingMiddleware>(
x => x.HandledMessageType == typeof(GetCashierQuery)
);
Message Routing
Local Message Routing
Local messages (commands/queries) are routed to handlers within the same process:
// This stays within the process
var result = await messageBus.InvokeAsync(new GetCashierQuery(tenantId, id));
External Message Routing
Integration events are routed to external transports:
// This is published to Kafka automatically when returned from handler
public static async Task<(Result<Cashier>, CashierCreated?)> Handle(
CreateCashierCommand command,
IMessageBus messaging,
CancellationToken cancellationToken)
{
// ... business logic
var integrationEvent = new CashierCreated(tenantId, cashier);
return (result, integrationEvent); // Automatically published
}
// Manual publishing
await messageBus.PublishAsync(new CashierCreated(tenantId, cashier));
Routing Rules
Configure custom routing rules:
builder.Services.AddWolverine(opts =>
{
// Route all events to Kafka
opts.PublishAllMessages().ToKafka();
// Route specific messages to specific topics
opts.PublishMessage<OrderCreated>().ToKafkaTopic("orders");
// Route to local queues
opts.PublishMessage<InternalNotification>().ToLocalQueue("notifications");
});
Persistence and Reliability
PostgreSQL Persistence
Wolverine uses PostgreSQL for reliable message delivery:
// Configured automatically with connection string
var connectionString = configuration.GetConnectionString("ServiceBus");
opts.PersistMessagesWithPostgresql(connectionString, schemaName: "messaging");
Outbox Pattern
The outbox pattern ensures reliable event publishing:
// Automatically configured
opts.Policies.UseDurableOutboxOnAllSendingEndpoints();
// This ensures that if the database transaction succeeds,
// the integration event will be published, even if the service crashes
public static async Task<(Result<Cashier>, CashierCreated?)> Handle(
CreateCashierCommand command,
IMessageBus messaging,
CancellationToken cancellationToken)
{
// Database operation and event publishing are transactionally consistent
var dbCommand = CreateInsertCommand(command);
var insertedCashier = await messaging.InvokeCommandAsync(dbCommand, cancellationToken);
var result = insertedCashier.ToModel();
var createdEvent = new CashierCreated(result.TenantId, 0, result);
return (result, createdEvent); // Published reliably via outbox
}
Message Durability
Configure durability options:
builder.Services.AddWolverine(opts =>
{
// Use durable local queues for internal messages
opts.Policies.UseDurableLocalQueues();
// Apply transactions automatically
opts.Policies.AutoApplyTransactions();
// Configure retry policies
opts.Policies.OnException<SqlException>().Retry(3);
opts.Policies.OnException<HttpRequestException>().RetryWithCooldown(1.Seconds(), 5.Seconds(), 10.Seconds());
});
Error Handling
Exception Policies
Configure how different exceptions are handled:
builder.Services.AddWolverine(opts =>
{
// Retry transient exceptions
opts.Policies.OnException<SqlException>().Retry(3);
opts.Policies.OnException<HttpRequestException>().RetryWithCooldown(1.Seconds(), 5.Seconds());
// Move poison messages to error queue
opts.Policies.OnException<BusinessRuleException>().MoveToErrorQueue();
// Continue processing for validation errors
opts.Policies.OnException<ValidationException>().ContinueProcessing();
});
Dead Letter Queues
Failed messages are moved to dead letter queues:
// Dead letter queues are automatically configured
// Messages that fail after all retries are moved to:
// - {original-queue-name}.dead-letter (for local queues)
// - {original-topic-name}.dead-letter (for Kafka topics)
Custom Error Handling
Implement custom error handling in middleware:
public class ErrorHandlingMiddleware
{
private readonly ILogger<ErrorHandlingMiddleware> _logger;
public ErrorHandlingMiddleware(ILogger<ErrorHandlingMiddleware> logger)
{
_logger = logger;
}
public async Task InvokeAsync<T>(MessageContext context, T message, Func<Task> next)
{
try
{
await next();
}
catch (BusinessRuleException ex)
{
_logger.LogWarning("Business rule violation for {MessageType}: {Message}",
typeof(T).Name, ex.Message);
// Don't retry business rule violations
return;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing {MessageType}", typeof(T).Name);
throw; // Re-throw to trigger retry logic
}
}
}
Testing with Wolverine
Unit Testing Handlers
Test handlers independently:
[Test]
public async Task Handle_ValidCommand_ReturnsSuccess()
{
// Arrange
var command = new CreateCashierCommand(Guid.NewGuid(), "John Doe", "john@example.com");
var mockMessaging = new Mock<IMessageBus>();
mockMessaging
.Setup(m => m.InvokeCommandAsync(It.IsAny<CreateCashierCommandHandler.DbCommand>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new Data.Entities.Cashier { /* ... */ });
// Act
var (result, integrationEvent) = await CreateCashierCommandHandler.Handle(
command, mockMessaging.Object, CancellationToken.None);
// Assert
result.IsSuccess.Should().BeTrue();
integrationEvent.Should().NotBeNull();
}
Integration Testing
Test the full message pipeline:
[Test]
public async Task Should_Process_Command_End_To_End()
{
// Arrange
using var testContext = new IntegrationTestContext();
var messageBus = testContext.GetService<IMessageBus>();
var command = new CreateCashierCommand(Guid.NewGuid(), "Jane Doe", "jane@example.com");
// Act
var (result, integrationEvent) = await messageBus.InvokeAsync(command);
// Assert
result.IsSuccess.Should().BeTrue();
// Verify database state
var db = testContext.GetService<AppDomainDb>();
var cashier = await db.Cashiers.FirstOrDefaultAsync(c => c.CashierId == result.Value.Id);
cashier.Should().NotBeNull();
// Verify event was published (integration test would check Kafka)
integrationEvent.Should().NotBeNull();
}
Testing with TestContainers
Use TestContainers for full integration testing:
public class IntegrationTestContext : IDisposable
{
private readonly IHost _host;
private readonly PostgreSqlContainer _database;
private readonly KafkaContainer _kafka;
public IntegrationTestContext()
{
_database = new PostgreSqlBuilder().Build();
_kafka = new KafkaBuilder().Build();
// Start containers
Task.WaitAll(
_database.StartAsync(),
_kafka.StartAsync()
);
// Configure test host
var builder = Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddWolverine(opts =>
{
opts.PersistMessagesWithPostgresql(_database.GetConnectionString());
opts.UseKafka(_kafka.GetBootstrapAddress());
});
});
_host = builder.Build();
_host.Start();
}
public T GetService<T>() where T : notnull
{
return _host.Services.GetRequiredService<T>();
}
public void Dispose()
{
_host?.Dispose();
_database?.DisposeAsync();
_kafka?.DisposeAsync();
}
}
Performance Optimization
Handler Performance
Optimize handler performance:
// Use static handlers to avoid allocation overhead
public static class OptimizedHandler
{
public static async ValueTask<Result<T>> Handle(MyQuery query, AppDomainDb db, CancellationToken token)
{
// Use ValueTask for hot paths
// Use compiled queries for performance
var result = await db.Entities
.Where(e => e.Id == query.Id)
.FirstOrDefaultAsync(token);
return result?.ToModel() ?? Result<T>.NotFound();
}
}
Message Throughput
Configure for high throughput:
builder.Services.AddWolverine(opts =>
{
// Increase parallelism
opts.Policies.MaximumParallelization(Environment.ProcessorCount * 2);
// Optimize local queues
opts.LocalQueue("high-priority")
.MaximumParallelization(10)
.Sequential(); // For ordered processing
// Configure batch processing
opts.LocalQueue("batch-processing")
.ProcessInBatches(50, TimeSpan.FromSeconds(5));
});
Memory Management
Optimize memory usage:
// Use object pools for frequently created objects
public static class PooledHandler
{
private static readonly ObjectPool<StringBuilder> StringBuilderPool =
new DefaultObjectPoolProvider().CreateStringBuilderPool();
public static async Task<Result<string>> Handle(ProcessDataCommand command, CancellationToken token)
{
var sb = StringBuilderPool.Get();
try
{
// Use pooled StringBuilder
sb.AppendLine(command.Data);
return sb.ToString();
}
finally
{
StringBuilderPool.Return(sb);
}
}
}
Command-Line Tools
Wolverine provides command-line tools for development and operations:
# Check environment and configuration
dotnet run -- check-env
# Generate code artifacts
dotnet run -- codegen
# Database operations
dotnet run -- db-apply # Apply database migrations
dotnet run -- db-assert # Verify database state
dotnet run -- db-patch # Generate migration patch
# Inspect configuration
dotnet run -- describe # Show current configuration
dotnet run -- resources # List all resources
# Storage operations
dotnet run -- storage # Storage-related commands
Usage in Development
These commands are automatically available when using app.RunAsync(args)
:
var app = builder.Build();
// This enables Wolverine CLI commands
await app.RunAsync(args);
Best Practices
Handler Design
- Use static methods: Better performance, easier testing
- Keep handlers focused: One handler per command/query
- Avoid heavy dependencies: Inject only what you need
- Use cancellation tokens: Support request cancellation
Error Handling
- Distinguish error types: Separate business errors from technical errors
- Use appropriate retry policies: Don't retry business rule violations
- Log meaningfully: Provide context for debugging
- Handle poison messages: Configure dead letter queues
Performance
- Profile handlers: Monitor performance of critical handlers
- Use appropriate data types: ValueTask for hot paths
- Minimize allocations: Use object pooling where appropriate
- Optimize database queries: Use compiled queries for frequently executed queries
Testing
- Test handlers independently: Unit test business logic
- Integration test pipelines: Test the full message flow
- Mock external dependencies: Use mocks for external services
- Test error scenarios: Verify error handling works correctly
Next Steps
- Learn about Integration Events publishing
- Understand Kafka Configuration for external messaging
- Explore Domain Events for internal messaging
- See Testing for comprehensive testing strategies