Friday, November 21, 2008

Java MapReduce Improved

My MapReduce class is much improved, now fitting into a single class. I find that I mostly use the pmap() method for parallel processing of list elements.

Here is example usage using the same example as my previous Java MapReduce post:

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;
import java.util.Map.Entry;

import MapReduce;

public class WordCount {

static final String doc1 = "This is document 1";
static final String doc2 = "This is another document";
static final String doc3 = "Document 3";

public Object map(Object data) {
String doc = (String) data;
String[] tokens = doc.trim().split("\\s+");
HashMap results = new HashMap();
for (int i = 0; i < tokens.length; i++) {
accumulate(tokens[i], results);
}
return results;
}

void accumulate(String s, HashMap acc) {
String key = s.toLowerCase();
if (acc.containsKey(key)) {
Integer I = (Integer) acc.get(key);
int newval = I.intValue() + 1;
acc.put(key, new Integer(newval));
} else {
acc.put(key, new Integer(1));
}
}

public Object reduce(Object input, Object acc) {
HashMap h = (HashMap) acc;
Collection entries = ((HashMap) input).entrySet();
for (Iterator j = entries.iterator(); j.hasNext();) {
Entry e = (Entry) j.next();
Object key = e.getKey();
Integer val = (Integer) e.getValue();
if (h.containsKey(key)) {
Integer oldval = (Integer) h.get(key);
h.put(key, new Integer(val.intValue() + oldval.intValue()));
} else {
h.put(key, val);
}
}
return h;
}

public static void main(String[] args) {
Vector docs = new Vector();
docs.add(doc1);
docs.add(doc2);
docs.add(doc3);

HashMap results = new HashMap();
WordCount wc = new WordCount();

try {
results = (HashMap) MapReduce.mapReduce(docs, wc, "map", wc, "reduce", results);
} catch (Exception e) {
e.printStackTrace();
}

System.out.println(results.toString());
}
}

Pretty similar to the earlier usage but now you can pass in an instance and have access to it's data and methods from inside your map function - just be careful of synchronization if you're writing to any variable inside the map function.

You can also use a static method to do the mapping or folding.

Here's the new MapReduce class:

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;

/**
* The MapReduce class provides static methods
* for encapsulating parallel processing.
* This class cannot be instantiated.
*
* The pmap() (parallel map) method in particular makes concurrent
* processing simple by abstracting away all the threading and
* synchronization.
*
* @author mike
*/
public class MapReduce extends Thread {

/**
* Concurrently maps each object A in the List inputList to a new object B by applying
* the method meth to every element in the list. Equivalent to calling
* pmap(inputList, obj, meth, 0) I.e. with no limit on the number of threads.
* @return ArrayList
* @param inputList - List of objects to be mapped
* @param obj - An instance of the class that defines meth
* @param meth - The name of the method to be run on each object in the data list.
* The method must have the prototype:
* Object method_name(Object input)
*/
public static List pmap(List inputList, Object obj, String meth) throws Exception {
return MapReduce.pmap(inputList, obj, meth, 0);
}

/**
* Concurrently maps each object A in the List inputList to a new object B by applying
* the method meth to every element in the list. Returns the new B objects in a
* new list. Mappings in the new list are in the same order and correspond to the
* objects in the original list but since each mapping is done in parallel,
* the evaluation order is undefined.
*
* @return ArrayList
* @param inputList - List of objects to be mapped
* @param obj - An instance of the class that defines meth
* @param meth - The name of the method to be run on each object in the data list.
* The method must have the prototype:
* Object method_name(Object input)
* @param maxThreads - The maximum number of threads to run at once.
* If 0, no limit. Use this limit to prevent OutOfMemoryErrors when
* processing large lists.
*/
public static List pmap(List inputList, Object obj, String meth, int maxThreads) throws Exception {
int size = inputList.size();
int inc = maxThreads <= 0 ? size : maxThreads;
ArrayList retval = new ArrayList(size);
for (int i = 0; i < size; i += inc) {
int end = (i + inc < size ? i + inc : size);
List threads = createThreads(inputList, i, end, obj, meth);
waitForThreads(threads);
for (int j = 0; j < threads.size(); j++) {
retval.add(((MapReduce)threads.get(j)).output);
}
}
return retval;
}


/**
* Calls meth(elem, accIn) on successive elements of list, starting with accIn == acc0.
* meth must return an accumulator which is passed to the next call.
* The function returns the final value of the accumulator.
* acc0 is returned if the list is empty.
* @param list - The list to be folded into a single object.
* @param obj - The instance of the class that defines meth.
* @param method - The accumulating function.
* The method must have the prototype
* Object method_name(Object input, Object accIn)
* @param acc0 - Initial accumulator
* @return Object
* @throws Exception
*/
public static Object fold(List list, Object obj, String meth, Object acc0) throws Exception {
Class[] types = {Object.class, Object.class};
Method m = obj.getClass().getMethod(meth, types);
for (int i = 0; i < list.size(); i++) {
Object[] args = {list.get(i), acc0};
acc0 = m.invoke(obj, args);
}
return acc0;
}

/**
* Combines the operations of pmap and fold with no limit on the number
* of concurrent threads.
*/
public static Object mapReduce(List list, Object mapObj, String mapMeth, Object foldObj, String foldMeth, Object foldAcc) throws Exception {
return mapReduce(list, mapObj, mapMeth, foldObj, foldMeth, foldAcc, 0);
}

/**
* Combines the operations of pmap and fold with a thread limit.
*/
public static Object mapReduce(List list, Object mapObj, String mapMeth, Object foldObj, String foldMeth, Object foldAcc, int maxThreads) throws Exception {
List mapResult = pmap(list, mapObj, mapMeth, maxThreads);
return fold(mapResult, foldObj, foldMeth, foldAcc);
}


static List createThreads(List list, int begin, int end, String obj, String meth) throws Exception {
return createThreads(list, begin, end, obj, meth, true);
}

static List createThreads(List list, int begin, int end, Object obj, String meth) throws Exception {
return createThreads(list, begin, end, obj, meth, false);
}

static List createThreads(List list, int begin, int end, Object obj, String meth, boolean isStaticMethod) throws Exception {
ArrayList threads = new ArrayList(end - begin);
for (int i = begin; i < end; i++) {
try {
MapReduce p = isStaticMethod ? new MapReduce((String)obj, meth, list.get(i)) : new MapReduce(obj, meth, list.get(i));
threads.add(p);
p.start();
} catch (java.lang.OutOfMemoryError e) {
System.err.println("Error: thread " + i);
throw e;
}
}
return threads;
}


static void waitForThreads(List threads) {
for (int i = 0; i < threads.size(); i++) {
Thread thread = (Thread) threads.get(i);
try {
thread.join();
} catch (InterruptedException e) {}
}
}

//
// Non-static instance methods and fields
//
Object obj;
Method meth;
Object input;
Object output;

// This class should never be instantiated except by its own static methods.
private MapReduce(String classname, String meth, Object in) throws Exception {
Class[] types = {Object.class};
this.meth = Class.forName(classname).getMethod(meth, types);
this.input = in;
}

// This class should never be instantiated except by its own static methods.
private MapReduce(Object obj, String meth, Object in) throws Exception {
this.obj = obj;
Class[] types = {Object.class};
this.meth = obj.getClass().getMethod(meth, types);
this.input = in;
}

public void run() {
Object[] args = {this.input};
try {
this.output = this.meth.invoke(this.obj, args);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

Enjoy!

Friday, November 7, 2008

Vector v. ArrayList

Vector has always been by favorite Java List implementation. With it's auto-resizing and easy iteration features it's always been my first choice for storing data in list form. (As an aside, the new version of Sun's Java which requires Vector to be parameterized seems broken to me. But I digress.) Another feature of Vector that I always regarded as a bonus is its built-in synchronization. However, I've begun to rethink that.

Only recently, I've begun to do much concurrent programming, and usually using my MapReduce class. (I have a new, much improved version of that, which I will blog about soon.) MapReduce uses Vector but not in a way that requires synchronization. In fact, I can't think of a single place where I'm using Vector that does.

Does this matter? I think so. Being synchronized, every addition to or removal from a Vector requires a lock to be obtained and then released. Not very noticeable overhead in a small program, but lately I've been trying to shave milliseconds off the running time in each module of a program that runs for over two hours.

Enter ArrayList. I never used ArrayList before but apparently it is basically equivalent to Vector except without synchronization. So I ran a quick test of adding and then removing a million Integer objects from both Vector and ArrayList. (Care had to be taken to avoid including garbage collection time in the results.) Guess what? ArrayList ran in about 3/4 of the time.

I think I have a new favorite Java List implementation.

Monday, August 18, 2008

Erlang Mnesia or ETS?

Recently I've been working on a RESTful web app project in Erlang using Yaws appmods. I decided I needed a memory cache to cache a (MySQL) database table that is read often but updated infrequently. As an Erlang beginner, I decided to use Mnesia to see how it would go. Using Joe Armstrong's book Programming Erlang as a reference, I dove in and found Mnesia to be very intuitive and pretty easy to set up. I had my memory cache working pretty quickly.

Soon after, I decided I wanted to cache another table as well, but thought I'd try to do it using an ETS table instead of Mnesia just to become familiar with ETS. Long story short, I found it to be very frustrating. Once I got it working, I found that the ETS table would seem to randomly disappear. I then realized that running under Yaws the process that created the ETS table sometimes died at which point the table would be deallocated and no other process could read it.

The solution was going to be to create a separate process to manage the ETS table and all other processes would have to communicate with this process to get at any data in the table. Not a terribly difficult task, but why reinvent the wheel? Mnesia most likely already does something similar.

Moral of the story: Skip ETS and just use Mnesia.

Friday, July 18, 2008

Java MapReduce

Ok, this isn't quite MapReduce in its entirety. It lacks any facility for distributed computing across multiple machines. But what it does do is simplify concurrent programming by distributing your MapReduce throughout multiple threads, which means multiple processors on most SMP machines.

It pretty nicely abstracts away all the threading and synchronizing issues so you can focus only on the problem at hand.

To use it, you create a class to encapsulate your mapping and reducing functions. This class must extend class MapperThread and implement class Reducer, which means you must implement two methods,
  • public Object map(Object data)
  • public Object getReduction(Collection results)
Then you call the static method MapReduce.mapReduce() passing in your data, as a Vector, and the name of your mapper-reducer class and it will return your results inside an object of your choice.

It's time for an example. Let's use a classic MapReduce example, the word counter. We'll put some "documents" as Strings into a Vector, pass this into mapReduce() which will concurrently process each document and return to us a HashMap where each key is a word and each value is the total count of that word in all our documents.

For example if our documents are {"This is document 1", "This is another document", "Document 3"}
then our output should be {this=2, is=2, document=3, 1=1, another=1, 3=1} (assuming case-insensitivity).

Here's the code to do it:

WordCount.java


package test.misc;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;
import java.util.Map.Entry;

import test.misc.MapReduce;
import test.misc.MapperThread;
import test.misc.Reducer;

public class WordCount extends MapperThread implements Reducer {

static final String doc1 = "This is document 1";
static final String doc2 = "This is another document";
static final String doc3 = "Document 3";

public Object map(Object data) {
String doc = (String) data;
String[] tokens = doc.trim().split("\\s+");
HashMap results = new HashMap();
for (int i = 0; i < tokens.length; i++) {
accumulate(tokens[i], results);
}
return results;
}

void accumulate(String s, HashMap acc) {
String key = s.toLowerCase();
if (acc.containsKey(key)) {
Integer I = (Integer) acc.get(key);
int newval = I.intValue() + 1;
acc.put(key, new Integer(newval));
} else {
acc.put(key, new Integer(1));
}
}

public Object getReduction(Collection c) {
HashMap h = new HashMap();
for (Iterator i = c.iterator(); i.hasNext();) {
Collection entries = ((HashMap) i.next()).entrySet();
for (Iterator j = entries.iterator(); j.hasNext();) {
Entry e = (Entry) j.next();
Object key = e.getKey();
Integer val = (Integer) e.getValue();
if (h.containsKey(key)) {
Integer oldval = (Integer) h.get(key);
h.put(key, new Integer(val.intValue() + oldval.intValue()));
} else {
h.put(key, val);
}
}
}
return h;
}

public static void main(String[] args) {
Vector docs = new Vector();
docs.add(doc1);
docs.add(doc2);
docs.add(doc3);

HashMap results = null;

try {
results = (HashMap) MapReduce.mapReduce(docs, "test.misc.WordCount", "test.misc.WordCount");
} catch (Exception e) {
e.printStackTrace();
}

System.out.println(results.toString());
}
}


Notice what we're NOT doing. We're not starting any threads or worrying about synchronization.

We are processing documents concurrently but all the threading is abstracted away to the MapReduce and MapperThread classes.

We can re-use these classes again and again to perform any number of tasks in parallel. We could use them to make concurrent http requests, for instance, and all the sudden we are doing concurrent DISTRIBUTED programming.

There's also an optional argument to mapReduce, maxThreads, that limits the number of concurrently active threads. You'll need to use this limit when working with large data sets.

Here are the three classes that handle the dirty work. You'll have to configure your own package and class paths of course. Enjoy!

MapReduce.java


package test.misc.util;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Vector;

public class MapReduce {

//
// If you get OutOfMemoryError, try setting maxThreads to something like 500 or 1000, or even lower if you're
// manipulating very large objects.
//
public static Object mapReduce(Vector datalist, String mapperClass, String reducerClass, int maxThreads) throws Exception {
Vector lists = new Vector();
splitList(datalist, maxThreads, lists);
HashMap map = new HashMap();
for (Iterator iter = lists.iterator(); iter.hasNext();) {
Vector list = (Vector) iter.next();
Vector threads = createThreads(list.size(), mapperClass);
initializeMap(map, threads);
startThreads(threads, list, map);
waitForThreads(threads);
}
Object results = runReducer(map, reducerClass);
return results;
}

public static Object mapReduce(Vector datalist, String mapperClass, String reducerClass) throws Exception {
return mapReduce(datalist, mapperClass, reducerClass, 1000000);
}

static void initializeMap(HashMap results, Vector threads) {
for (int i = 0; i < threads.size(); i++) {
results.put(threads.get(i), null);
}
}

static Vector createThreads(int numThreads, String mapperClass) throws Exception {
Vector threads = new Vector(numThreads);
for (int i = 0; i < numThreads; i++) {
try {
MapperThread thread = (MapperThread) Class.forName(mapperClass).newInstance();
threads.add(thread);
} catch (java.lang.OutOfMemoryError e) {
System.err.println("Error: thread " + i);
throw e;
}
}
return threads;
}

static Vector startThreads(Vector threads, Vector data, HashMap map) throws Exception {
for (int i = 0; i < data.size(); i++) {
try {
Object item = data.get(i);
MapperThread thread = (MapperThread) threads.get(i);
thread.setParams(item, map);
thread.start();
} catch (java.lang.OutOfMemoryError e) {
System.err.println("Error: thread " + i);
throw e;
}
}
return threads;
}

static void waitForThreads(Vector threads) {
for (Iterator i = threads.iterator(); i.hasNext();) {
Thread thread = (Thread) i.next();
try {
thread.join();
} catch (InterruptedException e) {}
}
}

static Object runReducer(HashMap map, String reducerClass) throws Exception {
Reducer reducer = (Reducer) Class.forName(reducerClass).newInstance();
return reducer.getReduction(map.values());
}

static Vector splitList(Vector in, int max, Vector acc) {
if (in.size() <= max) {
acc.add(in);
return acc;
} else {
acc.add(new Vector(in.subList(0, max)));
return splitList(new Vector(in.subList(max, in.size())), max, acc);
}
}
}

MapperThread.java


package test.misc.util;

import java.util.HashMap;

public class MapperThread extends Thread {
protected Object data;
protected HashMap mappings;

public void setParams(Object data, HashMap map) {
this.data = data;
this.mappings = map;
}

/**
* This is the method your subclass must override.
* @param data - This is taken from successive elements of the list
* you pass in to mapReduce.
* @return - Whatever is returned here will be added to the collection passed in to
* your Reducer.getReduction() method.
*/
public Object map(Object data) {
return data;
}

public void run() {
mappings.put(this, map(data));
}
}


Reducer.java


package test.misc.util;

import java.util.Collection;

public interface Reducer {

/**
* You'll override this in your own implementation.
* @param results - A collection of objects returned from your MapperThread subclass.
* @return - The return from this method is what is returned by mapReduce().
*/
public Object getReduction(Collection results);
}

Thursday, May 15, 2008

COMET: A Hack for the Broken Web

This post provides insight into some ingenious engineering at Facebook. Erlang is the perfect language to implement the server side of COMET's long-polling with all its simultaneous connections.

However, all of that and COMET itself, is only necessary because of the fact that the World Wide Web is unfinished! In its current state the Web is BROKEN!

Most of the clients out there are not network nodes. They are half-nodes. They can initiate connections out, but do not accept incoming connections (on the WWW port, 80). Unlike the good old telephone network where every phone can call every other phone, the Web is like a television network; you can request and receive their programming, but they are not interested in receiving your programming!

What does this mean for web applications? Well, a chat application is a perfect example. Each client wants real-time updates of the status of its buddies. But since a server can't really push data to a typical client, the client has to poll. Of course polling is so inefficient that brilliant engineers have had to come up with things like long-polling, COMET, hugely distributed web servers, and the like--all of which are still pretty inefficient.

Wouldn't it be nice if each client were a real node? It could PUT its status to each of its buddy nodes any time its status changed. It could POST new chat messages directly to the buddy node. The centralized server would have far fewer responsibilities, like maintaining your buddy list and the address of the node you're logged in at.

What would be necessary to get the Web finished? Among other things, IPV6 or some other addressing scheme to get all the nodes uniquely addressed; new firewall and security rules and protocols to keep the bad guys at bay, and renewed interest in using HTTP to its full potential (using REST of course).

Friday, May 9, 2008

Want to Be a Better/Faster/Smarter Programmer? Learn a New Language!

For years most of the projects I worked on were written in C. I thought I was a pretty good C programmer. Then I had to move fully into the Java world. I grumbled a little because I was no Java expert, but once I learned it I found I would much rather write Java code than C. But you know what else I discovered? When I did go back to writing or maintaining C code, I did it a little differently. I took a more object-oriented approach and wrote more re-usable libraries. (Like my vector.c module that emulated Java's Vector class.)

After a while I began to learn Ruby and that's when several ideas that should have already been ingrained really started to click. I realized that handling exceptions like this:

try {
some_method()
} catch (Exception e) {
write_to_logfile(e);
return false;
}

is almost always WRONG. You should be probably be throwing that error up the stack to be handled at a high level.

Ruby's awesome Test::Unit module woke me up to value of automated unit testing. Instead of doing unit testing as an afterthought, I began thinking about unit tests much earlier in the process. I've learned to use JUnit for Java and even written tests first and the class second a couple of times!

Coding Ruby has got me wanting to write small, easy to test classes made of small, easy to test methods, and modularize to a degree I haven't in the past.

As a result of better modularization and unit testing, my Java and Ruby classes are much more reliable from the moment of release and require far less debugging (and apologizing to users) post-release.

What next? Erlang! What a great languange. And what a great book Joe Armstrong has written in Programming Erlang: Software for a Concurrent World.

I'm not yet writing production code in Erlang, but now, whatever the language, I'm seeking more elegant solutions using recursion, thinking more about concurrency and shared data, doing actual top-down coding, and trying to think of solutions functionally.

Thursday, March 27, 2008

How Software Development is Like Finishing a Basement

Last year Carrie and I bought and moved into a new house. Our strategy was to move in then finish the unfinished basement ourselves, building up some sweat equity. Now we're about halfway finished and I see some obvious similarities between building a basement and building a software application.

We decided to be agile in our basement finish, rather than work from a predefined blueprint. Some decisions would be put off until necessary; like the colors of carpet and paint, the style of lighting fixtures, and even the placement of some doorways and size of closets. But some things have to be decided right at the start. What will the end product look like generally? (Modern.) What features are definitely required? (Bar, pool table, salon, bathroom, two bedrooms, toy room.) This is like deciding the overall look and feel of your app and its required features.

Once you've got a handle on the overall look and feature set, it's time for the strategy of how to make it happen. How will you design it? You can design the whole project, creating a detailed blueprint before starting work, or wait to design the bathroom until you are ready to start work on it. But keep dependencies in mind. If you cover the main heating duct with drywall before you run a duct to the bathroom, you'll be redoing your ceiling. Likewise, if you write some Javascript that runs on IE then find out you've got some Linux users, you'll be right back into that code.

We ended up doing the basement much like I write software. Start with a good idea of what features must be in the end product, but save the details for later. Know what has to be done first, middle, and last, but break each part into its details when you come to it.

Test each part well before moving on to the next. You want to find out that one of the electrical outlets didn't get a wire run to it BEFORE you put drywall on the walls. And you want to know that your sql stored procedure works correctly before you start calling it from several different web apps.

For me the greatest lesson learned from finishing a basement is that everything doesn't have to be perfect. A wall stud can be a half inch off and you can still make it work. A heating duct can be a few inches too long or short and be fine. Likewise, sometimes the quick and dirty code is good enough. No function will ever be elegant enough and fast enough and memory efficient enough for me to consider it perfect. Put down the level and tape measure, nail the board into place and move on.