Concurrent интерфейсы Callable, Future
При работе многопоточного приложения часто необходимо получение от потока результата его деятельности в виде некоторого объекта. Эту задачу можно решить с использованием интерфейсов Callable и Future. Совместное использование двух реализаций данных интерфейсов позволяет получить результат в виде некоторого объекта.
Интерфейс Callable
Интерфейс Callable очень похож на интерфейс Runnable. Объекты, реализующие данные интерфейсы, исполняются другим потоком. Однако, в отличие от Runnable, интерфейс Callable использует Generic’и для определения типа возвращаемого объекта. Runnable содержит метод run(), описывающий действие потока во время выполнения, а Callable – метод call().
С документацией интерфейса Callable можно познакомиться здесь.
Интерфейс Future
Интерфейс Future также, как и интерфейс Callable, использует Generic’и. Методы интерфейса можно использовать для проверки завершения работы потока, ожидания завершения и получения результата. Результат выполнения может быть получен методом get, если поток завершил работу. Прервать выполнения задачи можно методом cancel. Дополнительные методы позволяют определить завершение задачи : нормальное или прерванное. Если задача завершена, то прервать ее уже невозможно.
Методы интерфейса Future
Метод | Описание |
---|---|
cancel (boolean mayInterruptIfRunning) | попытка завершения задачи |
V get() | ожидание (при необходимости) завершения задачи, после чего можно будет получить результат |
V get(long timeout, TimeUnit unit) | ожидание (при необходимости) завершения задачи в течение определенного времени, после чего можно будет получить результат |
isCancelled() | вернет true, если выполнение задачи будет прервано прежде завершения |
isDone() | вернет true, если задача завершена |
С документацией интерфейса Future можно познакомиться здесь.
Пример использования интерфейсов Callable, Future
Рассмотрим простейший пример использования интерфейсов Callable и Future. Основная идея данного примера – показать, как можно, используя Future, узнать статус Callable потока и получить возвращенный объект. В примере используется объект executor типа ExecutorService, формирующий пул из трех потоков. Метод submit с параметром Callable возвращает объект Future для каждого из стартуемого потоков.
import java.util.Date; import java.util.List; import java.util.ArrayList; import java.util.concurrent.Future; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutionException; import java.text.SimpleDateFormat; public class CallableExample < public CallableExample() < // Определяем пул из трех потоков ExecutorService executor; executor = Executors.newFixedThreadPool(3); // Список ассоциированных с Callable задач Future List> futures; futures = new ArrayList>(); // Создание экземпляра Callable класса Callable callable = new CallableClass(); for (int i = 0; i < 3; i++)< /* * Стартуем возвращаюший результат исполнения * в виде объекта Future поток */ Futurefuture; future = executor.submit(callable); /* * Добавляем объект Future в список для * отображения результат выполнения (получение * наименования потока) */ futures.add(future); > SimpleDateFormat sdf; sdf = new SimpleDateFormat("HH:mm:ss "); for (Future future : futures) < try < // Выводим в консоль полученное значение String text = sdf.format(new Date()) + future.get(); System.out.println(text); >catch (InterruptedException | ExecutionException e) <> > // Останавливаем пул потоков executor.shutdown(); > //----------------------------------------------------- // Класс, реализующий интерфейс Callable class CallableClass implements Callable < @Override public String call() throws Exception < Thread.sleep(1000); // наименование потока, выполняющего // callable задачу return Thread.currentThread().getName(); >> //----------------------------------------------------- public static void main(String args[]) < new CallableExample(); >>
Класс CallableClass, реализующий интерфейс Callable, использует объект String в качестве generic’a. Соответственно и каждый объект Future также должен использовать тип объекта String.
Результат выполнения
Перед остановкой пула потоков в консоль выводятся наименования потока. Т.е. в примере демонстрируется возможность не прямого обращения к методу call класса, реализующего интерфейс Callable, а косвенно через объект Future, полученного при старте потока.
17:41:16 pool-1-thread-1 17:41:19 pool-1-thread-2 17:41:19 pool-1-thread-3
Описание и пример FutureTask
Одной из «неприятных» задач многопоточного приложения является определение состояния потока (работает или нет, реагирует на внешние воздействия или нет) и останов потока. Если рассматривать обычный поток Thread, то с ним могут возникнуть такие проблемы. Ведь в Java (JDK 1.5 и выше) нет механизма останова потока. Как известно метод Thread.stop() объявлен как Deprecated поскольку «не потокобезопасен», а безопасная инструкция Thread.interrupt() только сообщает потоку о необходимости остановки. Но если данное сообщение проигнорировано, т.е. разработчик не вставил обработку, то и поток не остановится.
И вот здесь на помощь разработчику приходит пакет Concurrent со своими интерфейсами java.util.concurrent.Callable, java.util.concurrent.Future. Данные интерфейсы (подробно представленные на сайте здесь ), а также их различные реализации позволяют решать задачи «внешнего» управления потоками, т.е. не в методе выполнения потока run() или call(). В данной статье будет рассмотрен класс FutureTask и приведен пример контроля состояния потоков и останова одного потока из другого.
Класс FutureTask
Класс-оболочка FutureTask базируется на конкретной реализации интерфейса Future. Чтобы создать реализацию данного класса необходим объект Callable; после этого можно использовать Java Thread Pool Executor для асинхронной обработки. Таким образом, FutureTask представляет удобный механизм для превращения Callable одновременно в Future и Runnable, реализуя оба интерфейса. Объект класса FutureTask может быть передан на выполнение классу, реализующему интерфейс Executor, либо запущен в отдельном потоке, как класс, реализующий интерфейс Runnable.
Если посмотреть в исходники класса FutureTask, то можно увидеть, что он реализует интерфейс RunnableFuture, который, в свою очередь, наследует свойства интерфейсов Runnable и Future.
Конструкторы класса FutureTask
Класс FutureTask содержит следующие два конструктора :
/* * Конструктор создания FutureTask, который * после старта выполнит callable */ FutureTask(Callable callable) /* * Конструктор создания FutureTask, который после старта * выполнит runnable и при успешном завершении вернет * объект result */ FutureTask(Runnable runnable, V result)
Методы класса FutureTask
Метод | Описание |
---|---|
V get() | получение результата выполнения потока; вызов метода блокирует дальнейшее выполнения до окончания вычислений |
V get(long timeout, TimeUnit unit) | получение результата до окончания вычислений или до истечения указанного интервала времени; если в течение указанного времени вычисления не завершились, то вызывается исключение TimeoutException |
boolean cancel(boolean mayInterrupt) | отмена выполнения задачи; если задача уже стартована и параметр mayInterrupt равен true, то она прерывается, в противном случае, если вычисления еще не начаты, то они и не начнутся. При успешной отмене выполнения задачи метод возвращает значение true |
boolean isCancelled() | метод возвращает true, если задача была отменена до ее нормального завершения |
boolean isDone() | метод возвращает true, если выполнение задачи завершено, прервано или если в процессе ее выполнения возникло исключение |
Пример использования FutureTask
Рассмотрим простой пример использования FutureTask, в котором создается пул из 3-х потоков, реализующих интерфейс Callable в виде класса CallableDelay. Основная идея примера связана с проверками выполняемых потоков и отмены выполнения задачи одного из потоков.
Конструктор класса CallableDelay в качестве параметров получает временно́й размер задержки delay и идентификатор потока. В зависимости от значения идентификатора потока в методе call() выполняется соответствующее количество циклов с заданной задержкой, после чего поток завершает работу. Второй поток на первом цикле прерывает выполнение 3-го потока вызовом метода cancel. Метод call потока возвращает текстовый объект String в виде наименования потока.
Метод areTasksDone() проверяет завершение выполнения всех задач/потоков вызовом методов isDone() объектов futureTask. Если выполнение всех потоков завершены, то сервис executor останавливает свою работу методом shutdown().
В конструкторе примера создаются два массива из объектов типа CallableDelay и FutureTask. После этого формируется пул для трех потоков сервиса типа ExecutorService и потоки стартуют методом execute сервиса executor. В цикле выполняются различного рода проверки : завершения работы потока методом isDone(), ожидания завершения потока методом get() с временны́ми параметрами и отмены выполнения потока методом isCancelled().
import java.util.concurrent.TimeUnit; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.concurrent.ExecutionException; public class FutureTaskExample < CallableDelay[] callable = null; FutureTask[] futureTask = null; ExecutorService executor = null; private final int THREAD_COUNT = 3; //----------------------------------------------------- class CallableDelay implements Callable < private long delay; private int idx ; private int cycle; public CallableDelay(int delay, int idx) < this.delay = delay; this.idx = idx; this.cycle = idx; >@Override public String call() throws Exception < while (cycle >0) < Thread.sleep(delay); cycle--; if ((idx == 2) && (cycle >0)) futureTask[futureTask.length - 1] .cancel(true); > /* * Идентификатор и наименование потока, * выполняющего данную callable задачу */ return ""+ idx + ". " + Thread.currentThread().getName(); > > //----------------------------------------------------- private boolean areTasksDone() < boolean isDone = true; for (int i = 0; i < THREAD_COUNT; i++) < if (!futureTask[i].isDone()) < isDone = false; break; >> return isDone; > //----------------------------------------------------- FutureTaskExample () < callable = new CallableDelay[THREAD_COUNT]; futureTask = new FutureTask [THREAD_COUNT]; // Сервис исполнения executor=Executors.newFixedThreadPool(THREAD_COUNT); for (int i = 0; i < THREAD_COUNT; i++) < callable [i] = new CallableDelay(1000, (i + 1)); futureTask[i] = new FutureTask( callable[i]); executor.execute(futureTask[i]); > // Цикл работы executor'а while (true) < try < if (areTasksDone()) < // Завершение работы executor'а executor.shutdown(); System.out.println("\nexecutor shutdown"); return; >// Проверка завершения выполнения задачи 1-м // потоком if (!futureTask[0].isDone()) System.out.println( "1-ый поток завершен : " + futureTask[0].get()); System.out.println( "Ожидание завершения 2-го потока"); String txt = futureTask[1].get(200L, TimeUnit.MILLISECONDS); if(txt != null) System.out.println( "2-ой поток завершен : " + txt); System.out.println( "Проверка завершения 3-го потока"); if (futureTask[2].isCancelled()) System.out.println( "3-ой поток был прерван . "); else if (!futureTask[2].isDone()) < txt = futureTask[2].get(); System.out.println( "3-ий поток завершен : "+txt); >Thread.sleep(200); > catch (InterruptedException | ExecutionException e) < System.err.println(e.getMessage()); >catch(TimeoutException e) < /* * 2-ой поток вызывает TimeoutException, * если задача не завершена за указанное * время */ System.err.println("TimeoutException"); >> > //----------------------------------------------------- public static void main(String[] args) < new FutureTaskExample(); >>
Результат работы примера
При выполнении задачи информационные сообщения, представленные ниже, выводятся в консоль. Согласно последовательности вывода сообщений можно сказать, что при вызове метода isDone() объекта FutureTask 1-го потока программа перешла в режим ожидания завершения работы потока.
После завершения выполнения 1-го потока программа перешла к проверке 2-го потока методом get() с временно́й задержкой.Так как за предоставленное время поток не смог завершить работу, то был вызван TimeoutException и цикл проверки повторился снова.
Только после завершения работы 2-го потока программа перешла к проверке отмены/завершения 3-го потока. Метод isCancelled() подтвердил, что выполнение потока было прервано. Только после этого метод areTasksDone() подтвердил, что все потоки завершили выполнение и цикл проверок был прерван, сервис executor завершил выполнение методом shutdown().
1-ый поток завершен : 1. pool-1-thread-1 Ожидание завершения 2-го потока TimeoutException Ожидание завершения 2-го потока TimeoutException Ожидание завершения 2-го потока TimeoutException Ожидание завершения 2-го потока TimeoutException Ожидание завершения 2-го потока 2-ой поток завершен : 2. pool-1-thread-2 Проверка завершения 3-го потока 3-ий поток был прерван . executor shutdown
Именование потоков
Стандартная схема именования потока соответствует формату pool-N-thread-M, где N обозначает последовательный номер пула, а M – порядковый номер потока в пуле. Так наименование pool-1-thread-2 означает второй поток в первом пуле жизненного цикла JVM. Каждый раз, когда создается новый пул, глобальный счетчик N инкрементится.
Интерфейсы Callable и Future в Java
Интерфейс Java Callable(java.util.concurrent.Callable) представляет асинхронную задачу, которая может быть выполнена отдельным потоком. Например, можно передать объект Callable в Java ExecutorService, который затем выполнит его асинхронно. Метод call() вызывается для выполнения асинхронной задачи.
Интерфейс Callable довольно прост. Он содержит единственный метод с именем call().
public interface Callable
Если задача выполняется асинхронно, результат обычно передается обратно через Java Future. Это тот случай, когда Callable передается в ExecutorService для одновременного выполнения.
Callable использует Generic для определения типа возвращаемого объекта. Класс Executors предоставляет полезные методы для выполнения Java Callable в пуле потоков. Поскольку вызываемые задачи выполняются параллельно, нам нужно дождаться возвращенного объекта.
Callable задачи возвращают объект java.util.concurrent.Future. Используя объект Java Future, мы можем узнать состояние задачи Callable и получить возвращенный объект. Он предоставляет метод get(), который может ожидать завершения Callable и затем возвращать результат.
Future предоставляет метод cancel() для отмены связанной задачи Callable. Существует версия метода get(), в которой мы можем указать время ожидания результата, поэтому полезно избегать блокировки текущего потока на более длительное время.
Существуют методы isDone() и isCancelled() для определения текущего состояния связанной вызываемой задачи.
Вот простой пример задачи с Callable, которая возвращает имя потока, выполняющего задачу через одну секунду. Мы используем платформу Executor для параллельного выполнения 100 задач и используем Java Future для получения результата представленных задач.
package com.journaldev.threads; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class MyCallable implements Callable < @Override public String call() throws Exception < Thread.sleep(1000); //вернуть имя потока, выполняющего вызываемую задачу return Thread.currentThread().getName(); >public static void main(String args[]) < //Получить ExecutorService из служебного класса Executors //размер пула потоков равен 10 ExecutorService executor = Executors.newFixedThreadPool(10); //создать список для хранения объекта Future, связанного с Callable Listlist = new ArrayList(); //Create MyCallable instance Callable callable = new MyCallable(); for(int i=0; i < 100; i++)< Future future = executor.submit(callable); //добавив Future в список, мы можем получить возвращаемое значение list.add(future); >for(Future fut : list) < try < // выводим возвращаемое значение Future, замечаем задержку вывода в консоли // потому что Future.get() ожидает завершения задачи System.out.println(new Date()+ "::"+fut.get()); >catch (InterruptedException | ExecutionException e) < e.printStackTrace(); >> //закрыть службу executor.shutdown(); > >
После того, как мы выполним вышеуказанную программу, вы заметите задержку вывода, потому что метод get() ожидает завершения задачи, вызываемой Java. Также обратите внимание, что есть только 10 потоков, выполняющих эти задачи.
Вот фрагмент вывода вышеуказанной программы.
Mon Dec 31 20:40:15 PST 2012::pool-1-thread-1
Mon Dec 31 20:40:16 PST 2012::pool-1-thread-2
Mon Dec 31 20:40:16 PST 2012::pool-1-thread-3
Mon Dec 31 20:40:16 PST 2012::pool-1-thread-4
Mon Dec 31 20:40:16 PST 2012::pool-1-thread-5
Mon Dec 31 20:40:16 PST 2012::pool-1-thread-6
Mon Dec 31 20:40:16 PST 2012::pool-1-thread-7
Mon Dec 31 20:40:16 PST 2012::pool-1-thread-8
Mon Dec 31 20:40:16 PST 2012::pool-1-thread-9
Mon Dec 31 20:40:16 PST 2012::pool-1-thread-10
Mon Dec 31 20:40:16 PST 2012::pool-1-thread-2
.
Что делать, если мы хотим переопределить некоторые методы, например, переопределить метод get() для тайм-аута через некоторое время по умолчанию, а не ждать бесконечно?
В этом случае пригодится класс Java FutureTask, который является базовой реализацией Future.
Средняя оценка 4.3 / 5. Количество голосов: 13
Пример вызываемого будущего Java
Java Callable и Future часто используются в многопоточном программировании. В последних нескольких сообщениях мы многое узнали о потоках Java, но иногда нам хочется, чтобы поток мог возвращать какое-то значение, которое мы можем использовать. Java 5 представила интерфейс java.util.concurrent.Callable в пакете параллелизма, который похож на интерфейс Runnable, но может возвращать любой объект и может вызывать исключение.
Java вызываемый
Будущее Java
Задачи Java Callable возвращают объект java.util.concurrent.Future. Используя объект Java Future, мы можем узнать статус задачи Callable и получить возвращенный объект. Он предоставляет метод get(), который может дождаться завершения Callable, а затем вернуть результат. Java Future предоставляет метод cancel() для отмены связанной вызываемой задачи. Существует перегруженная версия метода get(), в которой мы можем указать время ожидания результата, это полезно, чтобы избежать блокировки текущего потока на более длительное время. Существуют методы isDone() и isCancelled() для определения текущего состояния связанной вызываемой задачи. Вот простой пример задачи Java Callable, которая возвращает имя потока, выполняющего задачу, через одну секунду. Мы используем платформу Executor для параллельного выполнения 100 задач и используем Java Future для получения результата отправленных задач.
package com.journaldev.threads; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class MyCallable implements Callable < @Override public String call() throws Exception < Thread.sleep(1000); //return the thread name executing this callable task return Thread.currentThread().getName(); >public static void main(String args[])< //Get ExecutorService from Executors utility class, thread pool size is 10 ExecutorService executor = Executors.newFixedThreadPool(10); //create a list to hold the Future object associated with Callable List list = new ArrayList(); //Create MyCallable instance Callable callable = new MyCallable(); for(int i=0; i < 100; i++)< //submit Callable tasks to be executed by thread pool Futurefuture = executor.submit(callable); //add Future to the list, we can get return value using Future list.add(future); > for(Future fut : list) < try < //print the return value of Future, notice the output delay in console // because Future.get() waits for task to get completed System.out.println(new Date()+ "::"+fut.get()); >catch (InterruptedException | ExecutionException e) < e.printStackTrace(); >> //shut down the executor service now executor.shutdown(); > >
Как только мы выполним приведенную выше программу, вы заметите задержку вывода, потому что метод java Future get() ожидает завершения вызываемой задачи java. Также обратите внимание, что эти задачи выполняются только 10 потоками. Вот фрагмент вывода вышеуказанной программы.
Mon Dec 31 20:40:15 PST 2012::pool-1-thread-1 Mon Dec 31 20:40:16 PST 2012::pool-1-thread-2 Mon Dec 31 20:40:16 PST 2012::pool-1-thread-3 Mon Dec 31 20:40:16 PST 2012::pool-1-thread-4 Mon Dec 31 20:40:16 PST 2012::pool-1-thread-5 Mon Dec 31 20:40:16 PST 2012::pool-1-thread-6 Mon Dec 31 20:40:16 PST 2012::pool-1-thread-7 Mon Dec 31 20:40:16 PST 2012::pool-1-thread-8 Mon Dec 31 20:40:16 PST 2012::pool-1-thread-9 Mon Dec 31 20:40:16 PST 2012::pool-1-thread-10 Mon Dec 31 20:40:16 PST 2012::pool-1-thread-2 .
Совет: что, если мы хотим переопределить некоторые методы интерфейса Java Future, например переопределить метод get() для тайм-аута после некоторого времени по умолчанию, а не ждать бесконечно, в этом случае класс Java FutureTask пригодится. это базовая реализация интерфейса Future. Ознакомьтесь с примером Java FutureTask, чтобы узнать больше об этом классе.