Valid setup for Reactive data polling in C#

0

I need to implement periodic polling of some data from remote component.
It should poll data next time after previous polling is done + wait let's say 5sec.
I came up with the code below but later figured out app throws Exception upon close - Event viewer has a record for 0xc00000fd which is stack overflow exception.
It's worth to mention that this exception only happens when app was open and polling data for quite a while (it takes time for stack to overflow).
All this is a WPF app and code below is in ViewModel.

I realize why exception is happening in this code (probably shouldn't call OnNext in Subscribe) but what is the correct way to implement?

_ctrlSubj = new Subject<ControllerInfo>();
_ctrlSubj.SelectMany(async _ => 
{
    // CurrentController is of type ControllerInfo
    // next line can take various amount of time
    var jobDetails = await Library.GetJobsAsync(CurrentController);
    return jobDetails;
})
.ObserveOnDispatcher()
.Subscribe(async e =>
{
    // Jobs is bound to View
    Jobs = new ObservableCollection<JobDetail>(jobDetails);

    await Task.Delay(TimeSpan.FromSeconds(5));
    _ctrlSubj.OnNext(CurrentController);
});
c#
wpf
system.reactive
polling
reactive
asked on Stack Overflow Dec 26, 2018 by IgorStack

2 Answers

2

2nd edit:

@Aron's answer reminded me of multi subscription problems, which the below answer has. I recommend a helper function IntervalAsync looking like this:

public static class RxExtensions
{
    public static IObservable<TResult> IntervalAsync<TResult>(Func<IObservable<TResult>> f, TimeSpan period)
    {
        return IntervalAsync(f, period, Scheduler.Default);
    }

    public static IObservable<TResult> IntervalAsync<TResult>(Func<IObservable<TResult>> f, TimeSpan period, IScheduler scheduler)
    {
        return Observable.Create<TResult>(o =>
        {
            var q = new BehaviorSubject<TimeSpan>(TimeSpan.Zero);
            var observable = q
                .Delay(t => Observable.Timer(t))
                .SelectMany(_ => f())
                .Do(t => q.OnNext(period));
            return observable.Subscribe(o);
        });
    }
}

with the final code looking like this:

var subscription = RxExtensions.IntervalAsync(
        () => Observable.FromAsync(() => Library.GetJobsAsync(CurrentController)), 
        TimeSpan.FromSeconds(5)
    )
    .ObserveOnDispatcher()
    .Subscribe(i =>
    {
        Jobs = new ObservableCollection<JobDetail>(jobDetails);
    });

@Aron's answer works. I don't find it simpler because you have more Rx-TPL mixing, though I'l admit that 'simpler' is in the eye of the beholder.


First edit: (Not recommended, multiple subscription bugs are present).

Your concern is valid: With a synchronous reactive pipeline, Interval would wait for the full pipeline to finish. But with an async pipeline, Interval won't hold up. So if the first async task took 4.5 seconds, the next task would start .5 seconds after the first finished.

If you want the end-to-begin delay to be a set timespan, I think it would be best to do a queueing mechanism, similar to what you set up. I would do something similar to this:

var q = new BehaviorSubject<TimeSpan>(TimeSpan.Zero);
var subscription = q
    .Delay(t => Observable.Timer(t))
    .SelectMany(_ => Observable.FromAsync(() => Library.GetJobsAsync(CurrentController))
    .Subscribe(i =>
    {
        Jobs = new ObservableCollection<JobDetail>(jobDetails);
    });

Rx's thread & stack management I think works better here than TPL, and this won't lead to an infinite stack. However, I haven't tested it.


Original answer:

This may do it, but I can't test because no types.

var subscription = Observable.Interval(TimeSpan.FromSeconds(5))
    .SelectMany(_ => Observable.FromAsync(() => Library.GetJobsAsync(CurrentController))
    .ObserveOnDispatcher()
    .Subscribe(jobDetails =>
    {
        Jobs = new ObservableCollection<JobDetail>(jobDetails);
    });

If no good, then please modify your answer to include a mcve.


Testing code:

This is the code I'm using to test for async-TPL/RX mix. Doesn't full replicate @IgorStack's environment because no WPF (no .ObserveOnDispatcher()):

var f = new Func<Task<int>>(async () => {
    await Task.Delay(TimeSpan.FromSeconds(1));
    return 3;
});

var scheduler = new EventLoopScheduler(); //or Scheduler.Default
var o1 = RxExtensions.IntervalAsync(() => Observable.FromAsync(() => f()), TimeSpan.FromSeconds(5), scheduler)
    .Timestamp();
var subscription1 = o1.Subscribe(i =>
    {
        Console.WriteLine("s1: " + i.Timestamp.ToString("hh:mm:ss.ffff"));
    });
var subscription2 = o1.Subscribe(i =>
    {
        Console.WriteLine("s2: " + i.Timestamp.ToString("hh:mm:ss.ffff"));
    });
answered on Stack Overflow Dec 27, 2018 by Shlomo • edited Dec 28, 2018 by Shlomo
0

Sorry @Shlomo, simplest way to achieve this is the following (this will give you end-to-start delay of 5 seconds).

var jobs = Observable.Create<List<Job>>(async (observer, cancel) => {
   while(cancel.IsCancellationRequested == false)
   {
      try
      {
         var ret = await Library.GetJobsAsync(CurrentController);
         observer.OnNext(ret);
         await Task.Delay(5000, cancel);
      }
      catch(Exception ex)
      {
         observer.OnError(ex);
         return;
      }
   }
   observer.OnCompleted();

});

If instead you want start-to-start delay of 5 seconds you can exchange the body of the code simply with:

var delay = Task.Delay(5000, cancel);
var ret = await Library.GetJobsAsync(CurrentController);
observer.OnNext(ret);
await delay;

Here is the Code rewritten into a LinqPad "UnitTest"...You can confirm the results.

void Main()
{
    var scheduler = new TestScheduler();

    var foo = Observable.Create<int>(async (observer, cancellationToken) => {
        while(!cancellationToken.IsCancellationRequested){
            var ret = await DoStuff(scheduler);
            observer.OnNext(ret);
            await Observable.Delay(
                Observable.Return(Unit.Default), 
                TimeSpan.FromSeconds(5), 
                scheduler)
            .ToTask();
        }
    });

    using(foo.Timestamp(scheduler).Subscribe(f => Console.WriteLine(f.Timestamp))){
        scheduler.AdvanceBy(TimeSpan.FromSeconds(120).Ticks);
    }

}

// Define other methods and classes here


public Task<int> DoStuff(IScheduler scheduler){
    return Observable.Delay(Observable.Return(1), TimeSpan.FromSeconds(1), scheduler).ToTask();
}

The output is the following:

01/01/0001 00:00:01 +00:00
01/01/0001 00:00:07 +00:00
01/01/0001 00:00:13 +00:00
01/01/0001 00:00:19 +00:00
01/01/0001 00:00:25 +00:00
01/01/0001 00:00:31 +00:00
01/01/0001 00:00:37 +00:00
01/01/0001 00:00:43 +00:00
01/01/0001 00:00:49 +00:00
01/01/0001 00:00:55 +00:00
01/01/0001 00:01:01 +00:00
01/01/0001 00:01:07 +00:00
01/01/0001 00:01:13 +00:00
01/01/0001 00:01:19 +00:00
01/01/0001 00:01:25 +00:00
01/01/0001 00:01:31 +00:00
01/01/0001 00:01:37 +00:00
01/01/0001 00:01:43 +00:00
01/01/0001 00:01:49 +00:00
01/01/0001 00:01:55 +00:00
answered on Stack Overflow Dec 28, 2018 by Aron • edited Dec 28, 2018 by Aron

User contributions licensed under CC BY-SA 3.0