Producers
A Producer, is a service that is sending or publishing events to the broker.
Make sure you've read
MassTransit official documentation
on producers.
Publishing Events
Let's see an example of how we can Publish an Event to the bus:
First, we define our message contract, note how we are using interfaces for
messages, and they don't start with an I 😖
C# |
---|
| public interface OrderSubmitted
{
Guid CorrelationId { get; }
DateTime OrderDate { get; }
}
|
We can then Publish that message from within any service, by injecting the
IPublishEndpoint
:
C# |
---|
| internal class MyPublisherAppService : IAppService {
private readonly IPublishEndpoint publishEndpoint;
public MyPublisherAppService(IPublishEndpoint publishEndpoint) {
this.publishEndpoint = publishEndpoint;
}
public async Task DoSomething()
{
var correlationId = NewId.NextGuid();
await this.publishEndpoint.Publish<OrderSubmitted>(new
{
CorrelationId = correlationId,
OrderDate = DateTime.UtcNow,
});
}
}
|
If you'd like to publish a message from within a Consumer or a Saga, you use the
Context instead, for example:
C# |
---|
| public class SubmitOrderConsumer : IConsumer<SubmitOrder>
{
public async Task Consume(IConsumeContext<SubmitOrder> context)
{
await context.Publish<OrderSubmitted>(new
{
context.Message.CorrelationId,
OrderDate = DateTime.UtcNow
})
}
}
|
Sending Commands
When Sending Commands, things are a bit different since we usually need an
Address where to send the command from.
MassTransit uses conventions to declare queues and exchanges on the broker based
on our message and consumer types. Hence, for us to send a message we usually do
not specify a destination address.
For example:
C# |
---|
| public class SubmitOrder
{
public Guid CorrelationId { get; set; }
}
|
C# |
---|
| internal class SampleSuiteModule : SuiteModule
{
public override void SetupModule(IModuleBuilder builder)
{
base.SetupModule(builder);
builder.DependsOn<MassTransitModule, MassTransitModuleOptions>(ops=>{
ops.ConfigureMassTransit = c =>
{
c.AddRequestClient<SubmitOrder>();
};
});
}
}
|
We can then inject an IRequestClient<TMessage>
and send the message.
C# |
---|
| internal class MyPublisherAppService : IAppService {
private readonly IRequestClient<SubmitOrder> orderSubmitter;
public MyPublisherAppService(IRequestClient<SubmitOrder> orderSubmitter) {
this.orderSubmitter = orderSubmitter;
}
public async Task DoSomething()
{
var correlationId = NewId.NextGuid();
await this.orderSubmitter.Send(new {
CorrelationId = correlationId
});
}
}
|