Skip to content

Latest commit

 

History

History
890 lines (690 loc) · 52.5 KB

spring-integration-support.md

File metadata and controls

890 lines (690 loc) · 52.5 KB
titledescriptionms.dateauthorms.authorms.reviewerms.topicms.customappliesto
Spring Cloud Azure support for Spring Integration
This article describes how Spring Cloud Azure and Spring Integration can be used together.
04/06/2023
KarlErickson
karler
seal
reference
devx-track-java, devx-track-extended-java
✅ Version 4.20.0
✅ Version 5.22.0

Spring Cloud Azure support for Spring Integration

Spring Integration Extension for Azure provides Spring Integration adapters for the various services provided by the Azure SDK for Java. We provide Spring Integration support for these Azure services: Event Hubs, Service Bus, Storage Queue. The following is a list of supported adapters:

Spring Integration with Azure Event Hubs

Key concepts

Azure Event Hubs is a big data streaming platform and event ingestion service. It can receive and process millions of events per second. Data sent to an event hub can be transformed and stored by using any real-time analytics provider or batching/storage adapters.

Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters. Those adapters provide a higher-level of abstraction over Spring's support for remoting, messaging, and scheduling. The Spring Integration for Event Hubs extension project provides inbound and outbound channel adapters and gateways for Azure Event Hubs.

Note

RxJava support APIs are dropped from version 4.0.0. See Javadoc for details.

Consumer group

Event Hubs provides similar support of consumer group as Apache Kafka, but with slight different logic. While Kafka stores all committed offsets in the broker, you have to store offsets of Event Hubs messages being processed manually. Event Hubs SDK provides the function to store such offsets inside Azure Storage.

Partitioning support

Event Hubs provides a similar concept of physical partition as Kafka. But unlike Kafka's auto re-balancing between consumers and partitions, Event Hubs provides a kind of preemptive mode. The storage account acts as a lease to determine which partition is owned by which consumer. When a new consumer starts, it will try to steal some partitions from most heavy-loaded consumers to achieve the workload balancing.

To specify the load balancing strategy, developers can use EventHubsContainerProperties for the configuration. See the following section for an example of how to configure EventHubsContainerProperties.

Batch consumer support

The EventHubsInboundChannelAdapter supports the batch-consuming mode. To enable it, users can specify the listener mode as ListenerMode.BATCH when constructing an EventHubsInboundChannelAdapter instance. When enabled, an Message of which the payload is a list of batched events will be received and passed to the downstream channel. Each message header is also converted as a list, of which the content is the associated header value parsed from each event. For the communal headers of partition ID, checkpointer and last enqueued properties, they are presented as a single value for the entire batch of events shares the same one. For more information, see the Event Hubs Message Headers section.

Note

The checkpoint header only exists when MANUAL checkpoint mode is used.

Checkpointing of batch consumer supports two modes: BATCH and MANUAL. BATCH mode is an auto checkpointing mode to checkpoint the entire batch of events together once they are received. MANUAL mode is to checkpoint the events by users. When used, the Checkpointer will be passed into the message header, and users could use it to do checkpointing.

The batch consuming policy can be specified by properties of max-size and max-wait-time, where max-size is a necessary property while max-wait-time is optional. To specify the batch consuming strategy, developers can use EventHubsContainerProperties for the configuration. See the following section for an example of how to configure EventHubsContainerProperties.

Dependency setup

<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId> </dependency>

Configuration

This starter provides the following 3 parts of configuration options:

Connection Configuration Properties

This section contains the configuration options used for connecting to Azure Event Hubs.

Note

If you choose to use a security principal to authenticate and authorize with Microsoft Entra ID for accessing an Azure resource, see Authorize access with Microsoft Entra ID to make sure the security principal has been granted the sufficient permission to access the Azure resource.

Connection configurable properties of spring-cloud-azure-starter-integration-eventhubs:

[!div class="mx-tdBreakAll"]

PropertyTypeDescription
spring.cloud.azure.eventhubs.enabledbooleanWhether an Azure Event Hubs is enabled.
spring.cloud.azure.eventhubs.connection-stringStringEvent Hubs Namespace connection string value.
spring.cloud.azure.eventhubs.namespaceStringEvent Hubs Namespace value, which is the prefix of the FQDN. A FQDN should be composed of NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-nameStringDomain name of an Azure Event Hubs Namespace value.
spring.cloud.azure.eventhubs.custom-endpoint-addressStringCustom Endpoint address.
spring.cloud.azure.eventhubs.shared-connectionBooleanWhether the underlying EventProcessorClient and EventHubProducerAsyncClient use the same connection. By default, a new connection is constructed and used created for each Event Hub client created.

Checkpoint Configuration Properties

This section contains the configuration options for the Storage Blobs service, which is used for persisting partition ownership and checkpoint information.

Note

From version 4.0.0, when the property of spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists isn't enabled manually, no Storage container will be created automatically.

Checkpointing configurable properties of spring-cloud-azure-starter-integration-eventhubs:

[!div class="mx-tdBreakAll"]

PropertyTypeDescription
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-existsBooleanWhether to allow creating containers if not exists.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-nameStringName for the storage account.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-keyStringStorage account access key.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-nameStringStorage container name.

Common Azure Service SDK configuration options are configurable for Storage Blob checkpoint store as well. The supported configuration options are introduced in Spring Cloud Azure configuration, and could be configured with either the unified prefix spring.cloud.azure. or the prefix of spring.cloud.azure.eventhubs.processor.checkpoint-store.

Event Hub processor configuration properties

The EventHubsInboundChannelAdapter uses the EventProcessorClient to consume messages from an event hub, to configure the overall properties of an EventProcessorClient, developers can use EventHubsContainerProperties for the configuration. See the following section about how to work with EventHubsInboundChannelAdapter.

Basic usage

Send messages to Azure Event Hubs

  1. Fill the credential configuration options.

    • For credentials as connection string, configure the following properties in your application.yml file:

      spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING}processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER}account-name: ${CHECKPOINT-STORAGE-ACCOUNT}account-key: ${CHECKPOINT-ACCESS-KEY}

      [!INCLUDE security-note]

    • For credentials as managed identities, configure the following properties in your application.yml file:

      spring: cloud: azure: credential: managed-identity-enabled: trueclient-id: ${AZURE_CLIENT_ID}eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE}processor: checkpoint-store: container-name: ${CONTAINER_NAME}account-name: ${ACCOUNT_NAME}
    • For credentials as service principal, configure the following properties in your application.yml file:

      spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID}client-secret: ${AZURE_CLIENT_SECRET}profile: tenant-id: <tenant>eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE}processor: checkpoint-store: container-name: ${CONTAINER_NAME}account-name: ${ACCOUNT_NAME}

Note

The values allowed for tenant-id are: common, organizations, consumers, or the tenant ID. For more information about these values, see the Used the wrong endpoint (personal and organization accounts) section of Error AADSTS50020 - User account from identity provider does not exist in tenant. For information on converting your single-tenant app, see Convert single-tenant app to multitenant on Microsoft Entra ID.

  1. Create DefaultMessageHandler with the EventHubsTemplate bean to send messages to Event Hubs.

    classDemo { privatestaticfinalStringOUTPUT_CHANNEL = "output"; privatestaticfinalStringEVENTHUB_NAME = "eh1"; @Bean@ServiceActivator(inputChannel = OUTPUT_CHANNEL) publicMessageHandlermessageSender(EventHubsTemplateeventHubsTemplate) { DefaultMessageHandlerhandler = newDefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate); handler.setSendCallback(newListenableFutureCallback<Void>() { @OverridepublicvoidonSuccess(Voidresult) { LOGGER.info("Message was sent successfully."); } @OverridepublicvoidonFailure(Throwableex) { LOGGER.error("There was an error sending the message.", ex); } }); returnhandler; } }
  2. Create a message gateway binding with the above message handler via a message channel.

    classDemo { @AutowiredEventHubOutboundGatewaymessagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) publicinterfaceEventHubOutboundGateway { voidsend(Stringtext); } }
  3. Send messages using the gateway.

    classDemo { publicvoiddemo() { this.messagingGateway.send(message); } }

Receive messages from Azure Event Hubs

  1. Fill the credential configuration options.

  2. Create a bean of message channel as the input channel.

    @ConfigurationclassDemo { @BeanpublicMessageChannelinput() { returnnewDirectChannel(); } }
  3. Create EventHubsInboundChannelAdapter with the EventHubsMessageListenerContainer bean to receive messages from Event Hubs.

    @ConfigurationclassDemo { privatestaticfinalStringINPUT_CHANNEL = "input"; privatestaticfinalStringEVENTHUB_NAME = "eh1"; privatestaticfinalStringCONSUMER_GROUP = "$Default"; @BeanpublicEventHubsInboundChannelAdaptermessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannelinputChannel, EventHubsMessageListenerContainerlistenerContainer) { EventHubsInboundChannelAdapteradapter = newEventHubsInboundChannelAdapter(processorContainer); adapter.setOutputChannel(inputChannel); returnadapter; } @BeanpublicEventHubsMessageListenerContainermessageListenerContainer(EventHubsProcessorFactoryprocessorFactory) { EventHubsContainerPropertiescontainerProperties = newEventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.setCheckpointConfig(newCheckpointConfig(CheckpointMode.MANUAL)); returnnewEventHubsMessageListenerContainer(processorFactory, containerProperties); } }
  4. Create a message receiver binding with EventHubsInboundChannelAdapter via the message channel created before.

    classDemo { @ServiceActivator(inputChannel = INPUT_CHANNEL) publicvoidmessageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointercheckpointer) { Stringmessage = newString(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }

Configure EventHubsMessageConverter to customize objectMapper

EventHubsMessageConverter is made as a configurable bean to allow users to customize ObjectMapper.

Batch consumer support

To consume messages from Event Hubs in batches is similar with the above sample, besides users should set the batch-consuming related configuration options for EventHubsInboundChannelAdapter.

When create EventHubsInboundChannelAdapter, the listener mode should be set as BATCH. When create bean of EventHubsMessageListenerContainer, set the checkpoint mode as either MANUAL or BATCH, and the batch options can be configured as needed.

@ConfigurationclassDemo { privatestaticfinalStringINPUT_CHANNEL = "input"; privatestaticfinalStringEVENTHUB_NAME = "eh1"; privatestaticfinalStringCONSUMER_GROUP = "$Default"; @BeanpublicEventHubsInboundChannelAdaptermessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannelinputChannel, EventHubsMessageListenerContainerlistenerContainer) { EventHubsInboundChannelAdapteradapter = newEventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH); adapter.setOutputChannel(inputChannel); returnadapter; } @BeanpublicEventHubsMessageListenerContainermessageListenerContainer(EventHubsProcessorFactoryprocessorFactory) { EventHubsContainerPropertiescontainerProperties = newEventHubsContainerProperties(); containerProperties.setEventHubName(EVENTHUB_NAME); containerProperties.setConsumerGroup(CONSUMER_GROUP); containerProperties.getBatch().setMaxSize(100); containerProperties.setCheckpointConfig(newCheckpointConfig(CheckpointMode.MANUAL)); returnnewEventHubsMessageListenerContainer(processorFactory, containerProperties); } }

Event Hubs message headers

The following table illustrates how Event Hubs message properties are mapped to Spring message headers. For Azure Event Hubs, message is called as event.

Mapping between Event Hubs Message / Event Properties and Spring Message Headers in Record Listener Mode:

[!div class="mx-tdBreakAll"]

Event Hubs Event PropertiesSpring Message Header ConstantsTypeDescription
Enqueued timeEventHubsHeaders#ENQUEUED_TIMEInstantThe instant, in UTC, of when the event was enqueued in the Event Hub partition.
OffsetEventHubsHeaders#OFFSETLongThe offset of the event when it was received from the associated Event Hub partition.
Partition keyAzureHeaders#PARTITION_KEYStringThe partition hashing key if it was set when originally publishing the event.
Partition IDAzureHeaders#RAW_PARTITION_IDStringThe partition ID of the Event Hub.
Sequence numberEventHubsHeaders#SEQUENCE_NUMBERLongThe sequence number assigned to the event when it was enqueued in the associated Event Hub partition.
Last enqueued event propertiesEventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIESLastEnqueuedEventPropertiesThe properties of the last enqueued event in this partition.
NAAzureHeaders#CHECKPOINTERCheckpointerThe header for checkpoint the specific message.

Users can parse the message headers for the related information of each event. To set a message header for the event, all customized headers will be put as an application property of an event, where the header is set as the property key. When events are received from Event Hubs, all application properties will be converted to the message header.

Note

Message headers of partition key, enqueued time, offset and sequence number isn't supported to be set manually.

When the batch-consumer mode is enabled, the specific headers of batched messages are listed the follows, which contains a list of values from each single Event Hubs event.

Mapping between Event Hubs Message / Event Properties and Spring Message Headers in Batch Listener Mode:

[!div class="mx-tdBreakAll"]

Event Hubs Event PropertiesSpring Batch Message Header ConstantsTypeDescription
Enqueued timeEventHubsHeaders#ENQUEUED_TIMEList of InstantList of the instant, in UTC, of when each event was enqueued in the Event Hub partition.
OffsetEventHubsHeaders#OFFSETList of LongList of the offset of each event when it was received from the associated Event Hub partition.
Partition keyAzureHeaders#PARTITION_KEYList of StringList of the partition hashing key if it was set when originally publishing each event.
Sequence numberEventHubsHeaders#SEQUENCE_NUMBERList of LongList of the sequence number assigned to each event when it was enqueued in the associated Event Hub partition.
System propertiesEventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIESList of MapList of the system properties of each event.
Application propertiesEventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIESList of MapList of the application properties of each event, where all customized message headers or event properties are placed.

Note

When publish messages, all the above batch headers will be removed from the messages if exist.

Samples

For more information, see the azure-spring-boot-samples repository on GitHub.

Spring Integration with Azure Service Bus

Key concepts

Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters.

The Spring Integration for Azure Service Bus extension project provides inbound and outbound channel adapters for Azure Service Bus.

Note

CompletableFuture support APIs have been deprecated from version 2.10.0, and is replaced by Reactor Core from version 4.0.0. See Javadoc for details.

Dependency setup

<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId> </dependency>

Configuration

This starter provides the following 2 parts of configuration options:

Connection configuration properties

This section contains the configuration options used for connecting to Azure Service Bus.

Note

If you choose to use a security principal to authenticate and authorize with Microsoft Entra ID for accessing an Azure resource, see Authorize access with Microsoft Entra ID to make sure the security principal has been granted the sufficient permission to access the Azure resource.

Connection configurable properties of spring-cloud-azure-starter-integration-servicebus:

[!div class="mx-tdBreakAll"]

PropertyTypeDescription
spring.cloud.azure.servicebus.enabledbooleanWhether an Azure Service Bus is enabled.
spring.cloud.azure.servicebus.connection-stringStringService Bus Namespace connection string value.
spring.cloud.azure.servicebus.custom-endpoint-addressStringThe custom endpoint address to use when connecting to Service Bus.
spring.cloud.azure.servicebus.namespaceStringService Bus Namespace value, which is the prefix of the FQDN. A FQDN should be composed of NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-nameStringDomain name of an Azure Service Bus Namespace value.

Service Bus processor configuration properties

The ServiceBusInboundChannelAdapter uses the ServiceBusProcessorClient to consume messages, to configure the overall properties of an ServiceBusProcessorClient, developers can use ServiceBusContainerProperties for the configuration. See the following section about how to work with ServiceBusInboundChannelAdapter.

Basic usage

Send messages to Azure Service Bus

  1. Fill the credential configuration options.

    • For credentials as connection string, configure the following properties in your application.yml file:

      spring: cloud: azure: servicebus: connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}

      [!INCLUDE security-note]

    • For credentials as managed identities, configure the following properties in your application.yml file:

      spring: cloud: azure: credential: managed-identity-enabled: trueclient-id: ${AZURE_CLIENT_ID}profile: tenant-id: <tenant>servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}

Note

The values allowed for tenant-id are: common, organizations, consumers, or the tenant ID. For more information about these values, see the Used the wrong endpoint (personal and organization accounts) section of Error AADSTS50020 - User account from identity provider does not exist in tenant. For information on converting your single-tenant app, see Convert single-tenant app to multitenant on Microsoft Entra ID.

  • For credentials as service principal, configure the following properties in your application.yml file:

    spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID}client-secret: ${AZURE_CLIENT_SECRET}profile: tenant-id: <tenant>servicebus: namespace: ${AZURE_SERVICE_BUS_NAMESPACE}

Note

The values allowed for tenant-id are: common, organizations, consumers, or the tenant ID. For more information about these values, see the Used the wrong endpoint (personal and organization accounts) section of Error AADSTS50020 - User account from identity provider does not exist in tenant. For information on converting your single-tenant app, see Convert single-tenant app to multitenant on Microsoft Entra ID.

  1. Create DefaultMessageHandler with the ServiceBusTemplate bean to send messages to Service Bus, set the entity type for the ServiceBusTemplate. This sample takes Service Bus Queue as example.

    classDemo { privatestaticfinalStringOUTPUT_CHANNEL = "queue.output"; @Bean@ServiceActivator(inputChannel = OUTPUT_CHANNEL) publicMessageHandlerqueueMessageSender(ServiceBusTemplateserviceBusTemplate) { serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE); DefaultMessageHandlerhandler = newDefaultMessageHandler(QUEUE_NAME, serviceBusTemplate); handler.setSendCallback(newListenableFutureCallback<Void>() { @OverridepublicvoidonSuccess(Voidresult) { LOGGER.info("Message was sent successfully."); } @OverridepublicvoidonFailure(Throwableex) { LOGGER.info("There was an error sending the message."); } }); returnhandler; } }
  2. Create a message gateway binding with the above message handler via a message channel.

    classDemo { @AutowiredQueueOutboundGatewaymessagingGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) publicinterfaceQueueOutboundGateway { voidsend(Stringtext); } }
  3. Send messages using the gateway.

    classDemo { publicvoiddemo() { this.messagingGateway.send(message); } }

Receive messages from Azure Service Bus

  1. Fill the credential configuration options.

  2. Create a bean of message channel as the input channel.

    @ConfigurationclassDemo { privatestaticfinalStringINPUT_CHANNEL = "input"; @BeanpublicMessageChannelinput() { returnnewDirectChannel(); } }
  3. Create ServiceBusInboundChannelAdapter with the ServiceBusMessageListenerContainer bean to receive messages to Service Bus. This sample takes Service Bus Queue as example.

    @ConfigurationclassDemo { privatestaticfinalStringQUEUE_NAME = "queue1"; @BeanpublicServiceBusMessageListenerContainermessageListenerContainer(ServiceBusProcessorFactoryprocessorFactory) { ServiceBusContainerPropertiescontainerProperties = newServiceBusContainerProperties(); containerProperties.setEntityName(QUEUE_NAME); containerProperties.setAutoComplete(false); returnnewServiceBusMessageListenerContainer(processorFactory, containerProperties); } @BeanpublicServiceBusInboundChannelAdapterqueueMessageChannelAdapter( @Qualifier(INPUT_CHANNEL) MessageChannelinputChannel, ServiceBusMessageListenerContainerlistenerContainer) { ServiceBusInboundChannelAdapteradapter = newServiceBusInboundChannelAdapter(listenerContainer); adapter.setOutputChannel(inputChannel); returnadapter; } }
  4. Create a message receiver binding with ServiceBusInboundChannelAdapter via the message channel we created before.

    classDemo { @ServiceActivator(inputChannel = INPUT_CHANNEL) publicvoidmessageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointercheckpointer) { Stringmessage = newString(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message)) .doOnError(e -> LOGGER.error("Error found", e)) .block(); } }

Configure ServiceBusMessageConverter to customize objectMapper

ServiceBusMessageConverter is made as a configurable bean to allow users to customize ObjectMapper.

Service Bus message headers

For some Service Bus headers that can be mapped to multiple Spring header constants, the priority of different Spring headers is listed.

Mapping between Service Bus Headers and Spring Headers:

[!div class="mx-tdBreakAll"]

Service Bus message headers and propertiesSpring message header constantsTypeConfigurableDescription
Content typeMessageHeaders#CONTENT_TYPEStringYesThe RFC2045 Content-Type descriptor of the message.
Correlation IDServiceBusMessageHeaders#CORRELATION_IDStringYesThe correlation ID of the message
Message IDServiceBusMessageHeaders#MESSAGE_IDStringYesThe message ID of the message, this header has higher priority than MessageHeaders#ID.
Message IDMessageHeaders#IDUUIDYesThe message ID of the message, this header has lower priority than ServiceBusMessageHeaders#MESSAGE_ID.
Partition keyServiceBusMessageHeaders#PARTITION_KEYStringYesThe partition key for sending the message to a partitioned entity.
Reply toMessageHeaders#REPLY_CHANNELStringYesThe address of an entity to send replies to.
Reply to session IDServiceBusMessageHeaders#REPLY_TO_SESSION_IDStringYesThe ReplyToGroupId property value of the message.
Scheduled enqueue time utcServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIMEOffsetDateTimeYesThe datetime at which the message should be enqueued in Service Bus, this header has higher priority than AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Scheduled enqueue time utcAzureHeaders#SCHEDULED_ENQUEUE_MESSAGEIntegerYesThe datetime at which the message should be enqueued in Service Bus, this header has lower priority than ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Session IDServiceBusMessageHeaders#SESSION_IDStringYesThe session IDentifier for a session-aware entity.
Time to liveServiceBusMessageHeaders#TIME_TO_LIVEDurationYesThe duration of time before this message expires.
ToServiceBusMessageHeaders#TOStringYesThe "to" address of the message, reserved for future use in routing scenarios and presently ignored by the broker itself.
SubjectServiceBusMessageHeaders#SUBJECTStringYesThe subject for the message.
Dead letter error descriptionServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTIONStringNoThe description for a message that has been dead-lettered.
Dead letter reasonServiceBusMessageHeaders#DEAD_LETTER_REASONStringNoThe reason a message was dead-lettered.
Dead letter sourceServiceBusMessageHeaders#DEAD_LETTER_SOURCEStringNoThe entity in which the message was dead-lettered.
Delivery countServiceBusMessageHeaders#DELIVERY_COUNTlongNoThe number of the times this message was delivered to clients.
Enqueued sequence numberServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBERlongNoThe enqueued sequence number assigned to a message by Service Bus.
Enqueued timeServiceBusMessageHeaders#ENQUEUED_TIMEOffsetDateTimeNoThe datetime at which this message was enqueued in Service Bus.
Expires atServiceBusMessageHeaders#EXPIRES_ATOffsetDateTimeNoThe datetime at which this message will expire.
Lock tokenServiceBusMessageHeaders#LOCK_TOKENStringNoThe lock token for the current message.
Locked untilServiceBusMessageHeaders#LOCKED_UNTILOffsetDateTimeNoThe datetime at which the lock of this message expires.
Sequence numberServiceBusMessageHeaders#SEQUENCE_NUMBERlongNoThe unique number assigned to a message by Service Bus.
StateServiceBusMessageHeaders#STATEServiceBusMessageStateNoThe state of the message, which can be Active, Deferred, or Scheduled.

Partition key support

This starter supports Service Bus partitioning by allowing setting partition key and session ID in the message header. This section introduces how to set partition key for messages.

Recommended: Use ServiceBusMessageHeaders.PARTITION_KEY as the key of the header.

publicclassSampleController { @PostMapping("/messages") publicResponseEntity<String> sendMessage(@RequestParamStringmessage) { LOGGER.info("Going to add message {} to Sinks.Many.", message); many.emitNext(MessageBuilder.withPayload(message) .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key") .build(), Sinks.EmitFailureHandler.FAIL_FAST); returnResponseEntity.ok("Sent!"); } }

Not recommended but currently supported: AzureHeaders.PARTITION_KEY as the key of the header.

publicclassSampleController { @PostMapping("/messages") publicResponseEntity<String> sendMessage(@RequestParamStringmessage) { LOGGER.info("Going to add message {} to Sinks.Many.", message); many.emitNext(MessageBuilder.withPayload(message) .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key") .build(), Sinks.EmitFailureHandler.FAIL_FAST); returnResponseEntity.ok("Sent!"); } }

Note

When both ServiceBusMessageHeaders.PARTITION_KEY and AzureHeaders.PARTITION_KEY are set in the message headers, ServiceBusMessageHeaders.PARTITION_KEY is preferred.

Session support

This example demonstrates how to manually set the session ID of a message in the application.

publicclassSampleController { @PostMapping("/messages") publicResponseEntity<String> sendMessage(@RequestParamStringmessage) { LOGGER.info("Going to add message {} to Sinks.Many.", message); many.emitNext(MessageBuilder.withPayload(message) .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID") .build(), Sinks.EmitFailureHandler.FAIL_FAST); returnResponseEntity.ok("Sent!"); } }

Note

When the ServiceBusMessageHeaders.SESSION_ID is set in the message headers, and a different ServiceBusMessageHeaders.PARTITION_KEY header is also set, the value of the session ID will eventually be used to overwrite the value of the partition key.

Customize Service Bus client properties

Developers can use AzureServiceClientBuilderCustomizer to customize Service Bus Client properties. The following example customizes the sessionIdleTimeout property in ServiceBusClientBuilder:

@BeanpublicAzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionProcessorClientBuilder> customizeBuilder() { returnbuilder -> builder.sessionIdleTimeout(Duration.ofSeconds(10)); }

Samples

For more information, see the azure-spring-boot-samples repository on GitHub.

Spring Integration with Azure Storage Queue

Key concepts

Azure Queue Storage is a service for storing large numbers of messages. You access messages from anywhere in the world via authenticated calls using HTTP or HTTPS. A queue message can be up to 64 KB in size. A queue may contain millions of messages, up to the total capacity limit of a storage account. Queues are commonly used to create a backlog of work to process asynchronously.

Dependency setup

<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId> </dependency>

Configuration

This starter provides the following configuration options:

Connection configuration properties

This section contains the configuration options used for connecting to Azure Storage Queue.

Note

If you choose to use a security principal to authenticate and authorize with Microsoft Entra ID for accessing an Azure resource, see Authorize access with Microsoft Entra ID to make sure the security principal has been granted the sufficient permission to access the Azure resource.

Connection configurable properties of spring-cloud-azure-starter-integration-storage-queue:

[!div class="mx-tdBreakAll"]

PropertyTypeDescription
spring.cloud.azure.storage.queue.enabledbooleanWhether an Azure Storage Queue is enabled.
spring.cloud.azure.storage.queue.connection-stringStringStorage Queue Namespace connection string value.
spring.cloud.azure.storage.queue.accountNameStringStorage Queue account name.
spring.cloud.azure.storage.queue.accountKeyStringStorage Queue account key.
spring.cloud.azure.storage.queue.endpointStringStorage Queue service endpoint.
spring.cloud.azure.storage.queue.sasTokenStringSas token credential
spring.cloud.azure.storage.queue.serviceVersionQueueServiceVersionQueueServiceVersion that is used when making API requests.
spring.cloud.azure.storage.queue.messageEncodingStringQueue message encoding.

Basic usage

Send messages to Azure Storage Queue

  1. Fill the credential configuration options.

    • For credentials as connection string, configure the following properties in your application.yml file:

      spring: cloud: azure: storage: queue: connection-string: ${AZURE_STORAGE_QUEUE_CONNECTION_STRING}

      [!INCLUDE security-note]

    • For credentials as managed identities, configure the following properties in your application.yml file:

      spring: cloud: azure: credential: managed-identity-enabled: trueclient-id: ${AZURE_CLIENT_ID}profile: tenant-id: <tenant>storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}

Note

The values allowed for tenant-id are: common, organizations, consumers, or the tenant ID. For more information about these values, see the Used the wrong endpoint (personal and organization accounts) section of Error AADSTS50020 - User account from identity provider does not exist in tenant. For information on converting your single-tenant app, see Convert single-tenant app to multitenant on Microsoft Entra ID.

  • For credentials as service principal, configure the following properties in your application.yml file:

    spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID}client-secret: ${AZURE_CLIENT_SECRET}profile: tenant-id: <tenant>storage: queue: account-name: ${AZURE_STORAGE_QUEUE_ACCOUNT_NAME}

Note

The values allowed for tenant-id are: common, organizations, consumers, or the tenant ID. For more information about these values, see the Used the wrong endpoint (personal and organization accounts) section of Error AADSTS50020 - User account from identity provider does not exist in tenant. For information on converting your single-tenant app, see Convert single-tenant app to multitenant on Microsoft Entra ID.

  1. Create DefaultMessageHandler with the StorageQueueTemplate bean to send messages to Storage Queue.

    classDemo { privatestaticfinalStringSTORAGE_QUEUE_NAME = "example"; privatestaticfinalStringOUTPUT_CHANNEL = "output"; @Bean@ServiceActivator(inputChannel = OUTPUT_CHANNEL) publicMessageHandlermessageSender(StorageQueueTemplatestorageQueueTemplate) { DefaultMessageHandlerhandler = newDefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate); handler.setSendCallback(newListenableFutureCallback<Void>() { @OverridepublicvoidonSuccess(Voidresult) { LOGGER.info("Message was sent successfully."); } @OverridepublicvoidonFailure(Throwableex) { LOGGER.info("There was an error sending the message."); } }); returnhandler; } }
  2. Create a Message gateway binding with the above message handler via a message channel.

    classDemo { @AutowiredStorageQueueOutboundGatewaystorageQueueOutboundGateway; @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL) publicinterfaceStorageQueueOutboundGateway { voidsend(Stringtext); } }
  3. Send messages using the gateway.

    classDemo { publicvoiddemo() { this.storageQueueOutboundGateway.send(message); } }

Receive messages from Azure Storage Queue

  1. Fill the credential configuration options.

  2. Create a bean of message channel as the input channel.

    classDemo { privatestaticfinalStringINPUT_CHANNEL = "input"; @BeanpublicMessageChannelinput() { returnnewDirectChannel(); } }
  3. Create StorageQueueMessageSource with the StorageQueueTemplate bean to receive messages to Storage Queue.

    classDemo { privatestaticfinalStringSTORAGE_QUEUE_NAME = "example"; @Bean@InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000")) publicStorageQueueMessageSourcestorageQueueMessageSource(StorageQueueTemplatestorageQueueTemplate) { returnnewStorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate); } }
  4. Create a message receiver binding with StorageQueueMessageSource created in the last step via the message channel we created before.

    classDemo { @ServiceActivator(inputChannel = INPUT_CHANNEL) publicvoidmessageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointercheckpointer) { Stringmessage = newString(payload); LOGGER.info("New message received: '{}'", message); checkpointer.success() .doOnError(Throwable::printStackTrace) .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message)) .block(); } }

Samples

For more information, see the azure-spring-boot-samples repository on GitHub.

close