I am implementing repository pattern in RxJava using SqlBrite/SqlDelight for offline data storage and retrofit for Http requests
Here's a sample of that:
protected Observable<List<Item>> getItemsFromDb() {
     return database.createQuery(tableName(), selectAllStatement())
             .mapToList(cursor -> selectAllMapper().map(cursor));
 }
public Observable<List<Item>>getItems(){
     Observable<List<Item>> server = getRequest()
                 .doOnNext(items -> {
                     BriteDatabase.Transaction transaction = database.newTransaction();
                     for (Item item : items){
                         database.insert(tableName(), contentValues(item));
                     }
                     transaction.markSuccessful();
                     transaction.end();
                 })
                 .flatMap(items -> getItemsFromDbById())
                 .delaySubscription(200, TimeUnit.MILLISECONDS);
         Observable<List<Item>> db = getItemsFromDbById(id)
                 .filter(items -> items != null && items.size() > 0);
     return Observable.amb(db, server).doOnSubscribe(() -> server.subscribe(items -> {}, throwable -> {}));
 }
The current implementation uses Observable.amb to get latest of 2 streams and returns db stream in case db has data or server otherwise. To prevent early failure in case of no internet, server has a delaySubscription on it with 200ms.
I tried using Observable.concat but the SqlBrite stream never calls onComplete so server observable is never triggered.
I also tried Observable.combineLatest which didn't work because it keeps waiting for server observable to return data before emitting anything and Observable.switchOnNext didn't work either.
What I am looking for is a repository which:
- Keeps the subscription to SqlBrite (DB) open, in case of DB updates
- Always fetches data from server and writes it to database
- Should not emit empty result in case there was nothing in database and network request is still going on. This, because the user should see a progress bar in the case of the first load.
 
    