Consuming Twitter Streaming API with Spring Integration

October 04, 2018 · 8 min read – Last updated on October 26, 2019

1. Overview

Spring Integration has been known to have a myriad of connectors for interacting with external systems. Twitter was no exception and for a long time, as Spring Social had been an out-of-the-box solution which Spring Integration leveraged in order to connect to social networks.

1.1 Spring Social EOL

Unfortunately, Spring Social has reached its end of life, the project is now in maintenance mode. The reason why the Spring Team decided that they wouldn’t develop Spring Social further was that it became tedious to keep the API bindings in sync with the APIs of social networks.

Other than that, after Spring Framework 5 had been released, developers wished to leverage its reactive programming model and that would have required the team to re-implement a reactive Spring Social bindings next to the existing one.

Developers are now advised to either implement their own binding or use one of the purpose built libraries to connect to social networks.

1.2 Spring Integration’s Twitter module moved to extensions

The fact the Spring Social is now in maintenance mode forced the Spring Integration team to move the Twitter support module from the main project to the extensions. As Spring Social isn’t going to receive updates, it’s going to be built upon an earlier Spring Framework version. That would lead to class path conflict and would also hamper the development of Spring Integration.

Therefore, as of Spring Integration 5.1, the Twitter module is available as an extension.

1.3 What are the alternatives?

Twitter4J is an unofficial Java libary for Twitter’s API developed and maintained by Yusuke Yamamoto. The official HBC library (built by Twitter) is a Java HTTP Client for consuming Twitter’s Streaming API. The latter hasn’t seen major updates since 2016, while Twitter4J is receiving regular updates.

Implementing your own API binding is also an option. In Spring based projects using RestTemplate and is definitely an option and it’s an easy way to make REST calls.

This guide uses Twitter4J in streaming mode in a way that can be integrated into a Spring Integration message flow.

1.4 How does Twitter Streaming work?

In a nutshell, your app opens a single connection to to Twitter’s API and new results are sent through that connection whenever new matches occur. In contrast, the other way around is delivering data in batches through repeated requests to a REST API.

Streaming provides a low-latency delivery mechanism that can support very high throughput without having to deal with rate limiting.

2. Example project

The example project, which demonstrates the integration of Twitter’s Streaming API into a Spring Integration message flow, is available on GitHub: https://github.com/craftingjava/craftingjava-examples/tree/master/spring-integration/twitter-streaming.

2.1. Maven Dependencies

As Spring Social is EOL now, we won’t build upon it. All we pull in are spring-integration-core and twitter4j-stream.

<dependencies>
  <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
  </dependency>

  <dependency>
    <groupId>org.twitter4j</groupId>
    <artifactId>twitter4j-stream</artifactId>
    <version>4.0.1</version>
  </dependency>
</dependencies>

This project also uses Lombok and Spring Boot testing support, but these are optional.

2.3. Listenable message source with Spring Integration

Spring Integration provides support for implementing inbound message components. They’re divided into polling and listening behaviors.

The original Inbound Twitter Channel Adapter, that one which relies builds upon Spring Social and is now moved to the extensions, is a polling consumer. That is, you have to provide a poller configuration to use it. On the other hand, Twitter enforces rate limits in order to manage how often application can fetch updates. You should have taken rate limiting into consideration when the old Twitter Channel adapter was used, so that your configured poller intervals have been in compliance with the Twitter policies.

On the other hand, the listening inbound components are simpler and typically require only MessageProducerSupport to be implemented. Such a listening component looks like this.

public class MyMessageProducer extends MessageProducerSupport {

  public MyMessageProducer(MessageChannel outputChannel) {
    // Defining an output channel is required
    setOutputChannel(outputChannel);
  }

  @Override
  protected void onInit() {
    super.onInit();
    // Custom initialization - if applicable - comes here
  }

  @Override
  public void doStart() {
    // Lifecycle method for starting receiving messages
  }

  @Override
  public void doStop() {
    // Lifecycle method for stopping receiving messages
  }

  private void receiveMessage() {
    // Receive data from upstream service
    SomeData data = ...;

    // Convert it to a message as appropriate and send it out
    this.sendMessage(MessageBuilder.withPayload(data).build());
  }

}

There are only two required elements:

  • Output message channel has to be defined
  • sendMessage has to be called whenever the component receives a message

Optionally you might want to take control over the component’s initialization and manage its lifecycle.

As Twitter’s Streaming API is inherently message-driven, the listening behavior is a natural fit. Let’s see how Twitter4J can be incorporated in such a context.

2.4. Connect to Twitter Streaming API with Twitter4J

Twitter4J manages the nuances of connection handing and receiving updates from Twitter’s Streaming API. All we need to do is acquire a TwitterStream instance, attach a listener and define filtering.

2.4.1. Instantiate TwitterStream

Streaming examples on Twitter4J’s website suggest that a TwitterStream instance should be created through TwitterStreamFactory. That makes perfectly sense, however in a Spring application context we want it to be a managed bean.

Spring’s FactoryBean facility is clean and easy way to contain the details of making a singleton TwitterStream instance.

public class TwitterStreamFactory extends AbstractFactoryBean<TwitterStream> {

  @Override
  public Class<?> getObjectType() {
    return TwitterStream.class;
  }

  @Override
  protected TwitterStream createInstance() {
    return new twitter4j.TwitterStreamFactory().getInstance();
  }

  @Override
  protected void destroyInstance(TwitterStream twitterStream) {
    twitterStream.shutdown();
  }

}

Although we could also expose it as a regular bean without being created by a FactoryBean, that wouldn’t take care of properly shutting it down.

2.4.2. Attaching a listener and defining filtering

That’s going to be the responsibility of our custom MessageProducer implementation.

@Slf4j
public class TwitterMessageProducer extends MessageProducerSupport {

  private final TwitterStream twitterStream;

  private List<Long> follows;
  private List<String> terms;

  private StatusListener statusListener;
  private FilterQuery filterQuery;

  public TwitterMessageProducer(TwitterStream twitterStream, MessageChannel outputChannel) {
    this.twitterStream = twitterStream;
    setOutputChannel(outputChannel);
  }

  @Override
  protected void onInit() {
    super.onInit();

    statusListener = new StatusListener();

    long[] followsArray = null;

    if (!CollectionUtils.isEmpty(follows)) {
      followsArray = new long[follows.size()];
      for (int i = 0; i < follows.size(); i++) {
        followsArray[i] = follows.get(i);
      }
    }

    String[] termsArray = null;
    if (!CollectionUtils.isEmpty(terms)) {
      termsArray = terms.toArray(new String[0]);
    }

    filterQuery = new FilterQuery(0, followsArray, termsArray);
  }

  @Override
  public void doStart() {
    twitterStream.addListener(statusListener);
    twitterStream.filter(filterQuery);
  }

  @Override
  public void doStop() {
    twitterStream.cleanUp();
    twitterStream.clearListeners();
  }

  public void setFollows(List<Long> follows) {
    this.follows = follows;
  }

  public void setTerms(List<String> terms) {
    this.terms = terms;
  }

  StatusListener getStatusListener() {
    return statusListener;
  }

  FilterQuery getFilterQuery() {
    return filterQuery;
  }

  class StatusListener extends StatusAdapter {

    @Override
    public void onStatus(Status status) {
      sendMessage(MessageBuilder.withPayload(status).build());
    }

    @Override
    public void onException(Exception ex) {
      log.error(ex.getMessage(), ex);
    }

    @Override
    public void onStallWarning(StallWarning warning) {
      log.warn(warning.toString());
    }

  }
}

Lifecycle methods provided by MessageProducerSupport and TwitterStream‘s management interface play nicely together. That’s also going to enable us to stop and start the component at runtime when needed.

2.4.3. Java Configuration

Although Spring could auto-wire components, I still prefer controlling dependencies with manual configuration.

@Slf4j
@Configuration
public class TwitterConfig {

  @Bean
  TwitterStreamFactory twitterStreamFactory() {
    return new TwitterStreamFactory();
  }

  @Bean
  TwitterStream twitterStream(TwitterStreamFactory twitterStreamFactory) {
    return twitterStreamFactory.getInstance();
  }

  @Bean
  MessageChannel outputChannel() {
    return MessageChannels.direct().get();
  }

  @Bean
  TwitterMessageProducer twitterMessageProducer(
      TwitterStream twitterStream, MessageChannel outputChannel) {

    TwitterMessageProducer twitterMessageProducer =
        new TwitterMessageProducer(twitterStream, outputChannel);

    twitterMessageProducer.setTerms(Arrays.asList("java", "microservices", "spring"));

    return twitterMessageProducer;
  }

  @Bean
  IntegrationFlow twitterFlow(MessageChannel outputChannel) {
    return IntegrationFlows.from(outputChannel)
        .transform(Status::getText)
        .handle(m -> log.info(m.getPayload().toString()))
        .get();
  }

}

Important part here is that how our custom message producer integrates with a message flow. Basically, we don’t need to do anything, other than listing to messages at the producer’s output channel.

2.5. Testing

Only Chuck Norris tests code in production. However, ordinary mortal folks like you and me, we do write test cases.

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TestConfig.class)
public class TwitterMessageProducerTest {

  @MockBean
  private TwitterStream twitterStream;

  @Autowired
  private PollableChannel outputChannel;

  @Autowired
  private TwitterMessageProducer twitterMessageProducer;

  @Test
  public void shouldBeInitialized() {
    StatusListener statusListener = twitterMessageProducer.getStatusListener();
    verify(twitterStream).addListener(statusListener);

    FilterQuery filterQuery = twitterMessageProducer.getFilterQuery();
    verify(twitterStream).filter(filterQuery);
  }

  @Test
  public void shouldReceiveStatus() {
    StatusListener statusListener = twitterMessageProducer.getStatusListener();

    Status status = mock(Status.class);
    statusListener.onStatus(status);

    Message<?> statusMessage = outputChannel.receive();
    assertSame(status, statusMessage.getPayload());
  }

  @Import(TwitterConfig.class)
  static class TestConfig {

    @Bean
    MessageChannel outputChannel() {
      return MessageChannels.queue(1).get();
    }

  }

}

I like Twitter4J’s design, because it leverages interfaces. Most of the important parts of the library are exposed as ordinary interfaces. TwitterStream is no exception to that. That is, it can be mocked out easily in test cases.

6. Conclusion

  • Spring Social is EoL now – it’s not going to receive new features
  • Spring Integration’s Twitter module is available as an extension – it’s been moved out from the main project.
  • Twitter Inbound Channel adapter is a polling consumer – you have to deal with rate limiting when choosing your poll interval
  • Twitter’s Streaming API fits with the listening behavior of an inbound channel adapter