Skip to content

Schedule Creation

Before anything, to let a service register schedules into the scheduler, it is required to register the MassTransitModule in the module definition. This is required since the module injects the IMessageScheduler required to register schedules in the quartz queue created by the infrastructure scheduler.

A question that needs to be answered before the creation of a schedule is: "Is it needed to be executed just once or several times following a preestablished recurrence?". This means, that we would like to know whether it is periodic or not. This is important to know since the way to register an schedule is different for each one. In this section both will be covered in detail.

Deferred Schedules

For non-periodic schedules, or in other words deferred, their registration could be done inside both a Saga or a Consumer. However, notice that the syntax is different on each case.

Consumers

At consumers, it is required to inject the IMessageScheduler from MassTransit.

C#
public class ScheduleNotificationConsumer : IConsumer<ScheduleNotification>
{
    private readonly messageScheduler;

    public ScheduleNotificationConsumer(IMessageScheduler messageScheduler)
    {
        this.messageScheduler = messageScheduler;
    }

    public async Task Consume(ConsumeContext<ScheduleNotification> context)
    {
        // Your logic goes here.
    }
}

Notice that is possible to schedule both Events and Commands. The events should be done using the SchedulePublish providing it the DateTime when it should be executed and the message.

Example:

C#
    public async Task Consume(ConsumeContext<ScheduleNotification> context)
    {
        // This will schedule the publish of TestMessage in the next 10 seconds.
        this.messageScheduler.SchedulePublish<TestMessage>(
            DateTime.Now.AddSeconds(10),  // It could be any DateTime.
            new
            {
                CorrelationId = NewId.NextGuid(),
            });
    }

To schedule a command, which is discouraged, the ScheduleSend has to be used. Its parameters are: URI of the endpoint to be executed, date of execution and the message.

Example:

C#
public async Task Consume(ConsumeContext<ScheduleNotification> context)
{
    // This will schedule the publish of TestMessage at the test-service queue in the next 10 seconds.
    Uri testServiceUri = new Uri("queue:test-service");

    await context.ScheduleSend<TestMessage>(
        testServiceUri,
        DateTime.Now.AddSeconds(10), // It could be any DateTime.
        new
        {
            CorrelationId = NewId.NextGuid(),
        });
}

Sagas

At Sagas, the scheduling of deferred schedules is a bit different. It is recommended to take a look at the MassTransit official documentation since it is covered in detail. As for more examples, it is also advisable to take a look at the MassTransit Sample-Twitch repo since there are some good examples there as well.

Recurring schedules

To request a recurring message, you need to use the ScheduleRecurringSend extension method, which is available for both PublishEndpoint and SendEndpointProvider.

It is important to remark that recurring schedules behave as commands, meaning that they require the URI of the endpoint they are targeting.

This type of schedules require the following:

  • URI of the Endpoint triggered.
  • Recurrence object.
  • Object that contains the payload to be sent.

Recurrence Definition

The recurrence object needed to determine the periodicity of the schedule should inherit from DefaultRecurringSchedule or it should implement RecurringSchedule (if there is the need of customizing something). To define the recurrence properly, it is required to know how to manage Quartz's CRON expressions. They are slightly different from the ones from UNIX, for reference read this.

C#
public class SuiteRecurrence : DefaultRecurringSchedule
{
    public SuiteRecurrence()
    {
        // This means every minute.
        this.CronExpression = "0 0/1 * 1/1 * ? *";
    }
}

public class SuiteScheduleRecurrence : RecurringSchedule
{
    public SuiteScheduleRecurrence(
            string cronExpression,
            string scheduleId,
            string scheduleGroup,
            DateTime? endDate = null,
            MissedEventPolicy missedEventPolicy = MissedEventPolicy.Default)
    {
        this.ScheduleId = scheduleId.IsNullOrEmpty() ?
                                        TypeMetadataCache.GetShortName(this.GetType()) :
                                        scheduleId;

        this.ScheduleGroup =
                scheduleGroup.IsNullOrEmpty() ?
                        GetType().GetTypeInfo().Assembly.FullName.Split(",".ToCharArray(), StringSplitOptions.RemoveEmptyEntries)[0] :
                        scheduleGroup;

        this.CronExpression = cronExpression;

        this.TimeZoneId = TimeZoneInfo.Local.Id;
        this.MisfirePolicy = missedEventPolicy;
        this.StartTime = DateTime.Now;
        this.EndTime = endDate;
    }

    public MissedEventPolicy MisfirePolicy { get; protected set; }

    public string TimeZoneId { get; protected set; }

    public DateTimeOffset StartTime { get; protected set; }

    public DateTimeOffset? EndTime { get; protected set; }

    public string ScheduleId { get; private set; }

    public string ScheduleGroup { get; private set; }

    public string CronExpression { get; protected set; }

    public string? Description { get; protected set; }
}

Creation inside a consumer

At a consumer it is possible to access it through the ConsumeContext as shown below:

C#
public async Task Consume(ConsumeContext<UpdateComponentUnit> context)
{
    var schedulerEndpoint =
        await context.GetSendEndpoint(new Uri($"exchange:{MassTransitConstants.QuartzScheduler}"));

    var recurrence = new SuiteRecurrence();

    var scheduleDefinition = new
    schedulerEndpoint.ScheduleRecurringSend<TestMessage>(
                        new Uri("queue:test-service"),
                        recurrence,
                        new
                        {
                            Payload = requestPayload,
                        });
}

Creation inside a service

At a service it is possible to access it through the ISendEndpointProvider as it follows:

C#
public class ServiceOrderService
{
    private readonly ISendEndpointProvider sendEndpointProvider;

    public ServiceOrderService(ISendEndpointProvider sendEndpointProvider)
    {
        this.sendEndpointProvider = sendEndpointProvider;
    }

    public async Task TriggerRecurringSchedule()
    {
        var schedulerEndpoint =
            await this.sendEndpointProvider.GetSendEndpoint(new Uri($"exchange:{MassTransitConstants.QuartzScheduler}"));

        var recurrence = new SuiteRecurrence();

        var scheduleDefinition = new
        schedulerEndpoint.ScheduleRecurringSend<TestMessage>(
                            new Uri("queue:test-service"),
                            recurrence,
                            new
                            {
                                Payload = requestPayload,
                            });
    }
}