Originally posted this article on the Wedding Party tech blog

This is a bonus RxJava post that I landed up writing along with my previous post on creating an event bus with RxJava. If you went through the code in the actual repo you would have noticed more than one version of the bottom fragment in the RxBus demo.

Originally I envisioned the RxBus example being a tad bit fanicer however as I coded up the example, I realized that too many concepts were getting conflated. The ridiculous simplicity of the RxBus implementation was lost. So I dumbed down the original example but left in the original code for the Rx padawans.

Original example:

Simple RxBus example

Fancier one:

Fancy RxBus example

The fanciness is basically in the numbers being accumulated and shown in “chunks”. In my head I thought I could simply use the debounce operator (like the debounce search example) and be on my jolly way, but this was not to be… From the always helpful RxJava wiki:

debounce - only emit an item from the source Observable after a particular timespan has passed without the Observable emitting any other items

I wanted the “after a particular timespan has passed without the Observable emitting any other items” part of debounce, but I needed the whole list of emitted items (not just a single item). Thebufferoperator also seemed promising:

buffer - periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time

“emit as bundles”, perfect! uh… not exactly, it says “periodically” and that literally means periodically. So EVERY X seconds/minutes it will emit a list of objects regardless of whether there were any taps/events. This would mean a bunch of empty lists would periodically be emitted when no taps were registered. I could get the example working if I tweaked the time component for buffer just enough and filter out the empty results, but it was a hack and not the way of the Rx.

What I really needed was a selective combination of debounce and buffer - a “debouncedBuffer” operator. Such an operator doesn’t exist but you could achieve something similar by calling .buffer() with a special “boundry observable” parameter, as Jedi master Ben Christensen points out in this Stack Overflow post (the Rx is strong with this one).

Essentially, if you pass an observable as a parameter to buffer, every time this observable emits an item, buffer will take the source observable and emit a list of items from the source observable (instead of the items emitted by the boundary observable).

<Source Observable For Actual Events>.buffer(<Boundary Observable that only tells "when" to take items from the Source>)

So what we’re going to do is use a “debouncedEventEmitter” as our boundary observable. The “debouncedEventEmitter” will basically emit a single item-which we really don’t care about-only after a certain time has elapsed from the emission of the first item.

Observable<Object> debouncedEventEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS);

We now buffer our source observable again to give us a list of items (vs a single item) everytime the debouncedEventEmitter emits a single item.

Observable<List<Object>> debouncedBufferEmitter = tapEventEmitter.buffer(debouncedEventEmitter);

Altogether now:

Observable<Object> tapEventEmitter = _rxBus.toObserverable().share();
Observable<Object> debouncedEventEmitter = tapEventEmitter.debounce(1, TimeUnit.SECONDS);
Observable<List<Object>> debouncedBufferEmitter = tapEventEmitter.buffer(debouncedEventEmitter);

debouncedBufferEmitter.buffer(debouncedEventEmitter)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Action1<List<Object>>() {
      @Override
      public void call(List<Object> taps) {
        _showTapCount(taps.size());
      }
    });

Notice the .share() operator? In my next post, I’ll go into the details of that operator along with .publish() and refcount().

[UPDATE: Ben pointed me to a niftier implementation of debounced buffer. I’ve added a third variant of the Bottom fragment that uses this approach. A subsequent post will go into the details.]

Follow discussion on Reddit.