Schedule Creation
Before anything, to let a service register schedules into the scheduler, it is
required to register the MassTransitModule
in the module definition. This is
required since the module injects the IMessageScheduler
required to register
schedules in the quartz queue created by the infrastructure scheduler.
A question that needs to be answered before the creation of a schedule is: "Is
it needed to be executed just once or several times following a preestablished
recurrence?". This means, that we would like to know whether it is periodic or
not. This is important to know since the way to register an schedule is
different for each one. In this section both will be covered in detail.
Deferred Schedules
For non-periodic schedules, or in other words deferred, their registration
could be done inside both a Saga or a Consumer. However, notice that the
syntax is different on each case.
Consumers
At consumers, it is required to inject the IMessageScheduler
from
MassTransit
.
C# |
---|
| public class ScheduleNotificationConsumer : IConsumer<ScheduleNotification>
{
private readonly messageScheduler;
public ScheduleNotificationConsumer(IMessageScheduler messageScheduler)
{
this.messageScheduler = messageScheduler;
}
public async Task Consume(ConsumeContext<ScheduleNotification> context)
{
// Your logic goes here.
}
}
|
Notice that is possible to schedule both Events and Commands. The events
should be done using the SchedulePublish
providing it the DateTime when it
should be executed and the message.
Example:
C# |
---|
| public async Task Consume(ConsumeContext<ScheduleNotification> context)
{
// This will schedule the publish of TestMessage in the next 10 seconds.
this.messageScheduler.SchedulePublish<TestMessage>(
DateTime.Now.AddSeconds(10), // It could be any DateTime.
new
{
CorrelationId = NewId.NextGuid(),
});
}
|
To schedule a command, which is discouraged, the ScheduleSend
has to be
used. Its parameters are: URI of the endpoint to be executed, date of execution
and the message.
Example:
C# |
---|
| public async Task Consume(ConsumeContext<ScheduleNotification> context)
{
// This will schedule the publish of TestMessage at the test-service queue in the next 10 seconds.
Uri testServiceUri = new Uri("queue:test-service");
await context.ScheduleSend<TestMessage>(
testServiceUri,
DateTime.Now.AddSeconds(10), // It could be any DateTime.
new
{
CorrelationId = NewId.NextGuid(),
});
}
|
Sagas
At Sagas, the scheduling of deferred schedules is a bit different. It is
recommended to take a look at the
MassTransit official documentation
since it is covered in detail. As for more examples, it is also advisable to
take a look at the
MassTransit Sample-Twitch repo
since there are some good examples there as well.
Recurring schedules
To request a recurring message, you need to use the ScheduleRecurringSend
extension method, which is available for both PublishEndpoint
and
SendEndpointProvider
.
It is important to remark that recurring schedules behave as commands,
meaning that they require the URI of the endpoint they are targeting.
This type of schedules require the following:
- URI of the Endpoint triggered.
- Recurrence object.
- Object that contains the payload to be sent.
Recurrence Definition
The recurrence object needed to determine the periodicity of the schedule should
inherit from DefaultRecurringSchedule
or it should implement
RecurringSchedule
(if there is the need of customizing something). To define
the recurrence properly, it is required to know how to manage Quartz's CRON
expressions. They are slightly different from the ones from UNIX, for reference
read
this.
C# |
---|
| public class SuiteRecurrence : DefaultRecurringSchedule
{
public SuiteRecurrence()
{
// This means every minute.
this.CronExpression = "0 0/1 * 1/1 * ? *";
}
}
public class SuiteScheduleRecurrence : RecurringSchedule
{
public SuiteScheduleRecurrence(
string cronExpression,
string scheduleId,
string scheduleGroup,
DateTime? endDate = null,
MissedEventPolicy missedEventPolicy = MissedEventPolicy.Default)
{
this.ScheduleId = scheduleId.IsNullOrEmpty() ?
TypeMetadataCache.GetShortName(this.GetType()) :
scheduleId;
this.ScheduleGroup =
scheduleGroup.IsNullOrEmpty() ?
GetType().GetTypeInfo().Assembly.FullName.Split(",".ToCharArray(), StringSplitOptions.RemoveEmptyEntries)[0] :
scheduleGroup;
this.CronExpression = cronExpression;
this.TimeZoneId = TimeZoneInfo.Local.Id;
this.MisfirePolicy = missedEventPolicy;
this.StartTime = DateTime.Now;
this.EndTime = endDate;
}
public MissedEventPolicy MisfirePolicy { get; protected set; }
public string TimeZoneId { get; protected set; }
public DateTimeOffset StartTime { get; protected set; }
public DateTimeOffset? EndTime { get; protected set; }
public string ScheduleId { get; private set; }
public string ScheduleGroup { get; private set; }
public string CronExpression { get; protected set; }
public string? Description { get; protected set; }
}
|
Creation inside a consumer
At a consumer it is possible to access it through the ConsumeContext
as
shown below:
C# |
---|
| public async Task Consume(ConsumeContext<UpdateComponentUnit> context)
{
var schedulerEndpoint =
await context.GetSendEndpoint(new Uri($"exchange:{MassTransitConstants.QuartzScheduler}"));
var recurrence = new SuiteRecurrence();
var scheduleDefinition = new
schedulerEndpoint.ScheduleRecurringSend<TestMessage>(
new Uri("queue:test-service"),
recurrence,
new
{
Payload = requestPayload,
});
}
|
Creation inside a service
At a service it is possible to access it through the ISendEndpointProvider
as it follows:
C# |
---|
| public class ServiceOrderService
{
private readonly ISendEndpointProvider sendEndpointProvider;
public ServiceOrderService(ISendEndpointProvider sendEndpointProvider)
{
this.sendEndpointProvider = sendEndpointProvider;
}
public async Task TriggerRecurringSchedule()
{
var schedulerEndpoint =
await this.sendEndpointProvider.GetSendEndpoint(new Uri($"exchange:{MassTransitConstants.QuartzScheduler}"));
var recurrence = new SuiteRecurrence();
var scheduleDefinition = new
schedulerEndpoint.ScheduleRecurringSend<TestMessage>(
new Uri("queue:test-service"),
recurrence,
new
{
Payload = requestPayload,
});
}
}
|