Using a Spring Cloud App Starter

1. Introduction

In this article, we’ll demonstrate how to use the Spring Cloud App starters – which provide bootstrapped and ready-to-go applications – that can serve as starting points for future development.

Simply put, Task App Starters are dedicated for use-cases like database migration and distributed testing, and Stream App Starters provide integrations with external systems.

Overall, there are over 55 starters; check out the official documentation here and here for more information about these two.

Next, we’ll build a small distributed Twitter application that will stream Twitter posts into a Hadoop Distributed File System.

2. Getting Setup

We’ll use the consumer-key and access-token to create a simple Twitter app.

Then, we’ll set up Hadoop so we can persist our Twitter Stream for future Big Data purposes.

Lastly, we have the option to either use the supplied Spring GitHub repositories to compile and assemble standalone components of the sourcesprocessors-sinks architecture pattern using Maven or combine sourcesprocessors, and sinks through their Spring Stream binding interfaces.

We’ll take a look at both ways to do this.

It’s worth noting that, formerly, all Stream App Starters were collated into one large repo at github.com/spring-cloud/spring-cloud-stream-app-starters. Each Starter has been simplified and isolated.

3. Twitter Credentials

First, let’s set up our Twitter Developer credentials. To get Twitter developer credentials, follow the steps to set up an app and create an access token from the official Twitter developer documentation.

Specifically, we’ll need:

  1. Consumer Key
  2. Consumer Key Secret
  3. Access Token Secret
  4. Access Token

Make sure to keep that window open or jot those down since we’ll be using those below!

4. Installing Hadoop

Next, let’s install Hadoop! We can either follow the official documentation or simply leverage Docker:

$ sudo docker run -p 50070:50070 sequenceiq/hadoop-docker:2.4.1

5. Compiling Our App Starters

To use freestanding, fully individual components, we can download and compile desired Spring Cloud Stream App Starters individually from their GitHub repositories.

5.1. Twitter Spring Cloud Stream App Starter

Let’s add the Twitter Spring Cloud Stream App Starter (org.springframework.cloud.stream.app.twitterstream.source) to our project:

git clone https://github.com/spring-cloud-stream-app-starters/twitter.git

Then, we run Maven:

./mvnw clean install -PgenerateApps

The resulting compiled Starter App will be available in ‘/target’ of the local project root.

Then we can run that compiled .jar and pass in the relevant application properties like so:

java -jar twitter_stream_source.jar --consumerKey=<CONSUMER_KEY> --consumerSecret=<CONSUMER_SECRET> \
    --accessToken=<ACCESS_TOKEN> --accessTokenSecret=<ACCESS_TOKEN_SECRET>

We can also pass our credentials using the familiar Spring application.properties:

twitter.credentials.access-token=...
twitter.credentials.access-token-secret=...
twitter.credentials.consumer-key=...
twitter.credentials.consumer-secret=...

5.2. HDFS Spring Cloud Stream App Starter

Now (with Hadoop already set up), let’s add the HDFS Spring Cloud Stream App Starter (org.springframework.cloud.stream.app.hdfs.sink) dependency to our project.

First, clone the relevant repo:

git clone https://github.com/spring-cloud-stream-app-starters/hdfs.git

Then, run the Maven job:

./mvnw clean install -PgenerateApps

The resulting compiled Starter App will be available in ‘/target’ of the local project root. We can then run that compiled .jar and pass in relevant application properties:

java -jar hdfs-sink.jar --fsUri=hdfs://127.0.0.1:50010/

hdfs://127.0.0.1:50010/‘ is the default for Hadoop but your default HDFS port may vary depending on how you configured your instance.

We can see the list of data nodes (and their current ports) at ‘http://0.0.0.0:50070‘given the configured we passed in previously.

We can also pass our credentials using the familiar Spring application.properties before compilation – so we don’t have to always pass these in via CLI.

Let’s configure our application.properties to use the default Hadoop port:

hdfs.fs-uri=hdfs://127.0.0.1:50010/

6. Using AggregateApplicationBuilder

Alternatively, we can combine our Spring Stream Source and Sink through the org.springframework.cloud.stream.aggregate.AggregateApplicationBuilder into a simple Spring Boot application!

First, we’ll add the two Stream App Starters to our pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud.stream.app</groupId>
        <artifactId>spring-cloud-starter-stream-sink-hdfs</artifactId>
        <version>2.1.2.RELEASE</version>
    </dependency>
</dependencies>

Then we’ll begin combining our two Stream App Starter dependencies by wrapping them into their respective sub-applications.

6.1. Building Our App Components

Our SourceApp specifies the Source to be transformed or consumed:

@SpringBootApplication
@EnableBinding(Source.class)
@Import(TwitterstreamSourceConfiguration.class)
public class SourceApp {
    @InboundChannelAdapter(Source.OUTPUT)
    public String timerMessageSource() {
        return new SimpleDateFormat().format(new Date());
    }
}

Note that we bind our SourceApp to org.springframework.cloud.stream.messaging.Source and inject the appropriate configuration class to pick up the needed settings from our environmental properties.

Next, we set up a simple org.springframework.cloud.stream.messaging.Processor binding:

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApp {
    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) {
        log.info("Payload received!");
        return payload;
    }
}

Then, we create our consumer (Sink):

@SpringBootApplication
@EnableBinding(Sink.class)
@Import(HdfsSinkConfiguration.class)
public class SinkApp {
    @ServiceActivator(inputChannel= Sink.INPUT)
    public void loggerSink(Object payload) {
        log.info("Received: " + payload);
    }
}

Here, we bind our SinkApp to org.springframework.cloud.stream.messaging.Sink and again inject the correct configuration class to use our specified Hadoop settings.

Lastly, we combine our SourceApp, ProcessorApp, and our SinkApp using the AggregateApplicationBuilder in our AggregateApp main method:

@SpringBootApplication
public class AggregateApp {
    public static void main(String[] args) {
        new AggregateApplicationBuilder()
          .from(SourceApp.class).args("--fixedDelay=5000")
          .via(ProcessorApp.class)
          .to(SinkApp.class).args("--debug=true")
          .run(args);
    }
}

As with any Spring Boot application, we can inject specified settings as environmental properties through application.properties or programmatically.

Since we’re using the Spring Stream framework we can also pass our arguments into the AggregateApplicationBuilder constructor.

6.2. Running the Completed App

We can then compile and run our application using the following command line instructions:

    $ mvn install
    $ java -jar twitterhdfs.jar

Remember to keep each @SpringBootApplication class in a separate package (otherwise, several different binding exceptions will be thrown)! For more information about how to use the AggregateApplicationBuilder – have a look at the official docs.

After we compile and run our app we should see something like the following in our console (naturally the contents will vary by Tweet):

2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
c.b.twitterhdfs.processor.ProcessorApp   : Payload received!
2018-01-15 04:38:32.255  INFO 28778 --- [itterSource-1-1] 
com.maixuanviet.twitterhdfs.sink.SinkApp    : Received: {"created_at":
"Mon Jan 15 04:38:32 +0000 2018","id":952761898239385601,"id_str":
"952761898239385601","text":"RT @mighty_jimin: 180114 ...

Those demonstrate the correct operation of our Processor and Sink on receiving data from the Source! In this example, we haven’t configured our HDFS Sink to do much – it will simply print the message “Payload received!”

7. Conclusion

In this tutorial, we’ve learned how to combine two awesome Spring Stream App Starters into one sweet Spring Boot example!

Here are some other great official articles on Spring Boot Starters and how to create a customized starter!

As always, the code used in the article can be found over on GitHub.

Related posts:

Java – Try with Resources
Java Program to Describe the Representation of Graph using Incidence List
Debugging Reactive Streams in Java
Java Program to Find the Shortest Path from Source Vertex to All Other Vertices in Linear Time
Registration – Activate a New Account by Email
Java Program to Check Cycle in a Graph using Graph traversal
Setting Up Swagger 2 with a Spring REST API
Java Program to Implement Pollard Rho Algorithm
Collect a Java Stream to an Immutable Collection
Comparing Two HashMaps in Java
Spring Boot - Admin Client
Different Ways to Capture Java Heap Dumps
Array to String Conversions
Spring Boot - Introduction
Java Program to find the number of occurrences of a given number using Binary Search approach
JUnit 5 @Test Annotation
Java Program to Implement Find all Forward Edges in a Graph
Java Program to Test Using DFS Whether a Directed Graph is Weakly Connected or Not
Jackson Date
Assert an Exception is Thrown in JUnit 4 and 5
Merging Two Maps with Java 8
Rest Web service: Filter và Interceptor với Jersey 2.x (P1)
Hướng dẫn Java Design Pattern – State
Java Program to Perform Partition of an Integer in All Possible Ways
Thao tác với tập tin và thư mục trong Java
Finding Max/Min of a List or Collection
Java InputStream to Byte Array and ByteBuffer
A Guide to the Java LinkedList
Java Program to Check whether Graph is Biconnected
Java Program to Create a Random Linear Extension for a DAG
Java Program to Implement Wagner and Fisher Algorithm for online String Matching
Java Program to Implement Interpolation Search Algorithm