Java 7 Tage – Fork / Join Framework

Das Fork/Join Frameworki ist eine der Neuerungen in der Java Welt dar. Was davon in Java 7 auf uns zukommt, und was hinter diesem Framework steckt gibts hier.


Nachdem hier die letzten Tage die Änderungen aus dem Project Coin von Java 7 vorgestellt wurden, möchte ich heute auf das Fork / Join Framework eingehen. Eine Neuerung die das Mutli Threading vereinfachen soll.

Genauer gesagt ist das fork/join Framework eine Implementierung des ExecutorService Interfaces, welches beim Abarbeiten von eines Tasks die Verteilung dessen auf mehrere Prozessoren erleichtern soll. Dabei wird ein aufwendiger Task in immer kleinere Teile aufgebrochen bis die Teile klein genug sind um sie zu verarbeiten. Diese Teile werden dann über einen Thread Pool abgearbeitet und schlussendlich wieder zusammengesetzt zu einem fertigen Ergebnis.

Der oben genannte Vorgang hier noch verpackt in Pseudocode

if (aufgabe klein genug)
  erledige aufgabe
else
  teile die aufgabe in 2 gleich große teile
  starte verarbeitungsthread der 2 aufgaben und vereine die ergebnisse

Eigentlich recht simpel, nun wollen wir aber noch auf den Code dazu blicken.

Einfaches Beispiel

Nehmen wir an wir haben folgende Datenklasse

public class user{
    String name
    Map salesPerYear
    //...
}

In unserem System sind über 50.000 Kunden erfasst. Nun wollen wir den maximalen Umsatz den uns ein Kunde im Jahr 2010 beschert hat ermitteln.

Serielle Implementierung

Wenn wir nun ganz simpel jeden Kunden nach einander durchiterieren und deren Umsatz vergleichen schafft man damit einen lang werkenden Task.

User max = userList.get(0);
for(User u: userList){
    if(max.getSalesForYear(2010) < u.getSalesForYear(2010)){
        max = u;
    }
}
return max;

Zwar wird der Task fertig, doch je nach Datenanzahl ist die Verarbeitungsgeschwindigkeit nicht gerade berauschend.

Fork/Join Implementierung

Die selbe Aufgabe mit dem Rekursiven Ansatz des Fork/Join Frameworks sieht zwar nach mehr Code aus, doch sollte wesentlich performanter verarbeitet werden.

public class MaxSaleTask extends RecursiveAction {
    private List<User> userList;
    public User max;

    private int sThreshold = 10000;

    public MaxSaleTask(List<User> userList){
        this.userList = userList;
        this.max = userList.get(0); //vereinfacht
    }

    protected void computeDirectly(){
        for(User u: userList){
            if(max.getSalesForYear(2010) < u.getSalesForYear(2010)){
                max = u;
            }
        }
    }

    protected void compute(){
        int listSize = userList.size();
        if(listSize < sThreshold){
            computeDirectly();
        } else {
            int split = listSize / 2;
            MaxSaleTask left = new MaxSaleTask(userList.subList(0,split));
            MaxSaleTask right = new MaxSaleTask(userList.subList(split,listSize));
            invokeAll(left,right);
            if(left.max.getSalesForYear(2010) > right.max.getSalesForYear(2010)){
                max = left.max;
            } else {
                max = right.max;
            }
        }
    }    
}

Damit ein Task im Fork/Join Framework verarbeitet werden kann muss er eine RecursiveAction erweitern. Diese gibt eine compute() Methode zum erweitern vor, in der der Split ausprogrammiert werden muss. Ebenfalls stellt sie eine invokeAll Methode zur Verfügung, die etwaige weitere Tasks startet.

Bei unserem Beispiel mit über 50.000 Kunden gehen wir mal von exakt 53.432 aus und rechnen uns die Taskanzahl aus

1. Task mit 53432 Kunden startet 2 neue Tasks

2.-3. Task mit je 26716 Kunden starten je 2 neue Tasks

4.-7. Task mit je 13358 Kunden starten wiederum je 2 neue Tasks

8.-15. Task mit je 6679 Kunden fallen nun unter die Schranke sThreshold und werden direkt bearbeitet.

Sie geben das Resultat zurück an die Tasks 4-7, diese vereinen das Ergebnis geben es an 2 und 3 zurück, diese wiederum an den 1. Task, der am Schluss das Resultat, den max User zur Verfügung stellt.

Die Wahl der Schranke ist hierbei für die Performance entscheidend. Wählt man die Schranke zu hoch arbeiten zwar weniger Tasks, diese dafür aber länger, ist die Schranke zu niedrig ergeben sich eine enorme Anzahl kleinen Tasks und der Verwaltungsoverhead wird zu groß und Fressen den Vorteil der paralellen Verarbeitung wieder auf. Eine direkte Empfehlung seitens der Entwickler gibt es verständlicher weise nicht, kann sich diese Schranke je nach Art der Aufgabe unterschiedlich auswirken. Zur Wahl der Schanke muss man somit seinen Code profilen. Dabei sollte man nicht zwingend zu penibel sein, es reicht aus eine zu kleine oder zu große Schranke zu vermeiden.

Um diesen Prozess nun überhaupt anstoßen zu können benötigt man einen ForkJoinPool, dem man den initialen Task übergibt und diesen dann anstößt.

    ForkJoinPool pool = new ForkJoinPool();
    MaxSaleTask mst = new MaxSaleTask(userList);
    pool.invoke(mst);
    return mst.max;

Will man im Gegensatz zu dem hier gezeigten Beispiel ein Result per Return Aufruf zurückliefern muss man statt der RecursiveAction einen RecurisveTask erweitern.

Happy Coding

Quellen:

Java Fork Paper http://gee.cs.oswego.edu/dl/papers/fj.pdf

Fork and Join: Java Can Excel at Painless Paralell Programming Too! http://www.oracle.com/technetwork/articles/java/fork-join-422606.html

Fork Join Tutorial http://download.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html

Ähnliche Artikel:

Autor: Thomas Pummer

Thomas Pummer ist Softwareentwickler an der Universität Wien. In seiner Freizeit betreut er das Open Source Projekt TrayRSS und werkt an diversen anderen kleinen Projekten. Er ist sehr interessiert an Open Source und Webentwicklung und testet gern neue Programmiersprachen.

2 Gedanken zu „Java 7 Tage – Fork / Join Framework“

  1. Ich bezweifle ob rekursiv erzeugte sublist Instanzen (die ja nur wrapper instanzen sind und operationen an die parent list instanz relayen) so eine gute Idee sind. Bei einer Rekursionstiefe von 5 hat man für jede einzelne Operation schon 5 field Zugriffe fürs Relaying. Bei Tiefe 10 hat man 10 Zugriffe, usw.
    Um eine collection effizient auf mehrere Tasks zu splitten muss man sie möglichst „flach“ aufteilen, d.h. maximal eine sublist Instanz, die direkt auf einem kleinen Teil der „original“ list arbeitet.
    Noch besser wäre natürlich, wenn die collections selbst ranged~ versionen ihrer operationen anbieten würden, so dass ein task mit seinem bereich der collection direkt auf deren storage iterieren kann. Leider bieten die java.util collections das nicht. Wenn man professionelle collections haben will muss man sich die selber bauen, aber das ist ein anderes Thema.
    Jedenfalls würde ich nicht rekursiv sublisten erzeugen.

    Btw da sind auch Syntaxfehler in dem Code. Fehlende Klammern in Zeile 25, 26.
    Das get(0) wirft bei einer leeren Liste eine Exception, die List hat keine Typparameter, usw.
    Hast du den Code überhaupt mal ausgeführt? Sieht nicht so aus.

    Nix für ungut, bin nur beim (regelmäßigen) googeln nach Java 7 news immer wieder auf diesen Blog gestoßen :-).

  2. Hallo! Danke für dein Kommentar, und danke für dein wachsames Auge!

    Es stimmt leider, ich hatte das Beispiel vorab getestet, doch exakt den Code, wie er oben angeführt ist, habe ich nicht erneut ausgeführt. Ich gebe der Tatsache die Schuld, dass ich den Artikel nicht auf meinem Entwicklungsrechner geschrieben und ich ihn aus meiner Erinnerung rekonstruiert habe. Ein Fehler meinerseits. Ich hab den Code auf alle Fälle ergänzt.

    Danke auch für deinen Einwurf bezüglich der Liste. Das angeführte Beispiel sollte auch nur zur Veranschaulichung möglichst simpel gehalten bleiben.

    Schöne Grüße
    Thomas

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.

*