В этом уроке пробежимся по нескольким несложным оператором, чтобы понять, что они из себя представляют. И используем Action вместо Observer.
Подключение RxJava к проекту
Рекомендую вам создать тестовый проект и выполнять примеры. Так вы сможете экспериментировать и лучше понять работу операторов.
Чтобы подключить RxJava к проекту, необходимо добавить в gradle-файл строки:
compile 'io.reactivex:rxjava:1.2.3' compile 'io.reactivex:rxandroid:1.2.1'
Операторы
В RxJava есть операторы, с помощью которых вы можете создавать новые Observable или менять уже существующие. Операторы можно разделить на категории. В этом уроке мы рассмотрим самые простые операторы из нескольких категорий.
Операторы создания
Эти операторы позволяют создать Observable.
from
Этот оператор мы уже встречали в первом уроке. Он создает Observable из массива или коллекции.
// create observable Observable<String> observable = Observable.from(new String[]{"one", "two", "three"}); // create observer Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e); } @Override public void onNext(String s) { Log.d(TAG, "onNext: " + s); } }; // subscribe observable.subscribe(observer);
Результат
onNext: one
onNext: two
onNext: three
onCompleted
range
Оператор range выдаст последовательность чисел
// create observable Observable<Integer> observable = Observable.range(10, 4); // create observer Observer<Integer> observer = new Observer<Integer>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e); } @Override public void onNext(Integer s) { Log.d(TAG, "onNext: " + s); } }; // subscribe observable.subscribe(observer);
Мы указываем, что начать необходимо с 10, а кол-во элементов 4
Результат:
onNext: 10
onNext: 11
onNext: 12
onNext: 13
onCompleted
interval
Оператор interval выдает последовательность long чисел начиная с 0. Мы можем указать временной интервал, через который числа будут приходить. Укажем 500 мсек.
// create observable Observable<Long> observable = Observable.interval(500, TimeUnit.MILLISECONDS); // create observer Observer<Long> observer = new Observer<Long>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e); } @Override public void onNext(Long s) { Log.d(TAG, "onNext: " + s); } }; // subscribe observable.subscribe(observer);
Теперь каждые 500 мсек в Observer будет приходить все увеличивающееся значение, начиная с 0.
Результат:
onNext: 0
onNext: 1
onNext: 2
onNext: 3
...
Обратите внимание, что в логах не будет метода onCompleted. Вернее, когда нибудь он, наверно, все таки придет, когда достигнет значения, максимально доступного для Long. Но ждать придется долго.
fromCallable
Если у вас есть синхронный метод, который вам надо сделать асинхронным, то оператор fromCallable поможет вам
Например, есть метод longAction
private int longAction(String text) { log("longAction"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.parseInt(text); }
Необходимо обернуть его в Callable
class CallableLongAction implements Callable<Integer> { private final String data; public CallableLongAction(String data) { this.data = data; } @Override public Integer call() throws Exception { return longAction(data); } }
И затем можно создавать Observable из Callable
Observable.fromCallable(new CallableLongAction("5")) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { log("onNext " + integer); } });
Получившийся Observable запустит метод longAction и вернет вам результат в onNext.
Операторы observeOn и subscribeOn здесь управляют потоками (thread) и обеспечивают асинхронность вызова. Подробно мы поговорим о них в одном из следующих уроков.
Операторы преобразования
Эти операторы позволяют преобразовывать данные, которые генерирует Observable.
map
Оператор map преобразует все элементы последовательности. Для этого нам необходимо написать функцию преобразования. Например конвертация из String в Integer. Создаем Func1
Func1<String, Integer> stringToInteger = new Func1<String, Integer>() { @Override public Integer call(String s) { return Integer.parseInt(s); } };
Объект Func1 - это функция, через которую будет проходить каждый элемент последовательности. Этот объект требует от нас указания входного и выходного типов. Мы указали ему, что на вход придет String, а на выходе нам нужно получить Integer. И в его методе call мы написали код преобразования.
Теперь эту функцию мы передаем в оператор map
// create observable Observable<Integer> observable = Observable .from(new String[]{"1", "2", "3", "4", "5", "6"}) .map(stringToInteger); // create observer Observer<Integer> observer = new Observer<Integer>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e); } @Override public void onNext(Integer s) { Log.d(TAG, "onNext: " + s); } }; // subscribe observable.subscribe(observer);
Обратите внимание, оператор map мы вызываем сразу после оператора from. Тем самым на вход map придет последовательнось строк сгенерированная в from. А в результате работы map мы уже получим последовательность чисел. В Observer данные придут уже как Integer
Результат
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onCompleted
Еще раз разберем, что получилось. Оператором from мы создали Observable с типом данных String. А оператором map мы из Observable<String> получили новый Observable с типом данных Integer. Т.е. оператор map не изменил исходный Observable<String>, а создал новый поверх него. Этот созданный Observable<Integer> получает данные из Observable<String>, преобразует их в Integer и шлет дальше, как будто он сам их сгенерировал.
Попробуем спровоцировать ошибку преобразования. Заменим число 4 на букву а.
// create observable Observable<Integer> observable = Observable .from(new String[]{"1", "2", "3", "a", "5", "6"}) .map(stringToInteger);
Результат:
onNext: 1
onNext: 2
onNext: 3
onError: java.lang.NumberFormatException: Invalid int: "a"
Мы получили ошибку в метод onError, и, тем самым, последовательность завершилась. Есть, конечно, специальные операторы, которые умеют обрабатывать ошибку и продолжать работу, но об этом я расскажу позже, чтобы сейчас не усложнять.
buffer
Оператор buffer собирает элементы и по мере накопления заданного кол-ва отправляет их дальше одним пакетом.
Создадим Observable из 8 чисел, и добавим к нему буфер с количеством элементов = 3.
// create observable Observable<List<Integer>> observable = Observable .from(new Integer[]{1,2,3,4,5,6,7,8}) .buffer(3); // create observer Observer<List<Integer>> observer = new Observer<List<Integer>>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e); } @Override public void onNext(List<Integer> s) { Log.d(TAG, "onNext: " + s); } }; // subscribe observable.subscribe(observer);
Результат:
onNext: [1, 2, 3]
onNext: [4, 5, 6]
onNext: [7, 8]
onCompleted
Оператор разбил данные на блоки по 3 элемента. Обратите внимание, тип данных Observable в случае буфера будет не Integer, а List<Integer>.
Существуют и более сложные операторы преобразования, которые из каждого элемента генерируют отдельную последовательность данных. Я не рассматриваю их пока, чтобы не усложнять материал.
Операторы фильтрации
take
Оператор take возьмет только указанное количество первых элементов из переданной ему последовательности и сформирует из них новую последовательность. Возьмем первые три:
// create observable Observable<Integer> observable = Observable .from(new Integer[]{5,6,7,8,9}) .take(3); // create observer Observer<Integer> observer = new Observer<Integer>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e); } @Override public void onNext(Integer s) { Log.d(TAG, "onNext: " + s); } }; // subscribe observable.subscribe(observer);
Результат:
onNext: 5
onNext: 6
onNext: 7
onCompleted
skip
Оператор skip пропустит первые элементы. Пропустим первые 2
// create observable Observable<Integer> observable = Observable .from(new Integer[]{5,6,7,8,9}) .skip(2);
Результат:
onNext: 7
onNext: 8
onNext: 9
onCompleted
distinct
Оператор distinct отсеет дубликаты
// create observable Observable<Integer> observable = Observable .from(new Integer[]{5,9,7,5,8,6,7,8,9}) .distinct();
Результат:
onNext: 5
onNext: 9
onNext: 7
onNext: 8
onNext: 6
onCompleted
filter
Оператор filter может отсеять только нужные элементы. Для этого необходимо создать функцию, в которой будет описан алгоритм фильтрации. Например, оставим только строки содержащие 5.
Func1<String, Boolean> filterFiveOnly = new Func1<String, Boolean>() { @Override public Boolean call(String s) { return s.contains("5"); } };
В качестве типов мы указали String и Boolean. Данные типа String будут приходить на вход функции, а возвращать она должна Boolean - т.е. прошел элемент фильтр или нет.
Используем функцию в операторе filter
// create observable Observable<String> observable = Observable .from(new String[]{"15", "27", "34", "46", "52", "63"}) .filter(filterFiveOnly); // create observer Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e); } @Override public void onNext(String s) { Log.d(TAG, "onNext: " + s); } }; // subscribe observable.subscribe(observer);
Результат:
onNext: 15
onNext: 52
onCompleted
Операторы объединения
merge
Оператор merge объединит элементы из двух Observable в один Observable
// create observable Observable<Integer> observable = Observable .from(new Integer[]{1,2,3}) .mergeWith(Observable.from(new Integer[]{6,7,8,9})); // create observer Observer<Integer> observer = new Observer<Integer>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e); } @Override public void onNext(Integer s) { Log.d(TAG, "onNext: " + s); } }; // subscribe observable.subscribe(observer);
Результат:
onNext: 1
onNext: 2
onNext: 3
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onCompleted
Есть еще оператор concat, который делает примерно то же самое, но чуть по другому. Позже я расскажу об этом подробнее.
zip
Оператор zip попарно сопоставит элементы из двух Observable. Из каждой пары элементов с помощью функции будет получен один элемент, который будет добавлен в итоговый Observable.
Сначала нам необходимо создать функцию, в которой мы задаем как из двух элементов получить один. В нашем примере мы просто соединим их в одну строку через двоеточие.
Func2<Integer, String, String> zipIntWithString = new Func2<Integer, String, String>() { @Override public String call(Integer i, String s) { return s + ": " + i; } };
В качестве типов мы указали <Integer, String, String>. Первые два, Integer и String - это типы данных двух Observable, которые мы будем склеивать. Третий тип, String - это какой тип данных мы хотим получить на выходе, в итоговом Observable.
Методом from создаем первую последовательность, с типом Integer. Затем в вызове zipWith создаем вторую последовательность с типом String и указываем созданную ранее функцию zipIntWithString
// create observable Observable<String> observable = Observable .from(new Integer[]{1,2,3}) .zipWith(Observable.from(new String[]{"One", "Two", "Three"}), zipIntWithString); // create observer Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e); } @Override public void onNext(String s) { Log.d(TAG, "onNext: " + s); } }; // subscribe observable.subscribe(observer);
В результате получим одну последовательность с типом String, состоящую из результатов работы функции с парами элементов из двух последовательностей Integer и String.
Результат:
onNext: One: 1
onNext: Two: 2
onNext: Three: 3
onCompleted
Операторы условий
takeUntil
Оператор takeUntil будет брать элементы пока не попадется элемент, удовлетворяющий определенному условию. Это условие нам необходимо оформить в виде функции.
Например, создадим условие, что элемент равен 5.
Func1<Integer, Boolean> isFive = new Func1<Integer, Boolean>() { @Override public Boolean call(Integer i) { return i == 5; } };
И используем в операторе
// create observable Observable<Integer> observable = Observable .from(new Integer[]{1,2,3,4,5,6,7,8}) .takeUntil(isFive); // create observer Observer<Integer> observer = new Observer<Integer>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e); } @Override public void onNext(Integer s) { Log.d(TAG, "onNext: " + s); } }; // subscribe observable.subscribe(observer);
Новая последовательность содержит те же элементы, но заканчивается на элементе 5.
Результат:
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onCompleted
all
Оператор all позволяет узнать все ли элементы удовлетворяют указанному условию. Условие нам необходимо оформить в виде функции.
Например, создадим проверку, что все элементы меньше 10.
Func1<Integer, Boolean> lessThanTen = new Func1<Integer, Boolean>() { @Override public Boolean call(Integer i) { return i < 10; } };
Применим ее к последовательности чисел
// create observable Observable<Boolean> observable = Observable .from(new Integer[]{1,2,3,4,5,6,7,8}) .all(lessThanTen); // create observer Observer<Boolean> observer = new Observer<Boolean>() { @Override public void onCompleted() { Log.d(TAG, "onCompleted"); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: " + e); } @Override public void onNext(Boolean s) { Log.d(TAG, "onNext: " + s); } }; // subscribe observable.subscribe(observer);
В результате мы получим Boolean последовательность из одного элемента. Этот элемент скажет нам, все ли элементы последовательности подошли под условие.
Результат:
onNext: true
onCompleted
Я рассмотрел лишь малую часть всех операторов. К тому же некоторые операторы имеют несколько вариантов использования, с различными параметрами. Полный список можно посмотреть здесь.
Цель этого урока - показать, что такое операторы, как они могут создавать новые Observable и работать с уже существующими. Вы можете комбинировать операторы, используя их один за другим. Например, сначала отфильтровать данные, затем преобразовать их, затем объединить с другими данными. В коде все это будет выглядеть как одна последовательность операторов.
Повторюсь, оператор не меняет Observable, а создает новый, поверх данного. Соответственно, в цепочке операторов, каждый следующий оператор берет данные предыдущего, меняет их заданным образом и возвращает вам новый Observable.
В отдельной статье я выкладываю реальные рабочие примеры использования RxJava. Рекомендую посмотреть, когда закончите с теорией.
Action
В наших примерах мы создавали Observer с тремя методами. И этот Observer умел ловить все три типа событий. Но бывают случаи, когда нам требуется, например, только событие Next и вместо Observer мы можем использовать его сокращенную версию - Action.
Пример:
// create observable Observable<String> observable = Observable.from(new String[]{"one", "two", "three"}); // create action Action1<String> action = new Action1<String>() { @Override public void call(String s) { Log.d(TAG, "onNext: " + s); } }; // subscribe observable.subscribe(action);
Мы подписываем не Observer, а Action. И этот Action будет получать только Next события.
Результат:
onNext: one
onNext: two
onNext: three
Всего есть три варианта метода subscribe, в которых мы можем использовать Action:
- subscribe(Action1<? super T> onNext)
- subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError)
- subscribe(Action1<? super T> onNext, Action1<java.lang.Throwable> onError, Action0 onCompleted)
Мы использовали первый. Соответственно, если вам нужно добавить обработку Error и Completed, используйте второй и третий вариант.
Но учитывайте, что если вы не ловите событие Error, то в случае какой-либо ошибки у вас вылетит Exception.
Комментарии
Такое встпуление в бесплатном уроке подкупало бы читателя купить курс
compile 'io.reactivex:rxjava:1.2.3'
А в сниппетах строки кода
'AndroidSchedulers.mainThread()'
Тогда вроде надо бы ещё добавить
`compile 'io.reactivex:rxandroid:1.2.1'
В строке
Observable observable = Observable.from(new String[]{"one", "two", "three"});
выдает 2 ошибки:
1) Error:(21, 19) error: type Observable does not take parameters
2) Error:(21, 51) error: cannot find symbol method from(String[])
Observable observable = Observable.from(new Integer[] {1,2,2,1,3});
observable.distinct();
если так, то не работает.
Observable observable = Observable.from(new Integer[] {1,2,2,1,3});
observable = observable.distinct();
1. В Gradle-файле надо прописать вместо устаревших Compile...
implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
2. В Observable использую вмето оператора from оператор fromArray
3. в конструкторе должен быть @Override
public void onSubscribe(Disposable d) {
}
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
Func1 filterFiveOnly = new Func1() {
@Override
public Boolean call(String s) {
return s.contains("5");
}
};
Создаю абстрактный класс:
public abstract class Func1 implements Predicate {
V output;
public V call (T input) {
return output;
}
@Override
public boolean test(String s) throws Exception {
return false;
}
}
Но в логах выдает только:
D/myLogs: onCompleted
и все. Фильтрации не происходит
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
log("onNext " + integer);
}
});
У Action вроде метод run, или это какой-то другой Action1?
RSS лента комментариев этой записи