Caching pipeline behavior with MediatR

C# Apr 01, 2022

What is MediatR?

MediatR is a popular library for C# introduced by Jimmy Boggard, which provide for us a Simple implementation of Mediator pattern in .Net.

What is pipeline behavior?

The pipeline behavior is a wrapper in the MediatR and you can write some code before or after command or query handlers execute, without modify the original request. Pipeline behaviors are a good fit for cross-cutting concerns in your application. For example in logging, caching, validation, transaction and etc.

Pipeline behavior very similar with Decorator pattern in .NET middleware concept and gives us the easiest way to implement this approach.

Implementation of Caching Pipeline Behavior

For Implementing cache in CQRS, we will use Caching Pipeline Behavior on top of MediatR.

First, We need an interface for caching our request.

public interface ICacheRequest
{
    string CacheKey { get; }
    DateTime? AbsoluteExpirationRelativeToNow { get; }
}

After that, we use this interface in our command or query for caching request.

public record GetFlightsQuery : IRequest<IEnumerable<FlightResponseDto>>, ICacheRequest
{
    public string CacheKey => "GetFlightsQuery";
    public DateTime? AbsoluteExpirationRelativeToNow => DateTime.Now.AddHours(1);
}

Here we Implement ICacheRequest and fill default value for unique cache key and expiration cache, that it's here 1 hours as default.

Let’s implement the pipeline behavior.

public class CachingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
        where TRequest : notnull, IRequest<TResponse>
        where TResponse : notnull
    {
        private readonly ILogger<CachingBehavior<TRequest, TResponse>> _logger;
        private readonly IEasyCachingProvider _cachingProvider;
        private readonly ICacheRequest _cacheRequest;
        private readonly int defaultCacheExpirationInHours = 1;

        public CachingBehavior(IEasyCachingProviderFactory cachingFactory,
            ILogger<CachingBehavior<TRequest, TResponse>> logger,
            ICacheRequest cacheRequest)
        {
            _logger = logger;
            _cachingProvider = cachingFactory.GetCachingProvider("mem");
            _cacheRequest = cacheRequest;
        }


        public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken,
            RequestHandlerDelegate<TResponse> next)
        {
            if (request is not ICacheRequest || _cacheRequest == null)
            {
                // Continue to next middleware
                return await next();
            }

            var cacheKey = _cacheRequest.CacheKey;
            var cachedResponse = await _cachingProvider.GetAsync<TResponse>(cacheKey);
            if (cachedResponse.Value != null)
            {
                _logger.LogDebug("Fetch data from cache with cachKey: {CacheKey}", cacheKey);
                return cachedResponse.Value;
            }

            var response = await next();

            var expirationTime = _cacheRequest.AbsoluteExpirationRelativeToNow ??
                                 DateTime.Now.AddHours(defaultCacheExpirationInHours);

            await _cachingProvider.SetAsync(cacheKey, response, expirationTime.TimeOfDay);

            _logger.LogDebug("Set data to cahche with  cachKey: {CacheKey}", cacheKey);

            return response;
        }
    }

Let’s go ahead step by step.

The new addition is the interface IPipelineBehavior<TRequest, TResponse> This allows you to create implementation(s) that will invoke in the order they are registered with your container (returned from the MultiInstanceFactory delegate). The simplest implementation, that does nothing but call the next possible behavior. here, TRequest is GetFlightByIdQuery and TResponse is FlightResponseDto.

Here, I use EasyCaching to handle caching more easily. And I used InMemory cache as default, and you can use Redis as well with EasyCaching.

In this behavior in the first line we can check if our command or query implement ICacheRequest and if it doesn't implement that return await next() and go to the next step in the pipeline.

And in the next line we check our cache with specific key and if we have value for this key, return value and otherwise keep continue and call await next() and go to next pipeline and after get response from handler, we add the values in the cache with our key and expiration time and the end exit this behavior. And we have cache for this request.

We need to register this cache behavior in service registrations as below:

   services.AddScoped(typeof(IPipelineBehavior<,>), typeof(CachingBehavior<,>));

Also we need register all caching request that I prefer register all of that with get some help from magic [structor] library for registration our caching request.


 services.Scan(scan => scan
            .FromExecutingAssembly()
            .AddClasses(classes => classes.AssignableTo(typeof(ICacheRequest)),
                false)
            .AsImplementedInterfaces()
            .WithTransientLifetime());

Invalidate Cache

When a method updates an entity, it must also remove any cache item that depends on this entity. One way to achieve this goal is to require the update methods to know precisely which cached methods are dependent on the entity that has been modified, and to remove these methods (with proper arguments) from the cache. We call this scenario direct cache invalidation.

Implimentation of Invalidate Cache Pipeline Behavior

First, we need an interface for invalidating cache from request.

    public interface IInvalidateCacheRequest
    {
        string CacheKey { get; }
    }

After that, we use this interface in our command for invalidating cache request.And here our invalidation key is GetFlightsQuery base on our query. And we remove our cache with this specific key.

public record UpdateFlightCommand : IRequest<FlightResponseDto>, IInvalidateCacheRequest
{
    public long Id { get; init; }
    public string FlightNumber { get; init; }
    public long AircraftId { get; init; }
    public long DepartureAirportId { get; init; }
    public DateTime DepartureDate { get; init; }
    public DateTime ArriveDate { get; init; }
    public long ArriveAirportId { get; init; }
    public decimal DurationMinutes { get; init; }
    public DateTime FlightDate { get; init; }
    public FlightStatus Status { get; init; }
    public decimal Price { get; init; }
    public string CacheKey => "GetFlightsQuery";
}

Let’s implement invalidate pipeline behavior.

public class InvalidateCachingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
        where TRequest : notnull, IRequest<TResponse>
        where TResponse : notnull
    {
        private readonly ILogger<InvalidateCachingBehavior<TRequest, TResponse>> _logger;
        private readonly IEasyCachingProvider _cachingProvider;
        private readonly IInvalidateCacheRequest _invalidateCacheRequest;


        public InvalidateCachingBehavior(IEasyCachingProviderFactory cachingFactory,
            ILogger<InvalidateCachingBehavior<TRequest, TResponse>> logger,
            IInvalidateCacheRequest invalidateCacheRequest)
        {
            _logger = logger;
            _cachingProvider = cachingFactory.GetCachingProvider("mem");
            _invalidateCacheRequest = invalidateCacheRequest;
        }

        public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken,
            RequestHandlerDelegate<TResponse> next)
        {
            if (request is not IInvalidateCacheRequest || _invalidateCacheRequest == null)
            {
                // Continue to next middleware
                return await next();
            }

            var cacheKey = _invalidateCacheRequest.CacheKey;
            var response = await next();

            await _cachingProvider.RemoveAsync(cacheKey);

            _logger.LogDebug("Cache data with cacheKey: {CacheKey} removed.", cacheKey);

            return response;
        }
    }

Let’s go ahead step by step.

In this behavior in the first line we can check if our command or query implement IInvalidateCacheRequest and if it doesn't implement that return await next() and go to the next step in the pipeline.

And i the next line call await next() and go to next pipeline and after get response back to invalidate cache behavior and remove item with specific key from cache and exit from pipeline.

We need register this cache behavior in service registerations as below:

   services.AddScoped(typeof(IPipelineBehavior<,>), typeof(InvalidateCachingBehavior<,>));

Also we need register all invalidate cache request that I prefer register all of that with get some help from magic structor library for registration our inavalidate cache request.


 services.Scan(scan => scan
            .FromExecutingAssembly()
            .AddClasses(classes => classes.AssignableTo(typeof(IInvalidateCacheRequest)),
                false)
            .AsImplementedInterfaces()
            .WithTransientLifetime());

Reference:

https://github.com/jbogard/MediatR

Hope this helps 😀

Tags

Meysam Hadeli

I’m a software engineer with +7 years of experience in developing and designing distributed applications built on top of cutting-edge technologies with interest in Microservices, DDD.

Great! You've successfully subscribed.
Great! Next, complete checkout for full access.
Welcome back! You've successfully signed in.
Success! Your account is fully activated, you now have access to all content.