Wednesday, August 11, 2010

Customized Splitters and Readers

A basic problem with Hadoop systems is acquiring input. The input to a Hadoop job is usually one large file, many small files or multiple larger files. The first question to ask in designing a Hadoop job is how the input is to be split.the mapper receives a split of the input file, which is to say the input is broken into smaller chunks for processing by the mapper. The size and meaning of these chunks is determined by the character of the problem that you are trying to solve. There are two common cases that are handled by splitters in the Hadoop distribution. The first case is a text file in which a record is a single line of text. This case is appropriate for parsing log files, much webpage content and other text based systems. The second case is a sequence file, a standard Hadoop form, which consists of a collection of key value pairs. In this case a single record is a key value pair.

The major issue in splitting input records is to determine an appropriate chunk of data for a mapper to handle. There are two nature concerns. A split must be parallelizable, that is a sufficiently large number of records must be generated to keep a number of mappers busy. Records are generated in parallel in two ways. First, when a number of files is passed to the job, typically this will involve passing a directory full of files, each mapper may pull records from a separate file and continue switching files until the job is complete. Another option is to split an input file.

When an input file is split a reader starts in the middle of a file and reads until it hits the beginning of a record. After that it reads data one record at a time until a specified segment of the file has been read. It is up to the FileInput class to specify whether an input file may be split and up to the associated reader to determine exactly how. There is some clever code in the Hadoop infrastructure to make sure that as a file is split all of the data will end up in one or another split.One very common way of splitting files is to split a text file into lines. In this case, a record starts immediately after a carriage return and all the reader has to do to determine the start of a record is to read until a carriage return is detected.

Splitting compressed files is more difficult and can be a major concern in the design of a job where the input is a single compressed file. Most common compression formats such as zip and gzip cannot be split. Hadoop has code for splitting a compression format called lzo, Unfortunately lzo codecs in Java do not exist. When your data consists of a small number of unsplittable files, in our case only one, it is necessary to change the data format to make it splittable in some way, one good solution is to break a single compressed file into a large number, say 1000 or so, compressed files. While each individual file cannot be split, the large number of files supplies sufficient parallelism.

It is possible to write your own splitter to solve a specific problem. I will present a simple case of a splitter and a reader to illustrate how this can work. The problem that I was trying to solve involved an input consisting of a large number of text files. Unlike the word count case, each text file was a complete record. This case is very common, a text file might represent an XML document or some other file describing an interesting object. What I wanted to do was to send all of the text from each file as a single record to a mapper which would process it appropriately.

 

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

         Job job = new Job(conf, "WholeFiles");

        conf = job.getConfiguration();

        // NOTE JOB Copies the configuraton

        job.setInputFormatClass(WholeFileInputFormat.class);

The InputFormat is a subclass of FileInputFormat<KeyType, ValueType>. In the standard line reader the key type is a long representing the position in the file plus some random information and the value is a line of text. For the WholeFileInputFormat reuse the key as the name of the file as Text and the value is the text in the file read as a series of laws. FileInputFormat may implement two methods. isSplitable simply returns true if the reader is capable of splitting the file and false otherwise. In this example we return false indicating that the value will be the entire text of the file.

 

   protected boolean isSplitable(JobContext context, Path file) {

        return false;

    }

 

The createRecordReader method can simply create a copy of the appropriate reader and return it. All of the interesting work is done in the reader class itself.

 

    public RecordReader<Text, Text>  createRecordReader(InputSplit split,

                       TaskAttemptContext context) {

        return new MyWholeFileReader();

    }

 

 

The reader is copied from LineRecordReader, the standard line reader for text files and modified at three points. The Method boolean nextKeyValue() reads the entire file and sets it as the current value. It also sets the current key to the filename which is assumed to be unique.

 

The entire code for my whole file input format is shown below.

 /**

* written by Steve Lewis

* lordjoe2000@gmail.com

* See http://lordjoesoftware.blogspot.com/

*/

package org.systemsbiology.hadoop;

 

 

import org.apache.hadoop.conf.*;

import org.apache.hadoop.fs.*;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.io.*;

import org.apache.hadoop.io.compress.*;

import org.apache.hadoop.mapreduce.*;

import org.apache.hadoop.mapreduce.lib.input.*;

import org.apache.hadoop.util.*;

 

import java.io.*;

 

 

/**

* Splitter that reads a whole file as a single record

* This is useful when you have a large number of files

* each of which is a complete unit - for example XML Documents

*/

public class WholeFileInputFormat extends FileInputFormat<Text, Text> {

 

    @Override

    public RecordReader<Text, Text>  createRecordReader(InputSplit split,

                       TaskAttemptContext context) {

        return new MyWholeFileReader();

    }

 

    @Override

    protected boolean isSplitable(JobContext context, Path file) {

        return false;

    }

 

    /**

     * Custom RecordReader which returns the entire file as a

     * single value with the name as a key

     * Value is the entire file

     * Key is the file name

     */

    public static class MyWholeFileReader extends RecordReader<Text, Text> {

 

        private CompressionCodecFactory compressionCodecs = null;

        private long start;

        private LineReader in;

        private Text key = null;

        private Text value = null;

        private Text buffer = new Text();

 

        public void initialize(InputSplit genericSplit,

                               TaskAttemptContext context) throws IOException {

            FileSplit split = (FileSplit) genericSplit;

            Configuration job = context.getConfiguration();

            start = split.getStart();

            final Path file = split.getPath();

            compressionCodecs = new CompressionCodecFactory(job);

            final CompressionCodec codec = compressionCodecs.getCodec(file);

 

            // open the file and seek to the start of the split

            FileSystem fs = file.getFileSystem(job);

            FSDataInputStream fileIn = fs.open(split.getPath());

            if (codec != null) {

                in = new LineReader(codec.createInputStream(fileIn), job);

              }

            else {

                in = new LineReader(fileIn, job);

            }

            if (key == null) {

                key = new Text();

            }

            key.set(split.getPath().getName());

            if (value == null) {

                value = new Text();

            }

 

        }

 

        public boolean nextKeyValue() throws IOException {

            int newSize = 0;

            StringBuilder sb = new StringBuilder();

            newSize = in.readLine(buffer);

            while (newSize > 0) {

                String str = buffer.toString();

                sb.append(str);

                sb.append("\n");

                newSize = in.readLine(buffer);

            }

 

            String s = sb.toString();

            value.set(s);

 

            if (sb.length() == 0) {

                key = null;

                value = null;

                return false;

            }

            else {

                return true;

            }

        }

 

        @Override

        public Text getCurrentKey() {

            return key;

        }

 

        @Override

        public Text getCurrentValue() {

            return value;

        }

 

        /**

         * Get the progress within the split

         */

        public float getProgress() {

            return 0.0f;

        }

 

        public synchronized void close() throws IOException {

            if (in != null) {

                in.close();

            }

        }

    }

 

}

 

 


2 comments:

  1. Thanks a lot for a detailed and clear explanation

    ReplyDelete
  2. Thanks for the valuable information. Hope you will keep on post more like this..

    ReplyDelete