ZionSoft     About     Archive     Feed     Privacy

Wrapping Existing Code With RxJava

We are using RxJava in Android a lot, with good reasons. However, we still need to use code that is not built with RxJava, so let’s wrap them.

Synchronous APIs

For very simple synchronous APIs, you can use Observable.just() to wrap them. e.g. you can use Observable.just(1, 2, 3, 4, 5) to emit an integer sequence from 1 to 5.

If the API is blocking, you can use Observable.defer() to wrap them:

Observable<String> observable = Observable.defer(
  new Func0<Observable<String>>() {
    @Override
    public Observable<String> call() {
      try {
        return Observable.just(getStringBlocking());
      } catch (Exception e) {
        return Observable.error(e);
      }
    }
  });

And if you want RxJava to do try...catch for you, you can use Observable.fromCallable():

Observable<String> observable = Observable.fromCallable(
  new Callable<String>() {
    @Override
    public String call() throws Exception {
      return getStringBlocking();
    }
  });

Asynchronous APIs

Usually, existing code uses callbacks to support asynchronous operations, e.g. in Android we can request location updates like this:

locationManager.requestLocationUpdates(
  LocationManager.GPS_PROVIDER, 1000L, 10.0F,
  new LocationListener() {
    @Override
    public void onLocationChanged(Location location) {
    }

    ...
  });

For advanced RxJava users, who don’t need to read this article, you can use Observable.create() to wrap it, and fullfil the contract by yourself. But for us, we can easily use Observable.fromEmitter() to handle the case, and let the framework to help us:

Observable<Location> observable = Observable.fromEmitter(
  new Action1<AsyncEmitter<Location>>() {
    @Override
    public void call(final AsyncEmitter<Location> emitter) {
      final LocationListener locationListener = new LocationListener() {
        @Override
        public void onLocationChanged(Location location) {
          // emits location
          emitter.onNext(location);
        }

        ...
      };

      emitter.setCancellation(new AsyncEmitter.Cancellable() { 
        @Override 
        public void cancel() throws Exception {
          // stops location updates when unsubscribed
          locationManager.removeUpdates(locationListener);
        });

      locationManager.requestLocationUpdates(LocationManager.GPS_PROVIDER,
        1000L, 10.0F, locationListener);

      // if you also emit onError() or onComplete(),
      // the framework will make sure the Observable
      // contract is fullfilled
    }
     // let the framework to worry about backpressure
  }, AsyncEmitter.BackpressureMode.BUFFER);

Enjoy.