Database Transactions in Momentum
Momentum provides robust transaction handling through LinqToDB integration and follows the outbox pattern for reliable event publishing. This ensures data consistency and reliable message delivery in distributed systems.
Overview
Database transactions in Momentum are handled automatically by the framework, but you can also control them explicitly when needed:
public static class CreateCashierCommandHandler
{
public static async Task<(Result<Cashier>, CashierCreated?)> Handle(
CreateCashierCommand command,
IMessageBus messaging,
CancellationToken cancellationToken)
{
// Database operations are automatically wrapped in transactions
var dbCommand = CreateInsertCommand(command);
var insertedCashier = await messaging.InvokeCommandAsync(dbCommand, cancellationToken);
var result = insertedCashier.ToModel();
var createdEvent = new CashierCreated(result.TenantId, 0, result);
// Event will be published in the same transaction
return (result, createdEvent);
}
}
Automatic Transaction Management
Command Handler Transactions
All command handlers automatically run within transactions:
public static class UpdateCashierCommandHandler
{
public record DbCommand(Data.Entities.Cashier Cashier) : ICommand<Data.Entities.Cashier>;
public static async Task<(Result<Cashier>, CashierUpdated?)> Handle(
UpdateCashierCommand command,
IMessageBus messaging,
CancellationToken cancellationToken)
{
// All operations in this handler run in a single transaction:
// 1. Query existing cashier
var getQuery = new GetCashierQuery(command.TenantId, command.Id);
var existingResult = await messaging.InvokeAsync(getQuery, cancellationToken);
if (!existingResult.IsSuccess)
{
return (existingResult, null);
}
// 2. Update database
var dbCommand = CreateUpdateCommand(command, existingResult.Value);
var updatedCashier = await messaging.InvokeCommandAsync(dbCommand, cancellationToken);
var result = updatedCashier.ToModel();
var updatedEvent = new CashierUpdated(result.TenantId, result);
// 3. Event publishing (handled by framework)
return (result, updatedEvent);
// If any step fails, the entire transaction is rolled back
}
public static async Task<Data.Entities.Cashier> Handle(
DbCommand command,
AppDomainDb db,
CancellationToken cancellationToken)
{
// This database operation participates in the parent transaction
return await db.Cashiers
.Where(c => c.CashierId == command.Cashier.CashierId)
.UpdateWithOutputAsync(
_ => new Data.Entities.Cashier
{
Name = command.Cashier.Name,
Email = command.Cashier.Email,
UpdatedDateUtc = DateTime.UtcNow
},
token: cancellationToken);
}
}
Transaction Scope
Each command handler execution creates a transaction scope that includes:
- Database Operations: All DbCommand executions
- Event Publishing: Integration events are stored for later publishing
- Side Effects: Any other transactional resources
Explicit Transaction Control
Manual Transaction Management
For complex scenarios, you can manage transactions explicitly:
public static class ComplexBusinessOperationHandler
{
public static async Task<Result<BusinessOperationResult>> Handle(
ComplexBusinessOperationCommand command,
AppDomainDb db,
IMessageBus messaging,
CancellationToken cancellationToken)
{
using var transaction = await db.BeginTransactionAsync(cancellationToken);
try
{
// Step 1: Create primary entity
var cashier = new Data.Entities.Cashier
{
TenantId = command.TenantId,
CashierId = Guid.CreateVersion7(),
Name = command.CashierName,
Email = command.CashierEmail,
CreatedDateUtc = DateTime.UtcNow,
UpdatedDateUtc = DateTime.UtcNow
};
var insertedCashier = await db.Cashiers.InsertWithOutputAsync(cashier, token: cancellationToken);
// Step 2: Create related entities
var permissions = command.Permissions.Select(p => new Data.Entities.CashierPermission
{
CashierId = insertedCashier.CashierId,
Permission = p,
GrantedDateUtc = DateTime.UtcNow
}).ToList();
await db.CashierPermissions.BulkCopyAsync(permissions, cancellationToken);
// Step 3: Update statistics
await db.TenantStats
.Where(ts => ts.TenantId == command.TenantId)
.UpdateAsync(ts => new Data.Entities.TenantStat
{
CashierCount = ts.CashierCount + 1,
UpdatedDateUtc = DateTime.UtcNow
}, cancellationToken);
// Step 4: Store events for publishing
var events = new List<object>
{
new CashierCreated(command.TenantId, 0, insertedCashier.ToModel()),
new CashierPermissionsGranted(command.TenantId, insertedCashier.CashierId, command.Permissions)
};
foreach (var evt in events)
{
await StoreEventForPublishing(evt, db, cancellationToken);
}
// Commit all changes
await transaction.CommitAsync(cancellationToken);
// Publish events after successful commit
foreach (var evt in events)
{
await messaging.PublishAsync(evt, cancellationToken);
}
return new BusinessOperationResult
{
CashierId = insertedCashier.CashierId,
PermissionsGranted = permissions.Count
};
}
catch (Exception ex)
{
// Rollback on any error
await transaction.RollbackAsync(cancellationToken);
return Result<BusinessOperationResult>.Failure($"Operation failed: {ex.Message}");
}
}
private static async Task StoreEventForPublishing(
object evt,
AppDomainDb db,
CancellationToken cancellationToken)
{
var outboxEvent = new Data.Entities.OutboxEvent
{
Id = Guid.CreateVersion7(),
EventType = evt.GetType().Name,
EventData = JsonSerializer.Serialize(evt),
CreatedDateUtc = DateTime.UtcNow
};
await db.OutboxEvents.InsertAsync(outboxEvent, token: cancellationToken);
}
}
Nested Transactions
Handle nested operations with savepoints:
public static class BatchProcessingHandler
{
public static async Task<Result<BatchResult>> Handle(
BatchProcessingCommand command,
AppDomainDb db,
CancellationToken cancellationToken)
{
using var mainTransaction = await db.BeginTransactionAsync(cancellationToken);
var results = new List<ProcessingResult>();
var errors = new List<string>();
try
{
foreach (var item in command.Items)
{
// Create savepoint for each item
var savepoint = $"item_{item.Id}";
await mainTransaction.SaveAsync(savepoint, cancellationToken);
try
{
var result = await ProcessSingleItem(item, db, cancellationToken);
results.Add(result);
}
catch (Exception ex)
{
// Rollback to savepoint on item failure
await mainTransaction.RollbackAsync(savepoint, cancellationToken);
errors.Add($"Item {item.Id}: {ex.Message}");
// Continue processing other items
}
}
// Decide whether to commit based on success criteria
if (errors.Count == 0 || errors.Count < command.Items.Count * 0.5) // Allow up to 50% failures
{
await mainTransaction.CommitAsync(cancellationToken);
return new BatchResult
{
Successful = results,
Errors = errors,
Success = true
};
}
else
{
await mainTransaction.RollbackAsync(cancellationToken);
return Result<BatchResult>.Failure($"Too many failures: {errors.Count}/{command.Items.Count}");
}
}
catch (Exception ex)
{
await mainTransaction.RollbackAsync(cancellationToken);
return Result<BatchResult>.Failure($"Batch processing failed: {ex.Message}");
}
}
}
Outbox Pattern Implementation
Event Storage
Momentum implements the outbox pattern to ensure reliable event publishing:
// Database entity for storing events
public class OutboxEvent
{
public Guid Id { get; set; }
public string EventType { get; set; } = string.Empty;
public string EventData { get; set; } = string.Empty;
public DateTime CreatedDateUtc { get; set; }
public DateTime? PublishedDateUtc { get; set; }
public bool IsPublished { get; set; }
public int RetryCount { get; set; }
public string? LastError { get; set; }
}
Automatic Event Storage
Events returned from command handlers are automatically stored:
// Framework automatically handles this pattern:
public static async Task<(Result<Cashier>, CashierCreated?)> Handle(
CreateCashierCommand command,
IMessageBus messaging,
CancellationToken cancellationToken)
{
// 1. Execute database operations
var dbCommand = CreateInsertCommand(command);
var insertedCashier = await messaging.InvokeCommandAsync(dbCommand, cancellationToken);
var result = insertedCashier.ToModel();
var createdEvent = new CashierCreated(result.TenantId, 0, result);
// 2. Framework stores event in outbox table (same transaction)
// 3. Framework publishes event after transaction commit
// 4. Framework marks event as published
return (result, createdEvent);
}
Outbox Event Publisher
Background service that publishes stored events:
public class OutboxEventPublisher : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<OutboxEventPublisher> _logger;
public OutboxEventPublisher(IServiceProvider serviceProvider, ILogger<OutboxEventPublisher> logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessOutboxEvents(stoppingToken);
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing outbox events");
await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken);
}
}
}
private async Task ProcessOutboxEvents(CancellationToken cancellationToken)
{
using var scope = _serviceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDomainDb>();
var messageBus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
var unpublishedEvents = await db.OutboxEvents
.Where(e => !e.IsPublished && e.RetryCount < 5)
.OrderBy(e => e.CreatedDateUtc)
.Take(100)
.ToListAsync(cancellationToken);
foreach (var outboxEvent in unpublishedEvents)
{
try
{
await PublishEvent(outboxEvent, messageBus, cancellationToken);
// Mark as published
await db.OutboxEvents
.Where(e => e.Id == outboxEvent.Id)
.UpdateAsync(e => new OutboxEvent
{
IsPublished = true,
PublishedDateUtc = DateTime.UtcNow
}, cancellationToken);
_logger.LogDebug("Published outbox event {EventId} of type {EventType}",
outboxEvent.Id, outboxEvent.EventType);
}
catch (Exception ex)
{
// Increment retry count and log error
await db.OutboxEvents
.Where(e => e.Id == outboxEvent.Id)
.UpdateAsync(e => new OutboxEvent
{
RetryCount = e.RetryCount + 1,
LastError = ex.Message
}, cancellationToken);
_logger.LogError(ex, "Failed to publish outbox event {EventId} of type {EventType}",
outboxEvent.Id, outboxEvent.EventType);
}
}
}
private async Task PublishEvent(OutboxEvent outboxEvent, IMessageBus messageBus, CancellationToken cancellationToken)
{
// Deserialize event based on type
var eventType = Type.GetType(outboxEvent.EventType);
if (eventType == null)
{
throw new InvalidOperationException($"Unknown event type: {outboxEvent.EventType}");
}
var eventInstance = JsonSerializer.Deserialize(outboxEvent.EventData, eventType);
if (eventInstance == null)
{
throw new InvalidOperationException($"Failed to deserialize event: {outboxEvent.EventType}");
}
await messageBus.PublishAsync(eventInstance, cancellationToken);
}
}
Transaction Isolation Levels
Configure Isolation Levels
Set appropriate isolation levels for your operations:
public static class CriticalReadHandler
{
public static async Task<Result<CriticalData>> Handle(
CriticalReadQuery query,
AppDomainDb db,
CancellationToken cancellationToken)
{
using var transaction = await db.BeginTransactionAsync(IsolationLevel.Serializable, cancellationToken);
try
{
// Perform critical read operations that require serializable isolation
var data = await db.CriticalData
.Where(cd => cd.TenantId == query.TenantId)
.FirstOrDefaultAsync(cancellationToken);
await transaction.CommitAsync(cancellationToken);
return data?.ToModel() ??
new List<ValidationFailure> { new("TenantId", "Critical data not found") };
}
catch
{
await transaction.RollbackAsync(cancellationToken);
throw;
}
}
}
public static class HighVolumeReadHandler
{
public static async Task<Result<List<HighVolumeData>>> Handle(
HighVolumeReadQuery query,
AppDomainDb db,
CancellationToken cancellationToken)
{
using var transaction = await db.BeginTransactionAsync(IsolationLevel.ReadCommitted, cancellationToken);
try
{
// Use read committed for high-volume, less critical reads
var data = await db.HighVolumeData
.Where(hvd => hvd.TenantId == query.TenantId)
.Skip(query.Skip)
.Take(query.Take)
.ToListAsync(cancellationToken);
await transaction.CommitAsync(cancellationToken);
return data.Select(d => d.ToModel()).ToList();
}
catch
{
await transaction.RollbackAsync(cancellationToken);
throw;
}
}
}
Deadlock Detection and Handling
Deadlock Retry Pattern
Handle deadlocks gracefully with retry logic:
public static class DeadlockRetryHandler
{
private const int MaxRetries = 3;
private static readonly TimeSpan RetryDelay = TimeSpan.FromMilliseconds(100);
public static async Task<TResult> ExecuteWithRetry<TResult>(
Func<CancellationToken, Task<TResult>> operation,
CancellationToken cancellationToken)
{
var attempt = 0;
while (true)
{
try
{
return await operation(cancellationToken);
}
catch (Exception ex) when (IsDeadlockException(ex) && attempt < MaxRetries)
{
attempt++;
var delay = TimeSpan.FromMilliseconds(RetryDelay.TotalMilliseconds * Math.Pow(2, attempt - 1));
await Task.Delay(delay, cancellationToken);
}
}
}
private static bool IsDeadlockException(Exception ex)
{
// Check for PostgreSQL deadlock error codes
return ex is Npgsql.PostgresException pgEx &&
(pgEx.SqlState == "40001" || pgEx.SqlState == "40P01");
}
}
// Usage
public static class UpdateWithRetryHandler
{
public static async Task<(Result<Cashier>, CashierUpdated?)> Handle(
UpdateCashierCommand command,
IMessageBus messaging,
CancellationToken cancellationToken)
{
return await DeadlockRetryHandler.ExecuteWithRetry(async ct =>
{
var dbCommand = CreateUpdateCommand(command);
var updatedCashier = await messaging.InvokeCommandAsync(dbCommand, ct);
var result = updatedCashier.ToModel();
var updatedEvent = new CashierUpdated(result.TenantId, result);
return (result, updatedEvent);
}, cancellationToken);
}
}
Bulk Operations and Performance
Bulk Insert with Transactions
Handle large data sets efficiently:
public static class BulkCashierImportHandler
{
public static async Task<Result<ImportResult>> Handle(
BulkCashierImportCommand command,
AppDomainDb db,
ILogger<BulkCashierImportHandler> logger,
CancellationToken cancellationToken)
{
using var transaction = await db.BeginTransactionAsync(cancellationToken);
try
{
var batchSize = 1000;
var totalProcessed = 0;
var errors = new List<string>();
// Process in batches to avoid transaction timeout
foreach (var batch in command.Cashiers.Chunk(batchSize))
{
var entities = batch.Select(c => new Data.Entities.Cashier
{
TenantId = c.TenantId,
CashierId = Guid.CreateVersion7(),
Name = c.Name,
Email = c.Email,
CreatedDateUtc = DateTime.UtcNow,
UpdatedDateUtc = DateTime.UtcNow
}).ToList();
try
{
// Use bulk copy for better performance
await db.BulkCopyAsync(entities, cancellationToken);
totalProcessed += entities.Count;
logger.LogInformation("Processed batch of {BatchSize} cashiers, total: {Total}",
entities.Count, totalProcessed);
}
catch (Exception ex)
{
errors.Add($"Batch starting at record {totalProcessed}: {ex.Message}");
// Decide whether to continue or fail
if (errors.Count > 10) // Too many errors
{
throw new InvalidOperationException($"Import failed with {errors.Count} batch errors");
}
}
}
await transaction.CommitAsync(cancellationToken);
return new ImportResult
{
TotalRecords = command.Cashiers.Count,
ProcessedRecords = totalProcessed,
ErrorCount = errors.Count,
Errors = errors
};
}
catch (Exception ex)
{
await transaction.RollbackAsync(cancellationToken);
logger.LogError(ex, "Bulk import failed, transaction rolled back");
return Result<ImportResult>.Failure($"Import failed: {ex.Message}");
}
}
}
Testing with Transactions
Transaction Testing Patterns
[Test]
public async Task Handle_CreateCashier_CommitsTransactionOnSuccess()
{
// Arrange
using var connection = new NpgsqlConnection(TestConnectionString);
await connection.OpenAsync();
using var transaction = await connection.BeginTransactionAsync();
var db = new AppDomainDb(connection);
db.BeginTransaction(transaction);
var command = new CreateCashierCommand(
Guid.NewGuid(),
"John Doe",
"john@example.com");
var mockMessaging = new Mock<IMessageBus>();
mockMessaging.Setup(m => m.InvokeCommandAsync(It.IsAny<object>(), It.IsAny<CancellationToken>()))
.ReturnsAsync((object cmd, CancellationToken ct) =>
{
// Simulate database operation
if (cmd is CreateCashierCommandHandler.DbCommand dbCmd)
{
return dbCmd.Cashier;
}
throw new InvalidOperationException("Unexpected command type");
});
// Act
var (result, integrationEvent) = await CreateCashierCommandHandler.Handle(
command, mockMessaging.Object, CancellationToken.None);
// Assert
result.IsSuccess.Should().BeTrue();
integrationEvent.Should().NotBeNull();
// Verify transaction can be committed
await transaction.CommitAsync();
}
[Test]
public async Task Handle_CreateCashier_RollsBackTransactionOnError()
{
// Arrange
using var connection = new NpgsqlConnection(TestConnectionString);
await connection.OpenAsync();
using var transaction = await connection.BeginTransactionAsync();
var db = new AppDomainDb(connection);
db.BeginTransaction(transaction);
var command = new CreateCashierCommand(
Guid.NewGuid(),
"John Doe",
"john@example.com");
var mockMessaging = new Mock<IMessageBus>();
mockMessaging.Setup(m => m.InvokeCommandAsync(It.IsAny<object>(), It.IsAny<CancellationToken>()))
.ThrowsAsync(new InvalidOperationException("Database error"));
// Act & Assert
var exception = await Assert.ThrowsAsync<InvalidOperationException>(async () =>
{
await CreateCashierCommandHandler.Handle(
command, mockMessaging.Object, CancellationToken.None);
});
exception.Message.Should().Be("Database error");
// Verify transaction was rolled back
await transaction.RollbackAsync();
}
Integration Tests with Transactions
public class TransactionIntegrationTests : IClassFixture<TestDatabaseFixture>
{
private readonly TestDatabaseFixture _fixture;
public TransactionIntegrationTests(TestDatabaseFixture fixture)
{
_fixture = fixture;
}
[Fact]
public async Task CompleteWorkflow_Success_CommitsAllChanges()
{
// Arrange
using var scope = _fixture.ServiceProvider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDomainDb>();
var messageBus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
var tenantId = Guid.NewGuid();
// Act - Execute complete workflow
var createCommand = new CreateCashierCommand(tenantId, "John Doe", "john@example.com");
var (createResult, createEvent) = await messageBus.InvokeAsync(createCommand);
var updateCommand = new UpdateCashierCommand(tenantId, createResult.Value.Id, "John Smith", "john.smith@example.com");
var (updateResult, updateEvent) = await messageBus.InvokeAsync(updateCommand);
// Assert - Verify all changes are persisted
createResult.IsSuccess.Should().BeTrue();
updateResult.IsSuccess.Should().BeTrue();
var finalCashier = await db.Cashiers
.FirstOrDefaultAsync(c => c.CashierId == createResult.Value.Id);
finalCashier.Should().NotBeNull();
finalCashier!.Name.Should().Be("John Smith");
finalCashier.Email.Should().Be("john.smith@example.com");
}
[Fact]
public async Task CompleteWorkflow_FailureInMiddle_RollsBackAllChanges()
{
// This test would verify that if any step in a multi-step operation fails,
// the entire transaction is rolled back
}
}
Best Practices
Transaction Management
- Keep transactions short: Minimize transaction duration to reduce contention
- Use appropriate isolation levels: Choose the right balance between consistency and performance
- Handle deadlocks gracefully: Implement retry logic for deadlock scenarios
- Avoid nested transactions: Use savepoints instead of nested transactions
Event Publishing
- Use outbox pattern: Ensure reliable event publishing with database transactions
- Handle publishing failures: Implement retry logic for event publishing
- Monitor outbox events: Set up alerts for unpublished events
- Clean up old events: Regularly purge successfully published events
Performance
- Use bulk operations: For large data sets, use bulk inserts/updates
- Batch processing: Process large operations in smaller batches
- Connection pooling: Configure appropriate connection pool settings
- Monitor long-running transactions: Alert on transactions that run too long
Error Handling
- Always use try/catch: Wrap transaction operations in proper error handling
- Log transaction errors: Include sufficient context for debugging
- Provide meaningful errors: Return helpful error messages to callers
- Implement circuit breakers: Protect against cascading failures
Next Steps
- Learn about DbCommand Pattern for type-safe database operations
- Understand Entity Mapping for data transformation
- Explore CQRS patterns for commands and queries
- See Testing strategies for database operations