by Uwe B. Meding

The iterators in the Hadoop reducers are not as simple as they might look. The issue is that for big data applications the total number of items that you are iterating through might not fit into memory. Hadoop implements some nifty algorithms to help mitigate this. The implications are far from subtle and not necessarily obvious.

For simplicity of implementation, Hadoop doesn’t support having more than one iterator for the reduce values. This makes a lot of sense. For example, if you had two independent copies of the iterator, then you could have one of them far ahead of the other which implies that the data between where the two iterators point can’t be dropped. For large amounts of data this is not feasible since it would be easy to run out of memory for even a small calculation.

The practical impact of this is that you can through an iterator only ONCE. There are no provisions to reset or restart an iteration. If you sure that the number of items in the reducer will fit into memory, then you can copy all the items into a list. 

@Override
protected void reduce(Text key, Iterable<TimeSeriesWritable> values, Context context) throws IOException, InterruptedException {
    ... other processing, not involving the iterator ...
    // Save the values list
    List values = new ArrayList<>();
    for (TimeSeriesWritable tsw : values) {
        values.add(tsw.dup()); // clone/duplicate the time series
    }
    ... other processing ...
}

Use secondary storage like a database if you are unsure about the number of elements in the iterator or the size of the elements. 

Another practical impact is that calling the iterators next() will always return the SAME EXACT instance of the Writable, with the contents of that instance replaced with the next value. This also makes sense, because any Writable must implement the serialization and deserialization of itself. Therefore holding a reference to the Writable across calls to Iterator.next() is almost always a mistake. Obviously, a way around this is to use to clone or duplicate the instance you’re trying to preserve across calls to Iterator.next().

The better approach altogether is to design your program so that you don’t need unbounded storage in the reducer. Of course this is easier said than done. Any implementation depends on your application needs and the shape of the data. For example,

  • add a data partitioner and/or grouping to the implementation, that will organize the data for the reducer
  • run multiple independent reducers (if you can slice your problem like that)
  • divide your problem so that you can run staggered map-reduce jobs

Leave a Reply