22 октября 2019

RxJS для продолжающих

Библиотека RxJS — мощный инструмент для реализации обработки синхронного и асинхронного кода. Мы предполагаем, что читатель, приступающий к прочтению этого материала, уже освоил базовые концепты — это наблюдатель (Observer), наблюдаемое (Observable), операторы (Operators) и подписка (Subscription), а также уже использовал RxJS в проекте и хочет расширить свой опыт и экспертизу по этому вопросу.

RxJS for intermediates Noveo

Когда начинаешь работать с RxJS, часто встречаешь термин «потоки». Прочитав пару-тройку статей, мы, вдохновившись идеологией реактивного программирования (все есть поток), начинаем строить «потоковую» архитектуру своего приложения. А используя термин «потоки», мы быстро попадаем в когнитивную ловушку: нам кажется что Observable работает (или должен работать) как EventEmitter…

…Байка…

Типичный Холодный.

Представьте что вы бармен из ТОГО САМОГО бара, который сгорел1. К вам заходит посетитель (Observer) и спрашивает (Subscription), где здесь туалет. Потом заходит следующий и спрашивает, где здесь туалет. И следующий… и следующий… Вы, сохраняя самообладание, отвечаете всегда одно и тоже, НО персонально каждому. Вы типичный холодный Observable.

…но на самом деле порядок вещей следующий: по умолчанию Observable работает как функция — то есть вызывается каждый раз при подписке (unicast), но может работать и как EventEmitter — регистрировать количество подписчиков и вещать всем (multicast).

…Байка…

Типичный Горячий.

Вам, как бармену, надоело работать попугаем, и вы решили автоматизировать процесс. Допустим, вы купили телевизор и стали с каким-то интервалом выводить на нем бегущую строку с сообщением, как дойти до туалета, а сами спрятались под стойку. Теперь каждый приходящий посетитель вынужден садиться за стол и ожидать очередного сообщения по ТВ. Немного неудобно, зато все присутствующие видят сообщение разом! Широковещательное ТВ: типичный горячий Observable.

Unicast (Cold) and Multicast (Hot) Observables

Наблюдаемые (Observables) по способу выполнения делятся на две категории: “холодные” (unicast) и “горячие” (multicast).

Rx for intermediates Noveo

Как мы уже знаем, «холодный» создает собственный поток для каждого нового подписчика и выдает значения лично для него.

RxJS for intermediates Noveo

В то время как «горячий» регистрирует слушателей и вещает всем.

RxJS for intermediates Noveo

Библиотека RxJS даёт возможность преобразовать холодный поток в горячий. Но прежде чем говорить о преобразованиях, необходимо рассказать о специальных объектах, которые позволяют это сделать.

4 типа Subjects

В RxJS существуют объекты, которые одновременно являются и наблюдателем, и наблюдаемым — Subjects. Subject’ы придумали для того, чтобы передавать значения сразу нескольким подписчикам. Поскольку Subject’ы по умолчанию являются «горячими», то мы можем быть уверены, что у нас создается один поток, значения из которого будут получать наши подписчики.

RxJS for intermediates Noveo

Subject

— это объект, который одновременно умеет как принимать значения, так и выдавать. Главное отличие Subject’a от обычного Observable — это то, что мы можем генерировать новое событие вручную, с помощью функции .next(), в то время как в Observable новые значение должны поступать из callback-функции. Также мы можем вручную завершить поток функцией .complete(); или вызвать функцию завершения потока с ошибкой .error();

Пример: В примере ниже мы создаем Subject и кладем в него значения от 0 до 4 через каждую секунду. Создаем подписку A сразу после объявления Subject, а подписку B через 2,5 секунды.

const subject = new Subject();
const source1 = timer(1000, 1000).pipe(
    take(5),
    tap( (x) => subject.next(x) )
).subscribe();
const sub1 = subject.subscribe(x => console.log('A: ', x));
setTimeout(() => {
    const sub2 = subject.subscribe(x => console.log('B: ', x));
}, 2500);

Результат: https://rxviz.com/v/6Jrgx0g8

RxJS for intermediates Noveo

BehaviorSubject

— всегда возвращает последнее значение из потока, при этом обязательно нужно указывать начальное значение. Также имеет статический метод getValue, который возвращает статическое значение без подписок и потоков.

Пример использования: создание стейта для приложения.

RxJS for intermediates by Noveo

Пример: В этом примере мы создаем точно такой же поток, как и с простым Subject, но только теперь используем BehaviorSubject. Как мы можем видеть, при старте подписки A новых значений еще не было получено, поэтому возвращается начальное значение 42. Когда создается подписка B, сначала возвращается последнее значение, и после продолжается с новыми значениями.

const subject = new BehaviorSubject(42);
const source1 = timer(1000, 1000).pipe(
    take(5),
    tap( (x) => subject.next(x) )
).subscribe();
const sub1 = subject.subscribe(x => console.log('A: ', x));
setTimeout(() => {
  const sub2 = subject.subscribe(x => console.log('B: ', x));
}, 2500);

Результат: https://rxviz.com/v/jOLXG6AO

RxJS for intermediates by Noveo

ReplaySubject

— имеет возможность указания размера буфера.

Пример использования: кеширование запросов к API.

RxJS for intermediates by Noveo

Пример: в данном случае мы указали размеры буфера 2. При создании подписки B мы сначала получили значения 0 и 1.

const subject = new ReplaySubject(2);
const source1 = timer(1000, 1000).pipe(
    take(5),
    tap( (x) => subject.next(x) )
).subscribe();
const sub1 = subject.subscribe(x => console.log('A: ', x));
setTimeout(() => {
const sub2 = subject.subscribe(x => console.log('B: ', x));
}, 2500);

Результат: https://rxviz.com/v/VJ4wBRpO

RxJS for intermediates by Noveo

AsyncSubject

— возвращает последнее значение после завершения потока. Частой ошибкой и непониманием работы с этим типом Subject’a является отсутствие вызова функции завершения потока.

RxJS for intermediates by Noveo

Пример: в этом примере мы кладем значения в AsyncSubject от 0 до 4 каждую секунду. Делаем подписку A сразу и подписку B через 2,5 секунды. И дополнительно вызываем метод complete через 4 секунды, тем самым преждевременно завершая наш поток. В результате в консоли получим значения 3 для подписки A и B.

const subject = new AsyncSubject();
const source1 = timer(1000, 1000).pipe(
    take(5),
    tap( (x) => subject.next(x) )
).subscribe();
const sub1 = subject.subscribe(x => console.log('A: ', x));
setTimeout(() => {
const sub2 = subject.subscribe(x => console.log('B: ', x));
}, 3000);
setTimeout(() => {subject.complete();}, 4000);

Результат: https://rxviz.com/v/7JXZxL0o

RxJS for intermediates by Noveo

From Cold to Hot Observable

Все наши преобразования холодного в горячее мы будем проводить на одном примере, который указан ниже.

Пример:

const source1 = interval(1000).pipe(
take(5)
);
const sub1 = source1.subscribe(x => console.log('A: ', x));
setTimeout(() => {
const sub2 = source1.subscribe(x => console.log('B: ', x));
}, 2500);

В данном примере мы создаем холодный поток, который каждую секунду выдаёт значение, начиная с 0 и увеличивая каждый раз на 1.

RxJS for intermediates by Noveo

Multicast

Оператор, который позволяет преобразовать холодное наблюдаемое в горячее, — это оператор multicast. Этот оператор в качестве параметра принимает объект типа subject. Это может быть любой из 4-х видов subject’ов в зависимости от ваших целей.

Пример:

const source1 = interval(1000).pipe(
    take(5),
    multicast(new Subject())
) as ConnectableObservable<number>;
const sub1 = source1.subscribe(x => console.log('A: ', x));
setTimeout(() => {
  const sub2 = source1.subscribe(x => console.log('B: ', x));
}, 2500);

Обратите внимание, что наш Observable возвращает объект с типом ConnectableObservable. Этот объект похож на обычный Observable, за исключением одного момента: он начинает производить элементы не тогда, когда на него подписываются, а только тогда, когда на нем вызвана функция .connect()

Поэтому представленный выше пример ничего не делает. Для того, чтобы наш Observer начал выдавать значение, нам необходимо вызвать метод connect.

Пример:

onst source1 = interval(1000).pipe(
    take(5),
    multicast(new Subject())
) as ConnectableObservable<number>;
source1.connect();
const sub1 = source1.subscribe(x => console.log('A: ', x));
setTimeout(() => {
  const sub2 = source1.subscribe(x => console.log('B: ', x));
}, 2500);

Publish

Следующий оператор, который мы рассмотрим, — это оператор publish. Является сокращенным типом записи оператора multicast(new Subject()). Также помимо оператора publish существуют другие операторы, который упрощают запись и сводят ее к одной строке.

  • publish =  multicast + Subject
  • publishReplay = multicast + ReplaySubject
  • publishBehavior = multicast + BehaviorSubject
  • publishLast = multicast + AsyncSubject

Пример:

const source1 = interval(1000).pipe(
    take(5),
    publish()
) as ConnectableObservable<number>;
source1.connect();
const sub1 = source1.subscribe(x => console.log('A: ', x));
setTimeout(() => {
  const sub2 = source1.subscribe(x => console.log('B: ', x));
}, 2500);

refCount

Каждый раз вызывать метод connect получается достаточно проблематично, мы можем с легкостью забыть его вызвать. Для того, чтобы об этом не волноваться, существует оператор refCount. Этот оператор сам следит за количеством подписчиков и вызывает метод connect(), когда количество подписчиков меняется с 0 на 1, и делает отписку, когда количество подписчиков меняется с 1 на 0.

Пример:

const source1 = interval(1000).pipe(
    take(5),
    publish(),
    refCount()
);
const sub1 = source1.subscribe(x => console.log('A: ', x));
setTimeout(() => {
  const sub2 = source1.subscribe(x => console.log('B: ', x));
}, 2500);

Все вышеперечисленные методы являются устаревшими. Им на смену пришли новые операторы, которые еще больше упрощают процесс преобразования Observable (текущая версия на момент написания статьи RxJS 6.5.3).

Share

Первый оператор — это Share. Это вызов операторов publish и refCount, но только вызов будет состоять из одного оператора.

Пример:

const  source1 = interval(1000).pipe(
take(5),
share()
);
const sub1 = source1.subscribe(x => console.log('A: ', x));
setTimeout(() => {
const sub2 = source1.subscribe(x => console.log('B: ', x));
}, 2500);

Также существует оператор shareReplay. Это вызов операторов publishReplay и refCount. С ним есть особенность использования в зависимости от версии библиотеки. Подробнее о его непростой судьбе можно прочитать здесь: https://blog.angularindepth.com/rxjs-whats-changed-with-sharereplay-65c098843e95.

Memory Leaks

Вспомни про историю с баром. К сожалению, каждый посетитель занимает в баре место, а каждый включенный телевизор кушает электричество. Следовательно, когда бару пора закрываться, эти ресурсы нужно уметь освобождать.

RxJS for intermediates by Noveo

То есть каждый вызов .subscribe() и каждый вызов .connect() создает объект Subscription. Также стоит учитывать что функция продюсера данных в Observable может выполняться асинхронно (промис, аякс) или бесконечно (SetInterval, EventListener).

При использовании RxJS в ваших компонентах очищать ресурсы (делать .unsubscribe()) надо при каждом дестрое компонента, в котором они занимались, иначе они останутся в памяти, и с браузером вашего пользователя будет тоже самое, что и с баром!

Про необходимость и способы контроля за подписками в Ангуляре вы можете прочитать в этой замечательной статье: https://medium.com/ngx/why-do-you-need-unsubscribe-ee0c62b5d21f.

Conclusions

Вот мы и стали на шаг ближе к званию мастера RxJS! Осталось лишь освоить Schedulers, написание своих кастомных операторов и их тестирование. Всем удачи на этом извилистом пути!

RxJS for intermediates by Noveo

Заходит однажды тестировщик в бар.
Забегает в бар.
Пролезает в бар.
Танцуя, проникает в бар.
Прокрадывается в бар.
Врывается в бар.
Прыгает в бар
и заказывает:
кружку пива,
2 кружки пива,
0 кружек пива,
999999999 кружек пива,
ящерицу в стакане,
–1 кружку пива,
qwertyuip кружек пива.
Первый реальный клиент заходит в бар и спрашивает, где туалет. Бар вспыхивает пламенем, все погибают.

Если вы нашли ошибку, пожалуйста, выделите фрагмент текста и нажмите Ctrl+Enter.

Читайте в нашем блоге

Сообщить об опечатке

Текст, который будет отправлен нашим редакторам: