В этом уроке пробежимся по нескольким несложным оператором, чтобы понять, что они из себя представляют. И используем Action вместо Observer. 

 

Подключение RxJava к проекту

Рекомендую вам создать тестовый проект и выполнять примеры. Так вы сможете экспериментировать и лучше понять работу операторов.

Чтобы подключить RxJava к проекту, необходимо добавить в gradle-файл строку:
compile 'io.reactivex:rxjava:1.2.3'

 

 

Операторы

В RxJava есть операторы, с помощью которых вы можете создавать новые Observable или менять уже существующие. Операторы можно разделить на категории. В этом уроке мы рассмотрим самые простые операторы из нескольких категорий.

 

Операторы создания

Эти операторы позволяют создать Observable.

 

from

Этот оператор мы уже встречали в первом уроке. Он создает Observable из массива или коллекции.

        // create observable
        Observable<String> observable = Observable.from(new String[]{"one", "two", "three"});

        // create observer
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e);
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        // subscribe
        observable.subscribe(observer);

 

Результат
onNext: one
onNext: two
onNext: three
onCompleted

 

 

range

Оператор range выдаст последовательность чисел

        // create observable
        Observable<Integer> observable = Observable.range(10, 4);

        // create observer
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e);
            }

            @Override
            public void onNext(Integer s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        // subscribe
        observable.subscribe(observer);

Мы указываем, что начать необходимо с 10, а кол-во элементов 4

 

Результат:
onNext: 10
onNext: 11
onNext: 12
onNext: 13
onCompleted

 

 

interval

Оператор interval выдает последовательность long чисел начиная с 0. Мы можем указать временной интервал, через который числа будут приходить. Укажем 500 мсек.

        // create observable
        Observable<Long> observable = Observable.interval(500, TimeUnit.MILLISECONDS);

        // create observer
        Observer<Long> observer = new Observer<Long>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e);
            }

            @Override
            public void onNext(Long s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        // subscribe
        observable.subscribe(observer);

 

Теперь каждые 500 мсек в Observer будет приходить все увеличивающееся значение, начиная с 0.

 

Результат:
onNext: 0
onNext: 1
onNext: 2
onNext: 3
...

Обратите внимание, что в логах не будет метода onCompleted. Вернее, когда нибудь он, наверно, все таки придет, когда достигнет значения, максимально доступного для Long. Но ждать придется долго.

 

fromCallable

Если у вас есть синхронный метод, который вам надо сделать асинхронным, то оператор fromCallable поможет вам

 

Например, есть метод longAction

private int longAction(String text) {
   log("longAction");
 
   try {
       TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
       e.printStackTrace();
   }
 
   return Integer.parseInt(text);
}

 

Необходимо обернуть его в Callable

class CallableLongAction implements Callable<Integer> {
 
   private final String data;
 
   public CallableLongAction(String data) {
       this.data = data;
   }
 
   @Override
   public Integer call() throws Exception {
       return longAction(data);
   }
}

 

И затем можно создавать Observable из Callable

Observable.fromCallable(new CallableLongAction("5"))
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(new Action1<Integer>() {
           @Override
           public void call(Integer integer) {
               log("onNext " + integer);
           }
       });

Получившийся Observable запустит метод longAction и вернет вам результат в onNext.

Операторы observeOn и subscribeOn здесь управляют потоками (thread) и обеспечивают асинхронность вызова. Подробно мы поговорим о них в одном из следующих уроков.

 

 

 

Операторы преобразования

Эти операторы позволяют преобразовывать данные, которые генерирует Observable.

 

map

Оператор map преобразует все элементы последовательности. Для этого нам необходимо написать функцию преобразования. Например конвертация из String в Integer. Создаем Func1

        Func1<String, Integer> stringToInteger = new Func1<String, Integer>() {
            @Override
            public Integer call(String s) {
                return Integer.parseInt(s);
            }
        };

 

Объект Func1 - это функция, через которую будет проходить каждый элемент последовательности. Этот объект требует от нас указания входного и выходного типов. Мы указали ему, что на вход придет String, а на выходе нам нужно получить Integer. И в его методе call мы написали код преобразования.

Теперь эту функцию мы передаем в оператор map 

        // create observable
        Observable<Integer> observable = Observable
                .from(new String[]{"1", "2", "3", "4", "5", "6"})
                .map(stringToInteger);


        // create observer
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e);
            }

            @Override
            public void onNext(Integer s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        // subscribe
        observable.subscribe(observer);

Обратите внимание, оператор map мы вызываем сразу после оператора from. Тем самым на вход map придет последовательнось строк сгенерированная в from. А в результате работы map мы уже получим последовательность чисел. В Observer данные придут уже как Integer

 

Результат
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onCompleted

 

Еще раз разберем, что получилось. Оператором from мы создали Observable с типом данных String. А оператором map мы из Observable<String> получили новый Observable с типом данных Integer. Т.е. оператор map не изменил исходный Observable<String>, а создал новый поверх него. Этот созданный Observable<Integer> получает данные из Observable<String>, преобразует их в Integer и шлет дальше, как будто он сам их сгенерировал.

 

 

Попробуем спровоцировать ошибку преобразования. Заменим число 4 на букву а.

        // create observable
        Observable<Integer> observable = Observable
                .from(new String[]{"1", "2", "3", "a", "5", "6"})
                .map(stringToInteger);

 

Результат:
onNext: 1
onNext: 2
onNext: 3
onError: java.lang.NumberFormatException: Invalid int: "a"

Мы получили ошибку в метод onError, и, тем самым, последовательность завершилась. Есть, конечно, специальные операторы, которые умеют обрабатывать ошибку и продолжать работу, но об этом я расскажу позже, чтобы сейчас не усложнять.

 

 

buffer

Оператор buffer собирает элементы и по мере накопления заданного кол-ва отправляет их дальше одним пакетом.

Создадим Observable из 8 чисел, и добавим к нему буфер с количеством элементов = 3. 

        // create observable
        Observable<List<Integer>> observable = Observable
                .from(new Integer[]{1,2,3,4,5,6,7,8})
                .buffer(3);

        // create observer
        Observer<List<Integer>> observer = new Observer<List<Integer>>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e);
            }

            @Override
            public void onNext(List<Integer> s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        // subscribe
        observable.subscribe(observer);

 

Результат:
onNext: [1, 2, 3]
onNext: [4, 5, 6]
onNext: [7, 8]
onCompleted

Оператор разбил данные на блоки по 3 элемента. Обратите внимание, тип данных Observable в случае буфера будет не Integer, а List<Integer>.

 

Существуют и более сложные операторы преобразования, которые из каждого элемента генерируют отдельную последовательность данных. Я не рассматриваю их пока, чтобы не усложнять материал.

 

 

Операторы фильтрации

 

take

Оператор take возьмет только указанное количество первых элементов из переданной ему последовательности и сформирует из них новую последовательность. Возьмем первые три:

        // create observable
        Observable<Integer> observable = Observable
                .from(new Integer[]{5,6,7,8,9})
                .take(3);

        // create observer
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e);
            }

            @Override
            public void onNext(Integer s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        // subscribe
        observable.subscribe(observer);

  

Результат:
onNext: 5
onNext: 6
onNext: 7
onCompleted

 

 

skip

Оператор skip пропустит первые элементы. Пропустим первые 2

        // create observable
        Observable<Integer> observable = Observable
                .from(new Integer[]{5,6,7,8,9})
                .skip(2);

 

Результат:
onNext: 7
onNext: 8
onNext: 9
onCompleted

 

 

distinct

Оператор distinct отсеет дубликаты

        // create observable
        Observable<Integer> observable = Observable
                .from(new Integer[]{5,9,7,5,8,6,7,8,9})
                .distinct();

 

Результат:
onNext: 5
onNext: 9
onNext: 7
onNext: 8
onNext: 6
onCompleted

 

 

filter

Оператор filter может отсеять только нужные элементы. Для этого необходимо создать функцию, в которой будет описан алгоритм фильтрации. Например, оставим только строки содержащие 5.

        Func1<String, Boolean> filterFiveOnly = new Func1<String, Boolean>() {
            @Override
            public Boolean call(String s) {
                return s.contains("5");
            }
        };

В качестве типов мы указали String и Boolean. Данные типа String будут приходить на вход функции, а возвращать она должна Boolean - т.е. прошел элемент фильтр или нет.

Используем функцию в операторе filter

        // create observable
        Observable<String> observable = Observable
                .from(new String[]{"15", "27", "34", "46", "52", "63"})
                .filter(filterFiveOnly);
        
        // create observer
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e);
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        // subscribe
        observable.subscribe(observer);

 

Результат:
onNext: 15
onNext: 52
onCompleted

 

 

Операторы объединения

 

merge

Оператор merge объединит элементы из двух Observable в один Observable

        // create observable
        Observable<Integer> observable = Observable
                .from(new Integer[]{1,2,3})
                .mergeWith(Observable.from(new Integer[]{6,7,8,9}));

        // create observer
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e);
            }

            @Override
            public void onNext(Integer s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        // subscribe
        observable.subscribe(observer);

 

Результат:
onNext: 1
onNext: 2
onNext: 3
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onCompleted

 

Есть еще оператор concat, который делает примерно то же самое, но чуть по другому. Позже я расскажу об этом подробнее.

 

 

zip

Оператор zip попарно сопоставит элементы из двух Observable. Из каждой пары элементов с помощью функции будет получен один элемент, который будет добавлен в итоговый Observable.

Сначала нам необходимо создать функцию, в которой мы задаем как из двух элементов получить один. В нашем примере мы просто соединим их в одну строку через двоеточие.

        Func2<Integer, String, String> zipIntWithString = new Func2<Integer, String, String>() {
            @Override
            public String call(Integer i, String s) {
                return s + ": " + i;
            }
        };

В качестве типов мы указали <Integer, String, String>. Первые два, Integer и String - это типы данных двух Observable, которые мы будем склеивать. Третий тип, String - это какой тип данных мы хотим получить на выходе, в итоговом Observable.

Методом from создаем первую последовательность, с типом Integer. Затем в вызове zipWith создаем вторую последовательность с типом String и указываем созданную ранее функцию zipIntWithString

        // create observable
        Observable<String> observable = Observable
                .from(new Integer[]{1,2,3})
                .zipWith(Observable.from(new String[]{"One", "Two", "Three"}), zipIntWithString);

        // create observer
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e);
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        // subscribe
        observable.subscribe(observer);

В результате получим одну последовательность с типом String, состоящую из результатов работы функции с парами элементов из двух последовательностей Integer и String.

 

Результат:
onNext: One: 1
onNext: Two: 2
onNext: Three: 3
onCompleted

 

 

Операторы условий

 

takeUntil

Оператор takeUntil будет брать элементы пока не попадется элемент, удовлетворяющий определенному условию. Это условие нам необходимо оформить в виде функции.

Например, создадим условие, что элемент равен 5.

        Func1<Integer, Boolean> isFive = new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer i) {
                return i == 5;
            }
        };

И используем в операторе 

        // create observable
        Observable<Integer> observable = Observable
                .from(new Integer[]{1,2,3,4,5,6,7,8})
                .takeUntil(isFive);

        // create observer
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e);
            }

            @Override
            public void onNext(Integer s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        // subscribe
        observable.subscribe(observer);

Новая последовательность содержит те же элементы, но заканчивается на элементе 5.

 

Результат:
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onCompleted

 

 

all

Оператор all позволяет узнать все ли элементы удовлетворяют указанному условию. Условие нам необходимо оформить в виде функции.
Например, создадим проверку, что все элементы меньше 10.

        Func1<Integer, Boolean> lessThanTen = new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer i) {
                return i < 10;
            }
        };

 

Применим ее к последовательности чисел

        // create observable
        Observable<Boolean> observable = Observable
                .from(new Integer[]{1,2,3,4,5,6,7,8})
                .all(lessThanTen);

        // create observer
        Observer<Boolean> observer = new Observer<Boolean>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e);
            }

            @Override
            public void onNext(Boolean s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        // subscribe
        observable.subscribe(observer);

 

В результате мы получим Boolean последовательность из одного элемента. Этот элемент скажет нам, все ли элементы последовательности подошли под условие.

Результат:
onNext: true
onCompleted

 

 

Я рассмотрел лишь малую часть всех операторов. К тому же некоторые операторы имеют несколько вариантов использования, с различными параметрами. Полный список можно посмотреть здесь.

Цель этого урока - показать, что такое операторы, как они могут создавать новые Observable и работать с уже существующими. Вы можете комбинировать операторы, используя их один за другим. Например, сначала отфильтровать данные, затем преобразовать их, затем объединить с другими данными. В коде все это будет выглядеть как одна последовательность операторов.

Повторюсь, оператор не меняет Observable, а создает новый, поверх данного. Соответственно, в цепочке операторов, каждый следующий оператор берет данные предыдущего, меняет их заданным образом и возвращает вам новый Observable.

 

 

Action

В наших примерах мы создавали Observer с тремя методами. И этот Observer умел ловить все три типа событий. Но бывают случаи, когда нам требуется, например, только событие Next и вместо Observer мы можем использовать его сокращенную версию - Action.

Пример:

        // create observable
        Observable<String> observable = Observable.from(new String[]{"one", "two", "three"});

        // create action
        Action1<String> action = new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d(TAG, "onNext: " + s);
            }
        };

        // subscribe
        observable.subscribe(action);

 

Мы подписываем не Observer, а Action. И этот Action будет получать только Next события.

 

Результат:
onNext: one
onNext: two
onNext: three

 

Всего есть три варианта метода subscribe, в которых мы можем использовать Action:
- subscribe(Action1<? super T> onNext)
- subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError)
- subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0 onCompleted)

Мы использовали первый. Соответственно, если вам нужно добавить обработку Error и Completed, используйте второй и третий вариант.

Но учитывайте, что если вы не ловите событие Error, то в случае какой-либо ошибки у вас вылетит Exception.

 

Курс RxJava

 


Комментарии   

# RxJava2Дмитрий 12.05.2017 05:09
compile 'io.reactivex.rxjava2:rxjava:2.1.0' - как на счёт этого добра: рано ещё или это что-то другое концептуально ?
# RE: RxJava2Dmitry Vinogradov 14.05.2017 21:43
Это будет в одном из ближайших уроков
# СэмплSheikh 27.07.2017 09:26
Хорошо бы добавить вступление про то, где и в чём писать сэмплы: Android studio или IDEA, создайте проект и добавьте в gradle скрипт такую строку. Или кинуть проект для этого урока на гитхаб.
Такое встпуление в бесплатном уроке подкупало бы читателя купить курс ;-)
# RE: СэмплSheikh 27.07.2017 16:12
Ещё заметил, что в начале уроке предлагают включить в градл
compile 'io.reactivex:rxjava:1.2.3'
А в сниппетах строки кода
'AndroidSchedulers.mainThread()'
Тогда вроде надо бы ещё добавить
`compile 'io.reactivex:rxandroid:1.2.1'
# ObservableVladislav 28.11.2017 22:24
Подскажите в чем ошибка?
В строке
Observable observable = Observable.from(new String[]{"one", "two", "three"});
выдает 2 ошибки:
1) Error:(21, 19) error: type Observable does not take parameters
2) Error:(21, 51) error: cannot find symbol method from(String[])
# вопрос отпалVladislav 28.11.2017 22:35
вопрос отпал, как спросил - так сразу разобрался :lol:

Language

Система Orphus

Социальные сети

 

Telegram канал



Android чат в Telegram



Группа ВКонтакте



Страница в Facebook

Поддержка проекта

Яндекс
410011180491924

WebMoney
R248743991365
Z551306702056

Paypal

Яндекс.Метрика