ReactiveUI Goodies–Merge

If you have ever written any App that involves some sort of messaging or real-time data, then you were likely confronted with the integration of data streams that originate from different sources. It can mean that you have to load cached data from disk, get data via an API call and/or instantiate a real time connection via a web socket. For the end user all this doesn’t matter. They only wanna see a nicely consolidated flow of information. So let’s have a look how we can easily get data from multiple sources into one data stream with Rx.

First we need a little DataService for our demo App that provides us data from two sources. One is a batch load that could be an API call or a load data from disk. The other source simulates a real time data source communicates new messages as they arrive.

public class DataService
{
    private int _numberOfLoadedItems = 0;
 
    private Subject<Message> _loadedObservable = new Subject<Message>();
    private Subject<Message> _socketObservable = new Subject<Message>();
 
    public IObservable<Message> Loaded()
    {
        return _loadedObservable;
    }
 
    public IObservable<Message> Listen()
    {
        return _socketObservable;
    }
 
 
    public void Load(int number)
    {
        Task.Run(async () =>
        {
            for (int i = 0; i < 3; i++)
            {
                Random rand = new Random();
                int toLoad = _numberOfLoadedItems + number;
                for (int j = _numberOfLoadedItems; j < toLoad; j++)
                {
                    _loadedObservable.OnNext(new Message { Text = $"Message {j} from disk.", Date = DateTime.Now.AddMinutes(-rand.Next(0, 1000)) });
                    _numberOfLoadedItems++;
                }
                await Task.Delay(2000);
            }
        });
    }
 
    public void Connect()
    {
        Task.Run(async () =>
        {
            while (true)
            {
                _socketObservable.OnNext(new Message { Text = $"Message from socket.", Date = DateTime.Now });
                await Task.Delay(3000);
            }
        });
    }
}

Now that we have our data streams available we can simply combine them with the Merge() operator. The Merge() operator allows us to combine two observables of the same data type.

_dataService = new DataService();
_rootList = new ReactiveList<Message>();
 
Items = _rootList.CreateDerivedCollection(x => x, x => true, (x, y) => -1 * x.Date.CompareTo(y.Date));
 
_dataService.Loaded()
    .Merge(_dataService.Listen())
    .ObserveOn(RxApp.MainThreadScheduler)
    .Subscribe(x =>
    {
        _rootList.Add(x);
    });

Since the data come in an arbitrary order depending on the timing of each data source. We apply what we learned in previous post Reactive Goodies – IReactiveDerivedList Basics and use a DerivedCollection to sort them by date before they are reported to the UI layer. As result we have nicely aggregated and sorted message list that is presented to user.

MergeDemo

Please find the example code at https://github.com/bitdisaster/practicalcode

Happy Coding!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s