Injecting Messages

Overview

At the core of the AEP Engine is its event multiplexer - a prioritized dispatch loop that pumps messages into your microservice’s handlers. Whether a message originates from the prestart event, arrives via subscriptions on the underlying bus or explicitly injected by the user, it’s enqueued by priority in the multiplexer. The multiplexer then dequeues each message in turn and routes it to the appropriate handler. All messages and events are dispatched by the engine multiplexer's dispatcher thread.

This section covers how to inject messages directly into the multiplexer - a technique you can use to schedule deferred work from within an event handler or to have an external thread enqueue tasks into your microservice.

Injection Thread

Any thread can inject messages into the engine's multiplexer.

Message Injection

There are two variants of message injection

  • Prioritized Injection with immediate dispatch

  • Injection with delayed dispatch

All injection is done using an overloaded variant of the AepEngine's injectMessage() method.

Injection with Immediate Dispatch

Normal Injection

Normal injection is injection with priority 0. The following code illustrates how to perform injection with normal (0) priority for immediate dispatch.

engine.injectMessage(MessageB.create(), true);

The above is equivalent to the following which injects the messages explicitly specifying the message priority to be 0.

Prioritized Injection

Prioritized injection is performed in the same manner as above but using a negative value for the last parameter - the delay parameter. The lower the value supplied, the higher the priority.

The following illustrates how to perform prioritized injection. In this example, the message injected with priority -2 will be dispatched before the message injected with priority -1.

Injection with Delayed Dispatch

Injection with delayed dispatch is performed in the same manner as above but using a positive value for the last parameter - the delay parameter. If the delay parameter is > 0, then the value is interpreted as the dispatch delay, in milliseconds.

The following illustrates how to perform injection with delayed dispatch. In this example, the injected message will be dispatched to its handler 1 second after injection.

Injection, Transactions and Processing Notification

A injected messages either starts a new transaction or can be slotted into an ongoing transaction. The injector does not have any control on which transaction the injected message eventually ends up in. However, the injector can request to be notified when the processing of the injected messages is complete and the transaction in which the injected message was processed was completed. It does so by supplying an instance of IEventAcknowledger to injectMessage(). The supplied instance of the event acknowledger will be invoked when the transaction, containing the injected message, is committed.

The following illustrates how to perform an injection with a request to be notified on completion of the transaction that absorbs the injected message.

Blocking vs. Non-Blocking Injection

The true parameter in the above examples is a parameter indicating whether the injection should be done in a blocking or non-blocking manner. Blocking injection is injection in which the injector thread blocks in case its queue in the event multiplexer is full. With non-blocking injection, the injector thread never blocks - the queue continues to grow. Injections done by the event multiplexer dispatch thread e.g, injections done from within an event/message handler will never block.

Injected Message Ownership

Message injection transfer ownership of the message to the engine until the processing of the message is complete i.e. until the event acknowledger supplied by the injector in invoked. An injected message must not be written to or read from during this period of time. Once the processing is complete, the engine will dispose() its reference to the message (passed to it by the inject call). Therefore, if the user wishes to hold onto the message reference to perform read /write operations after the processing of the injected message is complete, then it would need to acquire a reference to the message via the message's acquire() method.

HA Considerations

In Event Sourced microservices, messages will only be injected into the event multiplexer of engines of Primary cluster instances in the Started state. Calls to inject messages on backup instances are ignored since, for Event Sourced engines, the injected message will be injected into and replicated from the primary and, for State Replication engines, all message processing is done only on the primary. Calls made while an engine is replaying from a transaction log (i.e,. engine state is Starting) are similarly ignored as those calls would interfere with the stream being replayed. An application that injects messages from an external source may call AepEngine.waitForMessagingToStart() to avoid an injected message being discarded while an engine is transitioning to a started, primary role.

Last updated