Integration Events in Momentum
Integration events enable communication between different services or bounded contexts in your system. They represent significant business events that other services might need to react to.
What are Integration Events?
Integration events are:
- Cross-service communication: Messages sent between different services
- Business significant: Represent important business events like "CashierCreated" or "InvoicePaid"
- Asynchronous: Processed in the background without blocking the caller
- Decoupling mechanism: Allow services to communicate without direct dependencies
Event Definition
Integration events are defined as records with specific attributes:
[EventTopic<Cashier>]
public record CashierCreated(
[PartitionKey] Guid TenantId,
Cashier Cashier
);
Key Components
- EventTopic Attribute: Defines the topic and routing configuration
- PartitionKey Attribute: Ensures message ordering and proper distribution
- XML Documentation: Required for auto-generated documentation
- Immutable Record: Events should never change once created
Event Attributes
EventTopic Attribute
The EventTopic
attribute configures how the event is routed and published:
// Basic usage
[EventTopic<Cashier>]
public record CashierCreated(/* parameters */);
// With custom configuration
[EventTopic<Invoice>(
Topic = "invoice-updates",
Domain = "invoicing",
Version = "v2",
Internal = false,
ShouldPluralizeTopicName = true
)]
public record InvoiceUpdated(/* parameters */);
Parameters:
Topic
: Custom topic name (defaults to class name)Domain
: Business domain (defaults to assembly domain)Version
: Event schema versionInternal
: Whether the event is internal to the serviceShouldPluralizeTopicName
: Whether to pluralize the topic name
PartitionKey Attribute
The PartitionKey
attribute ensures proper message ordering and distribution:
public record CashierCreated(
[PartitionKey] Guid TenantId, // Single partition key
Cashier Cashier
);
public record ComplexEvent(
[PartitionKey(Order = 0)] Guid TenantId, // Primary partition key
[PartitionKey(Order = 1)] int RegionId, // Secondary partition key
string EventData
);
Benefits of Partition Keys:
- Message ordering: Messages with the same partition key are processed in order
- Load balancing: Events are distributed across Kafka partitions
- Tenant isolation: Multi-tenant applications can isolate by tenant
Real-World Examples
Simple Event
[EventTopic<Guid>]
public record CashierDeleted(
[PartitionKey] Guid TenantId,
Guid CashierId
);
Complex Event with Multiple Partition Keys
[EventTopic<Cashier>]
public record CashierCreated(
[PartitionKey(Order = 0)] Guid TenantId,
[PartitionKey(Order = 1)] int PartitionKeyTest,
Cashier Cashier
);
Invoice Events
[EventTopic<Invoice>]
public record InvoiceCreated(
[PartitionKey] Guid TenantId,
Invoice Invoice
);
[EventTopic<Guid>]
public record InvoicePaid(
[PartitionKey] Guid TenantId,
Guid InvoiceId,
decimal AmountPaid,
DateTime PaidDate
);
Publishing Events
Integration events are automatically published when returned from command handlers:
public static class CreateCashierCommandHandler
{
public static async Task<(Result<Cashier>, CashierCreated?)> Handle(
CreateCashierCommand command,
IMessageBus messaging,
CancellationToken cancellationToken)
{
// Execute business logic
var dbCommand = CreateInsertCommand(command);
var insertedCashier = await messaging.InvokeCommandAsync(dbCommand, cancellationToken);
var result = insertedCashier.ToModel();
// Create integration event
var createdEvent = new CashierCreated(
result.TenantId,
PartitionKeyTest: 0,
result
);
// Return result and event - framework will publish the event automatically
return (result, createdEvent);
}
}
Manual Event Publishing
You can also publish events manually using the message bus:
public static class SomeService
{
public static async Task DoSomethingAsync(
IMessageBus messageBus,
CancellationToken cancellationToken)
{
// Your business logic here
// Publish event manually
var event = new CashierCreated(tenantId, 0, cashier);
await messageBus.PublishAsync(event, cancellationToken);
}
}
Event Handlers
Other services can subscribe to integration events by creating handlers:
// In another service (e.g., Notification Service)
public static class CashierCreatedHandler
{
public static async Task Handle(
CashierCreated cashierCreated,
IEmailService emailService,
ILogger<CashierCreatedHandler> logger,
CancellationToken cancellationToken)
{
logger.LogInformation("Processing CashierCreated event for tenant {TenantId}",
cashierCreated.TenantId);
try
{
await emailService.SendWelcomeEmailAsync(
cashierCreated.Cashier.Email,
cashierCreated.Cashier.Name,
cancellationToken);
logger.LogInformation("Welcome email sent to {Email}",
cashierCreated.Cashier.Email);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to send welcome email to {Email}",
cashierCreated.Cashier.Email);
throw; // Re-throw to trigger retry logic
}
}
}
Topic Naming Convention
Momentum automatically generates Kafka topic names based on the event configuration:
Format: {environment}.{domain}.{scope}.{topic}.{version}
Examples:
dev.appdomain.public.cashiers.v1
prod.invoicing.internal.payments
test.notifications.public.emails.v2
Topic Name Components
- Environment:
dev
,test
,prod
(based on hosting environment) - Domain: Business domain (from EventTopic attribute or assembly default)
- Scope:
public
(cross-service) orinternal
(service-specific) - Topic: Event name (pluralized by default)
- Version: Schema version (optional)
Event Versioning
Handle event schema evolution with versioning:
// Version 1
[EventTopic<User>(Version = "v1")]
public record UserCreated(
[PartitionKey] Guid TenantId,
Guid UserId,
string Name,
string Email
);
// Version 2 - added new field
[EventTopic<User>(Version = "v2")]
public record UserCreated(
[PartitionKey] Guid TenantId,
Guid UserId,
string Name,
string Email,
DateTime CreatedDate // New field
);
Handling Multiple Versions
// Handler for V1 events
public static class UserCreatedV1Handler
{
public static async Task Handle(UserCreatedV1 userCreated, CancellationToken cancellationToken)
{
// Handle V1 event
}
}
// Handler for V2 events
public static class UserCreatedV2Handler
{
public static async Task Handle(UserCreatedV2 userCreated, CancellationToken cancellationToken)
{
// Handle V2 event
}
}
Error Handling and Retry
Integration event handlers support automatic retry and error handling:
public static class OrderCreatedHandler
{
public static async Task Handle(
OrderCreated orderCreated,
IInventoryService inventoryService,
ILogger<OrderCreatedHandler> logger,
CancellationToken cancellationToken)
{
try
{
await inventoryService.ReserveItemsAsync(
orderCreated.OrderItems,
cancellationToken);
}
catch (InventoryNotAvailableException ex)
{
logger.LogWarning("Inventory not available for order {OrderId}: {Message}",
orderCreated.OrderId, ex.Message);
// Don't retry for business exceptions
return;
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to process order created event for order {OrderId}",
orderCreated.OrderId);
// Re-throw to trigger retry
throw;
}
}
}
Testing Integration Events
Testing Event Publishing
[Test]
public async Task Handle_ValidCommand_PublishesIntegrationEvent()
{
// Arrange
var command = new CreateCashierCommand(
Guid.NewGuid(),
"John Doe",
"john@example.com"
);
var mockMessaging = new Mock<IMessageBus>();
// ... setup mocks
// Act
var (result, integrationEvent) = await CreateCashierCommandHandler.Handle(
command, mockMessaging.Object, CancellationToken.None);
// Assert
result.IsSuccess.Should().BeTrue();
integrationEvent.Should().NotBeNull();
integrationEvent!.TenantId.Should().Be(command.TenantId);
integrationEvent.Cashier.Name.Should().Be(command.Name);
}
Testing Event Handlers
[Test]
public async Task Handle_CashierCreated_SendsWelcomeEmail()
{
// Arrange
var cashierCreated = new CashierCreated(
Guid.NewGuid(),
0,
new Cashier
{
Id = Guid.NewGuid(),
Name = "John Doe",
Email = "john@example.com"
}
);
var mockEmailService = new Mock<IEmailService>();
var logger = new Mock<ILogger<CashierCreatedHandler>>();
// Act
await CashierCreatedHandler.Handle(
cashierCreated,
mockEmailService.Object,
logger.Object,
CancellationToken.None);
// Assert
mockEmailService.Verify(
x => x.SendWelcomeEmailAsync(
cashierCreated.Cashier.Email,
cashierCreated.Cashier.Name,
It.IsAny<CancellationToken>()),
Times.Once);
}
Best Practices
Event Design
- Make events immutable: Use records with readonly properties
- Include necessary data: Events should contain all data consumers need
- Use meaningful names: Event names should clearly describe what happened
- Version your events: Plan for schema evolution from the beginning
Partition Keys
- Choose wisely: Partition keys affect message ordering and distribution
- Tenant isolation: Use tenant ID as partition key for multi-tenant systems
- Avoid hotspots: Don't use partition keys that create uneven distribution
- Keep it stable: Partition keys should not change for the same logical entity
Error Handling
- Handle business exceptions: Don't retry for expected business failures
- Log appropriately: Log enough information for debugging
- Use dead letter queues: Configure DLQ for failed messages
- Implement circuit breakers: Protect downstream services
Performance
- Keep events small: Large events can impact performance
- Batch when possible: Consider batching related events
- Monitor throughput: Watch for processing bottlenecks
- Use appropriate timeouts: Set reasonable timeouts for external calls
Documentation
- Use XML documentation: Document when events are published
- Include examples: Show example event payloads
- Document handlers: Explain what each handler does
- Keep it current: Update documentation when events change
Configuration
Integration events are configured automatically through:
- Service discovery: Events are discovered from domain assemblies
- Topic configuration: Kafka topics are auto-provisioned
- Consumer groups: Each service gets its own consumer group
- Serialization: CloudEvents format with System.Text.Json
See Kafka Configuration for detailed Kafka setup instructions.
Next Steps
- Learn about Domain Events for internal service events
- Understand Kafka Configuration for message broker setup
- Explore Wolverine messaging framework details
- See Testing for comprehensive testing strategies