I. Introduction
RxJava is a reactive programming framework developed by Netflix based on the Java Virtual Machine . It belongs to the ReactiveX open-source project and aims to simplify the development process of asynchronous programming and event-driven programs. This framework extends the observer pattern to support the composition of observable objects for processing data streams and event sequences, and provides declarative operators for logic orchestration.
Its core design reduces the complexity of multithreaded concurrent programming by encapsulating low-level details such as thread management, synchronization mechanisms, and non-blocking I/O, allowing developers to focus on business logic implementation. The framework adopts a functional style, providing a rich set of stream processing operators, enabling flexible construction of asynchronous task chains and handling scenarios such as backpressure.
II. RxJava Basic Concepts
(a) Core Roles
1. Observable (the object being observed)
- Function : The data source, responsible for generating and sending event (data) streams.
- Features : It can emit multiple data items (onNext), terminate when an error occurs (onError), or be called when the process ends normally (onComplete).
- Common creation methods :
Observable.create()Manually controlling data transmission is the most flexible but also the most complex method.Observable.just(T...): Quickly create Observables that emit a fixed amount of data.Observable.fromIterable()/Observable.fromArray(): Created from a collection or array.Observable.interval()/Observable.timer(): Used for scheduled tasks.- Example
Observable<String> observable = Observable.just("A", "B", "C");2. Observer
- Function : Receive data emitted by Observable and process it (such as displaying, storing, etc.).
- Included callback methods :
onSubscribe(Disposable d)Called when a subscription is established, it can be used to save a Disposable object for later cancellation of the subscription.onNext(T t)Called each time a data item is received.onError(Throwable e)This function is called when an error occurs, and no more data will be received afterward.onComplete()This function is called when the data stream ends normally, and there will be no more data after that.- Example
Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { // You can save d here for unsubscribing } @Override public void onNext(String s) { System.out.println("Received data: " + s); } @Override public void onError(Throwable e) { System.err.println("An error occurred:" + e.getMessage()); } @Override public void onComplete() { System.out.println("End of data flow"); } };3. Subscription (subscription relationship)
- Function : It represents the subscription relationship between Observable and Observer , and acts as a bridge between the two.
- In RxJava 1.x, it was called Subscription , while in RxJava 2.x and later , this role is mainly taken over by Disposable .
- Main functions:
- Used to unsubscribe (dispose()) to prevent memory leaks or continue receiving unwanted events.
- Manage the lifecycle and control when to stop receiving data.
- Example
Disposable disposable = observable.subscribe(observer); // Unsubscribe disposable.dispose();4. Subject
- Function : It is both an Observable (which emits data) and an Observer (which receives data) , acting as a bridge or proxy.
- Common implementation classes:
- PublishSubject : Only sends the data received after subscription.
- BehaviorSubject : Sends the last data before subscription and subsequent data.
- ReplaySubject : Sends all historical data to new subscribers.
- AsyncSubject : Sends the last data only during onComplete.
- Examples of applications : multicast (one data source with multiple observers), event bus, etc.
- Example
PublishSubject<String> subject = PublishSubject.create(); subject.subscribe(data -> System.out.println("Observer 1: " + data)); subject.onNext("Hello"); subject.subscribe(data -> System.out.println("Observer 2: " + data)); subject.onNext("World");
(II) Subscription Process
1. Create the observed object
Use the factory method to create an object that can emit data, for example:
Observable<String> observable = Observable.just("Data1", "Data2");A more flexible
create()approach can also be used:Observable.create(emitter -> { emitter.onNext("A"); emitter.onNext("B"); emitter.onComplete(); });2. Create an observer
Define the data processing logic, including receiving data, handling errors, and issuing completion notifications.
3. Subscribe
By
observable.subscribe(observer)linking the two together through a call, the flow of data is triggered.observable.subscribe( data -> System.out.println("received: " + data), // onNext error -> System.err.println("error: " + error), // onError () -> System.out.println("complete") // onComplete );
III. Detailed Explanation of Core Operators
(a) Creation Operators
just(): Transmit a fixed value directly (e.g.just(1,2,3)).fromIterable()/fromArray(): Emits data from a collection or array.create()Custom data transmission logic (requires manual calling of //onNext()) .onError()onComplete()defer()Delay the creation of the Observable until the subscription is completed (to avoid caching old data).interval(): Timed transmission of incrementing integers (e.g., 0, 1, 2… every 1 second).timer(): Transmit a single data item after a specified delay (e.g., transmit 0 after 2 seconds).
(ii) Conversion Operators
map()One-to-one data conversion (such as converting a string into a user object).flatMap()One-to-many transformation, the result is flattened into a single stream (e.g., user ID to user details list).concatMap(): Guarantee the orderflatMap(the next request is processed only after the previous one is completed).switchMap(): Keep only the latest request and cancel old requests (e.g., only take the last query when entering in the search box).scan()Cumulative calculation (such as accumulating values in a data stream).
(iii) Filtering operators
filter()Filter data based on conditions (e.g., keep only even numbers).take(n): Only receive the first n data (e.g., only take the first 3 click events).skip(n)Skip the first n data items (e.g., skip the first 2 old data items loaded initially).distinct(): Deduplication (e.g., to avoid duplicate user IDs).debounce(time, unit)Debouncing (e.g., delaying the search box input by 300ms before requesting).
(iv) Combination Operators
merge()Merge multiple streams (unordered, the first one emitted is processed first).zip()Pair data from multiple streams sequentially (e.g., pairing user information with avatar URLs).concat(): Sequentially merge multiple streams (process the next stream only after the previous one is completed).amb(): Take the stream of data that was sent first (e.g., take the fastest response from multiple requests).
IV. Thread Scheduling
(I) Commonly Used Schedulers
Schedulers.io(): I/O intensive operations (such as network requests, file read and write).Schedulers.computation()CPU-intensive computing (such as data computing and image processing).AndroidSchedulers.mainThread()Android main thread (updates UI).Schedulers.newThread()Create a new thread (not recommended, as it can easily lead to thread chaos).
(ii) Dispatch and Control
subscribeOn(): Specifies the thread on which the Observable emits data (only affects the upstream).observeOn(): Specifies the thread on which the Observer processes the data (affecting all downstream operations).- Example: The IO thread makes a request, and the main thread updates the UI.
api.getUser().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
V. Practical Application Scenarios
(a) Chained network request calls
- Serial requests: User login → Get personal information → Load preferences (
flatMapserialized).api.login(username, password) .flatMap(loginResponse -> api.getUserInfo(loginResponse.token)) .flatMap(userInfo -> api.getPreferences(userInfo.userId)) .subscribe(preferences -> { // Process final preference settings }, throwable -> { // handle errors });
- Parallel requests: Retrieve user information and friend list (
zipormergea combination thereof) simultaneously.Observable<User> userObs = api.getUser(userId); Observable<List<Friend>> friendObs = api.getFriends(userId); Observable.zip(userObs, friendObs, (user, friends) -> { return new UserProfile(user, friends); }).subscribe(profile -> { // Simultaneously obtaining user and friend information });
(II) Event Anti-shake and Traffic Limiting
- Enter
debounce(300, TimeUnit.MILLISECONDS)“Avoid frequent requests” in the search box.RxTextView.textChanges(searchEditText) .debounce(300, TimeUnit.MILLISECONDS) .switchMap(text -> api.search(text.toString())) .subscribe(results -> showResults(results));
- Button click:
throttleFirst(1, TimeUnit.SECONDS)Limit the response to only one click within 1 second.RxView.clicks(button) .throttleFirst(1, TimeUnit.SECONDS) .subscribe(aVoid -> performAction());
(III) Lifecycle Management
- Combine
RxLifecyclewithAutoDisposeautomatic unsubscription (to avoid memory leaks after Activity/Fragment is destroyed).CompositeDisposable compositeDisposable = new CompositeDisposable(); compositeDisposable.add( api.getData() .subscribe(data -> updateUI(data)) ); // In onDestroy() compositeDisposable.clear(); // Or dispose()
- Example:
compositeDisposable.add(observable.subscribe(...))Manually manage subscriptions.api.getData() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .`as`(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(this))) .subscribe { data -> updateUI(data) }
(iv) Responsive UI Interaction
- Listen to control state:
RxView.clicks(button)Convert to an observable click event stream.RxView.clicks(button) .subscribe(aVoid -> doSomething());
- Input box content monitoring:
RxTextView.textChanges(editText)Listen for changes in the text.RxTextView.textChanges(editText) .skipInitialValue() // Ignore initial null value .debounce(500, TimeUnit.MILLISECONDS) .subscribe(charSequence -> { // Process input content });
(v) Data caching and synchronization
- Memory caching + network requests: Prioritize reading from the cache; if the cache is not found, request from the network (
concatprioritize local access,switchIfEmptythen switch to network).Observable<Data> cacheObs = Observable.just(getFromCache()); Observable<Data> networkObs = api.fetchData().doOnNext(this::saveToCache); cacheObs.concatWith(networkObs) .firstElement() // Only retrieve the first transmitted data .subscribe(data -> updateUI(data));
- Or a clearer way to write it
Observable.concat(getFromCacheAsync(), fetchFromNetworkAsync()) .firstElement() .subscribe(data -> updateUI(data));
VI. Error Handling
(a) Basic Error Capture
onErrorReturn()Returns a default value when an error occurs (e.g., an empty list if the network fails).onErrorResumeNext(): Switch to a backup Observable when an error occurs (e.g., if the main interface fails, call the backup interface).
(ii) Retry Mechanism
retry(n)Retry n times after failure (e.g., retry 2 times if the network request fails).retryWhen()Custom retry logic (e.g., deciding whether to retry based on the error type).
(III) Global Error Handling
RxJavaPlugins.setErrorHandler(): Set a global error handler (to catch unhandled exceptions).
VII. Advanced Features
(a) Backpressure and Flowable
- This addresses the issue that producers (Observables) emit much faster than consumers (Observers).
- Use
FlowablealternativesObservable, in conjunction withBackpressureStrategy(e.g.,BUFFERcaching,DROPdiscarding).
(ii) Application of Subject
PublishSubject: Only emits subscribed data (such as real-time notification events) to subsequent observers.BehaviorSubject: Emit the most recent data (such as saving user login status) to the observer.ReplaySubject: Cache all data, and new observers receive all historical data (such as log replay).
VIII. Performance Optimization and Precautions
(a) Avoiding memory leaks
- Unsubscribe promptly: This can be done by
CompositeDisposablemanaging all subscriptions andonDestroy()calling the functionclear().- Avoid having long-lived Observables hold short-lived objects (such as Activities).
(ii) Choosing the right operator
- Avoid excessive nesting
flatMap(as it affects readability and performance).- Prefer using pure function operators that have no side effects (such as
mapthose that modify external variables).
(III) Debugging Techniques
- Use
doOnNext()/doOnError()print logs (to track the status of data flow).- Enable RxJava debug mode:
RxJavaPlugins.setInitIoSchedulerHandler(scheduler -> Schedulers.trampoline())(To avoid debugging difficulties caused by asynchronous operations).
IX. Conclusion
- RxJava’s core advantages include: chainable asynchronous operations, flattened logic, and powerful operator combination capabilities.
- Key points to note: thread scheduling, lifecycle management, error handling, and back pressure issues.