Home:ALL Converter>Missing configuration to well publish event from MassTransit consumer to MassTransit Saga (using MassTransit with Mediator)

Missing configuration to well publish event from MassTransit consumer to MassTransit Saga (using MassTransit with Mediator)

Ask Time:2020-11-13T18:15:43         Author:Tarek

Json Formatter

I have a series of event coordinated by a Saga

From the controller I persist data in db, then I publish an event to my saga:

[ApiController]
public class MyController : ControllerBase
{    
    private readonly IProductService _productService ;
    private readonly IMediator _mediator;

    public MyController(IProductService productService, IMediator mediator)
    {
        _productService = productService;
        _mediator = mediator;
    }

    [HttpPost]
    public async Task<IActionResult> Post([FromBody] ProductContract productContract)
    {            
        try
        {
            var result = await _productService.DoSomeThingAsync(productContract);            
            await _mediator.Publish<ProductSubmittedEvent>(new { CorrelationId = NewId.NextGuid(), result.Label });

            return Ok();
        }
        catch (Exception ex)
        {
            return BadRequest(ex.Message);
        }
    }
}

The Saga consume the first event then send new command:


public class ProductSaga:
    ISaga,
    InitiatedBy<ProductSubmittedEvent>,
    Orchestrates<AcecptedFilterEvent>,
    Orchestrates<RefusedFilterEvent>
{
    public Guid CorrelationId { get; set; }
    public string State { get; private set; } = "Not Started";

    public readonly Uri filterEndpoint = new Uri(
        $"queue:{KebabCaseEndpointNameFormatter.Instance.SanitizeName(nameof(FilterCommand))}");

    public async Task Consume(ConsumeContext<ApiSubmittedEvent> context)
    {
        //Send new command to filter step
        var sendEndpoint = await context.GetSendEndpoint(filterEndpoint);
        await sendEndpoint.Send<FilterCommand>(new { CorrelationId, context.Message.Label});
    }

    // I have two other events to consume according to command result    
    public Task Consume(ConsumeContext<AcecptedFilterEvent> context)
    {   
        //if OK send new command to next step
        //...
        return Task.CompletedTask;
    }
    
    public Task Consume(ConsumeContext<RefusedFilterEvent> context)
    {
        //if FilterCommand refused do it again
        //... 
        return Task.CompletedTask;
    }
}

So here my consumer code for FilterCommand:


public class FilterCommandConsumer : IConsumer<FilterCommand>
{
    private readonly ILogger<FilterCommand> _logger;

    public FilterCommandConsumer (ILogger<FilterCommand> logger)
    {
        _logger = logger;
    }
    public async Task Consume(ConsumeContext<FilterCommand> context)
    {
        _logger?.LogInformation($"Consuming FilterCommand- {context.Message.CorrelationId}");
       
        try
        {
            //call a service to handle the filtering 
            //...
            
            //here the problem: when publish thread exit and app still run to infinity
            await context.Publish<AcecptedFilterEvent>(new
            {
                CorrelationId = context.CorrelationId.Value,
                State = "AcecptedFilter"
            });
        }
        catch (Exception ex)
        {
            await context.Publish<RefusedFilterEvent>(new
            {
                CorrelationId = context.CorrelationId.Value,
                State = "RefusedFilter",
                Error = ex.Message
            });
            throw;
        }
    }
}

The problem:

When consuming my command, I want to publish new events AcecptedFilterEvent & RefusedFilterEvent, there I can not do the publish:

  • the app still run without any result
  • the thread exit

I want to publish an event and consume it on Saga to start next steps.

I tried to inject IMediator on my consumer class to publish messages with _mediator.Publish() but I got the same weird behavior.

Here is my startup.cs config:


        //configure MassTransit
        services.AddMediator(cfg =>
        {
            cfg.AddConsumersFromNamespaceContaining<FilterCommandConsumer>();                
            cfg.AddSaga<ProductSaga>().InMemoryRepository();
        });

If you have any recommendation ideas thanks for sharing and challenging me 😊

Author:Tarek,eproduced under the CC 4.0 BY-SA copyright license with a link to the original source and this disclaimer.
Link to original article:https://stackoverflow.com/questions/64819118/missing-configuration-to-well-publish-event-from-masstransit-consumer-to-masstra
yy