Sending Messages

Messages are sent over message bus channels in a fire-and-forget fashion - the microservice can rely on the underlying AEP Engine to deliver the message according to the quality of service configured for a channel. A microservice views message channels as a logical, ordered conduit over which messages flow.

Overview

The diagram below depicts the path of a message sent through an engine. The basic flow of sending is as follows:

  1. The application creates a message

  2. The application calls send, passing the message and channel name

  3. The engine looks up the channel and uses it to resolve the physical destination on which the message will be sent via the configured channel key

  4. The engine queues the message for sending until the associated state changes for the message handler are stabilized to the microservice's store (e.g., replication and transaction log write)

  5. Once the microservice's store changes have been stabilized, the enqueued message is then sent. This involves the following sub-steps:

    1. The message is serialized

    2. The serialized contents and message metadata are then packaged in a message bus provider specific message

    3. The message is sent through the underlying transport

Basic Example

To send a message, the microservice needs an AepMessageSender which can be injected via the @AppInjectionPoint annotation:

public class OrderProcessor {
    private AepMessageSender messageSender;

    @AppInjectionPoint
    public void setMessageSender(AepMessageSender messageSender) {
        this.messageSender = messageSender;
    }

    @EventHandler
    public void onNewOrder(NewOrderMessage message) {
        // Create acknowledgment message
        OrderAckMessage ack = OrderAckMessage.create();
        ack.setOrderId(message.getOrderId());
        ack.setStatus("ACCEPTED");

        // Send on the 'order-acks' channel
        messageSender.sendMessage("order-acks", ack);
    }
}

The AepMessageSender provides several send methods:

Method
Description

sendMessage(String channel, MessageView message)

Sends a message on the named channel

sendMessage(String bus, String channel, MessageView message)

Sends a message on a specific bus and channel

sendMessage(String bus, String channel, MessageView message, String key)

Sends with a caller-provided key (advanced)

Creating Messages For Send

Message types generated by ADM have no public constructors. Instances are created via the static create() factory method on the message class:

By routing all message creation through create() factory methods, Talon can transparently switch between pooled and non-pooled allocation of messages without requiring changes to business logic. This enables zero-garbage operation for latency-sensitive applications.

Populating Messages

Once created, messages can be populated using setter methods just like any other POJO:

For advanced encoding-specific population mechanisms (e.g., Xbuf), see Coding for Zero Garbage.

Disposing Unsent Messages

If a message is created but not sent, the application should call dispose() on the message to return it to the pool:

Failing to dispose of unsent messages will not cause memory leaks, but will prevent subsequent create() calls from reusing pooled objects, potentially impacting allocation rates.

Message Keys and Topic Resolution

When a message is sent, the engine resolves the physical topic or destination using the channel's configured message key. The message key can contain static values and dynamic variables that are resolved from the message being sent.

Static Keys

A static key does not contain variable substitutions:

Dynamic Keys with Variable Substitution

Variable portions of a key are denoted using ${variableName} syntax:

When a message is sent, the engine resolves variables by:

  1. Looking up the value on the message (e.g., message.getRegion())

  2. Using values from a configured key resolution table

  3. Using the default value if specified in the key

Message-Based Key Resolution

The most common approach is to resolve key variables from fields on the message:

When sending an OrderEventMessage with:

  • region = "US"

  • orderType = "MARKET"

The resolved topic would be: ORDERS/US/MARKET

Key Resolution Tables

For variables not present on the message, applications can configure a key resolution table:

Configuration-Based Resolution Table:

Programmatic Resolution Table:

Raw (Zero Garbage) Resolution Table:

For performance-sensitive applications using Xbuf encoding:

When using a RawKeyResolutionTable:

  • The table should not be modified after being set on a channel

  • XString values must not be modified after insertion

  • Cannot use both Raw and Properties-based tables on the same channel

Default Values

Variables can specify default values using the syntax ${variableName:defaultValue}:

If region cannot be resolved from the message or key resolution table, "US" will be used.

Channel Key Functions

Channel key functions enable dynamic computation of key portions at runtime. Functions are declared using #[variableName = functionName(arg1, argN)] syntax:

Built-In Functions

Function
Description
Arguments

hash

Hashes value into N partitions

Arg1: value to hash Arg2: number of partitions Returns: 1 through N

env

Looks up runtime property

Arg1: property name Arg2 (optional): default value

concat

Concatenates two strings

Arg1: string1 Arg2: string2

Hash Function Example:

In this example:

  • The sender sends on topics: NEWORDERS/1, NEWORDERS/2, or NEWORDERS/3

  • The receiver subscribes to: NEWORDERS/1 and NEWORDERS/2 (but not 3)

Custom Channel Key Functions

Applications can define custom channel key functions by setting nv.sma.channelkeyfunctioncontainers to a comma-separated list of classes containing channel key functions.

Requirements for Custom Functions:

  • Must be declared public static

  • First argument must be the key being resolved (to which the function appends)

  • May accept additional XString arguments representing function arguments from the channel key

Example:

Configuration:

Implementation:

Message Key Validation

When resolving topics dynamically from message fields, it's important to validate that values don't introduce illegal characters or empty topic levels. Configure validation using these properties:

Property
Default
Description

nv.sma.maxresolvedkeylength

0

Maximum resolved key length. When > 0, validates key length. Set to lowest max length across all bindings for portability.

nv.sma.cleanmessagekey

false

When true, replaces non-alphanumeric characters in variable values with underscore. For example, "Asia/Pac" becomes "Asia_Pac". Does not apply to channel key functions.

nv.sma.allowemptykeyfield

false

When false, empty string values in key variables cause resolution to fail. Does not apply to channel key functions.

nv.sma.treatemptykeyfieldasnull

false

When true, treats empty strings as null, allowing fallback to other sources or default values. Takes precedence over allowemptykeyfield.

nv.sma.validatemessagekey

false

When true, enables message key validation before sending.

Caller-Provided Keys

Applications can provide the resolved key directly in the send call:

When using caller-provided keys:

  • Variable substitution is not performed on the provided key

  • Key validation is performed

  • The key will be prefixed with the channel name if topic_starts_with_channel=true

Message Sequencing

The AEP Engine assigns monotonically increasing sequence numbers to outbound messages to enable receivers to detect message loss or out-of-order delivery. Sequence numbers are managed per bus+channel+qos combination.

Enabling Sequence Numbers

Sequence numbers are enabled per channel using the <sequenceMessages> element:

Solicited vs Unsolicited Sends

By default, sequence numbering behavior differs between solicited and unsolicited sends:

Solicited sends (in-transaction sends):

  • Sequence numbers are set by default

  • Controlled by setOutboundSequenceNumbers (default: true)

Unsolicited sends (out-of-transaction sends):

  • Sequence numbers are not set by default

  • Controlled by both setOutboundSequenceNumbers (default: true) and sequenceUnsolicitedSends (default: false)

  • To enable sequence numbers on unsolicited sends, set sequenceUnsolicitedSends=true

Configuration example:

When using sequenceUnsolicitedWithSolicitedSends=true for concurrent send safety, unsolicited sends are converted to solicited sends and thus follow solicited send sequencing behavior (controlled only by setOutboundSequenceNumbers). See Concurrent Sends for details.

Accessing Sequence Numbers

Applications can access sequence numbers from received messages:

Unsolicited Sends

An unsolicited send is a message sent from outside of a message handler - not in response to a received message. Unsolicited sends are common in gateway applications that input messages from external sources.

Overview

When messages are sent from within a message handler (solicited sends), the AEP Engine automatically manages send stability and ensures zero-loss delivery across failovers. For unsolicited sends, additional configuration and application participation may be required to ensure zero-loss delivery.

Send Stability Tracking

The AEP Engine provides AepSendStabilityEvent to notify applications when an unsolicited send has been stabilized and is guaranteed to be delivered. This allows gateway applications to track which messages have been successfully delivered.

Enable Send Stability Events:

Send Stability Semantics

An unsolicited send is considered stabilized according to the microservice's InboundEventAcknowledgementPolicy:

Sent in transaction stream (sequenceUnsolicitedWithSolicitedSends=true):

  • OnSendStability: Stabilized when all messages in the transaction (and prior transactions) are acknowledged

  • OnStoreStability: Stabilized when the transaction has been replicated to backup and/or transaction log

Sent outside transaction stream (sequenceUnsolicitedWithSolicitedSends=false and replicateUnsolicitedSends=false):

  • Stabilized on receipt of acknowledgment from the message bus

Correlating Send Stability

Applications need to correlate stability events with the source data that triggered the send. Use one of these approaches:

1. Use Message Fields:

2. Use Message Attachments:

When correlation data shouldn't be in the message payload:

Example: File Gateway with Send Stability

The following example shows a file gateway that reads lines from a file and publishes each line. It tracks send stability to maintain a cursor file:

Configuration:

Send Exception Handling

Exceptions can occur during send calls or asynchronously via negative acknowledgments (nacks) from the messaging provider. Handling depends on configuration:

Sent outside transaction stream:

  • Send Exception: Thrown to caller, no AepSendStabilityEvent dispatched

  • Nack: Reported in AepSendStabilityEvent via non-null getStatus()

Sent in transaction stream:

Exception handling depends on InboundEventAcknowledgementPolicy and MessageSendExceptionHandlingPolicy:

OnSendStability:

  • Send failure with LogExceptionAndContinue: Exception logged and reported in AepSendStabilityEvent

  • Send failure with TreatAsFatal: Engine shuts down, exception in AepEngineStoppedEvent

  • Nack: Engine shuts down, exception in AepEngineStoppedEvent

OnStoreStability:

  • AepSendStabilityEvent dispatched after replication

  • Send failure with LogExceptionAndContinue: Exception logged (not in stability event as it was already dispatched)

  • Send failure with TreatAsFatal: Engine shuts down

  • Nack: Engine shuts down

HA Considerations for Unsolicited Sends

  • EventSourcing: Unsolicited sends are not replicated or persisted in Event Sourcing mode. Use State Replication for unsolicited sends requiring durability.

  • Failover: Engine does not dispatch AepSendStabilityEvent for recovered sends after failover

  • At Least Once Delivery: Unsolicited sends provide at-least-once delivery semantics. Downstream applications must handle potential duplicates.

Additional Considerations

  • Threading: AepSendStabilityEvent is dispatched on the engine's multiplexer thread. Applications performing expensive or blocking operations should use a separate thread.

  • Event Ordering:

    • With sequenceUnsolicitedWithSolicitedSends=true: Stability events issued in send order

    • With sequenceUnsolicitedWithSolicitedSends=false: Stability events issued in acknowledgment order (usually ordered per bus+channel+qos)

Concurrent Sends

The Talon runtime ensures single-threaded operation for solicited sends (in-transaction sends). However, unsolicited sends may introduce concurrency that requires additional configuration when:

  • Multiple unsolicited sends are performed concurrently using multiple threads, or

  • Unsolicited sends are performed concurrently with solicited sends

Thread Safety Configuration

Talon provides two alternative mechanisms for ensuring concurrent send safety:

Configure the message bus binding for concurrent sends using the enable_concurrent_sends connection descriptor property:

This is the preferred, modern approach applicable to all message bus bindings supported by Talon. It ensures correct operation for both concurrent send scenarios:

  • Multiple unsolicited sends performed concurrently using multiple threads

  • Unsolicited sends performed concurrently with solicited sends

Option 2: Transaction Pipeline Integration (Legacy)

Use the sequenceUnsolicitedWithSolicitedSends configuration parameter:

This legacy approach injects unsolicited sends into the engine's transaction pipeline, effectively converting them to solicited sends. This achieves concurrent send safety as a side effect of transaction pipeline integration.

Key difference between the two approaches:

The approaches differ in how sequence numbers are applied to unsolicited sends. See Message Sequencing for details on sequence number configuration.

With enable_concurrent_sends:

  • Unsolicited sends remain out-of-transaction

  • Follow normal unsolicited send sequencing behavior (no sequence numbers by default)

With sequenceUnsolicitedWithSolicitedSends:

  • Unsolicited sends are converted to solicited sends (injected into transaction pipeline)

  • Follow solicited send sequencing behavior (sequence numbers set by default)

Both enable_concurrent_sends and sequenceUnsolicitedWithSolicitedSends are alternative mechanisms for achieving concurrent send safety. They are equivalent except for the default behavior of sequence numbering on unsolicited sends (see Message Sequencing). Setting both is safe but not necessary. The enable_concurrent_sends approach is preferred for new applications, while sequenceUnsolicitedWithSolicitedSends remains available for applications that prefer the transaction injection approach.

Configuration Summary

The following table summarizes how to configure Talon for concurrent sends in various scenarios:

Solicited Sends
Concurrent Unsolicited Sends
Sequence Number in Solicited Sends
Sequence Number in Unsolicited Sends
Configuration

No

Yes

N/A

Yes or No

enable_concurrent_sends=true

Yes

Yes

Yes

No

enable_concurrent_sends=true

Yes

Yes

No

Yes

enable_concurrent_sends=true

Yes

Yes

No

No

enable_concurrent_sends=true

Yes

Yes

Yes

Yes

sequenceUnsolicitedWithSolicitedSends=true

When using sequence numbers with concurrent sends, Talon ensures that messages are transmitted on the wire in the same sequence as the sequence numbers assigned to them.

See Also

Last updated