Trusted answers to developer questions

What are microservices and the event aggregator pattern?

Get Started With Machine Learning

Learn the fundamentals of Machine Learning with this free course. Future-proof your career by adding ML skills to your toolkit — or prepare to land a job in AI or Data Science.

How to generalize events flow in a microservices architecture

event-agregator

Numerous enterprise solutions based on the microservices architecture have an issue with generalizing event flow from different sources. A lot of solutions also have various providers, for example:

  • Azure Service Bus
  • Apache Kafka
  • RabbitMQ

Here we need a component with the ability to join event publishers and event subscribers.

Solution

Before we go deep into the implementation, we can look at Martin Fowler’s Event Aggregator.

Figure 1.

event-agregator

“An Event Aggregator is a simple element of indirection. In its simplest form, you have it register with all the source objects you are interested in and register all target objects with the Event Aggregator. The Event Aggregator responds to any event from a source object by propagating that event to the target objects.” Source.

Another example that follows this principle is Azure Event-Grid. With the EventGrid, you can join cloud resources that produce events (publishers) and resources that handle the events (subscribers). You can see this in the image below.

event-agregator

Let’s see how we can reuse this pattern in the actual example. I wrote components and services of the solution in Sharp. As a primary Event-Bus, I used the Azure Service Bus.

I designed the solution to be easy to extend, so you can create providers for Apache Kafka and Rabbit MQ.

The first component is ServiceBusManager. It contains a method to subscribe to the messages. You can see the code below:

public async Task RegisterOnReceiveMessages(string subscription, Dictionary<string, Func<Message, bool>> subscriptionToLabelHandler, CancellationToken cancellationToken)
        {
            var taskCompletionSource = new TaskCompletionSource<bool>();
            SubscriptionClient subscriptionClient = GetSubscriptionClient(subscription);

            RegisterCancellationToken(cancellationToken, subscriptionClient, taskCompletionSource);

            var messageHandlerOptions = MessageHandlerOptions;

            // Register the function that will process messages
            subscriptionClient.RegisterMessageHandler(async (message, token) =>
            {
                //Process the message
                Console.WriteLine($"Received message: SequenceNumber:{message.Label} | SequenceNumber:{message.SystemProperties.SequenceNumber} | Body:{Encoding.UTF8.GetString(message.Body)}");

                subscriptionToLabelHandler[message.Label](message);

                // Complete the message so that it is not received again.
                await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);

            }, messageHandlerOptions);

            await taskCompletionSource.Task;
        }

And the method that allows another component to send messages.

public async Task SendMessage(string label, string messageContent)
        {
            try
            {
                var topicClient = new TopicClient(serviceBusSettings.ConnectionString, serviceBusSettings.TopicName);

                var messageData = GetMessageContent(label, messageContent);

                var message = new Message
                {
                    Body = messageData,
                    Label = label,
                };

                // Send the message to the queue
                await topicClient.SendAsync(message);

                await topicClient.CloseAsync();
            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
            }
        }

Here you can find a complete implementation of the Service Bus Manager.

EventAggregator

The second component of our system is the EventAgregator class. It contains:

  • Configuration: maps event handler label and event handler function.
private Dictionary<string, Func<Message, bool>> SubscriptionToLabelFuncs => new Dictionary<string, Func<Message, bool>>
        {
            { "First", DoFirstHandler },
            { "Second", DoSecondHandler }
        };

....
public bool DoFirstHandler(Message message)
        {
            // Get message body example
            var data = GetMessageBody(message);

            return true;
        }

/// <summary>
        /// Second message handler example.
        /// </summary>
        /// <param name="message">The message.</param>
        /// <returns></returns>
        public bool DoSecondHandler(Message message)
        {
            // Get message body example
            var data = GetMessageBody(message);

            return true;
        }


  • And the method that runs the aggregation process in the background.
  /// <summary>
        /// Starts the agregating.
        /// </summary>
        /// <returns></returns>
        public async Task StartAgregating()
        {
            this.cancellationToken = new CancellationTokenSource().Token;
            await serviceBusManager.RegisterOnReceiveMessages(Subscription, SubscriptionToLabelFuncs, cancellationToken);
        }

Here, you can find a complete implementation of the Service Bus Manager.

Conclusion

In this shot, I’ve explained one way of how to organize the event communication in your microservice solution. If you want to receive advanced skills in building event-driven architecture, you can subscribe and receive Architecture Digest and enroll in the course Building Event-Driven and Microservices Architecture in Azure.

RELATED TAGS

azure
dotnetcore
Did you find this helpful?