Java 7 Tage – Fork / Join Framework

Das Fork/Join Frameworki ist eine der Neuerungen in der Java Welt dar. Was davon in Java 7 auf uns zukommt, und was hinter diesem Framework steckt gibts hier.

Nachdem hier die letzten Tage die Änderungen aus dem Project Coin von Java 7 vorgestellt wurden, möchte ich heute auf das Fork / Join Framework eingehen. Eine Neuerung die das Mutli Threading vereinfachen soll.

Genauer gesagt ist das fork/join Framework eine Implementierung des ExecutorService Interfaces, welches beim Abarbeiten von eines Tasks die Verteilung dessen auf mehrere Prozessoren erleichtern soll. Dabei wird ein aufwendiger Task in immer kleinere Teile aufgebrochen bis die Teile klein genug sind um sie zu verarbeiten. Diese Teile werden dann über einen Thread Pool abgearbeitet und schlussendlich wieder zusammengesetzt zu einem fertigen Ergebnis.

Der oben genannte Vorgang hier noch verpackt in Pseudocode

if (aufgabe klein genug)
  erledige aufgabe
else
  teile die aufgabe in 2 gleich große teile
  starte verarbeitungsthread der 2 aufgaben und vereine die ergebnisse

Eigentlich recht simpel, nun wollen wir aber noch auf den Code dazu blicken.

Einfaches Beispiel

Nehmen wir an wir haben folgende Datenklasse

public class user{
    String name
    Map salesPerYear
    //...
}

In unserem System sind über 50.000 Kunden erfasst. Nun wollen wir den maximalen Umsatz den uns ein Kunde im Jahr 2010 beschert hat ermitteln.

Serielle Implementierung

Wenn wir nun ganz simpel jeden Kunden nach einander durchiterieren und deren Umsatz vergleichen schafft man damit einen lang werkenden Task.

User max = userList.get(0);
for(User u: userList){
    if(max.getSalesForYear(2010) < u.getSalesForYear(2010)){
        max = u;
    }
}
return max;

Zwar wird der Task fertig, doch je nach Datenanzahl ist die Verarbeitungsgeschwindigkeit nicht gerade berauschend.

Fork/Join Implementierung

Die selbe Aufgabe mit dem Rekursiven Ansatz des Fork/Join Frameworks sieht zwar nach mehr Code aus, doch sollte wesentlich performanter verarbeitet werden.

public class MaxSaleTask extends RecursiveAction {
    private List<User> userList;
    public User max;

    private int sThreshold = 10000;

    public MaxSaleTask(List<User> userList){
        this.userList = userList;
        this.max = userList.get(0); //vereinfacht
    }

    protected void computeDirectly(){
        for(User u: userList){
            if(max.getSalesForYear(2010) < u.getSalesForYear(2010)){
                max = u;
            }
        }
    }

    protected void compute(){
        int listSize = userList.size();
        if(listSize < sThreshold){
            computeDirectly();
        } else {
            int split = listSize / 2;
            MaxSaleTask left = new MaxSaleTask(userList.subList(0,split));
            MaxSaleTask right = new MaxSaleTask(userList.subList(split,listSize));
            invokeAll(left,right);
            if(left.max.getSalesForYear(2010) > right.max.getSalesForYear(2010)){
                max = left.max;
            } else {
                max = right.max;
            }
        }
    }    
}

Damit ein Task im Fork/Join Framework verarbeitet werden kann muss er eine RecursiveAction erweitern. Diese gibt eine compute() Methode zum erweitern vor, in der der Split ausprogrammiert werden muss. Ebenfalls stellt sie eine invokeAll Methode zur Verfügung, die etwaige weitere Tasks startet.

Bei unserem Beispiel mit über 50.000 Kunden gehen wir mal von exakt 53.432 aus und rechnen uns die Taskanzahl aus

1. Task mit 53432 Kunden startet 2 neue Tasks

2.-3. Task mit je 26716 Kunden starten je 2 neue Tasks

4.-7. Task mit je 13358 Kunden starten wiederum je 2 neue Tasks

8.-15. Task mit je 6679 Kunden fallen nun unter die Schranke sThreshold und werden direkt bearbeitet.

Sie geben das Resultat zurück an die Tasks 4-7, diese vereinen das Ergebnis geben es an 2 und 3 zurück, diese wiederum an den 1. Task, der am Schluss das Resultat, den max User zur Verfügung stellt.

Die Wahl der Schranke ist hierbei für die Performance entscheidend. Wählt man die Schranke zu hoch arbeiten zwar weniger Tasks, diese dafür aber länger, ist die Schranke zu niedrig ergeben sich eine enorme Anzahl kleinen Tasks und der Verwaltungsoverhead wird zu groß und Fressen den Vorteil der paralellen Verarbeitung wieder auf. Eine direkte Empfehlung seitens der Entwickler gibt es verständlicher weise nicht, kann sich diese Schranke je nach Art der Aufgabe unterschiedlich auswirken. Zur Wahl der Schanke muss man somit seinen Code profilen. Dabei sollte man nicht zwingend zu penibel sein, es reicht aus eine zu kleine oder zu große Schranke zu vermeiden.

Um diesen Prozess nun überhaupt anstoßen zu können benötigt man einen ForkJoinPool, dem man den initialen Task übergibt und diesen dann anstößt.

    ForkJoinPool pool = new ForkJoinPool();
    MaxSaleTask mst = new MaxSaleTask(userList);
    pool.invoke(mst);
    return mst.max;

Will man im Gegensatz zu dem hier gezeigten Beispiel ein Result per Return Aufruf zurückliefern muss man statt der RecursiveAction einen RecurisveTask erweitern.

Happy Coding

Quellen:

Java Fork Paper http://gee.cs.oswego.edu/dl/papers/fj.pdf

Fork and Join: Java Can Excel at Painless Paralell Programming Too! http://www.oracle.com/technetwork/articles/java/fork-join-422606.html

Fork Join Tutorial http://download.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html

Ähnliche Artikel: