[Java] asyncrones Multi-Threading mit Futures, ExecutorService und Callable

Häufig ist es problematisch, Rückgabewerte aus Threads zu bekommen. Es gibt daher in Java Objekte, die diese Funktionalität auf einfache Weise kapseln.

Zunächst muss man sich ein Worker-Objekt für die spätere Thread-Abarbeitung anlegen, welches das Callable-Interface implementiert:

/**
 * Worker-Object zum Sortieren von byte-Arrays
 */
public class Sorter implements Callable<byte[]>
{
    private final byte[] _bytearray;

    Sorter(byte[] b)
    {
        this._bytearray = b;
    }

    @Override
    /**
     * überschriebene worker call-Funktion mit Rückgabewert
     */
    public byte[] call()
    {
        // do something here: sorts byte-arrays
        Arrays.sort(_bytearray);
        return _bytearray;
    }
}

Die callable-Objekte werden dann einem ExecutorService übergeben und von diesem abgearbeitet. Es kann während der Abarbeitung asyncron auf die Fertigstellung mit der Funktion get() gewartet werden. Wenn die Threads fertig sind, wird das Ergebnis in der vereinbarten Form zurückgegeben:

public class FutureTest
{
    /**
     * @param args the command line arguments
     * @throws java.lang.InterruptedException
     * @throws java.util.concurrent.ExecutionException
     * @throws java.util.concurrent.TimeoutException
     */
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException
    {
        SorterTest();
    }

    /**
     * testet asynchrones sortieren
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException 
     */
    private static void SorterTest() throws InterruptedException, ExecutionException, TimeoutException
    {
        System.out.println("--> SorterTest\n");
        
        // byte Arrays verschiedener Länge
        byte[] b1 = new byte[40000000];
        byte[] b2 = new byte[4000000];
        byte[] b3 = new byte[400000];

        // mit Zufallsdaten füllen
        new Random().nextBytes(b1);
        new Random().nextBytes(b2);
        new Random().nextBytes(b3);

        // callables generieren
        Callable<byte[]> c1 = new Sorter(b1);
        Callable<byte[]> c2 = new Sorter(b2);
        Callable<byte[]> c3 = new Sorter(b3);
        
        // worker-Threads erzeugen
        int threads = Runtime.getRuntime().availableProcessors();
        ExecutorService executor = Executors.newFixedThreadPool(threads);

        // und diesem dem ExecutorService übergeben
        // die möglichen Ergebnise werden (später) in einem Future-Objekt gespeichert
        Future<byte[]> result1 = executor.submit(c1);
        Future<byte[]> result2 = executor.submit(c2);
        Future<byte[]> result3 = executor.submit(c3);
        
        // zyklische Abarbeitung
        while (true)
        {
            System.out.println("Working.");

            try
            {
                // Wenn alle drei Threads fertig sind, Applikation beenden
                if (result1.isDone() && result2.isDone() && result3.isDone())
                {
                    // am Ende nochmal die Ergebnisse anzeigen
                    byte[] bs1 = result1.get();
                    System.out.println("Done: Thread 1 result: " + String.valueOf(bs1.length));
                    byte[] bs2 = result2.get();
                    System.out.println("Done: Thread 2 result: " + String.valueOf(bs2.length));
                    byte[] bs3 = result3.get();
                    System.out.println("Done: Thread 3 result: " + String.valueOf(bs3.length));
                    
                    System.out.println("Ready.");

                    // nicht vergessen den ExecutorService zu beenden
                    executor.shutdown();
                   
                    break;
                }
                else
                {
                    // Prüfen, ob die Threads schon fertig sind
                    if (!result1.isDone())
                    {
                        // asynchron 50ms warten,
                        // wenn der Thread noch nicht fertig ist, wird einen TimeoutException geworfen
                        byte[] bs = result1.get(50, TimeUnit.MILLISECONDS);
                        // wenn der Thread fertig ist, hier eine kurze Ausgabe des Ergebnisses
                        System.out.println("Thread 1: " + String.valueOf(bs.length));
                    }
                    else
                    {
                        System.out.println("Thread 1: Done");
                    }

                    if (!result2.isDone())
                    {
                        byte[] bs = result2.get(50, TimeUnit.MILLISECONDS);
                        System.out.println("Thread 2: " + String.valueOf(bs.length));
                    }
                    else
                    {
                        System.out.println("Thread 2: Done\n");
                    }
                    
                    if (!result3.isDone())
                    {
                        byte[] bs = result3.get(50, TimeUnit.MILLISECONDS);
                        System.out.println("Thread 3: " + String.valueOf(bs.length));
                    }
                    else
                    {
                        System.out.println("Thread 3: Done");
                    }
                }
            }
            catch (TimeoutException e)
            {
                // die TimeoutException abfangen
                System.out.println("--> Timeout.");
            }
        } 
    }
}