Apache Flink WordCount Java Example

posted on Aug 02nd, 2017

Apache Flink

Apache Flink is an open source stream processing framework developed by the Apache Software Foundation. The core of Apache Flink is a distributed streaming dataflow engine written in Java and Scala.

Pre Requirements

1) A machine with Ubuntu 14.04 LTS operating system and

2) Apache Hadoop 2.6 installed (How to install Hadoop on Ubuntu 14.04)

3) Apache Flink 1.2.1 installed (How to install Flink on Ubuntu 14.04)

NOTE

If you want to interact with Hadoop (e.g. HDFS or HBase), make sure to pick the Flink package matching your Hadoop version.

Flink Word Count Java Example

The following code shows the WordCount implementation from the Quickstart which processes some text lines with two operators (FlatMap and Reduce), prints the resulting words and counts to std-out.

Step 1 - Add JARs (Libraries) Add the following jars to your java project build path. You can find these jar files in the lib directory of flink.

flink-dist_2.10-1.2.1.jar
flink-python_2.10-1.2.1.jar
log4j-1.2.17.jar
slf4j-log4j12-1.7.7.jar

WordCount.java

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;

public class WordCount {
  
  public static void main(String[] args) throws Exception {
    
    // set up the execution environment
    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
    
    DataSet<String> text = env.readTextFile("/home/hduser/Desktop/input.txt");
    
    DataSet<Tuple2<String, Integer>> counts = 
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new LineSplitter())
        // group by the tuple field "0" and sum up tuple field "1"
        .groupBy(0)
        .aggregate(Aggregations.SUM, 1)
        ;

    // emit result
    counts.print();
      counts.writeAsText("/home/hduser/Desktop/output.txt").setParallelism(1);
    
    // execute program
    env.execute("WordCount Example");
  }
}

LineSplitter.java

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

	private static final long serialVersionUID = 1L;

@Override
  public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    // normalize and split the line into words
    String[] tokens = value.toLowerCase().split("\\W+");
    
    // emit the pairs
    for (String token : tokens) {
      if (token.length() > 0) {
        out.collect(new Tuple2<String, Integer>(token, 1));
      }
    }
  }
}

input.txt Which is saved in the directory /home/hduser/Desktop/input.txt

hi hello hi hello praveen deshmane

Step 2 - Run WordCount.java

output.txt Which is saved in the directory /home/hduser/Desktop/

deshmane 1
hello 2
hi 2
praveen 1

You can also give the input files those are present in the HDFS (Hadoop distributed file system) and store result.

DataSet<String> text = env.readTextFile("hdfs://localhost:9000/user/hduser/input.txt");
    

 counts.writeAsText("hdfs://localhost:9000/user/hduser/output").setParallelism(1);
   

Please share this blog post and follow me for latest updates on

facebook             google+             twitter             feedburner