Asynchronous và khái quát về RxJava

1. Asynchronous là gì ? Asynchronous là xử lý bất đồng bộ , nghĩa là chương trình thực hiện các tác vụ không theo một thứ tự . Vì thế có thể xử lí nhiều tác vụ cùng lúc và tiết kiệm thời gian. Tuy nhiên , nếu chương trình yêu cầu thực hiện lần

1. Asynchronous là gì ?

Asynchronous là xử lý bất đồng bộ , nghĩa là chương trình thực hiện các tác vụ không theo một thứ tự . Vì thế có thể xử lí nhiều tác vụ cùng lúc và tiết kiệm thời gian.

Tuy nhiên , nếu chương trình yêu cầu thực hiện lần lượt hay theo thứ tự thì Asynchronous là không phù hợp , khó kiểm soát. Ví dụ như sản xuất dây chuyền trong 1 nhà máy.

2. Observer pattern

Observer pattern là một “mẫu thiết kế phần mềm” giúp bạn đăng kí một cơ chế có thể nhận thông báo từ một hoặc nhiều đối tượng về tất cả các sự kiện xảy ra với đối tượng
mà chúng đang quan sát (observing).

Giả sử ở đây có 2 thực thể là “Customer” và “Store” . Customer rất muốn biết khi nào cửa hàng có mẫu áo mới hay sự kiện giảm giá , voucher,… Customer có thể đến cửa hàng
và kiểm tra mỗi ngày và cập nhật thông tin mới nhất, tuy nhiên việc này khá tốn thời gian vì có thể cửa hàng chỉ sale 1 năm 1 lần hay vài năm mới có mẫu áo mới. Thay vào đó Customer sẽ
đăng kí như 1 subscriber , bất cứ khi nào cửa hàng có thông báo mới qua mail , hay social network , customer đều nhận được thông báo. Họ cũng có thể tùy ý hủy đăng kí khi thấy không
còn nhu cầu nữa.

3. RxJava & RxAndroid

  • RxJava là một trong những Reactive Extension dành cho ngôn ngữ Java. Được triển khai theo Observer pattern . Bạn có thể tạo ra bất kì luồng dữ liệu không đồng bộ trên bất kỳ thread nào,
    chuyển đổi dữ liệu và dữ liệu này được sử dụng bởi Observer trên bất kỳ thread nào.

  • RxAndroid là một Extension của Rxjava sử dụng riêng cho Android platform. Cung cấp thêm AndroidScheduler (Dùng cho xử lý đa luồng trong Android)

4. Observable

Observables cung cấp dữ liệu một lần và và các subscribers bắt đầu lắng nghe. Khi muốn dừng lắng nghe , ta sử dụng dispose() để dừng quá trình lại

  • Một số phương thức trong Observable:
  1. Just()

Lấy một list các phần tử và chuyển đổi sang các observable items. Không thể pass quá 10 phần tử

Observable.just(1,2,3,4,5,6,7,8,9,10)
onNext:1
onNext:2...
onNext:9
onNext:10
  1. From()

Khởi tạo một observable từ một danh sách các item sử dụng vòng lặp . Nó trả về từng phần tử một trong 1 lần

Integer[] numbers ={1,2,3,4,5,6,7,8,9,10}
Observable.fromArray(numbers)
  1. Range()

Khởi tạo một observable từ một chuỗi các generated items. Nó nhận tham số start numberlength

Observable.range(1,10)
  1. Repeat()
Observable.range(1,4).repeat(3)
onNext:1
onNext:2
onNext:3
onNext:4
onNext:1
onNext:2
onNext:3
onNext:4
onNext:1
onNext:2
onNext:3
onNext:4
  1. Buffer()

Nhóm các item vào 1 “batch” và emit từng batch thay vì emit lần lượt từng item

Observable<Integer> observable = Observable.just(1,2,3,4,5,6,7,8,9)
observable.subscribeOn(Schedulers.io()).observeOn(AndroidScheduler.mainThread()).buffer(3)
onNext:
item:1
item:2
item:3
onNext:
item:4
item:5
item:6
onNext:
item:7
item:8
item:9

5. Scheduler

Scheduler chịu trách nhiệm phân bổ tác vụ thực hiện trên các thread khác nhau dựa trên các phương thức subscriberOn() và observeOn()

  • Một số loại Scheduler:
  1. Schedulers.io()

Đây là loại scheduler phổ biến nhất của rxJava , sử dụng cho network request , được lưu trữ bơi Thread-pool.

observable.subscribeOn(Schedulers.io())
  1. Schedulers.computation()

Loại scheduler này gần giống với Schedulers.io() , sử dụng cho bitmap ,… Số lượng thread tạo ra phụ thuộc vào số lõi của thiết bị . Vì thế nên nếu thiết bị có 2 lõi và chúng đều bận , tác vụ sẽ
được chờ tới khi chúng rảnh và được thự thi.

observable.subscribeOn(Schedulers.computation())
  1. Schedulers.single()

Scheduler này được back bởi duy nhất 1 thread , không quan trọng có bao nhiêu observable , nó được coi như sự thay thế của main thread

observable.subscribeOn(Schedulers.single())
  1. Schedulers.newThread()

Mỗi thread sẽ được tạo ra cho mỗi observable thực thi. Vì thế phải kiểm soát chặt chẽ số observable trong trường hợp có quá nhiều observable thực thi

observable.subscribeOn(Schedulers.newThrea())

6. Observer

Tiếp theo về observer , chúng ta cùng điểm qua một vài loại Observer hay sử dụng hiện nay

  1. Observer

Được dùng khi muốn emit nhiều hơn 1 giá trị, ví dụ như update tiến trình khi download một file.

Observer<Integer> observer =newObserver<Integer>(){@OverridepublicvoidonSubscribe(Disposable d){System.out.println("onSubscribe");}@OverridepublicvoidonNext(Integer o){System.out.println("onNext "+ o);}@OverridepublicvoidonError(Throwable e){}@OverridepublicvoidonComplete(){System.out.println("onComplete");}};
Output:
onSubscribe
onNext 10
onNext 20
onComplete
  1. SingleObserver

Sử dụng khi muốn emit duy nhất 1 giá trị , ví dụ như network call trong Android

singleObservable.subscribe(newSingleObserver<User>(){@OverridepublicvoidonSubscribe(Disposable d){}@OverridepublicvoidonSuccess(User user){System.out.println(String.format("User with name %s successfully created: ", user.getName()));}@OverridepublicvoidonError(Throwable e){}});
Userwithname'Anitaa' successfully created
  1. MaybeObserver

Sử dụng trong trường hợp có thể tác vụ không trả về giá trị. Ví dụ method POST api.

val maybeObservable = Maybe.create<Unit>{ emitter ->postRequestApi()
emitter.onComplete()}

maybeObservable.subscribeOn(Schedulers.io()).observeOn(AndroidScheduler.mainThread()).subscribe(maybeObserver)
  1. CompletableObserver

CompletableObserver chỉ quan tâm tới kết quả trả về ở onComplete() hoặc lỗi ở onError() , không quan trọng data trả về là dạng nào

observable.subscribe(newCompletableObserver(){@OverridepublicvoidonSubscribe(Disposable d){}@OverridepublicvoidonComplete(){System.out.println("onComplete is called");}@OverridepublicvoidonError(Throwable e){System.out.println("onError is called"+ e.getMessage());}});
onComplete is called

7. Disposable

Sử dụng disposable để handle observer tránh hiện tượng “Memory leak” , khi subscribers muốn dừng lắng nghe observables.

var myObserver: Observer<Int>=object: Observer<Int>{privatevar disposable: Disposable?=nulloverridefunonSubscribe(disposable: Disposable){this.disposable = disposable
        }overridefunonNext(value:Int){//Has access to Disposable}overridefunonError(e:Throwable){//Has access to Disposable}overridefunonComplete(){//Has access to Disposable}}
overridefunonStop(){super.onStop()
disposable?.dispose()}

8. Tổng kết

Trong bài viết này , chúng ta đã tìm hiểu khái niệm về Asynchronous , Observer pattern và Rxjava cùng các thành phần của nó

Để có thêm thông tin , hãy đọc thêm về các bài viết tham khảo sau:

https://refactoring.guru/design-patterns/observer

https://www.journaldev.com/22594/rxjava-observables-observers

https://blog.mindorks.com/rxjava-for-android-rxandroid

Nguồn: viblo.asia

Bài viết liên quan

Sự Khác Nhau Giữa Domain và Hosting Là Gì?

Sự khác nhau giữa domain và hosting là gì? Bài này giải thích ngắn và dễ hiểu nh

Shared Hosting hay VPS Hosting: Lựa chọn nào dành cho bạn?

Bài viết giải thích rõ shared hosting và vps hosting là gì và hướng dẫn chọn lựa

Thay đổi Package Name của Android Studio dể dàng với plugin APR

Nếu bạn đang gặp khó khăn hoặc bế tắc trong việc thay đổi package name trong And

Lỗi không Update Meta_Value Khi thay thế hình ảnh cũ bằng hình ảnh mới trong WordPress

Mã dưới đây hoạt động tốt có 1 lỗi không update được postmeta ” meta_key=