杰瑞科技汇

Java API如何对接Hadoop实现数据处理?

Of course! This is a fundamental topic for anyone working with Hadoop. Here’s a comprehensive guide to the Java API for Hadoop, covering the core components, key classes, and practical examples.

Java API如何对接Hadoop实现数据处理?-图1
(图片来源网络,侵删)

What is the Hadoop Java API?

The Hadoop Java API is the set of libraries, classes, and interfaces provided by the Hadoop framework that allows developers to write applications that can process vast amounts of data stored in the Hadoop Distributed File System (HDFS) and run computations on the Hadoop MapReduce framework.

The primary API for interacting with Hadoop is the Hadoop Common API, which provides the fundamental building blocks for all Hadoop-related operations.

Core Concepts of the Hadoop Java API

Before diving into the code, you need to understand the two main abstractions:

  1. The FileSystem Abstraction: This is your primary entry point for interacting with any file system, whether it's HDFS, the local file system, or even cloud storage (like S3). You never want to hardcode hdfs://... paths or use raw Java File objects. The FileSystem class provides a unified, portable way to read from and write to files.
  2. Configuration (org.apache.hadoop.conf.Configuration): This is how you configure your Hadoop application. It loads properties from core-site.xml, hdfs-site.xml, mapred-site.xml, and yarn-site.xml files, as well as any custom properties you set in your code. All Hadoop clients and services use this to find each other and determine their behavior.

Part 1: Interacting with HDFS (The FileSystem API)

This is the most common task: reading and writing data in Hadoop.

Java API如何对接Hadoop实现数据处理?-图2
(图片来源网络,侵删)

Key Classes for HDFS Operations:

  • org.apache.hadoop.fs.FileSystem: The main class for file system operations.
  • org.apache.hadoop.fs.Path: Represents a file or directory path in a file system.
  • org.apache.hadoop.fs.FSDataInputStream: An input stream for reading data from a file.
  • org.apache.hadoop.fs.FSDataOutputStream: An output stream for writing data to a file.
  • org.apache.hadoop.fs.FileSystem methods:
    • get(Configuration conf): Gets a default FileSystem instance (usually HDFS).
    • open(Path path): Opens a file for reading.
    • create(Path path): Creates a file for writing.
    • exists(Path path): Checks if a path exists.
    • delete(Path path, boolean recursive): Deletes a file or directory.
    • mkdirs(Path path): Creates a directory, including any necessary parent directories.

Example 1: Writing a Text File to HDFS

This example connects to HDFS and writes a simple text file.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
public class HdfsWriter {
    public static void main(String[] args) {
        // The HDFS path where the file will be created
        String hdfsPath = "hdfs://namenode:8020/user/myuser/sample.txt";
        // A local configuration object is needed to connect to the cluster
        Configuration configuration = new Configuration();
        // The following line is often needed to explicitly set the filesystem
        // configuration.set("fs.defaultFS", "hdfs://namenode:8020");
        try (FileSystem fs = FileSystem.get(configuration);
             FSDataOutputStream fsdos = fs.create(new Path(hdfsPath))) {
            String content = "Hello, Hadoop!\nThis is a test file written via the Java API.";
            // Write the string to the output stream
            fsdos.writeUTF(content);
            System.out.println("File " + hdfsPath + " created successfully.");
        } catch (IOException e) {
            System.err.println("Error writing to HDFS: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

Example 2: Reading a Text File from HDFS

This example reads the file we just created from HDFS.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
public class HdfsReader {
    public static void main(String[] args) {
        String hdfsPath = "hdfs://namenode:8020/user/myuser/sample.txt";
        Configuration configuration = new Configuration();
        try (FileSystem fs = FileSystem.get(configuration);
             FSDataInputStream fsdis = fs.open(new Path(hdfsPath))) {
            // Read the content from the input stream
            // Note: writeUTF and readUTF are a pair. For simple text, 
            // you might use BufferedReader for better performance.
            String content = fsdis.readUTF();
            System.out.println("Content read from HDFS:");
            System.out.println(content);
        } catch (IOException e) {
            System.err.println("Error reading from HDFS: " + e.getMessage());
            e.printStackTrace();
        }
    }
}

Part 2: The MapReduce API (Writing a Job)

This is the classic Hadoop programming model for distributed data processing.

Key Concepts of the MapReduce API:

  1. Mapper (<KEYIN, VALUEIN, KEYOUT, VALUEOUT>):

    • The Mapper class takes input data and breaks it into smaller chunks, producing a set of intermediate key-value pairs.
    • You must implement the map(KEYIN key, VALUEIN value, Context context) method.
    • The Context object allows you to emit the output key-value pairs.
  2. Reducer (<KEYIN, VALUEIN, KEYOUT, VALUEOUT>):

    • The Reducer class takes the intermediate key-value pairs from all Mappers (which are grouped by key) and aggregates them into a smaller set of final key-value pairs.
    • You must implement the reduce(KEYIN key, Iterable<VALUEIN> values, Context context) method.
    • The Iterable<VALUEIN> contains all values associated with a given key.
  3. Driver (or Job):

    • This is the main class that configures and runs the MapReduce job.
    • It sets the input and output paths.
    • It specifies the Mapper, Reducer, and Combiner classes.
    • It sets the input and output formats (e.g., TextInputFormat, TextOutputFormat).
    • It submits the job to the Hadoop cluster (or runs it locally).

Example: Word Count

This is the "Hello, World!" of MapReduce. It counts the occurrences of each word in a text file.

The Mapper Class (WordCountMapper.java)

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // Convert the line of text to a String
        String line = value.toString();
        // Split the line into words
        String[] words = line.split("\\s+");
        // For each word, emit (word, 1)
        for (String w : words) {
            word.set(w);
            context.write(word, one);
        }
    }
}

The Reducer Class (WordCountReducer.java)

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        // Sum up all the counts for this word
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        // Emit the final (word, count)
        context.write(key, result);
    }
}

The Driver Class (WordCountDriver.java)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // This allows you to pass arguments like input and output paths from the command line
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCountDriver.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // Set the output key and value classes for the Mapper and Reducer
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // Set the input and output paths
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        // Submit the job and wait for it to complete
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Part 3: The Modern API (MapReduce v2 / YARN)

The code above is the classic API. While still valid, the modern API introduces more flexibility. The key difference is in the Driver.

Key Modern Concepts:

  • Job API: The org.apache.hadoop.mapreduce.Job class is the central object for configuring a job.
  • Configuration: Still used, but often set more explicitly.
  • Tool and ToolRunner: A common pattern is to make your Driver class implement the Tool interface. This allows you to use ToolRunner, which handles parsing standard Hadoop options (like -D for properties) and simplifies argument handling.

Modern Driver Example (ModernWordCountDriver.java)

This driver uses the Tool interface, which is the recommended practice.

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
// The Driver class now implements the Tool interface
public class ModernWordCountDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        // Get the configuration from the Configured superclass
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "Modern Word Count");
        job.setJarByClass(ModernWordCountDriver.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // Submit the job and return the status code
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args) throws Exception {
        // ToolRunner simplifies running the Tool
        int exitCode = ToolRunner.run(new ModernWordCountDriver(), args);
        System.exit(exitCode);
    }
}

How to Compile and Run Your Code

You need the Hadoop JAR files in your classpath.

  1. Compile:

    # Make sure HADOOP_HOME is set
    export HADOOP_HOME=/path/to/your/hadoop-installation
    # Compile the Java files
    javac -cp ${HADOOP_HOME}/share/hadoop/common/hadoop-common-*.jar:${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-*.jar *.java
  2. Package into a JAR (for the MapReduce Job):

    jar -cvf wordcount.jar *.class
  3. Run the Job on Hadoop:

    # Assume you have an input file 'input.txt' in HDFS
    # and an output directory 'output' (which should not exist)
    hadoop jar wordcount.jar ModernWordCountDriver hdfs://namenode:8020/user/myuser/input.txt hdfs://namenode:8020/user/myuser/output

Summary of Key APIs

Component Key Class/Interface Purpose
Configuration org.apache.hadoop.conf.Configuration Manages application settings.
FileSystem org.apache.hadoop.fs.FileSystem Abstract interface for file system operations (HDFS, Local, etc.).
Paths org.apache.hadoop.fs.Path Represents a file or directory path.
I/O Streams FSDataInputStream, FSDataOutputStream For reading/writing data from HDFS.
Mapper org.apache.hadoop.mapreduce.Mapper Processes input records and emits key-value pairs.
Reducer org.apache.hadoop.mapreduce.Reducer Aggregates intermediate results.
Job Driver org.apache.hadoop.mapreduce.Job Configures and submits the MapReduce job.
Tool Runner org.apache.hadoop.util.ToolRunner A utility to run Tool-based applications.
分享:
扫描分享到社交APP
上一篇
下一篇