Реактивное программирование на Java: как, зачем и стоит ли? Часть I
Идея реактивного программирования появилась сравнительно недавно, лет 10 назад. Что вызвало популярность этого относительно нового подхода и почему сейчас он в тренде, рассказал на конференции РИТ++ 2020 эксперт и тренер Luxoft Training Владимир Сонькин.
В режиме мастер-класса он продемонстрировал, почему так важен неблокирующий ввод-вывод, в чем минусы классической многопоточности, в каких ситуациях нужна реактивность, и что она может дать. А еще описал недостатки реактивного подхода.
В этой статье мы поговорим о том, что такое реактивное программирование, и зачем оно нужно, обсудим подходы и посмотрим примеры.
Почему реактивное программирование получило такую популярность? В какой-то момент перестала расти скорость процессоров, а значит разработчикам уже не приходится рассчитывать на то, что скорость их программ станет увеличиваться сама по себе: теперь их нужно распараллеливать.
На рисунке видно, что график частоты процессоров рос в 90-х, а в начале 2000-х частота резко увеличилась. Оказалось, что это был потолок.
Почему же рост частоты остановился?
Транзисторы начали делать максимально малого размера. PN-переход, который используется в них, получился настолько тоненьким, насколько это вообще возможно. На графике размеров транзисторов внутри процессора мы видим: размер все меньше и меньше, транзисторов в процессоре все больше и больше.
Такая миниатюризация раньше приводила к тому, что росла частота. Поскольку электроны бегут со скоростью света, за счет уменьшения размера, время, которое требовалось электрону, чтобы пробежать весь путь внутри процессора, уменьшалось. Но техпроцессы уткнулись в физический потолок. Пришлось придумывать что-то другое.
Многопоточность
И мы уже знаем, что удалось придумать: начали делать многоядерные процессоры. Вместо того, чтобы опираться на то, что производительность процессора будет расти, стали рассчитывать увеличение их количества. Но для эффективного использования множества процессоров нужна многопоточность.
Тема многопоточности — сложная, но неизбежная в современном мире. Типичный современный компьютер имеет от 4 ядер и множество потоков. В современном мощном сервере может быть и 100 ядер. Если в вашей программе не используется многопоточность, вы не получаете никаких преимуществ. Поэтому все мировые индустрии постепенно двигаются к тому, чтобы задействовать эти возможности.
При этом нас подстерегает множество опасностей. Программировать с учетом многопоточности сложно: синхронизации, гонки, затрудненная отладка и т.д. попортили немало крови разработчикам. К тому же, стоимость такой разработки становится выше.
В Java многопоточность появилась давным-давно, она существует с самой первой версии.
Выглядит она так:
Писать большую систему, используя примитивы многопоточности, мягко говоря, сложно. Сейчас так уже никто не делает. Это все равно, что кодить на Ассемблере.
Во многих случаях эффект, который приносит многопоточность, не улучшает производительность, а ухудшает ее.
Что же с этим делать?
Параллельное программирование во многих ситуациях можно заменить асинхронностью. Посмотрите на иллюстрацию. На левой картинке малыш очень хочет помогать маме в домашних делах. Мама достает из стиральной машинки белье, дает ребенку, и он его укладывает в корзину. Так работает программа на 2 потока: поток-мама и поток-малыш. Теоретически производительность в этом случае должна возрастать: два человека лучше, чем один, ведь мы задействовали два ядра. Но представьте себе такую ситуацию в реальной жизни: мама подает ребенку белье и ждет, пока он его уложит в машинку. Или ребенок ждет белье от мамы. На деле они постоянно мешают друг другу. Плюс, нужно отвести время на передачу белья. Мама быстрее разобралась бы с бельем сама.
Примерно такая же ситуация происходит в компьютере. Поэтому работа с параллельностью совсем не так проста, как кажется. Все синхронизации между потоками выполнения на самом деле занимают кучу времени.
На картинке справа одинокий парень, который купил себе автоматическую машинку. Она стирает, а он в это время может почитать книжку. Здесь однозначно есть преимущество для юноши, потому что он занимается своим делом и при этом не следит за тем, завершилась ли стирка. Когда стирка завершится, он услышит звуковой сигнал и отреагирует на него. То есть параллельность есть, а синхронизации нет. Стало быть нет и траты времени на синхронизацию, сплошная выгода!
Это и есть подход с асинхронностью. Есть отдельный исполнитель, и мы дали ему не часть нашей задачи, а свою собственную. На левой картинке мама и мальчик делают общую задачу, а на правой стиральная машина и парень делают разные, каждый свою. В какой-то момент они соединятся: когда стиральная машина достирает, юноша отложит свою книгу. Но все 1,5 часа, пока белье стиралось, он прекрасно себя чувствовал, читал и ни о чем не думал.
Примеры параллельного и асинхронного подходов
Рассмотрим 2 варианта выполнения потоков: параллельный и асинхронный.
- Потоки выполняются параллельно;
Потокам thread 1 и 2 нужно обращаться к одному и тому же общему разделяемому ресурсу. Допустим, это какая-то база данных, и она не позволяет потокам подключаться к ней одновременно. Или позволяет, но это сразу снижает скорость ее работы, поэтому потокам лучше обращаться к ней по очереди. Никакой параллельности здесь нет: потокам приходится работать по очереди. А третий поток ждет ответа от базы данных, и тоже заблокирован — такая система малоэффективна.
Вроде бы параллельность есть, а преимуществ от нее не так много.
- Потоки выполняются асинхронно.
Если использовать асинхронность, мы ставим задачу, и она выполняется где-то в другом потоке. Например, другим ядром процессора или другим процессором. Мы поставили задачу и занимаемся другими делами, а потом в какой-то момент, когда эта задача завершится, получим результаты. Это можно проиллюстрировать работой организации. Начальник — поток main — ставит задачу Пете, и говорит: «Как только ты ее выполнишь, передай Коле, а тот после завершения работы над задачей пусть доложит мне. В результате Петя и Коля заняты работой, а начальник может ставить новые задачи другим сотрудникам».
Еще один пример: конкуренция и параллелизм.
Представим себе офис, утро, всем хочется выпить кофе. Concurrency (конкуренция) — это когда выстраивается очередь к одной на всех кофемашине. Люди конкурируют: «Эй, я тут первый стоял!» — «Нет, я!». Они друг другу мешают.
В параллелизме есть две кофемашины и две очереди: каждый стоит в своей. Но все равно сотрудники тратят время на то, чтобы постоять там.
Как найти правильное решение для этого сценария, если использовать асинхронность?
Доставка кофе прямо к столу — хороший вариант, в очереди вы не стоите, но придется нанимать официанта, который будет разносить напитки.
Другой возможный вариант — фиксированный график. Например, один сотрудник подходит за кофе в 11:10, следующий — в 11:20 и т.д. Но это не асинхронность. Будут происходить простои, а значит это не полная загрузка кофемашины. Кто-то не успел к своему времени, а кому-то не хватило 10 минут, чтобы сделать себе кофе, и в итоге весь график сдвигается. А если сделать
большие зазоры, кофемашина будет недогружена. И потом, все хотят прийти в 10 утра и выпить кофе, а это растягивается на 2 часа, и кому-то его чашка достанется только в 12.
Еще один вариант — записывать всех желающих в «виртуальную очередь». Когда кофемашина освободится от предыдущих любителей кофе, человек получает уведомление и подходит к кофемашине без очереди. Сейчас во многих организациях так делают. Например, в интернет-магазинах с самовывозом. Берешь талончик и занимаешься своими делами, а когда приходит время, подходишь и получаешь товар. Вот это и есть асинхронность: никто никого не ждет, все работают и получают свой кофе настолько быстро, насколько возможно. И кофемашина тоже не простаивает.
С асинхронностью разобрались. Но есть еще одна важная проблема: блокирующий ввод-вывод.
Блокирующий ввод-вывод
Традиционный ввод-вывод — блокирующий. А что же такое блокирующий ввод-вывод?
Допустим, вы читаете файл или базу данных. Вызывается метод, который это делает, и он блокирует поток: мы больше ничего не делаем, мы ждем. Например, вы вызвали readFile() и ждете, когда это наконец произойдет. Поток блокируется и не прогрессирует: он находится в ожидании. Но на самом деле процессор не занят.
В этом примере заблокированы потоки:
- На чтение файла (blocked on reading file);
- На чтение из базы данных (blocked on reading from DB);
- На сложных вычислениях (blocked on heavy calculations);
- На ответе от клиента (blocked on responding the client).
Эта ситуация сродни той, когда вы пришли в супермаркет, там есть четыре кассы, но система обслуживания касс тормозит. Кассирши сидят, ничего не делают и просто ждут, пока на кассе сработает нажатие на кнопку.
Что делать, если все потоки заблокированы? Как подобные проблемы решаются в супермаркете?
Synchronous I/O
Вариант обычный: синхронный ввод-вывод. Хорошего мало, в этом варианте образуются очереди к кассам.
Что сделать, чтобы возле касс не собирались огромные очереди? Например, можно открыть больше касс, или создать больше потоков.
Больше потоков — больше касс. Это рабочий вариант. Но нагрузка получается неравномерной.
Мы открыли много касс (создали много потоков), и получается, что кто-то простаивает. На самом деле, это не просто простой: когда у нас много потоков, есть дополнительный расход ресурсов. Увеличивается расход памяти. Кроме того, процессору нужно переключаться между потоками.
Чем больше потоков, тем чаще между ними нужно переключаться. Получается, что потоков у нас гораздо больше, чем ядер. Допустим, у нас 4 ядра, а потоков мы насоздавали сотню, потому что все остальные были заблокированы чтением данных. Соответственно, происходит переключение, так называемый context switching, чтобы разные потоки получали свою порцию машинного времени.
Но у такого подхода есть минусы. Context switching не бесплатен. Он занимает время. Плодить неограниченное количество потоков было бы неплохим вариантом в теории. Но на практике мы получаем упадок скорости работы и рост потребляемой памяти.
В Java есть разные подходы, которые позволяют с этим бороться — это блокирующие очереди и пулы потоков (ThreadPool). Можно ограничивать количество потоков, и тогда все остальные клиенты встают в очередь. При старте у нас может быть минимальное количество потоков, потом их количество растет.
Вернемся к примеру магазина: если народа нет, в магазине открыты две кассы, а в час пик работает десять. Но больше мы открыть не можем, потому что тогда пришлось бы арендовать дополнительную площадь и нанимать людей. А это ударит по бизнесу.
Теперь поговорим о более современных подходах: кассах самообслуживания, предзаказах и так далее. А значит, мы подбираемся к асинхронному подходу.
Asynchronous I/O
Асинхронный ввод-вывод известен достаточно давно, ведь асинхронность необходима, когда мы работаем с самым медленным инструментом ввода-вывода. А наиболее медленным девайсом ввода-вывода, с которым постоянно приходится работать, является не консоль или клавиатура, а человек. В системах, которые работают с человеком, асинхронность появилась давным-давно.
Блокирующие интерфейсы использовались когда-то и при работе с человеком. Например, старый DOS’овский интерфейс командной строки. И сейчас существуют такие утилиты, которые задают вопрос, блокируются и больше ничего не делают, а ждут, пока человек ответит. С тех пор, как стали появляться оконные интерфейсы, появился асинхронный ввод-вывод. В настоящее время большинство интерфейсов именно асинхронные.
Как работает асинхронность?
Мы регистрируем функцию-callback, но на сей раз не говорим: «Человек, введи данные, а я буду ждать». Это звучит иначе: «Когда человек введет данные, вызови, пожалуйста, эту функцию — callback». Такой подход используется в любых библиотеках пользовательского интерфейса. Но в JavaScript он был изначально. В 2009 году, когда движок JavaScript стал работать гораздо быстрее, умные ребята решили использовать его на сервере, и сделали инструмент под названием Node.js.
Node.js
Идея Node.js в том, что на серверную часть переносится JavaScript, и весь ввод-вывод становится асинхронным. То есть вместо того, чтобы поток блокировался, например, при обращении к файлу, мы получаем асинхронный ввод-вывод. Обращение к файлу тоже становится асинхронным. Например, если потоку нужно получить содержимое файла, он говорит: «Дайте мне, пожалуйста, содержимое файла, а когда оно будет прочитано, вызовите эту функцию». Мы поставили задачу и занимаемся своими делами.
Такой асинхронный подход оказался весьма действенным, и Node.js быстро набрал популярность.
Как работает Node.js?
На входе есть приемщик — это цикл. JavaScript однопоточный язык. Но это не значит, что там ничего нельзя делать в других потоках. В нем поддерживаются потоки через Web Workers и т.д. Но на входе стоит один поток.
Вычислительные задачи для Node.js обычно очень маленькие. Основная работа идет с вводом-выводом (в базу данных, в файловую систему, в сторонние сервисы и т.д.). Сами вычисления занимают мало времени. Когда данные получили из базы или из файловой системы, вызывается callback, то есть какая-то функция, в которую передаются данные.
Но в этой схеме нет ожидания. Сравним ее с традиционной моделью многопоточного сервера в Java.
What happens in Java?
Здесь есть пул потоков. Сначала обращение попадает в первый поток, потом какой-то поток заблокировался, и мы создали еще один. Он тоже заблокировался, создаем следующий. А блокируются они потому, что обращаются к блокирующим операциям ввода-вывода. Например, поток запросил файл или данные из БД и ждет, когда эти данные придут.
Модель Node.js очень быстро стала популярной. Естественно, в этот момент люди стали переписывать ее на других языках. Node.js в какой-то момент вырвался вперед в нагруженных системах с большим объемом ввода-вывода. Но подходит он не для любых систем. Если у вас много вычислений или небольшое количество запросов, то большого преимущества вы не увидите. Соответственно, в Java стали появляться аналогичные решения, в том числе платформа для работы с асинхронным вводом-выводом Vert.x. Сервер Vert.x построен на таком же принципе, что и Node.js.
Решение Node.js интересное, оно действительно помогает повышать производительность. Когда пришла реактивность, стали применять сервер, который называется Netty. Такой подход оказался очень выгодным.
История многопоточности
Как работает многопоточность в Java? Старая добрая многопоточность в Java — это базовые примитивы многопоточности:
- Threads (потоки);
- Synchronization (синхронизация);
- Wait/notify (ожидание/уведомление).
Сложно писать, сложно отлаживать, сложно тестировать.
Java 5
- Future interface:
- V get()
- boolean cancel()
- boolean isCancelled()
- boolean isDone()
- Executors
- Callable interface
- BlockingQueue
В пятой версии появился интерфейс Future. Future — это обещание, или будущее: что-то, что мы можем получить из функции, результат какой-то еще не завершенной асинхронной работы.
У интерфейса Future появился метод get. Он блокирует вызов до завершения вычисления. Например, у нас есть Future, который возвращает данные из БД, и мы обращаемся к методу get:
В этом месте возникает блокировка. На самом деле никакого преимущества от того, что мы использовали Future, нет. Когда можно получить преимущество? Например, мы ставим какую-то задачу, выполняем ее, обращаемся к методу get и в этот момент блокируемся:
Future f = getDBData();
Если вернуться к метафоре начальника и подчиненных, мы поставили задачу, сделали какую-то работу и дальше ждем, пока задача завершится. А если результат этой работы нужно передать еще какому-то человеку? Ранее мы рассматривали такой сценарий: сделай задачу, передай результаты, а потом уже вернись. В случае параллелизма нет такой возможности.
Возможности интерфейса Future очень ограничены. Например, можно узнать, выполнилась ли эта задача:
Future f = getDBData();
if (!f.isDone) doOtherJob();
Если задача еще не доделана, мы можем еще что-то предпринять. Но в любом случае пропустим этот момент: либо задача опять не успеет завершиться, либо, наоборот, она уже давно завершена, а мы занимались другой работой. Моменты синхронизации здесь получаются не очень четкими.
Интерфейс Future имел очень ограниченные возможности в Java 5, поэтому использовать его было неудобно.
Давайте подумаем, какие бизнес-задачи стоят перед типичным приложением?
Data flow
Обычно типичная задача приложения: прочитать, обработать и записать данные. Если мы хотим это делать асинхронно, нужно использовать асинхронные чтение, обработку и запись.
Например, если асинхронная функция блокируется, мы написали прекрасный код:
readData.get() и заблокировались,
processData.get() и заблокировались,
writeData.get() и тут тоже заблокировались.
Получаем синхронный код на выходе. Асинхронности здесь нет, использовать это неудобно.
Рассмотрим типичную задачу, когда есть асинхронное чтение данных, а потом мы хотим обрабатывать их «в три горла»:
Для того, чтобы дождаться результата чтения, много потоков не нужно. Мы просто должны получить данные. Потом их нужно обрабатывать, а обработка — ресурсоемкая задача с точки зрения процессора, и хорошо бы ее распараллелить. Мы говорим: «Прочитай данные. Когда сделаешь это, обработай их в три потока, после этого соедини результаты выполнения и запиши данные». Хотелось бы, чтобы все это делалось асинхронно.
CompletableFuture brings us to the Async world
В Java 8 появился CompletableFuture. Он построен на базе Fork/Join framework. Так же, кстати, как и распараллеливание потоков. Fork/Join framework появился еще в Java 7, но его было сложно использовать. В 8 версии CompletableFuture стал шагом вперед: в сторону асинхронного мира.
Рассмотрим простенький пример.
В коде оранжевым выделены методы CompletableFuture из стандартного JDK.
Допустим, у нас есть API, который позволяет:
- Читать данные (readData) из источника и возвращать CompletableFuture, потому что он асинхронный;
- Обрабатывать данные, для чего есть два обработчика: processData1 и processData2;
- Объединять данные (mergeData) после того, как они обработаны;
- Записать данные (writeData) в приемник (Destination).
Это типичная задача — прочитать данные, обработать их «в два горла», потом соединить результаты этой обработки и куда-то записать.
Мы прочитали данные:
CompletableFuture data = readData(source);
Дальше говорим: когда прочитаем данные, нужно отправить их на обработку:
CompletableFuture processData1 = data.thenApplyAsync(this::processData1);
Это значит, что нужно запустить их обработку в отдельном потоке. Так как у нас здесь используется Async постфикс, стартует обработка в двух разных потоках:
CompletableFuture processData2 = data.thenApplyAsync(this::processData2);
То есть функции this::processData1 и this::processData2 будут запущены в двух разных потоках и будут выполняться параллельно. Но после параллельного выполнения их результаты должны соединиться. Это делает thenCombine.
Мы здесь запустили два потока выполнения, и, когда они завершились, скомбинировали их. thenCombine работает так: он дожидается, когда и processData1, и processData2 завершатся, и после этого вызывает функцию объединения данных:
То есть мы объединяем результаты первой и второй обработки, и после этого записываем данные:
Здесь получается цепочка, которая по сути является бизнес-процессом. Мы как бы говорим: «Таня, забери данные из архива, отдай Лене и Грише на обработку. Когда и Леня, и Гриша принесут результаты, передай их Вере, чтобы она соединила их, а потом отдай Вите, чтобы он написал отчет по этим данным».
У нас здесь нет четкого графика, о котором мы говорили в начале: есть возможность передать данные сразу же, как только сможем. Единственный, кто здесь ждет — это thenCombine. Он ожидает, когда оба процесса, результат которых он объединяет, завершатся.
CompletableFuture — это действительно крутой подход, который помогает делать асинхронные системы.
В следующей части нашей статьи поговорим о более высокоуровневых подходах к асинхронности и разберем операторы реактивного программирования.
Конференция HighLoad++ 2020 пройдет 20 и 21 мая 2021 года. Приобрести билеты можно уже сейчас.
Хотите бесплатно получить материалы конференции мини-конференции Saint HighLoad++ 2020? Подписывайтесь на нашу рассылку.
- java
- асинхронное программирование
- реактивное программирование
- highload
- конференции
- Блог компании Конференции Олега Бунина (Онтико)
- Высокая производительность
- Программирование
- Java
- Параллельное программирование
Реактивное программирование на Java: как, зачем и стоит ли? Часть II
Реактивное программирование — один из самых актуальных трендов современности. Обучение ему — сложный процесс, особенно если нет подходящих материалов. В качестве своеобразного дайджеста может выступить эта статья. На конференции РИТ++ 2020 эксперт и тренер Luxoft Training Владимир Сонькин рассказал о фишках управления асинхронными потоками данных и подходах к ним, а также показал на примерах, в каких ситуациях нужна реактивность, и что она может дать.
В первой части статьи рассказывалось о том, что привело к появлению реактивного программирования, где оно применяется, и что нам может дать асинхронность. Пришло время рассказать о следующем шаге, позволяющем получить максимум преимуществ от асинхронности, и это — реактивное программирование.
Reactivity
Реактивное программирование — это асинхронность, соединенная с потоковой обработкой данных. То есть если в асинхронной обработке нет блокировок потоков, но данные обрабатываются все равно порциями, то реактивность добавляет возможность обрабатывать данные потоком. Помните тот пример, когда начальник поручает задачу Васе, тот должен передать результат Диме, а Дима вернуть начальнику? Но у нас задача — это некая порция, и пока она не будет сделана, дальше передать ее нельзя. Такой подход действительно разгружает начальника, но Дима и Вася периодически простаивают, ведь Диме надо дождаться результатов работы Васи, а Васе — дождаться нового задания.
А теперь представьте, что задачу разбили на множество подзадач. И теперь они плывут непрерывным потоком:
Говорят, когда Генри Форд придумал свой конвейер, он повысил производительность труда в четыре раза, благодаря чему ему удалось сделать автомобили доступными. Здесь мы видим то же самое: у нас небольшие порции данных, а конвейер с потоком данных, и каждый обработчик пропускает через себя эти данные, каким-то образом их преобразовывая. В качестве Васи и Димы у нас выступают потоки выполнения (threads), обеспечивая, таким образом, многопоточную обработку данных.
На этой схеме показаны разные технологии распараллеливания, добавлявшиеся в Java в разных версиях. Как мы видим, спецификация Reactive Streams на вершине — она не заменяет всего, что было до нее, но добавляет самый высокий уровень абстракции, а значит ее использование просто и эффективно. Попробуем в этом разобраться.
Идея реактивности построена на паттерне проектирования Observer.
Давайте вспомним, что это за паттерн. У нас есть подписчики и то, на что мы подписываемся. В качестве примера здесь рассмотрен Твиттер, но подписаться на какое-то сообщество или человека, а потом получать обновления можно в любой соцсети. После подписки, как только появляется новое сообщение, всем подписчикам приходит notify, то есть уведомление. Это базовый паттерн.
В данной схеме есть:
- Publisher — тот, кто публикует новые сообщения;
- Observer — тот, кто на них подписан. В реактивных потоках подписчик обычно называется Subscriber. Термины разные, но по сути это одно и то же. В большинстве сообществ более привычны термины Publisher/Subscriber.
Это базовая идея, на которой все строится.
Один из жизненных примеров реактивности — система оповещения при пожаре. Допустим, нам надо сделать систему, включающую тревогу в случае превышения задымленности и температуры.
У нас есть датчик дыма и градусник. Когда дыма становится много и/или температура растет, на соответствующих датчиках увеличивается значение. Когда значение и температура на датчике дыма оказываются выше пороговых, включается колокольчик и оповещает о тревоге.
Если бы у нас был традиционный, а не реактивный подход, мы бы писали код, который каждые пять минут опрашивает детектор дыма и датчик температуры, и включает или выключает колокольчик. Однако в реактивном подходе за нас это делает реактивный фреймворк, а мы только прописываем условия: колокольчик активен, когда детектор больше X, а температура больше Y. Это происходит каждый раз, когда приходит новое событие.
От детектора дыма идет поток данных: например, значение 10, потом 12, и т.д. Температура тоже меняется, это другой поток данных — 20, 25, 15. Каждый раз, когда появляется новое значение, результат пересчитывается, что приводит к включению или выключению системы оповещения. Нам достаточно сформулировать условие, при котором колокольчик должен включиться.
Если вернуться к паттерну Observer, у нас детектор дыма и термометр — это публикаторы сообщений, то есть источники данных (Publisher), а колокольчик на них подписан, то есть он Subscriber, или наблюдатель (Observer).
Немного разобравшись с идеей реактивности, давайте углубимся в реактивный подход. Мы поговорим об операторах реактивного программирования. Операторы позволяют каким-либо образом трансформировать потоки данных, меняя данные и создавая новые потоки. Для примера рассмотрим оператор distinctUntilChanged. Он убирает одинаковые значения, идущие друг за другом. Действительно, если значение на детекторе дыма не изменилось — зачем нам на него реагировать и что-то там пересчитывать:
Reactive approach
Рассмотрим еще один пример: допустим, мы разрабатываем UI, и нам нужно отслеживать двойные нажатия мышкой. Тройной клик будем считать как двойной.
Клики здесь — это поток щелчков мышкой (на схеме 1, 2, 1, 3). Нам нужно их сгруппировать. Для этого мы используем оператор throttle. Говорим, что если два события (два клика) произошли в течение 250 мс, их нужно сгруппировать. На второй схеме представлены сгруппированные значения (1, 2, 1, 3). Это поток данных, но уже обработанных — в данном случае сгрупированных.
Таким образом начальный поток преобразовался в другой. Дальше нужно получить длину списка ( 1, 2, 1, 3). Фильтруем, оставляя только те значения, которые больше или равны 2. На нижней схеме осталось только два элемента (2, 3) — это и были двойные клики. Таким образом, мы преобразовали начальный поток в поток двойных кликов.
Это и есть реактивное программирование: есть потоки на входе, каким-то образом мы пропускаем их через обработчики, и получаем поток на выходе. При этом вся обработка происходит асинхронно, то есть никто никого не ждет.
Еще одна хорошая метафора — это система водопровода: есть трубы, одна подключена к другой, есть какие-то вентили, может быть, стоят очистители, нагреватели или охладители (это операторы), трубы разделяются или объединяются. Система работает, вода льется. Так и в реактивном программировании, только в водопроводе течет вода, а у нас — данные.
Можно придумать потоковое приготовление супа. Например, есть задача максимально эффективно сварить много супа. Обычно берется кастрюля, в нее наливается порция воды, овощи нарезаются и т.д. Это не потоковый, а традиционный подход, когда мы варим суп порциями. Сварили эту кастрюлю, потом нужно ставить следующую, а после — еще одну. Соответственно, надо дождаться, пока в новой кастрюле снова закипит вода, растворится соль, специи и т.д. Все это занимает время.
Представьте себе такой вариант: в трубе нужного диаметра (достаточного, чтобы заполнялась кастрюля) вода сразу подогревается до нужной температуры, есть нарезанная свекла и другие овощи. На вход они поступают целыми, а выходят уже шинкованными. В какой-то момент все смешивается, вода подсаливается и т.д. Это максимально эффективное приготовление, супоконвейер. И именно в этом идея реактивного подхода.
Observable example
Теперь посмотрим на код, в котором мы публикуем события:
Observable.just позволяет положить в поток несколько значений, причем если обычные реактивные потоки содержат значения, растянутые во времени, то тут мы их кладем все сразу — то есть синхронно. В данном случае это названия городов, на которые в дальнейшем можно подписаться (тут для примера взяты города, в которых есть учебный центр Люксофт).
Девушка (Publisher) опубликовала эти значения, а Observers на них подписываются и печатают значения из потока.
Это похоже на потоки данных (Stream) в Java 8. И тут, и там синхронные потоки. И здесь, и в Java 8 список значений нам известен сразу. Но если бы использовался обычный для Java 8 поток, мы не могли бы туда что-то докладывать. В стрим ничего нельзя добавить: он синхронный. В нашем примере потоки асинхронные, то есть в любой момент времени в них могут появляться новые события — скажем, если через год откроется учебный центр в новой локации — она может добавиться в поток, и реактивные операторы правильно обработают эту ситуацию. Мы добавили события и сразу же на них подписались:
Мы можем в любой момент добавить значение, которое через какое-то время выводится. Когда появляется новое значение, мы просим его напечатать, и на выходе получаем список значений:
При этом есть возможность не только указать, что должно происходить, когда появляются новые значения, но и дополнительно отработать такие сценарии, как возникновение ошибок в потоке данных или завершение потока данных. Да-да, хотя часто потоки данных не завершаются (например, показания термометра или датчика дыма), многие потоки могут завершаться: например, поток данных с сервера или с другого микросервиса. В какой-то момент сервер закрывает соединение, и появляется потребность на это как-то отреагировать.
Implementing and subscribing to an observer
В Java 9 нет реализации реактивных потоков — только спецификация. Но есть несколько библиотек — реализаций реактивного подхода. В этом примере используется библиотека RxJava. Мы подписываемся на поток данных, и определяем несколько обработчиков, то есть методы, которые будут запущены в начале обработки потока (onSubscribe), при получении каждого очередного сообщения (onNext), при возникновении ошибки (onError) и при завершении потока (onComplete):
Давайте посмотрим на последнюю строчку.
locations.map(String::length).filter(l -> l >= 5).subscribe(observer);
Мы используем операторы map и filter. Если вы работали со стримами Java 8, вам, конечно, знакомы map и filter. Здесь они работают точно так же. Разница в том, что в реактивном программировании эти значения могут появляться постепенно. Каждый раз, когда приходит новое значение, оно проходит через все преобразования. Так, String::length заменит строчки на длину в каждой из строк.
В данном случае получится 5 (Minsk), 6 (Krakow), 6 (Moscow), 4 (Kiev), 5 (Sofia). Фильтруем, оставляя только те, что больше 5. У нас получится список длин строк, которые больше 5 (Киев отсеется). Подписываемся на итоговый поток, после этого вызывается Observer и реагирует на значения в этом итоговом потоке. При каждом следующем значении он будет выводить длину:
public void onNext(Integer value) System.out.println(«Length: » + value);
То есть сначала появится Length 5, потом — Length 6. Когда наш поток завершится, будет вызван onComplete, а в конце появится надпись «Done.»:
public void onComplete() System.out.println(«Done.»);
Не все потоки могут завершаться. Но некоторые способны на это. Например, если мы читали что-то из файла, поток завершится, когда файл закончится.
Если где-то произойдет ошибка, мы можем на нее отреагировать:
public void onError(Throwable e) e.printStackTrace();
Таким образом мы можем реагировать разными способами на разные события: на следующее значение, на завершение потока и на ошибочную ситуацию.
Reactive Streams spec
Реактивные потоки вошли в Java 9 как спецификация.
Если предыдущие технологии (Completable Future, Fork/Join framework) получили свою имплементацию в JDK, то реактивные потоки имплементации не имеют. Есть только очень короткая спецификация. Там всего 4 интерфейса:
Если рассматривать наш пример из картинки про Твиттер, мы можем сказать, что:
Publisher — девушка, которая постит твиты;
Subscriber — подписчик. Он определяет , что делать, если:
- Начали слушать поток (onSubscribe). Когда мы успешно подписались, вызовется эта функция;
- Появилось очередное значение в потоке (onNext);
- Появилось ошибочное значение (onError);
- Поток завершился (onComplete).
Subscription — у нас есть подписка, которую можно отменить (cancel) или запросить определенное количество значений (request(long n)). Мы можем определить поведение при каждом следующем значении, а можем забирать значения вручную.
Processor — обработчик — это два в одном: он одновременно и Subscriber, и Publisher. Он принимает какие-то значения и куда-то их кладет.
Если мы хотим на что-то подписаться, вызываем Subscribe, подписываемся, и потом каждый раз будем получать обновления. Можно запросить их вручную с помощью request. А можно определить поведение при приходе нового сообщения (onNext): что делать, если появилось новое сообщение, что делать, если пришла ошибка и что делать, если Publisher завершил поток. Мы можем определить эти callbacks, или отписаться (cancel).
PUSH / PULL модели
Существует две модели потоков:
- Push-модель — когда идет «проталкивание» значений.
Например, вы подписались на кого-то в Telegram или Instagram и получаете оповещения (они так и называются — push-сообщения, вы их не запрашиваете, они приходят сами). Это может быть, например, всплывающее сообщение. Можно определить, как реагировать на каждое новое сообщение.
- Pull-модель — когда мы сами делаем запрос.
Например, мы не хотим подписываться, т.к. информации и так слишком много, а хотим сами заходить на сайт и узнавать новости.
Для Push-модели мы определяем callbacks, то есть функции, которые будут вызваны, когда придет очередное сообщение, а для Pull-модели можно воспользоваться методом request, когда мы захотим узнать, что новенького.
Pull-модель очень важна для Backpressure — «напирания» сзади. Что же это такое?
Вы можете быть просто заспамленными своими подписками. В этом случае прочитать их все нереально, и есть шанс потерять действительно важные данные — они просто утонут в этом потоке сообщений. Когда подписчик из-за большого потока информации не справляется со всем, что публикует Publisher, получается Backpressure.
В этом случае можно использовать Pull-модель и делать request по одному сообщению, прежде всего из тех потоков данных, которые наиболее важны для вас.
Implementations
Давайте рассмотрим существующие реализации реактивных потоков:
- RxJava. Эта библиотека реализована для разных языков. Помимо RxJava существует Rx для C#, JS, Kotlin, Scala и т.д.
- Reactor Core. Был создан под эгидой Spring, и вошел в Spring 5.
- Akka-стримы от создателя Scala Мартина Одерски. Они создали фреймворк Akka (подход с Actor), а Akka-стримы — это реализация реактивных потоков, которые дружат с этим фреймворком.
Во многом эти реализации похожи, и все они реализуют спецификацию реактивных потоков из Java 9.
Посмотрим подробнее на Spring’овский Reactor.
Function may return…
Давайте обобщим, что может возвращать функция:
- Single/Synchronous;
Обычная функция возвращает одно значение, и делает это синхронно.
- Multipple/Synchronous;
Если мы используем Java 8, можем возвращать из функции поток данных Stream. Когда вернулось много значений, их можно отправлять на обработку. Но мы не можем отправить на обработку данные до того, как все они получены — ведь Stream работают только синхронно.
- Single/Asynchronous;
Здесь уже используется асинхронный подход, но функция возвращает только одно значение:
- либо CompletableFuture (Java), и через какое-то время приходит асинхронный ответ;
- либо Mono, возвращающая одно значение в библиотеке Spring Reactor.
- Multiple/Asynchronous.
А вот тут как раз — реактивные потоки. Они асинхронные, то есть возвращают значение не сразу, а через какое-то время. И именно в этом варианте можно получить поток значений, причем эти значения будут растянуты во времени Таким образом, мы комбинируем преимущества потоков Stream, позволяющих вернуть цепочку значений, и асинхронности, позволяющей отложить возврат значения.
Например, вы читаете файл, а он меняется. В случае Single/Asynchronous вы через какое-то время получаете целиком весь файл. В случае Multiple/Asynchronous вы получаете поток данных из файла, который сразу же можно начинать обрабатывать. То есть можно одновременно читать данные, обрабатывать их, и, возможно, куда-то записывать. . Реактивные асинхронные потоки называются:
- Publisher (в спецификации Java 9);
- Observable (в RxJava);
- Flux (в Spring Reactor).
Netty as a non-blocking server
Рассмотрим пример использования реактивных потоков Flux вместе со Spring Reactor. В основе Reactor лежит сервер Netty. Spring Reactor — это основа технологии, которую мы будем использовать. А сама технология называется WebFlux. Чтобы WebFlux работал, нужен асинхронный неблокирующий сервер.
Схема работы сервера Netty похожа на то, как работает Node.js. Есть Selector — входной поток, который принимает запросы от клиентов и отправляет их на выполнение в освободившиеся потоки. Если в качестве синхронного сервера (Servlet-контейнера) используется Tomcat, то в качестве асинхронного используется Netty.
Давайте посмотрим, сколько вычислительных ресурсов расходуют Netty и Tomcat на выполнение одного запроса:
Throughput — это общее количество обработанных данных. При небольшой нагрузке, до первых 300 пользователей у RxNetty и Tomcat оно одинаковое, а после Netty уходит в приличный отрыв — почти в 2 фраза.
Blocking vs Reactive
У нас есть два стека обработки запросов:
- Традиционный блокирующий стек.
- Неблокирующий стек — в нем все происходит асинхронно и реактивно.
В блокирующем стеке все строится на Servlet API, в реактивном неблокирующем стеке — на Netty.
Сравним реактивный стек и стек Servlet.
В Reactive Stack применяется технология Spring WebFlux. Например, вместо Servlet API используются реактивные стримы.
Чтобы мы получили ощутимое преимущество в производительности, весь стек должен быть реактивным. Поэтому чтение данных тоже должно происходить из реактивного источника.
Например, если у нас используется стандартный JDBC, он является не реактивным блокирующим источником, потому что JDBC не поддерживает неблокирующий ввод/вывод. Когда мы отправляем запрос в базу данных, приходится ждать, пока результат этого запроса придет. Соответственно, получить преимущество не удается.
В Reactive Stack мы получаем преимущество за счет реактивности. Netty работает с пользователем, Reactive Streams Adapters — со Spring WebFlux, а в конце находится реактивная база: то есть весь стек получается реактивным. Давайте посмотрим на него на схеме:
Data Repo — репозиторий, где хранятся данные. В случае, если есть запросы, допустим, от клиента или внешнего сервера, они через Flux поступают в контроллер, обрабатываются, добавляются в репозиторий, а потом ответ идет в обратную сторону.
При этом все это делается неблокирующим способом: мы можем использовать либо Push-подход, когда мы определяем, что делать при каждой следующей операции, либо Pull-подход, если есть вероятность Backpressure, и мы хотим сами контролировать скорость обработки данных, а не получать все данные разом.
Операторы
В реактивных потоках огромное количество операторов. Многие из них похожи на те, которые есть в обычных стримах Java. Мы рассмотрим только несколько самых распространенных операторов, которые понадобятся нам для практического примера применения реактивности.
Filter operator
Скорее всего, вы уже знакомы с фильтрами из интерфейса Stream.
По синтаксису этот фильтр точно такой же, как обычный. Но если в стриме Java 8 все данные есть сразу, здесь они могут появляться постепенно. Стрелки вправо — это временная шкала, а в кружочках находятся появляющиеся данные. Мы видим, что фильтр оставляет в итоговом потоке только значения, превышающие 10.
Take 2 означает, что нужно взять только первые два значения.
Map operator
Оператор Map тоже хорошо знаком:
Это действие, происходящее с каждым значением. Здесь — умножить на десять: было 3, стало 30; было 2, стало 20 и т.д.
Delay operator
Задержка: все операции сдвигаются. Этот оператор может понадобиться, когда значения уже генерируются, но подготовительные процессы еще происходят, поэтому приходится отложить обработку данных из потока.
Reduce operator
Еще один всем известный оператор:
Он дожидается конца работы потока (onComplete) — на схеме она представлена вертикальной чертой. После чего мы получаем результат — здесь это число 15. Оператор reduce сложил все значения, которые были в потоке.
Scan operator
Этот оператор отличается от предыдущего тем, что не дожидается конца работы потока.
Оператор scan рассчитывает текущее значение нарастающим итогом: сначала был 1, потом прибавил к предыдущему значению 2, стало 3, потом прибавил 3, стало 6, еще 4, стало 10 и т.д. На выходе получили 15. Дальше мы видим вертикальную черту — onComplete. Но, может быть, его никогда не произойдет: некоторые потоки не завершаются. Например, у термометра или датчика дыма нет завершения, но scan поможет рассчитать текущее суммарное значение, а при некоторой комбинации операторов — текущее среднее значение всех данных в потоке.
Merge operator
Объединяет значения двух потоков.
Например, есть два температурных датчика в разных местах, а нам нужно обрабатывать их единообразно, в общем потоке .
Combine latest
Получив новое значение, комбинирует его с последним значением из предыдущего потока.
Если в потоке возникает новое событие, мы его комбинируем с последним полученным значением из другого потока. Скажем, таким образом мы можем комбинировать значения от датчика дыма и термометра: при появлении нового значения температуры в потоке temperatureStream оно будет комбинироваться с последним полученным значением задымленности из smokeStream. И мы будем получать пару значений. А уже по этой паре можно выполнить итоговый расчет:
temperatureStream.combineLatest(smokeStream).map((x, y) -> x > X && y > Y)
В итоге на выходе у нас получается поток значений true или false — включить или выключить колокольчик. Он будет пересчитываться каждый раз, когда будет появляться новое значение в temperatureStream или в smokeStream.
FlatMap operator
Этот оператор вам, скорее всего, знаком по стримам Java 8. Элементами потока в данном случае являются другие потоки. Получается поток потоков. Работать с ними неудобно, и в этих случаях нам может понадобиться «уплостить» поток.
Можно представить такой поток как конвейер, на который ставят коробки с запчастями. До того, как мы начнем их применять, запчасти нужно достать из коробок. Именно это делает оператор flatMap.
Flatmap часто используется при обработке потока данных, полученных с сервера. Т.к. сервер возвращает поток, чтобы мы смогли обрабатывать отдельные данные, этот поток сначала надо «развернуть». Это и делает flatMap.
Buffer operator
Это оператор, который помогает группировать данные. На выходе Buffer получается поток, элементами которого являются списки (List в Java). Он может пригодиться, когда мы хотим отправлять данные не по одному, а порциями.
Мы с самого начала говорили, что реактивные потоки позволяют разбить задачу на подзадачи, и обрабатывать их маленькими порциями. Но иногда лучше наоборот, собрать много маленьких частей в блоки. Скажем, продолжая пример с конвейером и запчастями, нам может понадобиться отправлять запчасти на другой завод (другой сервер). Но каждую отдельную запчасть отправлять неэффективно. Лучше их собрать в коробки, скажем по 100 штук, и отправлять более крупными партиями.
На схеме выше мы группируем отдельные значения по три элемента (так как всего их было пять, получилась «коробка» из трех, а потом из двух значений). То есть если flatMap распаковывает данные из коробок, buffer, наоборот, упаковывает их.
Всего существует более сотни операторов реактивного программирования. Здесь разобрана только небольшая часть.
Итого
Есть два подхода:
- Spring MVC — традиционная модель, в которой используется JDBC, императивная логика и т.д.
- Spring WebFlux, в котором используется реактивный подход и сервер Netty.
Есть кое-что, что их объединяет. Tomcat, Jetty, Undertow могут работать и со Spring MVC, и со Spring WebFlux. Однако дефолтным сервером в Spring для работы с реактивным подходом является именно Netty.
Заинтересовались темой?
Новый практический online-курс Java Advanced: функциональное, асинхронное и реактивное программирование по изучению современных функциональных, асинхронных и реактивных подходов к разработке на Java. Включает изучение NIO2, CompletableFurure, RxJava, Reactor, R2DBC, SSE, Spring Data reactive, WebClient, reactive WebSocket, RSocket.
Расскажи друзьям:
Как не пропустить самое интересное?
Подписывайтесь на наш ежемесячный дайджест!
Оценка и обучение ИТ-специалистов по ключевым направлениям разработки программного обеспечения. Курсы от экспертов-практиков по языкам программирования, системному и бизнес-анализу, архитектуре ПО, ручному и автоматизированному тестированию ПО, Big Data и машинному обучению, управлению проектами и Agile. Действует скидка 10% на обучение физических лиц.
Остались вопросы?
IBS Training Center Контакты: +7 (495) 609-6967 education@ibs.ru Адрес:
127018 , Москва , ул. Складочная, д. 3, стр. 1
© 2024 IBS, all rights reserved
Пользователь только что записался на курс » »
Сайт IBS Training Center использует cookie. Это дает нам возможность следить за корректной работой сайта, а также анализировать данные, чтобы развивать наши продукты и сервисы. Посещая сайт, вы соглашаетесь с обработкой ваших персональных данных. В случае несогласия вам следует покинуть его
Реактивное программирование на Java. Будущее, настоящее и прошлое
Разберемся с парадигмой реактивного программирования. Какие есть плюсы и минусы по сравнению с императивным подходом.
28 мая 2023 · 14 минуты на чтение
В современном мире разработки, где требования к производительности и отзывчивости системы становятся все более строгими, реактивное программирование выделяется как ключевой подход.
Что же такое реактивное программирование, и почему оно становится всё более популярным? В чём недостаток императивного подхода? И, что самое важное, как реактивные потоки помогают нам создавать более производительные и эффективные системы?
Целью статьи является погружение в контекст реактивной разработки и объяснение основных механик. Это не гайд по написанию реактивных приложений. Это будет в следующих статьях.
Спонсор поста
Реактивная система
Так как реактивный подход помогает создавать реактивные системы, неплохо сначала разобраться, что это за системы такие.
Представим себе систему управления таксопарка. Владелец таксопарка идёт в ногу со временем и решил заказать разработку системы для управления заказами. Система включает в себя множество подсистем: работа с заказами, управлением автопарком и так далее.
Перед разработкой владелец подсчитал среднее количество заказов в день, аналитики по этим данным рассчитали необходимое количество железа, заложили в эти данные избыток в 50% на будущий рост и закупили сервера. Система была написана и введена в эксплуатацию.
Всё было хорошо, пока в городе не объявили проведение чемпионата мира по футболу. Толпы туристов, многие из которых решили воспользоваться удобным способом заказа такси. В какой-то момент нагрузка превзошла все самые смелые ожидания и система полностью развалилась. В итоге таксопарк потерял клиентов и прибыль, а его рейтинг в AppStore обвалился.
Кажется, что было бы неплохо, чтобы система как-то динамически реагировала на изменения, будь то резко выросшая нагрузка или недоступность внешних служб.
В этом примере стоит задуматься об увеличении эластичности. Пропускная способность вашей системы должна автоматически увеличиваться при увеличении нагрузки, и автоматически уменьшаться со снижением нагрузки.
Это одно из свойств, которыми должна обладать реактивная система. Поговорим обо всех характеристиках.
Отзывчивость
Система способна быстро обрабатывать запросы пользователей даже при высокой нагрузке. Это требует соблюдения нескольких ключевых принципов проектирования.
Неблокирующий ввод/вывод: Использование неблокирующего ввода-вывода, позволит минимизировать время, которое потоки тратят на ожидание завершения операций ввода-вывода. Более эффективное использование потоков снижает вероятность «голодания» потоков и увеличивает производительность сервиса.
Про неблокирующий ввод/вывод я расскажу ниже более подробно.
Балансировка нагрузки гарантирует, что ни один экземпляр не будет перегружен трафиком. Таким образом увеличивая пропускную способность и производительность системы.
Существуют различные алгоритмы распределения запросов, но их цель одна: распределить запросы равномерно на существующие и работоспособные экземпляры системы.
Кэширование: система должна использовать методы кэширования для сокращения времени, затрачиваемого на обработку запросов, и повышения общей производительности системы.
Допустим, у вас есть сервис справочников, который содержит значения для различных списков: статусы заказа, названия категорий товаров. Нет смысла каждый раз обращаться в сервис справочной информации, если данные там меняются нечасто. Кэширование этой информации позволит снизить затраты на межсервисное взаимодействие, а также позволит сервису продолжить работу, даже если сервис справочников будет недоступен.
Для больших систем отличным решением будет использование CDN для кэширования статического контента и снижения сетевых задержек.
Устойчивость
Система должна продолжать работать во время сбоев и автоматически восстанавливаться после ошибок, а не полностью выходить из строя.
Существует несколько ключевых стратегий, которые могут помочь обеспечить устойчивость:
Отказоустойчивость: Реактивная система должна быть спроектирована таким образом, чтобы выдерживать сбои на всех уровнях, включая аппаратные, сетевые и программные сбои. Это может быть достигнуто благодаря избыточности, репликации и механизмов автоматического распределения нагрузки с отказавших экземпляров сервисов на здоровые экземпляры.
Автоматическое восстановление: Реактивная система должна уметь автоматически обнаруживать и диагностировать ошибки, а также предпринимать корректирующие действия для восстановления после сбоев без вмешательства разработчиков.
Вынужденная деградация (Graceful degradation): Система должна продолжать работать, даже если некоторые её компоненты или функции недоступны или работают неправильно.
Вместо того чтобы полностью разрушиться, система плавно деградирует, отключая несущественные функции, снижая функциональность или предоставляя запасные варианты. Это позволяет системе продолжать работать, хотя и с ограниченными возможностями, пока проблема не будет устранена.
Предохранители (Circuit breakers): Модель проектирования, которая может помочь предотвратить каскадные отказы в системе. Работает путём мониторинга количества отказов, происходящих в сервисе за определённый период времени, и автоматически отключает предохранитель, если количество отказов превышает заданный порог.
Можно реализовать различные предохранители. Например, если сервис не отвечает на запрос, начать возвращать дефолтное значение или кэшированное значение. Все зависит от ваших сценариев.
Контроль потока данных (Backpressure): Механизм, позволяющий получателю управлять скоростью получения данных от отправителя. Иными словами, это метод контроля потока информации между отправителем и получателем.
Включив эти стратегии в конструкцию реактивной системы, можно достичь высокого уровня устойчивости и гарантировать, что система способна быстро восстанавливаться после ошибок и продолжать бесперебойную работу.
Эластичность
Реактивная система должна иметь возможность масштабирования для обработки растущих рабочих нагрузок без снижения производительности и доступности.
Существует несколько методов, которые могут быть использованы для достижения эластичности в реактивной системе:
Горизонтальное масштабирование: Предполагает добавление дополнительных экземпляров сервиса для распределения рабочей нагрузки на несколько машин или узлов.
Этот подход может использоваться для обработки растущего трафика или запросов пользователей и может быть достигнут при использовании технологий контейнеризации, таких как Docker, и инструментов оркестрации, таких как Kubernetes.
Вертикальное масштабирование: Предполагает добавление дополнительных ресурсов (процессор, память или дисковое пространство) к существующему сервису для увеличения его мощности.
Этот подход может быть использован для обработки возросших объёмов данных или требований к обработке, и может быть реализован с помощью облачных инфраструктурных сервисов, таких как Amazon EC2 или Microsoft Azure.
Автомасштабирование: Предполагает использование автоматизированных инструментов или алгоритмов для динамической корректировки количества экземпляров или ресурсов, выделяемых сервису, на основе показателей, собираемых в реальном времени, таких как использование процессора, памяти или сетевого трафика.
Динамическая корректировка может увеличить количество экземпляров сервиса при увеличении количества запросов, а также уменьшить количество экземпляров при уменьшении нагрузки. Увеличение позволяет обработать непредсказуемый рост нагрузки, а уменьшение позволяет эффективнее потреблять доступные ресурсы и не платить накладные расхода за не используемые.
Достижение эластичности в реактивной системе требует сочетания архитектурного проектирования, управления инфраструктурой и инструментов автоматического масштабирования для обеспечения того, чтобы система могла адаптироваться к изменяющимся рабочим нагрузкам и поддерживать свою производительность и доступность в течение долгого времени.
Управление сообщениями
Message Driven — шаблон проектирования, используемый в реактивных системах для обеспечения асинхронной связи между различными компонентами. Этот паттерн позволяет сервисам отправлять и получать сообщения без блокировки, ожидания или удержания ресурсов, что помогает минимизировать потребление ресурсов и максимизировать пропускную способность.
Для обеспечения коммуникации на основе сообщений реактивные системы обычно используют брокер сообщений или очередь сообщений, которая выступает в качестве посредника между различными сервисами. Когда сервис посылает сообщение другому сервису, он помещает его в очередь сообщений, а принимающий компонент может получить сообщение асинхронно, когда он будет готов обработать его.
Такой подход позволяет сервисам работать независимо друг от друга, без необходимости знать о состоянии или доступности других сервисов. Он также позволяет сервисам обрабатывать большие объёмы сообщений или событий, не перегружаясь и не блокируясь входящим трафиком.
Некоторые популярные брокеры сообщений и системы очередей, используемые в реактивных системах: Apache Kafka, RabbitMQ и AWS SQS.
Подписывайся на Telegram
Реактивное программирование
Манифест реактивных систем гласит: «Большие системы состоят из подсистем, имеющих те же свойства и, следовательно, зависят от их реактивных характеристик. Это означает, что принципы Реактивных Систем применяются на всех уровнях.» Таким образом, каждый отдельный сервис должен также следовать принципам реактивной системы.
Реактивное программирование — это парадигма программирования, ориентированная на работу с потоками данных и распространение изменений в этих потоках. Какие-то данные поступают в систему, и как реакция на них, система выполняет какие-то действия. Вот отсюда и название — «реактивное».
Хотя реактивное программирование часто используется для создания реактивных систем, технически возможно достичь определённой степени реактивности системы, используя другие методы программирования, такие как многопоточность, асинхронное программирование и событийно-ориентированные архитектуры. Эти методы могут помочь улучшить отзывчивость и масштабируемость системы, но они могут не обеспечить тот уровень устойчивости и отказоустойчивости, который призваны обеспечить реактивные системы.
Проблемы императивного программирования
Разберёмся в недостатках императивного подхода. Зачем понадобилось выдумывать какое-то реактивное программирование, почему сложно написать реактивную систему на существующих технологиях?
Реализуем небольшой пример, который состоит из двух сервисов.
@Service @RequiredArgsConstructor public class PassengerServiceImpl implements PassengerService < private final RideService rideService; @Override public void requestRide(Location pickupLocation, Location dropoffLocation) < RideRequest rideRequest = new RideRequest(pickupLocation, dropoffLocation); Ride ride = rideService.processRideRequest(rideRequest); // other logic >>
PassengerServiceImpl представляет API для запроса поездки, ориентированный на пассажира. Метод requestRide() принимает в качестве параметров место посадки и высадки пассажира и инициирует процесс запроса поездки.
@Service @RequiredArgsConstructor public class RideServiceImpl implements RideService < private final DriverService driverService; @Override public Ride processRideRequest(RideRequest rideRequest) < Driver driver = driverService.findAvailableDriver(rideRequest.getPickupLocation()); // other logic Ride ride = new . return ride; >>
RideServiceImpl представляет внутреннюю службу, которая обрабатывает запросы на поездки. Метод processRideRequest() принимает объект RideRequest в качестве параметра и инициирует процесс поиска водителя и назначения поездки.
Представим, что DriverService при вызове метода findAvailableDriver() обращается к базе данных или посылает сетевой запрос в другой сервис. Что будет, если БД будет выполнять запрос 30 секунд или другой сервис ответит спустя 5 минут?
Одна из основных проблем императивного подхода это ожидание потоков выполнения какой-либо задачи, то есть блокировка. Например, для выполнения запроса к БД из пула потоков берётся поток, далее он ожидает , пока БД выполнит запрос и вернёт результат. Если вычисление результата займёт 5 минут, то поток всё это время будет недоступен для других операций.
Это может привести к снижению производительности сервиса, особенно если многие потоки будут блокироваться в ожидании завершения долго выполняющихся запросов к базе данных. В какой-то момент у вас просто могут закончиться потоки в пуле, и обработка новых запросов просто остановится.
Такая же проблема может возникнуть, когда выполняется запрос к внешнему сервису. Например, вы посылаете запрос используя RestTemplate . Если внешний ресурс будет отвечать 5 минут, то всё это время поток будет находится в ожидании ответа, то есть простаивать.
Почему простаивание потока — это проблема?
Каждый поток нуждается в памяти для хранения своего стека вызовов и других связанных с ним структур данных. Когда поток простаивает, он продолжает потреблять ресурсы для поддержания своего состояния.
Кроме того, процессорное время, которое выделяется неработающим потокам, могло бы быть использовано для других задач. Если большое количество потоков простаивает, это может привести к увеличению загрузки процессора и снижению производительности, так как операционная система будет тратить больше времени на переключение между потоками.
Попробуем решить проблему блокировки потоков доступными способами. Добавим ExecutorService в RideService и будем возвращать не Ride , а Future .
public inteface RideService < public FutureprocessRideRequest(RideRequest rideRequest); >
@Service public class PassengerServiceImpl implements PassengerService < private final RideService rideService; @Override public void requestRide(Location pickupLocation, Location dropoffLocation) < RideRequest rideRequest = new RideRequest(pickupLocation, dropoffLocation); Futurefuture = rideService.processRideRequest(rideRequest); // other logic Ride ride = future.get(); // other logic > >
Теперь мы выполняем асинхронный вызов к RideService и получаем объект Future . Далее мы можем продолжить выполнять другие операции, пока выполняется обработка Future .
Мы можем выполнить какую-то другую логику, но в какой-то момент необходимо вызывать метод Future.get() , который потенциально также является блокирующим, если Future ещё не закончил работу, то мы заблокируем поток.
Эта реализация позволила нам сократить время блокировки потока, однако полностью эта проблема не решена, мы всё ещё с большой вероятностью будем получать блокировку потока.
Более высокоуровневым решением может быть использование CompletionStage и его реализации CompletableFuture . CompletionStage позволяет писать код в функциональном стиле, который выполняется асинхронно.
public inteface RideService < public CompletionStageprocessRideRequest(RideRequest rideRequest); >
@Service public class PassengerServiceImpl implements PassengerService < private final RideService rideService; public PassengerServiceImpl(RideService rideService) < this.rideService = rideService; >@Override public void requestRide(Location pickupLocation, Location dropoffLocation) < RideRequest rideRequest = new RideRequest(pickupLocation, dropoffLocation); rideService.processRideRequest(rideRequest) .thenApply(a ->< . >) .thenCombine(b -> < . >) .thenAccept(c -> < . >) > >
Однако, реализации с использованием Future , CompletionStage и им подобных требуют от разработчика глубокого понимания многопоточного программирования: доступ к общей памяти, синхронизация, обработка ошибок и так далее.
Но и это ещё не всё. Дизайн многопоточности в Java не предполагает, что мы будем создавать поток на каждый чих. Создание потока дорогостоящая операция. Да, пул потоков частично решает эту проблему, но есть ещё одна проблема: несколько потоков могут использовать один процессор для выполнения задач одновременно. При такой ситуации, процессорное время распределяется между несколькими потоками, что вызывает необходимость переключения контекста. Для возобновления выполнения потока позже, необходимо сохранять и загружать регистры, карты памяти и выполнить другие операции с высоким объёмом вычислений. Из-за этого снижается эффект от использования большого количества потоков при небольшом количестве процессоров.
Паттерн Наблюдатель (Observer Pattern)
Вспомним паттерн «Наблюдатель». Он поможет нам лучше понять концепцию реактивных потоков.
Наблюдатель — это поведенческий паттерн проектирования, который создаёт механизм подписки, позволяющий одним объектам следить и реагировать на события, происходящие в других объектах.
В этом паттерне есть два ключевых участника: Издатель и Подписчик (Наблюдатель). Издатель обновляет состояние и оповещает всех своих подписчиков об этих изменениях. Подписчики, в свою очередь, реагируют на эти уведомления.
Ключевые характеристики
- Декаплинг: Субъекты и наблюдатели функционируют независимо друг от друга. Это означает, что они не должны знать друг о друге. Субъекты просто отправляют уведомления, а наблюдатели просто реагируют на них.
- Динамичность: Подписчики могут подписываться и отписываться от субъектов в любое время.
- Многопоточность: Паттерн Наблюдатель позволяет обрабатывать события асинхронно и в различных потоках исполнения.
В контексте реактивного программирования паттерн Наблюдатель становится основой для создания реактивных потоков, обеспечивающих эффективную обработку данных и событий. Это также служит фундаментом для различных библиотек и фреймворков, таких как RxJava и Project Reactor.
Реактивные потоки (стримы)
Спецификация Reactive Streams впервые была опубликована в 2015 году. Она была разработана для стандартизации модели асинхронного потокового программирования с контролем потока данных в JVM и была принята в качестве основы для обработки асинхронного потока в JDK 9.
В отличие от обычных Java Stream не было предоставлено стандартных реализаций реактивных стримов, поэтому в последующие годы в Java-сообществе появилось несколько библиотек и фреймворков, которые реализуют и расширяют спецификацию Reactive Streams, таких как: RxJava, Vert.x, ProjectReactor, Akka Streams.
Базовая схема работы стримов
Рекомендую посмотреть доклад «Олег Докука — Реактивный хардкор: как построить свой Publisher», который закрепит понимание данного механизма взаимодействия.
Продолжая историю с паттерном Наблюдатель, теперь у нас есть интерфейсы Publisher , представляет источник данных, и Subscriber — наблюдатель, который подписывается на поток и получает уведомления об изменении состояния потока. Есть ещё один важный интерфейс, который является «посредником» между первыми двумя — это Subscription .
Давайте взглянем, как выглядят данные интерфейсы:
package org.reactivestreams; public interface Publisher < public void subscribe(Subscribers); >
package org.reactivestreams; public interface Subscriber
package org.reactivestreams; public interface Subscription
Все начинается с подписки Subscriber на Publisher посредством вызова метода Publisher.subscribe() . Publisher использует переданный объект Subscriber , вызывая метод onSubscribe() , передавая в Subscriber объект Subscription . Через этот объект Subscriber будет взаимодействовать с Publisher .
Теперь Subscriber , используя полученный Subscription , будет запрашивать значения у Publisher . Это важный момент, не Publisher инициализирует отправку данных подписчикам когда хочет, это подписчики запрашивают необходимое количество данных у Publisher . Таким образом реализуется контроль потока данных (Backpressure).
Метод onNext(T t) вызывается, когда Subcriber запрашивает значения у Publisher , используя метод Subscription.request() . Он передаёт данные по одному, но не больше, чем было запрошено подписчиком.
Метод onError() вызывается, когда ошибка происходит на стороне Publisher , оповещая таким образом о проблеме Subscriber , передавая объект исключения.
Метод onComplete() оповещает подписчиков, что у Publisher не осталось элементов для передачи. Данный метод, как и onError() вызывается лишь один раз.
Пример реактивного потока
Представим, что у нас есть набор чисел, и мы хотим получить квадрат каждого числа. При этом числа для расчёта могут поступать из разных систем или из бд, или из БД редиса и других систем.
В императивном стиле программирования мы бы обработали эти данные следующим образом:
List numbers = externalServices.getNumbers(); for (int number : numbers)
Код обрабатывает данные в определённом порядке и от начала до конца. Мы должны дождаться пока метод getNumbers() отправит нам все данные для расчёта.
Давайте рассмотрим тот же пример с использованием Project Reactor:
Flux numbers = externalServices.getNumber(); // Flux это реализация Publisher numbers .map(number -> number * number) .subscribe( data -> System.out.println("Получены данные: " + data), error -> System.out.println("Произошла ошибка: " + error), () -> System.out.println("Поток данных завершен") );
Этот код напоминает Stream API. Только вместо Stream используется Flux — это тип данных из Project Reactor, который представляет собой поток данных.
Мы используем метод map , чтобы преобразовать каждое число в его квадрат, и затем подписываемся на поток, чтобы вывести результат. Подписка очень важна, без неё ничего не произойдёт. Только в отличие от Stream API на Flux можно подписаться несколько раз.
В данном примере мы будем обрабатывать данные асинхронно по мере их поступления. Одна система ответила быстрее другой, сразу обработали эту часть данных.
Ограничения и недостатки
Не бывает идеального решения и реактивные потоки не исключение.
Чтобы получить преимущество реактивных потоков, весь стек должен быть реактивным: доступ к БД, операции чтения/записи файлов и так далее. Всё должно работать в реактивной парадигме, иначе вы получите блокировки, которые ухудшат производительность всей системы.
Например, стандартный JDBC не является реактивным. Если использовать его в реактивном сервисе, то придётся ждать ответ, когда мы отправляем запрос в базу данных. Соответственно, вся реактивность тут же ломается.
Весь технологический стек должен быть реактивным
Сложность: Реактивное программирование может быть сложным для понимания, особенно для новичков. Это связано с необходимостью работы с асинхронностью, обработкой ошибок и механизмами подобными backpressure.
Отладка и тестирование: Отладка и тестирование реактивных систем сложная задача, так как асинхронная природа реактивного кода делает отслеживание исполнения программы менее прямолинейным и понятным.
Требования к проектированию: Реактивные системы требуют продуманного проектирования, чтобы обеспечить их эффективность. Без правильного проектирования система может столкнуться с проблемами производительности или недостаточной отзывчивости.
Поддержи автора
Event Loop
Рассуждая на тему реактивного программирования, нельзя пройти мимо такого понятия, как Event Loop.
Это реактивная асинхронная модель программирования для серверов. Она позволяет достичь более высокого уровня параллелизма при меньшем количестве потоков.
По сути, Event Loop — это реализация шаблона Reactor. Является неблокирующим потоком ввода-вывода, который работает непрерывно. Его основная задача — проверка новых событий. И как только событие пришло перенаправлять его тому, кто в данный момент может его обработать. Иногда их может быть несколько для увеличения производительности.
Выше приведён абстрактный дизайн цикла событий, который представляет идеи реактивного асинхронного программирования:
- Цикл событий выполняется непрерывно в одном потоке, хотя у нас может быть столько циклов событий, сколько доступно ядер.
- Цикл событий последовательно обрабатывает события из очереди событий и возвращается сразу после регистрации обратного вызова в платформе.
- Платформа может инициировать завершение операции, такой как вызов базы данных или вызов внешней службы.
- Цикл событий может запускать обратный вызов при уведомлении о завершении операции и отправлять результат обратно исходному вызывающему.
В своей работе механизм Event Loop использует Netty — клиент-серверная среда ввода-вывода для разработки сетевых приложений Java. Также этот механизм использует Vert.x – это полифункциональная библиотека для построения реактивных приложений на JVM.
Реактивные фреймворки в Java
Теперь немного поговорим про имплементации реактивной спецификации, коих уже появилось достаточное количество.
Spring WebFlux (Project Reactor)
Project Reactor – это библиотека для реактивного программирования на Java, которая полностью поддерживает Reactive Streams. Она предлагает два основных типа данных – Flux и Mono .
Flux представляет поток ноль или более элементов, а Mono представляет один или ноль элементов. Оба типа предоставляют обширный набор операторов для трансформации и комбинирования этих потоков.
Spring WebFlux – это веб-фреймворк, который является частью экосистемы Spring и использует Project Reactor для обработки реактивных потоков. В отличие от Spring MVC, который предназначен для синхронного веб-программирования и блокирования I/O, Spring WebFlux предназначен для асинхронного и неблокирующего веб-программирования.
Вместо Tomcat используется неблокирующий Netty. Сервлеты тоже ушли в прошлое.
Quarkus (Vert.x)
Не спрингом единым. Активно использую его в работе и в пет-проектах.
Реактивное программирование в Quarkus основано на библиотеке SmallRye Mutiny. Mutiny предлагает два основных типа: Uni и Multi , которые представляют собой типы реактивных потоков, обеспечивающих обработку одного или множества элементов соответственно.
Множество знакомых коллег, которым довелось поработать и со Spring WebFlux и с Quarkus, отмечают, что реактивный Quarkus API более приятный для работы.
Quarkus также оптимизирован для работы в контейнеризированных средах и облачных приложениях, что, в сочетании с его реактивной моделью программирования, делает его отличным выбором для создания современных микросервисов.
Кроме того, Quarkus предоставляет возможность компиляции приложений в нативный код при помощи GraalVM, что позволяет уменьшить время запуска и использование памяти, что является ещё одним преимуществом при использовании в облачных средах.
Project Loom
Некоторые разработчики предрекают скорую смерть таким проектам, как Project Reactor, ведь уже близится релиз Project Loom. Давайте порассуждаем на эту тему.
Цель Project Loom добавить в Java так называемые виртуальные потоки или нити (fibers). Одной из ключевых особенностей виртуальных потоков является их «непрерывность». Виртуальный поток может быть приостановлен, а его ресурсы могут быть возвращены в пул потоков, что позволяет использовать поток операционной системы для другой работы. Когда придёт ответ от внешнего сервиса, виртуальный поток может быть возобновлен и продолжить свою работу.
Хотя такие реактивные библиотеки предоставляют мощные абстракции для управления асинхронным и неблокирующим кодом, они всё ещё полагаются на традиционную модель потоков Java, которая может быть сложной и трудной для понимания. С Project Loom разработчики получат более простой способ написания неблокирующего кода, без необходимости использования сложных абстракций или пулов потоков.
Однако, на данный момент Project Loom находится в разработке, а вот реактивные фреймворки уже есть и успешно используются.
Вот аргументы, почему существующие фреймворки останутся в строю:
- Project Loom находится в разработке, и его окончательная форма и влияние на экосистему Java не полностью понятны.
- Ничто не мешает реактивным фреймворкам использовать под капотом новые виртуальные потоки. Предоставляя мощные абстракции и API для работы.
- Когда требуется тип обработки в стиле событий (Event-Driven Architecture), то их API очень удобен, странно от него отказываться.
Рекомендую посмотреть следующие доклады на тему Project Loom:
- Иван Углянский — Thread Wars: проект Loom наносит ответный удар
- Олег Докука, Андрей Родионов — Project Loom — друг или враг Reactive?
Заключение
Мы разобрались, что такое реактивные системы, и какими свойствами система должна обладать, чтобы называться реактивной. Разработка реактивной системы — сложный процесс и очевидно, что не каждой системе необходимо быть реактивной.
Однако, системы написанные с применением реактивных подходов и реактивного стека позволяют выдерживать большую нагрузку, чем системы, написанные на стандартном императивном стеке, а также быть более эффективными с точки зрения потребления ресурсов системы и дальнейшего её масштабирования.
Создать реактивную систему проще всего с помощью реактивных фремворков, которые позволяют не блокировать потоки и обрабатывать данные по мере их поступления.
Дополнительные материалы
- Зачем нам Reactive и как его готовить. Команда разработки делится своим опытом перевода сервисов на реактивный стек. Они используют Spring WebFlux.
Реактивное программирование на Java: как, зачем и стоит ли? Часть II
Реактивное программирование — один из самых актуальных трендов современности. Обучение ему — сложный процесс, особенно если нет подходящих материалов. В качестве своеобразного дайджеста может выступить эта статья. На конференции РИТ++ 2020 эксперт и тренер Luxoft Training Владимир Сонькин рассказал о фишках управления асинхронными потоками данных и подходах к ним, а также показал на примерах, в каких ситуациях нужна реактивность, и что она может дать.
В первой части статьи рассказывалось о том, что привело к появлению реактивного программирования, где оно применяется, и что нам может дать асинхронность. Пришло время рассказать о следующем шаге, позволяющем получить максимум преимуществ от асинхронности, и это — реактивное программирование.
Reactivity
Реактивное программирование — это асинхронность, соединенная с потоковой обработкой данных. То есть если в асинхронной обработке нет блокировок потоков, но данные обрабатываются все равно порциями, то реактивность добавляет возможность обрабатывать данные потоком. Помните тот пример, когда начальник поручает задачу Васе, тот должен передать результат Диме, а Дима вернуть начальнику? Но у нас задача — это некая порция, и пока она не будет сделана, дальше передать ее нельзя. Такой подход действительно разгружает начальника, но Дима и Вася периодически простаивают, ведь Диме надо дождаться результатов работы Васи, а Васе — дождаться нового задания.
А теперь представьте, что задачу разбили на множество подзадач. И теперь они плывут непрерывным потоком:
Говорят, когда Генри Форд придумал свой конвейер, он повысил производительность труда в четыре раза, благодаря чему ему удалось сделать автомобили доступными. Здесь мы видим то же самое: у нас небольшие порции данных, а конвейер с потоком данных, и каждый обработчик пропускает через себя эти данные, каким-то образом их преобразовывая. В качестве Васи и Димы у нас выступают потоки выполнения (threads), обеспечивая, таким образом, многопоточную обработку данных.
На этой схеме показаны разные технологии распараллеливания, добавлявшиеся в Java в разных версиях. Как мы видим, спецификация Reactive Streams на вершине — она не заменяет всего, что было до нее, но добавляет самый высокий уровень абстракции, а значит ее использование просто и эффективно. Попробуем в этом разобраться.
Идея реактивности построена на паттерне проектирования Observer.
Давайте вспомним, что это за паттерн. У нас есть подписчики и то, на что мы подписываемся. В качестве примера здесь рассмотрен Твиттер, но подписаться на какое-то сообщество или человека, а потом получать обновления можно в любой соцсети. После подписки, как только появляется новое сообщение, всем подписчикам приходит notify, то есть уведомление. Это базовый паттерн.
В данной схеме есть:
- Publisher — тот, кто публикует новые сообщения;
- Observer — тот, кто на них подписан. В реактивных потоках подписчик обычно называется Subscriber. Термины разные, но по сути это одно и то же. В большинстве сообществ более привычны термины Publisher/Subscriber.
Это базовая идея, на которой все строится.
Один из жизненных примеров реактивности — система оповещения при пожаре. Допустим, нам надо сделать систему, включающую тревогу в случае превышения задымленности и температуры.
У нас есть датчик дыма и градусник. Когда дыма становится много и/или температура растет, на соответствующих датчиках увеличивается значение. Когда значение и температура на датчике дыма оказываются выше пороговых, включается колокольчик и оповещает о тревоге.
Если бы у нас был традиционный, а не реактивный подход, мы бы писали код, который каждые пять минут опрашивает детектор дыма и датчик температуры, и включает или выключает колокольчик. Однако в реактивном подходе за нас это делает реактивный фреймворк, а мы только прописываем условия: колокольчик активен, когда детектор больше X, а температура больше Y. Это происходит каждый раз, когда приходит новое событие.
От детектора дыма идет поток данных: например, значение 10, потом 12, и т.д. Температура тоже меняется, это другой поток данных — 20, 25, 15. Каждый раз, когда появляется новое значение, результат пересчитывается, что приводит к включению или выключению системы оповещения. Нам достаточно сформулировать условие, при котором колокольчик должен включиться.
Если вернуться к паттерну Observer, у нас детектор дыма и термометр — это публикаторы сообщений, то есть источники данных (Publisher), а колокольчик на них подписан, то есть он Subscriber, или наблюдатель (Observer).
Немного разобравшись с идеей реактивности, давайте углубимся в реактивный подход. Мы поговорим об операторах реактивного программирования. Операторы позволяют каким-либо образом трансформировать потоки данных, меняя данные и создавая новые потоки. Для примера рассмотрим оператор distinctUntilChanged. Он убирает одинаковые значения, идущие друг за другом. Действительно, если значение на детекторе дыма не изменилось — зачем нам на него реагировать и что-то там пересчитывать:
Reactive approach
Рассмотрим еще один пример: допустим, мы разрабатываем UI, и нам нужно отслеживать двойные нажатия мышкой. Тройной клик будем считать как двойной.
Клики здесь — это поток щелчков мышкой (на схеме 1, 2, 1, 3). Нам нужно их сгруппировать. Для этого мы используем оператор throttle. Говорим, что если два события (два клика) произошли в течение 250 мс, их нужно сгруппировать. На второй схеме представлены сгруппированные значения (1, 2, 1, 3). Это поток данных, но уже обработанных — в данном случае сгрупированных.
Таким образом начальный поток преобразовался в другой. Дальше нужно получить длину списка ( 1, 2, 1, 3). Фильтруем, оставляя только те значения, которые больше или равны 2. На нижней схеме осталось только два элемента (2, 3) — это и были двойные клики. Таким образом, мы преобразовали начальный поток в поток двойных кликов.
Это и есть реактивное программирование: есть потоки на входе, каким-то образом мы пропускаем их через обработчики, и получаем поток на выходе. При этом вся обработка происходит асинхронно, то есть никто никого не ждет.
Еще одна хорошая метафора — это система водопровода: есть трубы, одна подключена к другой, есть какие-то вентили, может быть, стоят очистители, нагреватели или охладители (это операторы), трубы разделяются или объединяются. Система работает, вода льется. Так и в реактивном программировании, только в водопроводе течет вода, а у нас — данные.
Можно придумать потоковое приготовление супа. Например, есть задача максимально эффективно сварить много супа. Обычно берется кастрюля, в нее наливается порция воды, овощи нарезаются и т.д. Это не потоковый, а традиционный подход, когда мы варим суп порциями. Сварили эту кастрюлю, потом нужно ставить следующую, а после — еще одну. Соответственно, надо дождаться, пока в новой кастрюле снова закипит вода, растворится соль, специи и т.д. Все это занимает время.
Представьте себе такой вариант: в трубе нужного диаметра (достаточного, чтобы заполнялась кастрюля) вода сразу подогревается до нужной температуры, есть нарезанная свекла и другие овощи. На вход они поступают целыми, а выходят уже шинкованными. В какой-то момент все смешивается, вода подсаливается и т.д. Это максимально эффективное приготовление, супоконвейер. И именно в этом идея реактивного подхода.
Observable example
Теперь посмотрим на код, в котором мы публикуем события:
Observable.just позволяет положить в поток несколько значений, причем если обычные реактивные потоки содержат значения, растянутые во времени, то тут мы их кладем все сразу — то есть синхронно. В данном случае это названия городов, на которые в дальнейшем можно подписаться (тут для примера взяты города, в которых есть учебный центр Люксофт).
Девушка (Publisher) опубликовала эти значения, а Observers на них подписываются и печатают значения из потока.
Это похоже на потоки данных (Stream) в Java 8. И тут, и там синхронные потоки. И здесь, и в Java 8 список значений нам известен сразу. Но если бы использовался обычный для Java 8 поток, мы не могли бы туда что-то докладывать. В стрим ничего нельзя добавить: он синхронный. В нашем примере потоки асинхронные, то есть в любой момент времени в них могут появляться новые события — скажем, если через год откроется учебный центр в новой локации — она может добавиться в поток, и реактивные операторы правильно обработают эту ситуацию. Мы добавили события и сразу же на них подписались:
Мы можем в любой момент добавить значение, которое через какое-то время выводится. Когда появляется новое значение, мы просим его напечатать, и на выходе получаем список значений:
При этом есть возможность не только указать, что должно происходить, когда появляются новые значения, но и дополнительно отработать такие сценарии, как возникновение ошибок в потоке данных или завершение потока данных. Да-да, хотя часто потоки данных не завершаются (например, показания термометра или датчика дыма), многие потоки могут завершаться: например, поток данных с сервера или с другого микросервиса. В какой-то момент сервер закрывает соединение, и появляется потребность на это как-то отреагировать.
Implementing and subscribing to an observer
В Java 9 нет реализации реактивных потоков — только спецификация. Но есть несколько библиотек — реализаций реактивного подхода. В этом примере используется библиотека RxJava. Мы подписываемся на поток данных, и определяем несколько обработчиков, то есть методы, которые будут запущены в начале обработки потока (onSubscribe), при получении каждого очередного сообщения (onNext), при возникновении ошибки (onError) и при завершении потока (onComplete):
Давайте посмотрим на последнюю строчку.
locations.map(String::length).filter(l -> l >= 5).subscribe(observer);
Мы используем операторы map и filter. Если вы работали со стримами Java 8, вам, конечно, знакомы map и filter. Здесь они работают точно так же. Разница в том, что в реактивном программировании эти значения могут появляться постепенно. Каждый раз, когда приходит новое значение, оно проходит через все преобразования. Так, String::length заменит строчки на длину в каждой из строк.
В данном случае получится 5 (Minsk), 6 (Krakow), 6 (Moscow), 4 (Kiev), 5 (Sofia). Фильтруем, оставляя только те, что больше 5. У нас получится список длин строк, которые больше 5 (Киев отсеется). Подписываемся на итоговый поток, после этого вызывается Observer и реагирует на значения в этом итоговом потоке. При каждом следующем значении он будет выводить длину:
public void onNext(Integer value) <
System.out.println(«Length: » + value);
То есть сначала появится Length 5, потом — Length 6. Когда наш поток завершится, будет вызван onComplete, а в конце появится надпись «Done.»:
public void onComplete() System.out.println(«Done.»);
Не все потоки могут завершаться. Но некоторые способны на это. Например, если мы читали что-то из файла, поток завершится, когда файл закончится.
Если где-то произойдет ошибка, мы можем на нее отреагировать:
public void onError(Throwable e) e.printStackTrace();
Таким образом мы можем реагировать разными способами на разные события: на следующее значение, на завершение потока и на ошибочную ситуацию.
Reactive Streams spec
Реактивные потоки вошли в Java 9 как спецификация.
Если предыдущие технологии (Completable Future, Fork/Join framework) получили свою имплементацию в JDK, то реактивные потоки имплементации не имеют. Есть только очень короткая спецификация. Там всего 4 интерфейса:
Если рассматривать наш пример из картинки про Твиттер, мы можем сказать, что:
Publisher — девушка, которая постит твиты;
Subscriber — подписчик. Он определяет , что делать, если:
- Начали слушать поток (onSubscribe). Когда мы успешно подписались, вызовется эта функция;
- Появилось очередное значение в потоке (onNext);
- Появилось ошибочное значение (onError);
- Поток завершился (onComplete).
Subscription — у нас есть подписка, которую можно отменить (cancel) или запросить определенное количество значений (request(long n)). Мы можем определить поведение при каждом следующем значении, а можем забирать значения вручную.
Processor — обработчик — это два в одном: он одновременно и Subscriber, и Publisher. Он принимает какие-то значения и куда-то их кладет.
Если мы хотим на что-то подписаться, вызываем Subscribe, подписываемся, и потом каждый раз будем получать обновления. Можно запросить их вручную с помощью request. А можно определить поведение при приходе нового сообщения (onNext): что делать, если появилось новое сообщение, что делать, если пришла ошибка и что делать, если Publisher завершил поток. Мы можем определить эти callbacks, или отписаться (cancel).
PUSH / PULL модели
Существует две модели потоков:
- Push-модель — когда идет «проталкивание» значений.
Например, вы подписались на кого-то в Telegram или Instagram и получаете оповещения (они так и называются — push-сообщения, вы их не запрашиваете, они приходят сами). Это может быть, например, всплывающее сообщение. Можно определить, как реагировать на каждое новое сообщение.
- Pull-модель — когда мы сами делаем запрос.
Например, мы не хотим подписываться, т.к. информации и так слишком много, а хотим сами заходить на сайт и узнавать новости.
Для Push-модели мы определяем callbacks, то есть функции, которые будут вызваны, когда придет очередное сообщение, а для Pull-модели можно воспользоваться методом request, когда мы захотим узнать, что новенького.
Pull-модель очень важна для Backpressure — «напирания» сзади. Что же это такое?
Вы можете быть просто заспамленными своими подписками. В этом случае прочитать их все нереально, и есть шанс потерять действительно важные данные — они просто утонут в этом потоке сообщений. Когда подписчик из-за большого потока информации не справляется со всем, что публикует Publisher, получается Backpressure.
В этом случае можно использовать Pull-модель и делать request по одному сообщению, прежде всего из тех потоков данных, которые наиболее важны для вас.
Implementations
Давайте рассмотрим существующие реализации реактивных потоков:
- RxJava. Эта библиотека реализована для разных языков. Помимо RxJava существует Rx для C#, JS, Kotlin, Scala и т.д.
- Reactor Core. Был создан под эгидой Spring, и вошел в Spring 5.
- Akka-стримы от создателя Scala Мартина Одерски. Они создали фреймворк Akka (подход с Actor), а Akka-стримы — это реализация реактивных потоков, которые дружат с этим фреймворком.
Во многом эти реализации похожи, и все они реализуют спецификацию реактивных потоков из Java 9.
Посмотрим подробнее на Spring’овский Reactor.
Function may return…
Давайте обобщим, что может возвращать функция:
- Single/Synchronous;
Обычная функция возвращает одно значение, и делает это синхронно.
- Multipple/Synchronous;
Если мы используем Java 8, можем возвращать из функции поток данных Stream. Когда вернулось много значений, их можно отправлять на обработку. Но мы не можем отправить на обработку данные до того, как все они получены — ведь Stream работают только синхронно.
- Single/Asynchronous;
Здесь уже используется асинхронный подход, но функция возвращает только одно значение:
- либо CompletableFuture (Java), и через какое-то время приходит асинхронный ответ;
- либо Mono, возвращающая одно значение в библиотеке Spring Reactor.
- Multiple/Asynchronous.
А вот тут как раз — реактивные потоки. Они асинхронные, то есть возвращают значение не сразу, а через какое-то время. И именно в этом варианте можно получить поток значений, причем эти значения будут растянуты во времени Таким образом, мы комбинируем преимущества потоков Stream, позволяющих вернуть цепочку значений, и асинхронности, позволяющей отложить возврат значения.
Например, вы читаете файл, а он меняется. В случае Single/Asynchronous вы через какое-то время получаете целиком весь файл. В случае Multiple/Asynchronous вы получаете поток данных из файла, который сразу же можно начинать обрабатывать. То есть можно одновременно читать данные, обрабатывать их, и, возможно, куда-то записывать. . Реактивные асинхронные потоки называются:
- Publisher (в спецификации Java 9);
- Observable (в RxJava);
- Flux (в Spring Reactor).
Netty as a non-blocking server
Рассмотрим пример использования реактивных потоков Flux вместе со Spring Reactor. В основе Reactor лежит сервер Netty. Spring Reactor — это основа технологии, которую мы будем использовать. А сама технология называется WebFlux. Чтобы WebFlux работал, нужен асинхронный неблокирующий сервер.
Схема работы сервера Netty похожа на то, как работает Node.js. Есть Selector — входной поток, который принимает запросы от клиентов и отправляет их на выполнение в освободившиеся потоки. Если в качестве синхронного сервера (Servlet-контейнера) используется Tomcat, то в качестве асинхронного используется Netty.
Давайте посмотрим, сколько вычислительных ресурсов расходуют Netty и Tomcat на выполнение одного запроса:
Throughput — это общее количество обработанных данных. При небольшой нагрузке, до первых 300 пользователей у RxNetty и Tomcat оно одинаковое, а после Netty уходит в приличный отрыв — почти в 2 фраза.
Blocking vs Reactive
У нас есть два стека обработки запросов:
- Традиционный блокирующий стек.
- Неблокирующий стек — в нем все происходит асинхронно и реактивно.
В блокирующем стеке все строится на Servlet API, в реактивном неблокирующем стеке — на Netty.
Сравним реактивный стек и стек Servlet.
В Reactive Stack применяется технология Spring WebFlux. Например, вместо Servlet API используются реактивные стримы.
Чтобы мы получили ощутимое преимущество в производительности, весь стек должен быть реактивным. Поэтому чтение данных тоже должно происходить из реактивного источника.
Например, если у нас используется стандартный JDBC, он является не реактивным блокирующим источником, потому что JDBC не поддерживает неблокирующий ввод/вывод. Когда мы отправляем запрос в базу данных, приходится ждать, пока результат этого запроса придет. Соответственно, получить преимущество не удается.
В Reactive Stack мы получаем преимущество за счет реактивности. Netty работает с пользователем, Reactive Streams Adapters — со Spring WebFlux, а в конце находится реактивная база: то есть весь стек получается реактивным. Давайте посмотрим на него на схеме:
Data Repo — репозиторий, где хранятся данные. В случае, если есть запросы, допустим, от клиента или внешнего сервера, они через Flux поступают в контроллер, обрабатываются, добавляются в репозиторий, а потом ответ идет в обратную сторону.
При этом все это делается неблокирующим способом: мы можем использовать либо Push-подход, когда мы определяем, что делать при каждой следующей операции, либо Pull-подход, если есть вероятность Backpressure, и мы хотим сами контролировать скорость обработки данных, а не получать все данные разом.
Операторы
В реактивных потоках огромное количество операторов. Многие из них похожи на те, которые есть в обычных стримах Java. Мы рассмотрим только несколько самых распространенных операторов, которые понадобятся нам для практического примера применения реактивности.
Filter operator
Скорее всего, вы уже знакомы с фильтрами из интерфейса Stream.
По синтаксису этот фильтр точно такой же, как обычный. Но если в стриме Java 8 все данные есть сразу, здесь они могут появляться постепенно. Стрелки вправо — это временная шкала, а в кружочках находятся появляющиеся данные. Мы видим, что фильтр оставляет в итоговом потоке только значения, превышающие 10.
Take 2 означает, что нужно взять только первые два значения.
Map operator
Оператор Map тоже хорошо знаком:
Это действие, происходящее с каждым значением. Здесь — умножить на десять: было 3, стало 30; было 2, стало 20 и т.д.
Delay operator
Задержка: все операции сдвигаются. Этот оператор может понадобиться, когда значения уже генерируются, но подготовительные процессы еще происходят, поэтому приходится отложить обработку данных из потока.
Reduce operator
Еще один всем известный оператор:
Он дожидается конца работы потока (onComplete) — на схеме она представлена вертикальной чертой. После чего мы получаем результат — здесь это число 15. Оператор reduce сложил все значения, которые были в потоке.
Scan operator
Этот оператор отличается от предыдущего тем, что не дожидается конца работы потока.
Оператор scan рассчитывает текущее значение нарастающим итогом: сначала был 1, потом прибавил к предыдущему значению 2, стало 3, потом прибавил 3, стало 6, еще 4, стало 10 и т.д. На выходе получили 15. Дальше мы видим вертикальную черту — onComplete. Но, может быть, его никогда не произойдет: некоторые потоки не завершаются. Например, у термометра или датчика дыма нет завершения, но scan поможет рассчитать текущее суммарное значение, а при некоторой комбинации операторов — текущее среднее значение всех данных в потоке.
Merge operator
Объединяет значения двух потоков.
Например, есть два температурных датчика в разных местах, а нам нужно обрабатывать их единообразно, в общем потоке .
Combine latest
Получив новое значение, комбинирует его с последним значением из предыдущего потока.
Если в потоке возникает новое событие, мы его комбинируем с последним полученным значением из другого потока. Скажем, таким образом мы можем комбинировать значения от датчика дыма и термометра: при появлении нового значения температуры в потоке temperatureStream оно будет комбинироваться с последним полученным значением задымленности из smokeStream. И мы будем получать пару значений. А уже по этой паре можно выполнить итоговый расчет:
temperatureStream.combineLatest(smokeStream).map((x, y) -> x > X && y > Y)
В итоге на выходе у нас получается поток значений true или false — включить или выключить колокольчик. Он будет пересчитываться каждый раз, когда будет появляться новое значение в temperatureStream или в smokeStream.
FlatMap operator
Этот оператор вам, скорее всего, знаком по стримам Java 8. Элементами потока в данном случае являются другие потоки. Получается поток потоков. Работать с ними неудобно, и в этих случаях нам может понадобиться «уплостить» поток.
Можно представить такой поток как конвейер, на который ставят коробки с запчастями. До того, как мы начнем их применять, запчасти нужно достать из коробок. Именно это делает оператор flatMap.
Flatmap часто используется при обработке потока данных, полученных с сервера. Т.к. сервер возвращает поток, чтобы мы смогли обрабатывать отдельные данные, этот поток сначала надо «развернуть». Это и делает flatMap.
Buffer operator
Это оператор, который помогает группировать данные. На выходе Buffer получается поток, элементами которого являются списки (List в Java). Он может пригодиться, когда мы хотим отправлять данные не по одному, а порциями.
Мы с самого начала говорили, что реактивные потоки позволяют разбить задачу на подзадачи, и обрабатывать их маленькими порциями. Но иногда лучше наоборот, собрать много маленьких частей в блоки. Скажем, продолжая пример с конвейером и запчастями, нам может понадобиться отправлять запчасти на другой завод (другой сервер). Но каждую отдельную запчасть отправлять неэффективно. Лучше их собрать в коробки, скажем по 100 штук, и отправлять более крупными партиями.
На схеме выше мы группируем отдельные значения по три элемента (так как всего их было пять, получилась «коробка» из трех, а потом из двух значений). То есть если flatMap распаковывает данные из коробок, buffer, наоборот, упаковывает их.
Всего существует более сотни операторов реактивного программирования. Здесь разобрана только небольшая часть.
Итого
Есть два подхода:
- Spring MVC — традиционная модель, в которой используется JDBC, императивная логика и т.д.
- Spring WebFlux, в котором используется реактивный подход и сервер Netty.
Есть кое-что, что их объединяет. Tomcat, Jetty, Undertow могут работать и со Spring MVC, и со Spring WebFlux. Однако дефолтным сервером в Spring для работы с реактивным подходом является именно Netty.
Конференция HighLoad++ Весна 2021 пройдет 20 и 21 мая 2021 года. Приобрести билеты можно уже сейчас.
А совсем скоро состоится еще одно интересное событие, на сей раз онлайн: 18 марта в 17:00 МСК пройдет митап «Как устроена самая современная платежная система в МИРе: архитектура и безопасность».
Вместе с разработчиками Mир Plat.Form будем разбираться, как обеспечить устойчивость работы всех сервисов уже на этапе проектирования и как сделать так, чтобы система могла развиваться, не затрагивая бизнес-процессы. Митап будет интересен разработчикам, архитекторам и специалистам по безопасности.
Хотите бесплатно получить материалы конференции мини-конференции Saint HighLoad++ 2020? Подписывайтесь на нашу рассылку.
- реактивное программирование
- java
- конференция
- highload
- Блог компании Конференции Олега Бунина (Онтико)
- Высокая производительность
- Программирование
- Java
- Параллельное программирование