NetEvolve.Pulse
0.67.36
Prefix Reserved
dotnet add package NetEvolve.Pulse --version 0.67.36
NuGet\Install-Package NetEvolve.Pulse -Version 0.67.36
<PackageReference Include="NetEvolve.Pulse" Version="0.67.36" />
<PackageVersion Include="NetEvolve.Pulse" Version="0.67.36" />
<PackageReference Include="NetEvolve.Pulse" />
paket add NetEvolve.Pulse --version 0.67.36
#r "nuget: NetEvolve.Pulse, 0.67.36"
#:package NetEvolve.Pulse@0.67.36
#addin nuget:?package=NetEvolve.Pulse&version=0.67.36
#tool nuget:?package=NetEvolve.Pulse&version=0.67.36
NetEvolve.Pulse
NetEvolve.Pulse is a high-performance CQRS mediator for ASP.NET Core that wires commands, queries, and events through a scoped, interceptor-enabled pipeline.
Features
- Typed CQRS mediator with single-handler enforcement for commands and queries plus fan-out events
- Minimal DI integration via
services.AddPulse(...)with scoped lifetimes for handlers and interceptors - Configurable interceptor pipeline (logging, metrics, tracing, validation) via
IMediatorBuilder - Distributed query caching — register
ICacheableQuery<TResponse>per query and enable transparentIDistributedCachecaching withAddQueryCaching() - Outbox pattern with background processor for reliable event delivery via
AddOutbox() - Parallel event dispatch for efficient domain event broadcasting
- TimeProvider-aware for deterministic testing and scheduling scenarios
- OpenTelemetry-friendly metrics and tracing through
AddActivityAndMetrics()
Installation
NuGet Package Manager
Install-Package NetEvolve.Pulse
.NET CLI
dotnet add package NetEvolve.Pulse
PackageReference
<PackageReference Include="NetEvolve.Pulse" Version="x.x.x" />
Quick Start
using Microsoft.Extensions.DependencyInjection;
using NetEvolve.Pulse;
using NetEvolve.Pulse.Extensibility;
var services = new ServiceCollection();
// Register Pulse and handlers
services.AddPulse();
services.AddScoped<ICommandHandler<CreateOrderCommand, OrderCreated>, CreateOrderHandler>();
using var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();
var result = await mediator.SendAsync<CreateOrderCommand, OrderCreated>(
new CreateOrderCommand("SKU-123"));
Console.WriteLine($"Created order {result.OrderId}");
public record CreateOrderCommand(string Sku) : ICommand<OrderCreated>;
public record OrderCreated(Guid OrderId);
public sealed class CreateOrderHandler
: ICommandHandler<CreateOrderCommand, OrderCreated>
{
public Task<OrderCreated> HandleAsync(
CreateOrderCommand command,
CancellationToken cancellationToken) =>
Task.FromResult(new OrderCreated(Guid.NewGuid()));
}
Usage
Basic Example
services.AddPulse();
services.AddScoped<IQueryHandler<GetOrderQuery, Order>, GetOrderHandler>();
services.AddScoped<IEventHandler<OrderCreatedEvent>, OrderCreatedHandler>();
var order = await mediator.QueryAsync<GetOrderQuery, Order>(new GetOrderQuery(orderId));
await mediator.PublishAsync(new OrderCreatedEvent(order.Id));
public record GetOrderQuery(Guid Id) : IQuery<Order>;
public record Order(Guid Id, string Sku);
public record OrderCreatedEvent(Guid Id) : IEvent;
public sealed class GetOrderHandler : IQueryHandler<GetOrderQuery, Order>
{
public Task<Order> HandleAsync(GetOrderQuery query, CancellationToken cancellationToken) =>
Task.FromResult(new Order(query.Id, "SKU-123"));
}
public sealed class OrderCreatedHandler : IEventHandler<OrderCreatedEvent>
{
public Task HandleAsync(OrderCreatedEvent @event, CancellationToken cancellationToken)
{
// React to the event (logging, projections, etc.)
return Task.CompletedTask;
}
}
Advanced Example
// Enable tracing and metrics and add custom interceptors
services.AddPulse(config =>
{
config.AddActivityAndMetrics();
});
services.AddScoped<ICommandHandler<ShipOrderCommand, Void>, ShipOrderHandler>();
public record ShipOrderCommand(Guid Id) : ICommand;
public sealed class ShipOrderHandler : ICommandHandler<ShipOrderCommand, Void>
{
public Task<Void> HandleAsync(ShipOrderCommand command, CancellationToken cancellationToken)
{
// Shipping workflow here
return Task.FromResult(Void.Completed);
}
}
Configuration
// Configure Pulse during startup
services.AddPulse(config =>
{
// Built-in observability
config.AddActivityAndMetrics();
// Add your own configurator extensions for validation, caching, etc.
// config.AddCustomValidation();
});
Distributed Query Caching
Enable transparent IDistributedCache caching for queries. Any query that implements ICacheableQuery<TResponse> (from NetEvolve.Pulse.Extensibility) is served from the cache on subsequent invocations; all other queries pass through unchanged.
// 1. Register an IDistributedCache implementation
services.AddDistributedMemoryCache(); // or Redis, SQL Server, etc.
// 2. Enable the caching interceptor (with optional options)
services.AddPulse(config => config.AddQueryCaching(options =>
{
// Choose between absolute (default) and sliding expiration
options.ExpirationMode = CacheExpirationMode.Sliding;
// Supply custom JsonSerializerOptions for cache serialization
options.JsonSerializerOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
};
}));
// 3. Implement ICacheableQuery<TResponse> on the queries you want cached
public record GetProductQuery(Guid Id) : ICacheableQuery<ProductDto>
{
public string? CorrelationId { get; set; }
// Unique per query result — include all discriminating parameters
public string CacheKey => $"product:{Id}";
// null = no explicit expiry (relies on cache defaults); or provide a TimeSpan
public TimeSpan? Expiry => TimeSpan.FromMinutes(5);
}
Behavior summary:
| Scenario | Result |
|---|---|
Query implements ICacheableQuery<TResponse> and cache entry exists |
Cached response returned; handler skipped |
Query implements ICacheableQuery<TResponse> and no cache entry |
Handler invoked; response stored in cache |
Query does not implement ICacheableQuery<TResponse> |
Handler always invoked; cache never consulted |
IDistributedCache not registered in DI |
Interceptor falls through; handler invoked without error |
Expiry = null and DefaultExpiry = null |
Entry stored without explicit expiry; cache default eviction policy applies |
Expiry = null and DefaultExpiry is set |
DefaultExpiry value is applied using the configured ExpirationMode |
ExpirationMode = Absolute (default) |
Expiry (or DefaultExpiry) is applied as absolute expiry relative to now |
ExpirationMode = Sliding |
Expiry (or DefaultExpiry) window resets on each cache access |
Outbox Pattern Configuration
The outbox pattern ensures reliable event delivery by persisting events before dispatching:
services.AddPulse(config => config
.AddOutbox(
options => options.Schema = "pulse",
processorOptions =>
{
processorOptions.BatchSize = 100; // Messages per batch (default: 100)
processorOptions.PollingInterval = TimeSpan.FromSeconds(5); // Poll delay (default: 5s)
processorOptions.MaxRetryCount = 3; // Max retries before dead letter (default: 3)
processorOptions.ProcessingTimeout = TimeSpan.FromSeconds(30); // Per-message timeout (default: 30s)
processorOptions.EnableBatchSending = false; // Use batch transport (default: false)
})
// Choose a persistence provider:
// .AddEntityFrameworkOutbox<MyDbContext>()
// .AddSqlServerOutbox(connectionString)
);
Per-Event-Type Overrides
You can tune processing behaviour for individual event types using EventTypeOverrides. The dictionary key matches the EventType field of stored outbox messages. Any null property falls back to the global default:
processorOptions.EventTypeOverrides["MyNamespace.CriticalEvent"] = new OutboxEventTypeOptions
{
MaxRetryCount = 10, // More retries for critical events
ProcessingTimeout = TimeSpan.FromSeconds(10), // Tighter timeout
};
processorOptions.EventTypeOverrides["MyNamespace.BulkEvent"] = new OutboxEventTypeOptions
{
MaxRetryCount = 1, // Fewer retries for low-priority bulk events
ProcessingTimeout = TimeSpan.FromMinutes(2), // Longer timeout for large payloads
};
See NetEvolve.Pulse.EntityFramework or NetEvolve.Pulse.SqlServer for persistence provider setup.
Payload Serialization
Pulse uses IPayloadSerializer (from NetEvolve.Pulse.Extensibility) for all internal serialization needs, including outbox message payloads, distributed cache entries, and audit trail data. A default implementation based on System.Text.Json is registered automatically when you call AddPulse().
Default Behavior
No configuration is required — the built-in SystemTextJsonPayloadSerializer uses JsonSerializerOptions.Default:
services.AddPulse();
// SystemTextJsonPayloadSerializer is automatically registered
Configure JSON Serialization Options
Use the standard .NET options pattern to customize JSON serialization settings:
services.Configure<JsonSerializerOptions>(options =>
{
options.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
options.WriteIndented = false;
options.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
});
services.AddPulse();
These options will be used for all payload serialization throughout Pulse, including:
- Outbox message payloads
- Distributed cache query results
- Any other internal serialization needs
Custom Serializer Implementation
Replace the default serializer with your own implementation by registering it before calling AddPulse():
using NetEvolve.Pulse.Extensibility;
// Register custom serializer (e.g., using Newtonsoft.Json)
services.AddSingleton<IPayloadSerializer, NewtonsoftJsonPayloadSerializer>();
services.AddPulse();
public sealed class NewtonsoftJsonPayloadSerializer : IPayloadSerializer
{
public string Serialize<T>(T value) =>
JsonConvert.SerializeObject(value);
public string Serialize(object value, Type type) =>
JsonConvert.SerializeObject(value, type, null);
public byte[] SerializeToBytes<T>(T value) =>
Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(value));
public T? Deserialize<T>(string payload) =>
JsonConvert.DeserializeObject<T>(payload);
public T? Deserialize<T>(byte[] payload) =>
JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(payload));
}
The custom serializer will be used for all payload operations within Pulse. Ensure your implementation is thread-safe, as the same instance may be accessed concurrently from multiple pipeline stages.
Requirements
- .NET 8.0, .NET 9.0, or .NET 10.0
- ASP.NET Core environment with
Microsoft.Extensions.DependencyInjection - OpenTelemetry packages when using
AddActivityAndMetrics()
Related Packages
- NetEvolve.Pulse.Dapr - Dapr pub/sub integration for event dispatch
- NetEvolve.Pulse.Extensibility - Core contracts and abstractions used by the mediator
- NetEvolve.Pulse.EntityFramework - Entity Framework Core persistence for the outbox pattern
- NetEvolve.Pulse.SqlServer - SQL Server ADO.NET persistence for the outbox pattern
- NetEvolve.Pulse.Polly - Polly v8 resilience policies integration
Documentation
For complete documentation, please visit the official documentation.
Contributing
Contributions are welcome! Please read the Contributing Guidelines before submitting a pull request.
Support
- Issues: Report bugs or request features on GitHub Issues
- Documentation: Read the full documentation at https://github.com/dailydevops/pulse
License
This project is licensed under the MIT License - see the LICENSE file for details.
Made with ❤️ by the NetEvolve Team
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- NetEvolve.Pulse.Extensibility (>= 0.67.36)
-
net8.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.7)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.7)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.7)
- Microsoft.Extensions.Options (>= 10.0.7)
- NetEvolve.Pulse.Extensibility (>= 0.67.36)
-
net9.0
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.7)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.7)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.7)
- Microsoft.Extensions.Options (>= 10.0.7)
- NetEvolve.Pulse.Extensibility (>= 0.67.36)
NuGet packages (8)
Showing the top 5 NuGet packages that depend on NetEvolve.Pulse:
| Package | Downloads |
|---|---|
|
NetEvolve.Pulse.SqlServer
SQL Server persistence provider for the Pulse outbox pattern using plain ADO.NET. Provides SqlServerOutboxRepository implementing IOutboxRepository with optimized T-SQL queries, transaction support via SqlTransaction enlistment, and schema scripts for table creation. Supports configurable schema and table names for multi-tenant scenarios. Uses the canonical outbox schema ensuring interchangeability with Entity Framework provider. Designed for high-performance microservices requiring reliable event delivery with SQL Server as the backing store. |
|
|
NetEvolve.Pulse.EntityFramework
Provider-agnostic Entity Framework Core persistence for the Pulse outbox pattern. Provides IOutboxDbContext interface, OutboxMessageConfiguration for fluent mapping, EntityFrameworkOutboxRepository implementing IOutboxRepository, and EntityFrameworkEventOutbox participating in ambient DbContext transactions. Works with any EF Core database provider (SQL Server, PostgreSQL, SQLite, etc.) - users bring their own provider and generate migrations. Uses the canonical outbox schema ensuring interchangeability with plain ADO.NET providers. Designed for rapid development with full EF Core features including change tracking, LINQ queries, and migration support. |
|
|
NetEvolve.Pulse.MongoDB
MongoDB persistence provider for the Pulse outbox pattern using the official MongoDB C# driver. Provides MongoDbOutboxRepository implementing IOutboxRepository with FindOneAndUpdateAsync for atomic message claiming and safe concurrent polling. Supports configurable database name and collection name. Requires IMongoClient to be registered in the dependency injection container by the caller. Designed for document-oriented architectures where the outbox participates in the same MongoDB database as domain aggregates. |
|
|
NetEvolve.Pulse.PostgreSql
PostgreSQL persistence provider for the Pulse outbox and idempotency patterns using plain ADO.NET. Provides PostgreSqlOutboxRepository implementing IOutboxRepository with optimized PostgreSQL queries, transaction support via NpgsqlTransaction enlistment, and schema scripts for table creation. Also provides PostgreSqlIdempotencyKeyRepository implementing IIdempotencyKeyRepository for at-most-once command processing. Uses FOR UPDATE SKIP LOCKED for concurrent polling safety. Supports configurable schema and table names for multi-tenant scenarios. Uses the canonical schemas ensuring interchangeability with Entity Framework provider. Designed for high-performance microservices requiring reliable event delivery with PostgreSQL as the backing store. |
|
|
NetEvolve.Pulse.SQLite
SQLite persistence provider for the Pulse outbox pattern using plain ADO.NET. Provides SQLiteOutboxRepository implementing IOutboxRepository with optimized SQL queries, WAL mode support, and BEGIN IMMEDIATE transactions for safe concurrent polling. Supports configurable table names for embedded and edge deployments. Designed for lightweight applications, CLI tools, IoT, and edge services that require the outbox pattern without external infrastructure dependencies. |
GitHub repositories
This package is not used by any popular GitHub repositories.