by Uwe B. Meding
Normally, Hadoop processes the data that is stored in the local storage environment, usually HDFS and systems that derive from it. The data in these systems can be directly passed into the mappers. This works very well (usually) — but what if it the data is not stored in an HDFS filesystem? Staging the data in HDFS is not always feasible or a sensible thing to do. For example a large RDBMS, or a web service.
Loading data in the mapping phase from an external data source directly has a few caveats. Kicking off an even moderate number of mappers can exhaust system and network resources very quickly. One of the key mechanisms in the map/reduce algorithms is to read any data in parallel. From a scalability point of view this makes perfect sense. However, it also means that the data must be structured such that we have very little friction with respect to the data chunks we are loading.
- clear data chunk boundaries
- no (or very little) database scanning
Hadoop offers the functionality to implement your own data input source in this case. There are three steps to this:
- Implement an input format that creates a number input splits
- Implement an input split which has all information about a chunk of data
- Implement a record reader that reads a chunk of data.
Class diagram for the external source input. I have only shown the most important method references, a few management methods are omitted.
The implementation of the Writable
interface is not strictly necessary but makes a lot of sense, since it makes the input data chunk directly usable.
- Input Format
- The input format class is the “top-most” class that manages the access references to the source data and the data splitting algorithms. The
getSplits(...)
method determines how the input data should be split into chunks. This step depends very much on the shape of the input data and the kinds of things you want to process with the resulting chunks. For example, in my case I have large time series data sets in millisecond resolution with values roughly every 30 seconds that can span several months worth of data.public class TimeSeriesInputFormat extends InputFormat<Text, TimeSeriesReferenceWritable> { @Override public List<InputSplit> getSplits(JobContext jc) throws IOException, InterruptedException { long from = getTimestamp(jc, TIMESERIES_FROM); long to = getTimestamp(jc, TIMESERIES_TO); List<InputSplit> splits = new ArrayList<>(); for(String dataSourceName : getSourceNames()) { for(long[] interval : getIntervals(from, to)) { splits.add(new SplitBySourceReferences( dataSourceName, interval); } } return splits; } @Override public RecordReader<Text, TimeSeriesReferenceWritable> createRecordReader(InputSplit is, TaskAttemptContext tac) throws IOException, InterruptedException { return new SourceReferenceReader(); }
The highlighted lines shows how we create splits based a number of input data references and chunking the desired time interval.
- Input Split
- The input data splitter describes and individual split of data. It neither reads nor processes the data. Rather it describes a chunk of data that we need for processing. For example, in the case of my time series data, it makes sense to have the data in daily chunks.
public class SplitBySourceReferences extends InputSplit implements Writable { private List sourceNames; private long from; private long to; public SplitBySourceReferences() { } public SplitBySourceReferences(List sourceNames, long from, long to) { this.sourceNames = sourceNames; this.from = from; this.to = to; } public List getSourceNames() { return sourceNames; } public long getFrom() { return from; } public long getTo() { return to; } @Override public long getLength() throws IOException, InterruptedException { return 0; } @Override public String[] getLocations() throws IOException, InterruptedException { return new String[]{"127.0.0.1"}; } @Override public void write(DataOutput d) throws IOException { d.writeInt(sourceNames.size()); for (String sourceName : sourceNames) { d.writeUTF(sourceName); } d.writeLong(from); d.writeLong(to); } @Override public void readFields(DataInput di) throws IOException { this.sourceNames = new ArrayList<>(); int sourceCount = di.readInt(); for (int i = 0; i < sourceCount; i++) { this.sourceNames.add(di.readUTF()); } this.from = di.readLong(); this.to = di.readLong(); } }
The code simply facilitates the split reference exchange (a time series reference in my case). The implementation of the
Writable
interface wraps the class neatly. - Record Reader
- The record reader actually accesses the data and creates the key/value pairs we need for processing the data through the map/reduce steps. This is also the point where an external source reference ties to the map/reduce key-value pairs:
The type references to the key and value data types must be the same as the data types for the mapper.
public class SourceReferenceReader extends RecordReader<Text, TimeSeriesReferenceWritable> { private Text key; private long from; private long to; private TimeSeriesReferenceWritable current; private Iterator sourceNamesIterator; private float ncurrent; private float ntotal; public SourceReferenceReader() { } @Override public void initialize(InputSplit is, TaskAttemptContext tac) throws IOException, InterruptedException { SplitBySourceReferences split = (SplitBySourceReferences) is; from = split.getFrom(); to = split.getTo(); sourceNamesIterator = split.getSourceNames().iterator(); ntotal = split.getSourceNames().size(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (sourceNamesIterator.hasNext()) { ncurrent += 1.0; String name = sourceNamesIterator.next(); this.current = new TimeSeriesReferenceWritable(name, from, to); } else { return false; } } @Override public Text getCurrentKey() throws IOException, InterruptedException { return current.getName(); } @Override public TimeSeriesReferenceWritable getCurrentValue() throws IOException, InterruptedException { return current; } @Override public float getProgress() throws IOException, InterruptedException { return ntotal > 0 ? ncurrent / ntotal : 0; } @Override public void close() throws IOException { // nothing to do here } }
Leave a Reply
You must be logged in to post a comment.