Skip to content

Customizing the Consumer

As you must have already read by now, the Suite uses Messaging Pattern to communicate asynchronously between services. To do so, it relies on a third party framework called MassTransit.

Equipment Consumer

In order to create a new equipment our microservice needs to listen for a message requesting the creation. There is no HTTP endpoint like /equipments/create, by design, all Creation and Update operations should be requested using the Messaging Pattern, that is, by publishing a message to the bus, where then it will be routed to the components that are listening for that message. Such components are called Consumers because they consume the message and act accordingly.

Luckily the microservice template has already created all the basic elements we need:

  1. The CreateOrUpdateEquipment record: A record with the information required to create or update an Equipment. (Somewhat analogous to a DTO).
  2. CreateOrUpdateEquipmentConsumer: an implementation of IConsumer in charge of processing the CreateOrUpdateEquipment message.
  3. EquipmentIntegrationTests: It is very important to test our code, that is why the template includes the corresponding tests for its code. It is our job to keep the tests updated as the project evolves, and of course, add new ones when required. We recommend writing tests first, and then writing the code to pass that test.

Adding the Equipment.Code Property to the Test

On the EquipmentIntegrationTests file there is a test called CreateOrUpdateMessage_WithValidData_ShouldCreateEntity that checks that our consumer is properly creating the Equipment when a message is received:

C#
[Fact]
public async Task CreateOrUpdateMessage_WithValidData_ShouldCreateEntity()
{
    var correlationId = NewId.NextGuid();
    var displayName = "The name";

    await this.PublishEndpoint.Publish(new CreateOrUpdateEquipment(
        correlationId,
        displayName
    ));

    await this.Harness.ConsumedAnyOrThrowAsync<CreateOrUpdateEquipment>(x => x.CorrelationId == correlationId);

    using var uow = this.ServiceProvider.GetRequiredService<IUnitOfWorkManager>().BeginUnitOfWork();

    var entity = await this.repository.FindByIdOrThrowAsync(correlationId);

    Assert.Equal(displayName, entity.DisplayName);

    await this.Harness.PublishedAnyOrThrowAsync<EquipmentUpdated>(
        x => x.CorrelationId == correlationId
        && x.DisplayName == displayName);
}

To read more about tests visit this article. For now lets focus on the assertions. On the line:

C#
 Assert.Equal(displayName, entity.DisplayName);

The test is checking if the DisplayName was properly assigned to the entity. This is correct but we should also validate the Code property we added on the previous section of this tutorial. Lets add a variable to hold a value for the code, and then assert it at the end of the test.

C#
public async Task CreateOrUpdateMessage_WithValidData_ShouldCreateEntity()
{
    var correlationId = NewId.NextGuid();
    var displayName = "The name";
    var code = "The Code";

    await this.PublishEndpoint.Publish(new CreateOrUpdateEquipment(
        correlationId,
        displayName
    ));

    await this.Harness.ConsumedAnyOrThrowAsync<CreateOrUpdateEquipment>(x => x.CorrelationId == correlationId);

    using var uow = this.ServiceProvider.GetRequiredService<IUnitOfWorkManager>().BeginUnitOfWork();

    var entity = await this.repository.FindByIdOrThrowAsync(correlationId);

    Assert.Equal(displayName, entity.DisplayName);
    Assert.Equal(code, entity.Code);

    await this.Harness.PublishedAnyOrThrowAsync<EquipmentUpdated>(
        x => x.CorrelationId == correlationId
        && x.DisplayName == displayName);
}

You can try running the test now, and you'll see that it fails, which makes sense, we are asserting that the Code was updated but we are not passing a code on the creation message. We need to find the CreateOrUpdateEquipment record and add the code field to it:

C#
 public record CreateOrUpdateEquipment(Guid CorrelationId, string DisplayName, string Code);

Important

Remember adding the corresponding documentation when adding a new property to the record.

Now we can go back to the CreateOrUpdateMessage_WithValidData_ShouldCreateEntity test and add the Code when publishing the message:

C#
1
2
3
4
5
await this.PublishEndpoint.Publish(new CreateOrUpdateEquipment(
    correlationId,
    displayName,
    code
));

Now we can run the test again and... it fails again. Which makes sense, even though we are sending the Code on the message, we haven't yet modified the Consumer to tell it what to do with it. Lets do that. Find the CreateOrUpdateEquipmentConsumer class, in it, the Consume method is in charged of processing the messages, and it looks like this:

C#
public async Task Consume(ConsumeContext<CreateOrUpdateSampleDomainEntity> context)
{
    var entityData = context.Message;
    var entity = await this.repository.FindByIdAsync(entityData.CorrelationId);

    // Try creating the entity when it's not found.
    if (entity == null)
    {
        entity = new SampleDomainEntity(entityData.CorrelationId);

        await this.repository.CreateAsync(entity);

        this.logger.LogDebug(
            "Creating SampleDomainEntity: {correlationId} {displayName}",
            entityData.CorrelationId,
            entityData.DisplayName);
    }
    else
    {
        this.logger.LogDebug(
            "Updating SampleDomainEntity: {correlationId} {displayName}",
            entityData.CorrelationId,
            entityData.DisplayName);
    }

    // Update all entity fields to what the incoming message says.
    entity.SetDisplayName(entityData.DisplayName);

    var sampleDomainEntityUpdated = new SampleDomainEntityUpdated(
        entity.CorrelationId,
        entity.DisplayName);

    await context.RespondAsyncIfRequired(sampleDomainEntityUpdated);

    await context.Publish(sampleDomainEntityUpdated);
}

On the context parameter we receive all the information required to operate, including the content of the message being consumed, we can use it to create and assign the correct values to the Equipment entity. The consumer checks if there is already an entity with the given Id, if not it creates the entity, else it updates it.

On the line:

C#
entity.SetDisplayName(entityData.DisplayName);

We are setting the DisplayName, lets add one to set the code:

C#
entity.SetCode(entityData.Code);

Now that we have fixed the consumer, the CreateOrUpdateMessage_WithValidData_ShouldCreateEntity test should pass.

Now any other component on the ecosystem should be able to publish a CreateOrUpdateEquipment message and our microservice will process it correctly.