Häufig ist es problematisch, Rückgabewerte aus Threads zu bekommen. Es gibt daher in Java Objekte, die diese Funktionalität auf einfache Weise kapseln.
Zunächst muss man sich ein Worker-Objekt für die spätere Thread-Abarbeitung anlegen, welches das Callable-Interface implementiert:
/**
* Worker-Object zum Sortieren von byte-Arrays
*/
public class Sorter implements Callable<byte[]>
{
private final byte[] _bytearray;
Sorter(byte[] b)
{
this._bytearray = b;
}
@Override
/**
* überschriebene worker call-Funktion mit Rückgabewert
*/
public byte[] call()
{
// do something here: sorts byte-arrays
Arrays.sort(_bytearray);
return _bytearray;
}
}
Die callable-Objekte werden dann einem ExecutorService übergeben und von diesem abgearbeitet. Es kann während der Abarbeitung asyncron auf die Fertigstellung mit der Funktion get() gewartet werden. Wenn die Threads fertig sind, wird das Ergebnis in der vereinbarten Form zurückgegeben:
public class FutureTest
{
/**
* @param args the command line arguments
* @throws java.lang.InterruptedException
* @throws java.util.concurrent.ExecutionException
* @throws java.util.concurrent.TimeoutException
*/
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException
{
SorterTest();
}
/**
* testet asynchrones sortieren
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
private static void SorterTest() throws InterruptedException, ExecutionException, TimeoutException
{
System.out.println("--> SorterTest\n");
// byte Arrays verschiedener Länge
byte[] b1 = new byte[40000000];
byte[] b2 = new byte[4000000];
byte[] b3 = new byte[400000];
// mit Zufallsdaten füllen
new Random().nextBytes(b1);
new Random().nextBytes(b2);
new Random().nextBytes(b3);
// callables generieren
Callable<byte[]> c1 = new Sorter(b1);
Callable<byte[]> c2 = new Sorter(b2);
Callable<byte[]> c3 = new Sorter(b3);
// worker-Threads erzeugen
int threads = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(threads);
// und diesem dem ExecutorService übergeben
// die möglichen Ergebnise werden (später) in einem Future-Objekt gespeichert
Future<byte[]> result1 = executor.submit(c1);
Future<byte[]> result2 = executor.submit(c2);
Future<byte[]> result3 = executor.submit(c3);
// zyklische Abarbeitung
while (true)
{
System.out.println("Working.");
try
{
// Wenn alle drei Threads fertig sind, Applikation beenden
if (result1.isDone() && result2.isDone() && result3.isDone())
{
// am Ende nochmal die Ergebnisse anzeigen
byte[] bs1 = result1.get();
System.out.println("Done: Thread 1 result: " + String.valueOf(bs1.length));
byte[] bs2 = result2.get();
System.out.println("Done: Thread 2 result: " + String.valueOf(bs2.length));
byte[] bs3 = result3.get();
System.out.println("Done: Thread 3 result: " + String.valueOf(bs3.length));
System.out.println("Ready.");
// nicht vergessen den ExecutorService zu beenden
executor.shutdown();
break;
}
else
{
// Prüfen, ob die Threads schon fertig sind
if (!result1.isDone())
{
// asynchron 50ms warten,
// wenn der Thread noch nicht fertig ist, wird einen TimeoutException geworfen
byte[] bs = result1.get(50, TimeUnit.MILLISECONDS);
// wenn der Thread fertig ist, hier eine kurze Ausgabe des Ergebnisses
System.out.println("Thread 1: " + String.valueOf(bs.length));
}
else
{
System.out.println("Thread 1: Done");
}
if (!result2.isDone())
{
byte[] bs = result2.get(50, TimeUnit.MILLISECONDS);
System.out.println("Thread 2: " + String.valueOf(bs.length));
}
else
{
System.out.println("Thread 2: Done\n");
}
if (!result3.isDone())
{
byte[] bs = result3.get(50, TimeUnit.MILLISECONDS);
System.out.println("Thread 3: " + String.valueOf(bs.length));
}
else
{
System.out.println("Thread 3: Done");
}
}
}
catch (TimeoutException e)
{
// die TimeoutException abfangen
System.out.println("--> Timeout.");
}
}
}
}