Archive

Category Archives for "Tutorials"

Tutorials / How-Tos

Using Java Flight Recorder with OpenJDK 11

Java Flight Recorder (JFR) used to be a commercial add-on of the Oracle JDK. As it’s been open sourced recently along with Java Mission Control, everyone using OpenJDK 11 can now troubleshoot their Java apps with this excellent tool for free of charge. JFR, being a proprietary solution formerly, might be lesser known for those relying on previous versions of OpenJDK. Therefore, I thought it was worth writing a fresh post on using JFR with OpenJDK 11.

Continue reading

Consuming Twitter Streaming API with Spring Integration

1. Overview

Spring Integration has been known to have a myriad of connectors for interacting with external systems. Twitter was no exception and for a long time, as Spring Social had been an out-of-the-box solution which Spring Integration leveraged in order to connect to social networks.

1.1 Spring Social EOL

Unfortunately, Spring Social has reached its end of life, the project is now in maintenance mode. The reason why the Spring Team decided that they wouldn’t develop Spring Social further was that it became tedious to keep the API bindings in sync with the APIs of social networks.

Other than that, after Spring Framework 5 had been released, developers wished to leverage its reactive programming model and that would have required the team to re-implement a reactive Spring Social bindings next to the existing one.

Developers are now advised to either implement their own binding or use one of the purpose built libraries to connect to social networks.

1.2 Spring Integration’s Twitter module moved to extensions

The fact the Spring Social is now in maintenance mode forced the Spring Integration team to move the Twitter support module from the main project to the extensions. As Spring Social isn’t going to receive updates, it’s going to be built upon an earlier Spring Framework version. That would lead to class path conflict and would also hamper the development of Spring Integration.

Therefore, as of Spring Integration 5.1, the Twitter module is available as an extension.

1.3 What are the alternatives?

Twitter4J is an unofficial Java libary for Twitter’s API developed and maintained by Yusuke Yamamoto. The official HBC library (built by Twitter) is a Java HTTP Client for consuming Twitter’s Streaming API. The latter hasn’t seen major updates since 2016, while Twitter4J is receiving regular updates.

Implementing your own API binding is also an option. In Spring based projects using RestTemplate and is definitely an option and it’s an easy way to make REST calls.

This guide uses Twitter4J in streaming mode in a way that can be integrated into a Spring Integration message flow.

1.4 How does Twitter Streaming work?

In a nutshell, your app opens a single connection to to Twitter’s API and new results are sent through that connection whenever new matches occur. In contrast, the other way around is delivering data in batches through repeated requests to a REST API.

Streaming provides a low-latency delivery mechanism that can support very high throughput without having to deal with rate limiting.

2. Example project

The example project, which demonstrates the integration of Twitter’s Streaming API into a Spring Integration message flow, is available on GitHubhttps://github.com/springuni/springuni-examples/tree/master/spring-integration/twitter-streaming.

2.1. Maven Dependencies

As Spring Social is EOL now, we won’t build upon it. All we pull in are spring-integration-core and twitter4j-stream.

  <dependencies>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

    <dependency>
      <groupId>org.twitter4j</groupId>
      <artifactId>twitter4j-stream</artifactId>
      <version>4.0.1</version>
    </dependency>
  </dependencies>

This project also uses Lombok and Spring Boot testing support, but these are optional.

2.3. Listenable message source with Spring Integration

Spring Integration provides support for implementing inbound message components. They’re divided into polling and listening behaviors.

The original Inbound Twitter Channel Adapter, that one which relies builds upon Spring Social and is now moved to the extensions, is a polling consumer. That is, you have to provide a poller configuration to use it. On the other hand, Twitter enforces Rate Limits in order to manage how often application can fetch updates. You should have taken Rate Limiting into consideration when the old Twitter Channel adapter was used, so that your configured poller intervals have been in compliance with the Twitter policies.

On the other hand, the listening inbound components are simpler and typically require only MessageProducerSupport to be implemented. Such a listening component looks like this.

public class MyMessageProducer extends MessageProducerSupport {

  public MyMessageProducer(MessageChannel outputChannel) {
    // Defining an output channel is required
    setOutputChannel(outputChannel);
  }

  @Override
  protected void onInit() {
    super.onInit();
    // Custom initialization - if applicable - comes here
  }

  @Override
  public void doStart() {
    // Lifecycle method for starting receiving messages
  }

  @Override
  public void doStop() {
    // Lifecycle method for stopping receiving messages
  }

  private void receiveMessage() {
    // Receive data from upstream service
    SomeData data = ...;

    // Convert it to a message as appropriate and send it out
    this.sendMessage(MessageBuilder.withPayload(data).build());
  }

}

There are only two required elements:

  • Output message channel has to be defined
  • sendMessage has to be called whenever the component receives a message

Optionally you might want to take control over the component’s initialization and manage its lifecycle.

As Twitter’s Streaming API is inherently message-driven, the listening behavior is a natural fit. Let’s see how Twitter4J can be incorporated in such a context.

2.4. Connect to Twitter Streaming API with Twitter4J

Twitter4J manages the nuances of connection handing and receiving updates from Twitter’s Streaming API. All we need to do is acquire a TwitterStream instance, attach a listener and define filtering.

2.4.1. Instantiate TwitterStream

Streaming examples on Twitter4J’s website suggest that a TwitterStream instance should be created through TwitterStreamFactory. That makes perfectly sense, however in a Spring application context we want it to be a managed bean.

Spring’s FactoryBean facility is clean and easy way to contain the details of making a singleton TwitterStream instance.

public class TwitterStreamFactory extends AbstractFactoryBean<TwitterStream> {

  @Override
  public Class<?> getObjectType() {
    return TwitterStream.class;
  }

  @Override
  protected TwitterStream createInstance() {
    return new twitter4j.TwitterStreamFactory().getInstance();
  }

  @Override
  protected void destroyInstance(TwitterStream twitterStream) {
    twitterStream.shutdown();
  }

}

Although we could also expose it as a regular bean without being created by a FactoryBean, that wouldn’t take care of properly shutting it down.

2.4.2. Attaching a listener and defining filtering

That’s going to be the responsibility of our custom MessageProducer implementation.

@Slf4j
public class TwitterMessageProducer extends MessageProducerSupport {

  private final TwitterStream twitterStream;

  private List<Long> follows;
  private List<String> terms;

  private StatusListener statusListener;
  private FilterQuery filterQuery;

  public TwitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {
    this.twitterStream = twitterStream;
    setOutputChannel(outputChannel);
  }

  @Override
  protected void onInit() {
    super.onInit();

    statusListener = new StatusListener();

    long[] followsArray = null;

    if (!CollectionUtils.isEmpty(follows)) {
      followsArray = new long[follows.size()];
      for (int i = 0; i < follows.size(); i++) {
        followsArray[i] = follows.get(i);
      }
    }

    String[] termsArray = null;
    if (!CollectionUtils.isEmpty(terms)) {
      termsArray = terms.toArray(new String[0]);
    }

    filterQuery = new FilterQuery(0, followsArray, termsArray);
  }

  @Override
  public void doStart() {
    twitterStream.addListener(statusListener);
    twitterStream.filter(filterQuery);
  }

  @Override
  public void doStop() {
    twitterStream.cleanUp();
    twitterStream.clearListeners();
  }

  public void setFollows(List<Long> follows) {
    this.follows = follows;
  }

  public void setTerms(List<String> terms) {
    this.terms = terms;
  }

  StatusListener getStatusListener() {
    return statusListener;
  }

  FilterQuery getFilterQuery() {
    return filterQuery;
  }

  class StatusListener extends StatusAdapter {

    @Override
    public void onStatus(Status status) {
      sendMessage(MessageBuilder.withPayload(status).build());
    }

    @Override
    public void onException(Exception ex) {
      log.error(ex.getMessage(), ex);
    }

    @Override
    public void onStallWarning(StallWarning warning) {
      log.warn(warning.toString());
    }

  }
}

Lifecycle methods provided by MessageProducerSupport and TwitterStream‘s management interface play nicely together. That’s also going to enable us to stop and start the component at runtime when needed.

2.4.3. Java Configuration

Although Spring could auto-wire components, I still prefer controlling dependencies with manual configuration.

@Slf4j
@Configuration
public class TwitterConfig {

  @Bean
  TwitterStreamFactory twitterStreamFactory() {
    return new TwitterStreamFactory();
  }

  @Bean
  TwitterStream twitterStream(TwitterStreamFactory twitterStreamFactory) {
    return twitterStreamFactory.getInstance();
  }

  @Bean
  MessageChannel outputChannel() {
    return MessageChannels.direct().get();
  }

  @Bean
  TwitterMessageProducer twitterMessageProducer(
      TwitterStream twitterStream, MessageChannel outputChannel) {

    TwitterMessageProducer twitterMessageProducer =
        new TwitterMessageProducer(twitterStream, outputChannel);

    twitterMessageProducer.setTerms(Arrays.asList("java", "microservices", "spring"));

    return twitterMessageProducer;
  }

  @Bean
  IntegrationFlow twitterFlow(MessageChannel outputChannel) {
    return IntegrationFlows.from(outputChannel)
        .transform(Status::getText)
        .handle(m -> log.info(m.getPayload().toString()))
        .get();
  }

}

Important part here is that how our custom message producer integrates with a message flow. Basically, we don’t need to do anything, other than listing to messages at the producer’s output channel.

2.5. Testing

Only Chuck Norris tests code in production. However, ordinary mortal folks like you and me, we do write test cases.

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TestConfig.class)
public class TwitterMessageProducerTest {

  @MockBean
  private TwitterStream twitterStream;

  @Autowired
  private PollableChannel outputChannel;

  @Autowired
  private TwitterMessageProducer twitterMessageProducer;

  @Test
  public void shouldBeInitialized() {
    StatusListener statusListener = twitterMessageProducer.getStatusListener();
    verify(twitterStream).addListener(statusListener);

    FilterQuery filterQuery = twitterMessageProducer.getFilterQuery();
    verify(twitterStream).filter(filterQuery);
  }

  @Test
  public void shouldReceiveStatus() {
    StatusListener statusListener = twitterMessageProducer.getStatusListener();

    Status status = mock(Status.class);
    statusListener.onStatus(status);

    Message<?> statusMessage = outputChannel.receive();
    assertSame(status, statusMessage.getPayload());
  }

  @Import(TwitterConfig.class)
  static class TestConfig {

    @Bean
    MessageChannel outputChannel() {
      return MessageChannels.queue(1).get();
    }

  }

}

I like Twitter4J’s design, because it leverages interfaces. Most of the important parts of the library are exposed as ordinary interfaces. TwitterStream is no exception to that. That is, it can be mocked out easily in test cases.

 6. Conclusion

  • Spring Social is EoL now – it’s not going to receive new features
  • Spring Integration’s Twitter module is available as an extension – it’s been moved out from the main project.
  • Twitter Inbound Channel adapter is a polling consumer – you have to deal with rate limiting when choosing your poll interval
  • Twitter’s Streaming API fits with the listening behavior of an inbound channel adapter

Bootiful GCP: Spring Cloud Stream with Google Cloud Pub/Sub

I’ve recently read Josh Long‘s Bootiful GCP series on Sprint Central’s engineering blog and especially liked the 4th part about using Google Cloud’s Pub/Sub. I felt inspired by the series and as I’m also evaluating Spring Cloud Stream for a new project of mine. I thought, I would expand on that article where Josh left off. This article describes how to use Spring Cloud Stream with Google Cloud Pub/Sub for implementing a simple producer and a consumer application.

Continue reading

1

How to consume large SQS messages with JMS and Spring Boot

Spring Boot became ubiquitous in recent years and provided an opinionated way of integrating various pieces of technology. Working with JMS is no exception to that. Altought Amazon has got it own Java API for interacting with SQS, using it through JMS ensures that we’ll be able to use the same piece of code with another messaging infrastructure. After taking a look at a basic message consumer and producer setup, we dive into a more advanced use case, which is consuming large messages.

Get the working example

If you’re like most people, perhaps you’d like to see the big picture first and go into the details after that. I prepared a fully working example, which is available on GitHub and here you go.

% git clone git@github.com:springuni/springuni-examples.git
% cd spring-boot/spring-jms-sqs

Create a local configuration file .env with the following contents in the root of the project (spring-jms-sqs).

AMAZON_SQS_ENDPOINT=https://sqs.us-east-1.amazonaws.com/XXXXXXX/queue-name
AMAZON_SQS_ACCESS_KEY=XXXXXXXX
AMAZON_SQS_SECRET_KEY=XXXXXXXX
AMAZON_SQS_EXT_S3_ACCESS_KEY=XXXXXXXX
AMAZON_SQS_EXT_S3_SECRET_KEY=XXXXXXXX
AMAZON_SQS_EXT_S3_BUCKET_NAME=queue-name-large-payloads

After starting the demo app, it will listen on port 5000. I would have liked to keep it simple and it just handles bare text messages. Spring’s automated message converter infrastructure doesn’t play a role here, as the article’s focus is how to deal with large SQS messages.

% mvn spring-boot:run
% curl -H 'Content-Type: text/plain' http://localhost:5000/message -XPOST -d 'test'

If everything went well, you should see the following messages.

2017-08-09 19:13:01.443 INFO 29525 --- [nio-5000-exec-1] c.s.examples.jms.MessageProducer : Sending message test.
2017-08-09 19:13:02.069 INFO 29525 --- [nio-5000-exec-1] c.a.s.javamessaging.SQSMessageProducer : Message sent to SQS with SQS-assigned messageId: 1001f7ba-55fc-4bdb-8732-8f4d40343068
2017-08-09 19:13:02.069 INFO 29525 --- [nio-5000-exec-1] com.amazon.sqs.javamessaging.SQSSession : Shutting down SessionCallBackScheduler executor
2017-08-09 19:13:02.243 INFO 29525 --- [enerContainer-1] c.s.examples.jms.MessageConsumer : Received message test

I’m hoping you’re still with me and interested in seeing the details.

Nuances of working with SQS

SQS is an odd-one out from the point of view how message brokers operate in general.

SQS maximizes the size of messages at 256K

For exchanging data in larger chunks than that can be achieved in various ways. When I first faced this limitation, I applied gzip compression and encoded the compressed binary data with base64. That solution worked just fine for textual (JSON) data, however it required customization on both the producer’s and the consumer’s side. Furthermore compression itself doesn’t guarantee that the size of messages to be sent will never exceed the 256K limit. Amazon SDK provides an extended SQS client, which enables end users to exchange messages larger than 256K transparently without having to apply customization themselves. The extened SQS client leverages S3 for storing messages and only a reference to the stored object is being sent to queues.

SQS isn’t transactional

However, Spring Boot  tries to set up JmsListenerContainerFactory as transactional. When JMS autoconfiguration is enabled JmsAnnotationDrivenConfiguration delegates configuring a DefaultJmsListenerContainerFactory to DefaultJmsListenerContainerFactoryConfigurer and that expects either a JtaTransactionManager be present or is set the container factory’s sessionTransacted property to true.

Annotation driven message listener configuration requires the queue name to be defined upfront

Arbitrary methods of a managed bean can be annotated with @JmsListener(destination = "queueName") or alternatively javax.jms.MessageListener can be implemented instead. Nevertheless, going for the first option is much more convenient as Spring intelligently converts the received message to various user defined data type through its MessageConverter infrastructure.

In SQS endpoint URLs identify queues and they also contain the queue’s name.
Such an URL looks like this: https://sqs.<region>.amazonaws.com/<acctount- id>/<queue-name>.

Obviously, we can extract the queue name from an URL like this, however the way JMS can be setup with Spring Boot requires you to define the queue’s name directly.

In order to do be able to leverage Spring’s messaging infrastructure without having to hard code a JMS destination in the message consumer or having to repeat the queue’s name in the application’s configuration, we need to implement a custom DestinationResolver. That DestinationResolver will eventually parse the endpoints URL of a SQS queue and we’ll have to fiddle with only a single application property.

Basic setup with SQS

We continue from that point where Messaging with JMS left off. That guide doesn’t configure ConnectionFactory explicitely in which case Spring Boot configures and embedded ActiveMQ broker.

Maven dependencies

For the showcasing how Amazon’s JMS messaging library plays with Spring, we need to setup the following dependencies.

<dependencies>
  <dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-sqs-java-extended-client-lib</artifactId>
    <version>${amazon-sqs-java-extended-client-lib.version}</version>
  </dependency>

  <dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-sqs-java-messaging-lib</artifactId>
    <version>${amazon-sqs-java-messaging-lib.version}</version>
  </dependency>

  <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
  </dependency>

  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
</dependencies>

Basically amazon-sqs-java-extended-client-lib is only required if you would like to send large messages and 256K might not be enough. For basic use cases however, you can omit that.

The example is using spring-boot-starter-web, because we produce and consume messages in the same application. Real world solutions however have these functionalities separated.

Producing JMS messages

For the sake of simplicity, we’ll be exchanging simple text messages. The aforementioned official Spring tutorial covers sending structured messages and it also explains how JSON messages are getting converted to/from simple Java POJOs.

We’re however focusing on the details of integrating SQS as our message broker instead.

For producing messages a simple REST controller (MessageProducer) is used which in turn puts the HTTP request’s body to an SQS queue. It’s fairly trivial to do just that, eventually there’s nothing specific to SQS in that piece of code.

@RestController
@Slf4j
public class MessageProducer {

  private final JmsOperations jmsTemplate;

  @Autowired
  public MessageProducer(JmsOperations jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
  }

  @PostMapping("/message")
  public ResponseEntity sendMessage(@RequestBody String message) {
    log.info("Sending message {}.", message);
    jmsTemplate.convertAndSend(message);
    return ResponseEntity.ok().build();
  }

}

Receiving JMS messages

Altought receiving messages (MessageConsumer) looks equally trivial at a first sight, the challenge here was to eliminate the requirement of having to define a hard-coded message destination.

@Component
@Slf4j
public class MessageConsumer {

  @JmsListener(destination = "")
  public void receive(@Payload String message) {
    log.info("Received message {}.", message);
  }

}

Spring gives support for resolving destinations based on that destination name which is supplied by the destination attribute of the @JmsListener annotation (DynamicDestinationResolver). As the application is listening on messages coming from a single queue, we don’t want to do that. Instead the queue’s name is to be determined at that time when the application boots.

StaticDestinationResolver gets initialized with a fixed queue name and it resolves destinations against that same queue name every time.

public class StaticDestinationResolver implements DestinationResolver {

  private final String queueName;

  public StaticDestinationResolver(String queueName) {
    this.queueName = queueName;
  }

  @Override
  public Destination resolveDestinationName(
      Session session, String destinationName, boolean pubSubDomain) throws JMSException {

    return session.createQueue(queueName);
  }

}

SQS configuration

SqsProperties encapsulate all of the required properties required for creating an SQS client. Basically we need to define an AWS region, queue endpoint URL and AWS access/secret keys, thought the two latter can be omitted if the underlying EC2 container has got the necessary IAM roles.

@Data
@ConfigurationProperties(prefix = "amazon.sqs")
public class SqsProperties {

  private String region;
  private String endpoint;
  private String accessKey;
  private String secretKey;

  private Integer numberOfMessagesToPrefetch;

  private Extended extended = new Extended();

  public Optional<Integer> getNumberOfMessagesToPrefetch() {
    return Optional.ofNullable(numberOfMessagesToPrefetch);
  }

  public String getQueueName() {
    URI endpointUri = URI.create(endpoint);
    String path = endpointUri.getPath();
    int pos = path.lastIndexOf('/');
    return path.substring(pos + 1);
  }

  @Data
  public static class Extended {

    private String s3Region;
    private String s3BucketName;
    private String s3AccessKey;
    private String s3SecretKey;

  }

}

In application.yml these properties are mapped to individual, upper cased application configuration options. Most of the time these kind of applications are deployed on EB (Elasticbeantalk) or ECS (Elastic Container Services) which supply configuration data as environment variables.

All of the configurations steps below are taken from AbstractSqsConfiguration. They were implemented as reusable building blocks and we would be inspecting them one-by-one in what follows.

Creating an AWS credentials provider

Connecting to AWS usually starts with authenticating to one of its services. As I mentioned above, the steps of having to supply access and secret keys can be omitted in case of IAM roles, yet creating an AWSCredentialsProvider (one way or another) is necessary.

private AWSCredentialsProvider createAwsCredentialsProvider(
    String localAccessKey, String localSecretKey) {

  AWSCredentialsProvider ec2ContainerCredentialsProvider =
      new EC2ContainerCredentialsProviderWrapper();

  if (StringUtils.isEmpty(localAccessKey) || StringUtils.isEmpty(localSecretKey)) {
    return ec2ContainerCredentialsProvider;
  }

  AWSCredentialsProvider localAwsCredentialsProvider =
      new AWSStaticCredentialsProvider(
          new BasicAWSCredentials(localAccessKey, localSecretKey));

  return new AWSCredentialsProviderChain(
      localAwsCredentialsProvider, ec2ContainerCredentialsProvider);
}

When access and secret keys are supplied we try to authenticate with those static credentials first with a fallback to fetching credentials from ECS directly.

Creating a SQS client

Once we have an AWSCredentialsProvider at hand, using the AmazonSQSClientBuilder for making a client instance is straight forward.

private AmazonSQS createAmazonSQSClient(SqsProperties sqsProperties) {
  Regions region = Regions.fromName(sqsProperties.getRegion());

  EndpointConfiguration endpointConfiguration = new EndpointConfiguration(
      sqsProperties.getEndpoint(), region.getName());

  AWSCredentialsProvider awsCredentialsProvider = createAwsCredentialsProvider(
      sqsProperties.getAccessKey(),
      sqsProperties.getSecretKey()
  );

  return AmazonSQSClientBuilder
      .standard()
      .withCredentials(awsCredentialsProvider)
      .withEndpointConfiguration(endpointConfiguration)
      .build();
}

Creating a SQS connection factory

JMS programming model

JMS programming model (figure created by javatpoint.com)

SQSConnectionFactory is the concrete implementation of javax.jms.ConnectionFactory and as such it’s the gateway between the standard JMS API and native access to SQS through its Java SDK.

protected SQSConnectionFactory createStandardSQSConnectionFactory(SqsProperties sqsProperties) {
  AmazonSQS sqsClient = createAmazonSQSClient(sqsProperties);

  ProviderConfiguration providerConfiguration = new ProviderConfiguration();
  sqsProperties.getNumberOfMessagesToPrefetch()
      .ifPresent(providerConfiguration::setNumberOfMessagesToPrefetch);

  return new SQSConnectionFactory(providerConfiguration, sqsClient);
}

After we’ve had an AmazonSQSClient instance created in the former step, we need a ProviderConfiguration object as well in order to set the number of message to be pre-fetched.

JMS configuration

Spring Boot makes working with JMS very easy. Under normal circumstances it’s perfectly enough to create register a single javax.jms.ConnectionFactory bean and then it takes care of creating a message listener container and a JmsTemplate.

@Bean
@Override
public ConnectionFactory connectionFactory(SqsProperties sqsProperties) {
  return createStandardSQSConnectionFactory(sqsProperties);
}

In case of SQS however, there some nuances we should take care of ourselves. I mentioned that SQS wasn’t transactional, but Spring Boot’s autoconfiguration mechanism tried to create a listener container factory that way. JmsTemplate needs a destination and and that can be extracted from the given SQS endpoint URL and also it’s more convenient to have the queue name resolved automatically in contrast to having to hard-code it with @JmsListner.

@Bean
public DestinationResolver destinationResolver(SqsProperties sqsProperties) {
  return new StaticDestinationResolver(sqsProperties.getQueueName());
}

@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(
    ConnectionFactory connectionFactory, DestinationResolver destinationResolver) {

  DefaultJmsListenerContainerFactory jmsListenerContainerFactory =
      new DefaultJmsListenerContainerFactory();
  jmsListenerContainerFactory.setConnectionFactory(connectionFactory);
  jmsListenerContainerFactory.setDestinationResolver(destinationResolver);
  jmsListenerContainerFactory.setSessionTransacted(false);

  return jmsListenerContainerFactory;
}

@Bean
public JmsTemplate jmsTemplate(
    SqsProperties sqsProperties, ConnectionFactory connectionFactory) {

  JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
  jmsTemplate.setDefaultDestinationName(sqsProperties.getQueueName());
  return jmsTemplate;
}

Using the SQS extended client

We’ve convered how to setup a basic JMS config, if you want to produce and consume message larger than 256K, keep reading.

Creating an S3 client

Amazon SQS client relies on S3 as a means of persisting large messages and only a reference is sent over SQS. In order to be able to leverage S3, a bucket and credetials for accessing S3 are required. The following piece of code demonstrates how an AmazonS3Client can be built.

private AmazonS3 createAmazonS3Client(SqsProperties sqsProperties) {
  AWSCredentialsProvider awsCredentialsProvider = createAwsCredentialsProvider(
      sqsProperties.getAccessKey(),
      sqsProperties.getSecretKey()
  );

  Regions region = Regions.fromName(sqsProperties.getRegion());

  AmazonS3 amazonS3Client = AmazonS3ClientBuilder
      .standard()
      .withCredentials(awsCredentialsProvider)
      .withRegion(region)
      .build();

  String s3BucketName = sqsProperties.getExtended().getS3BucketName();

  if (!amazonS3Client.doesBucketExist(s3BucketName)) {
    amazonS3Client.createBucket(s3BucketName);

    BucketLifecycleConfiguration.Rule expirationRule =
        new BucketLifecycleConfiguration.Rule()
            .withExpirationInDays(14).withStatus("Enabled");

    BucketLifecycleConfiguration lifecycleConfig =
        new BucketLifecycleConfiguration().withRules(expirationRule);

    amazonS3Client.setBucketLifecycleConfiguration(s3BucketName, lifecycleConfig);
  }

  return amazonS3Client;
}

Creating the bucket automatically upon the first initialization of the app is also taken care of, although that’s optional.

Creating an extended SQS client

This is the key step for enable large message handling. Basically we need an AmazonS3Client and an AmazonSQSClient and we covered that already how to create them.

protected SQSConnectionFactory createExtendedSQSConnectionFactory(SqsProperties sqsProperties) {
  AmazonS3 s3Client = createAmazonS3Client(sqsProperties);

  ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration()
      .withLargePayloadSupportEnabled(s3Client, sqsProperties.getExtended().getS3BucketName());

  ProviderConfiguration providerConfiguration = new ProviderConfiguration();
  sqsProperties.getNumberOfMessagesToPrefetch()
      .ifPresent(providerConfiguration::setNumberOfMessagesToPrefetch);

  AmazonSQS sqsClient = createAmazonSQSClient(sqsProperties);

  return new SQSConnectionFactory(
      providerConfiguration,
      new AmazonSQSExtendedClient(sqsClient, extendedClientConfig)
  );
}

Thereafter these two are linked through AmazonSQSExtendedClient and ExtendedClientConfiguration provides a way to customize how large messages should be handled. By default, only messages larger than 256K will be sent over S3, but a user-defined message size threshold can also be specified. It’s also possible to configure it in a way that all of the messages go through S3 regardless of their size.

Conclusion

I read somewhere that Spring makes simple things easy and complex things possible. This assertion proved to be true on many occasions (including this one), when I was dealing with configuring a sophisticated application infrastructure with it.

It took me approximately a half day of tweaking to get every aspect SQS large message handling with JMS and Spring Boot right. Eventually difficulties stemmed from that fact that SQS is a bit different from other message queuing solutions. For example, as I mentioned earlier, it isn’t transactional, but Spring Boot – being an opinionated framework – tries to configure it that way, as most messages brokers are transactional.

So in conclusion I can say that this setup haven’t been working just fine for a couple of month in three applications using SQS with large messages.

Don’t want to miss a thing?

If you like Java and Spring as much as I do, you might consider signing up for my newsletters.

Introduction to HTTP/2 support in Java 9

The IETF streaming group approved the HTTP/2 protocol in 2015, sixteen years after HTTP/1.1 had been released. HTTP/2 comes with the promise of lowering latency and makes many of those workarounds obsolete which were necessary for HTTP/1.1 in order to be able to keep up with today’s response time requirements. In this article, I introduce HTTP/2 briefly and how it renews the text-based HTTP/1.1 and then we look into the upcoming HTTP/2 support in Java 9.

Continue reading

2

This is how you can setup a Maven project for Java 9

One of the most anticipated events of the Java world is going to be when version 9 gets released on the 21st of September*.  It’ll be a game changer, no doubt about that, primarily due to its module system. It can put an end to the jar-hell we might have been facing for a long time. In this short tutorial, I’d like to show you how a new Java 9 enabled Maven project can be configured.

Continue reading

How to post JSON-serialized form data with RestTemplate

RestTemplate is often used for consuming RESTful web services from Spring applications. It’s little known however, how to use it for more advanced use cases, that is, when you need to combine POSTing form data along with a JSON-serialized data. In this guide we’ll explore the key points of being able to do that, I’ll also show you how to unit test your REST client.

Continue reading

How to deploy a Spring Boot Application to Google App Engine

Let me show you how you what possibilities there are to deploy Spring Boot application to Google’s Cloud Platform. Google has recently introduced their free tier which gives you $300 worth of credit for a year and also under certain usage limits their services remain free. This is similar what Amazon has been offering for a wile, but goes even beyond that.

Continue reading