by Uwe B. Meding

Normally, Hadoop processes the data that is stored in the local storage environment, usually HDFS and systems that derive from it. The data in these systems can be directly passed into the mappers. This works very well (usually) — but what if it the data is not stored in an HDFS filesystem? Staging the data in HDFS is not always feasible or a sensible thing to do. For example a large RDBMS, or a web service.

Loading data in the mapping phase from an external data source directly has a few caveats. Kicking off an even moderate number of mappers can exhaust system and network resources very quickly. One of the key mechanisms in the map/reduce algorithms is to read any data in parallel. From a scalability  point of view this makes perfect sense. However, it also means that the data must be structured such that we have very little friction with respect to the data chunks we are loading.

  • clear data chunk boundaries
  • no (or very little) database scanning

Hadoop offers the functionality to implement your own data input source in this case. There are three steps to this:

  1. Implement an input format that creates a number input splits
  2. Implement an input split which has all information about a chunk of data
  3. Implement a record reader that reads a chunk of data.
Custom Input Format

Custom Input Format

Class diagram for the external source input. I have only shown the most important method references, a few management methods are omitted.

The implementation of the Writable interface is not strictly necessary but makes a lot of sense, since it makes the input data chunk directly usable.

Input Format
The input format class is the “top-most” class that manages the access references to the source data and the data splitting algorithms. The getSplits(...) method determines how the input data should be split into chunks. This step depends very much on the shape of the input data and the kinds of things you want to process with the resulting chunks. For example, in my case I have large time series data sets in millisecond resolution with values roughly every 30 seconds that can span several months worth of data.

public class TimeSeriesInputFormat
   extends InputFormat<Text, TimeSeriesReferenceWritable> {

 @Override
 public List<InputSplit> getSplits(JobContext jc)
    throws IOException, InterruptedException {

  long from = getTimestamp(jc, TIMESERIES_FROM);
  long to = getTimestamp(jc, TIMESERIES_TO);

  List<InputSplit> splits = new ArrayList<>();
  for(String dataSourceName : getSourceNames()) {
   for(long[] interval : getIntervals(from, to)) {
    splits.add(new SplitBySourceReferences(
        dataSourceName,
        interval);
   }
  }
  return splits;
 }

 @Override
 public RecordReader<Text, TimeSeriesReferenceWritable>
    createRecordReader(InputSplit is,
                       TaskAttemptContext tac)
        throws IOException, InterruptedException {
    return new SourceReferenceReader();
 }

The highlighted lines shows how we create splits based a number of input data references and chunking the desired time interval.

Input Split
The input data splitter describes and individual split of data. It neither reads nor processes the data. Rather it describes a chunk of data that we need for processing. For example, in the case of my time series data, it makes sense to have the data in daily chunks.

public class SplitBySourceReferences
   extends InputSplit implements Writable {
	private List sourceNames;
	private long from;
	private long to;

	public SplitBySourceReferences() {
	}

	public SplitBySourceReferences(List sourceNames, long from, long to) {
		this.sourceNames = sourceNames;
		this.from = from;
		this.to = to;
	}

	public List getSourceNames() {
		return sourceNames;
	}

	public long getFrom() {
		return from;
	}

	public long getTo() {
		return to;
	}

	@Override
	public long getLength() throws IOException, InterruptedException {
		return 0;
	}

	@Override
	public String[] getLocations() throws IOException, InterruptedException {
		return new String[]{"127.0.0.1"};
	}

	@Override
	public void write(DataOutput d) throws IOException {
		d.writeInt(sourceNames.size());
		for (String sourceName : sourceNames) {
			d.writeUTF(sourceName);
		}
		d.writeLong(from);
		d.writeLong(to);
	}

	@Override
	public void readFields(DataInput di) throws IOException {
		this.sourceNames = new ArrayList<>();
		int sourceCount = di.readInt();
		for (int i = 0; i < sourceCount; i++) {
			this.sourceNames.add(di.readUTF());
		}
		this.from = di.readLong();
		this.to = di.readLong();
	}
}

The code simply facilitates the split reference exchange (a time series reference in my case). The implementation of the Writable interface wraps the class neatly.

Record Reader
The record reader actually accesses the data and creates the key/value pairs we need for processing the data through the map/reduce steps. This is also the point where an external source reference ties to the map/reduce key-value pairs:

The type references to the key and value data types must be the same as the data types for the mapper.

public class SourceReferenceReader
   extends RecordReader<Text, TimeSeriesReferenceWritable> {

	private Text key;
	private long from;
	private long to;
	private TimeSeriesReferenceWritable current;
	private Iterator sourceNamesIterator;
	private float ncurrent;
	private float ntotal;

	public SourceReferenceReader() {
	}

	@Override
	public void initialize(InputSplit is, TaskAttemptContext tac) throws IOException, InterruptedException {
		SplitBySourceReferences split = (SplitBySourceReferences) is;

		from = split.getFrom();
		to = split.getTo();

		sourceNamesIterator = split.getSourceNames().iterator();
		ntotal = split.getSourceNames().size();
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
            if (sourceNamesIterator.hasNext()) {
		ncurrent += 1.0;
		String name = sourceNamesIterator.next();
		this.current = new TimeSeriesReferenceWritable(name, from, to);
            } else {
		return false;
	    }
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
	    return current.getName();
	}

	@Override
	public TimeSeriesReferenceWritable getCurrentValue() throws IOException, InterruptedException {
	    return current;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		return ntotal > 0 ? ncurrent / ntotal : 0;
	}

	@Override
	public void close() throws IOException {
		// nothing to do here
	}
}

Leave a Reply