I'm new to the world of Reactive Extensions and I'm still trying to learn.
I'm developing an app with a datagrid that displays certain running Windows processes and their memory usage. The memory usage of each process should be updated frequently i.e. every 200ms.
Requirements
- When checkbox is checked
- the datagrid should be filled with processes and the memory usage is updated be using a timer with an interval of 200 ms.
- monitor (all should be done on a background thread)
 
-- if a process has exited, it should be removed from the source.
-- if a process starts, it should be added to the source
-- a file for changes
- When checkbox is unchecked
- all the monitor activity should be stopped
- the datagrid is cleared
 
Any help would be greatly appreciated! Notes:
- In the past I tried several approaches like using a ObservableConcurrentDictionary as a resource and a timer for periodically updating the resource, but I ran into troubles (concurrency, locking, etc), so I would like to have a solution based on Rx/ReactiveUI
- Do to technical limitation I can use only .NET Framework 4.0, Reactive-core.Net40
Update
ViewModel
private ReactiveList<IProcessModel> _processes = new ReactiveList<IProcessModel>() { ChangeTrackingEnabled = true };
public ReactiveList<IProcessModel> Processes { get { return _processes; } }
public MainViewModel(IMonitorService monitorService)
{
   this.WhenAnyValue(vm => vm.ShowProcessesIsChecked).Subscribe((b) => DoShowProcesses(b));
}
private void DoShowProcesses(bool checkboxChecked)
{
    IDisposable timer;
    Processes.Clear();
    if (checkboxChecked)
    {
        //checkbox checked
        lock (Processes)
            Processes.AddRange(_monitorService.GetProcesses());
        timer = Observable.Timer(TimeSpan.FromMilliseconds(200.0))
            .Select(x =>
        {
            lock (Processes)
            {
                foreach (var process in Processes) //throws the 'Collection was modified; enumeration operation may not execute.'
                    process.UpdateMemory(); 
                return Processes.Where(p => p.ProcessObject.HasExited).ToList();
            }
        }).
        ObserveOnDispatcher()
        .Subscribe(processesExited =>
        {
            if (processesExited.Count() > 0)
            {
                lock (Processes)
                    Processes.RemoveAll(processesExited); //remove all processes that have exited
            }
        });
    }
    else
    {
        if (timer != null)
            timer.Dispose();
    }
}
I started a new thread
Original
ViewModel
public class MainViewModel : ReactiveObject
{
    public ReactiveList<IProcessModel> Processes { get; private set; }
    IMonitorService _monitorService;
    public MainViewModel(IMonitorService monitorService)
    {
        _monitorService = monitorService;
        Processes = new ReactiveList<IProcessModel>() { ChangeTrackingEnabled = true };
        this.WhenAnyValue(vm => vm.ShowProcessesIsChecked)
            .Where(value => value == true) //checkbox checked
            .ObserveOn(Scheduler.Default) //raise notifications on thread-pool thread to keep UI responsive
            .Select((isChecked) =>
            {
                return monitorService.GetProcesses();
            })
            .ObserveOn(SynchronizationContext.Current)
            .Subscribe(processes => {
                Processes.AddRange(processes); }
            );
        //start the MonitorService with MonitorService.Start(Processes)
        //start a timer with an interval of 200ms --> at interval
        //- do UpdateMemory() foreach IProcessModel in Processes
        //- if ProcessObject.HasExited --> remove it from the collection source
        ;
        this.WhenAnyValue(vm => vm.ShowProcessesIsChecked)
            .Where(value => value == false) //checkbox unchecked
            .Subscribe((isChecked) =>
            {
                monitorService.Stop(); //this stops monitoring for starting processes and clears the Processes
            });
    }
    private bool _showProcessesIsChecked;
    public bool ShowProcessesIsChecked
    {
        get { return _showProcessesIsChecked; }
        set { this.RaiseAndSetIfChanged(ref _showProcessesIsChecked, value); }
    }
}
Model
public class ProcessModel : ProcessModelBase, IProcessModel
{
    public ProcessModel(Process process)
    {
        ProcessObject = process;
    }      
    public void UpdateMemory()
    {
        try
        {
            if (!ProcessObject.HasExited)
            {
                long mem = ProcessObject.PagedMemorySize64;
                ProcessObject.Refresh();
                if (mem != ProcessObject.PagedMemorySize64)
                    OnPropertyChanged(nameof(ProcessObject));
            }
        }
        catch (Exception)
        {
            //log it
        }
    }
}
Service
public class MonitorService : IMonitorService
{
    ManagementEventWatcher managementEventWatcher;
    ReactiveList<IProcessModel> _processes;
    public List<IProcessModel> GetProcesses()
    {
        List<IProcessModel> processes = new List<IProcessModel>();
        foreach (var process in Process.GetProcesses().Where(p => p.ProcessName.Contains("chrome")))
            processes.Add(new ProcessModel(process));
        return processes;
    }
    /// <summary>
    /// Starts the manager. Monitor a starting process and changes in log file
    /// </summary>
    /// <param name="processes"></param>
    public void Start(ReactiveList<IProcessModel> processes)
    {
        _processes = processes;
        var qStart = "SELECT * FROM Win32_ProcessStartTrace WHERE ProcessName like 'chrome'";
        managementEventWatcher = new ManagementEventWatcher(new WqlEventQuery(qStart));
        managementEventWatcher.EventArrived += new EventArrivedEventHandler(OnProcessStarted);
        try
        {
            managementEventWatcher.Start();
        }
        catch (Exception)
        {
            //log it
        }
        Task.Factory.StartNew(() => MonitorLogFile());
    }
    public void Stop()
    {
        if (managementEventWatcher != null)
            managementEventWatcher.Stop();
        if (_processes != null)
            _processes.Clear();
    }
    private void MonitorLogFile()
    {
        //this code monitors a log file for changes. It is possible that the IsChecked property of a ProcessModel object is set in the Processes collection
    }
    private void OnProcessStarted(object sender, EventArrivedEventArgs e)
    {
        try
        {
            Process process = Process.GetProcessById(Convert.ToInt32(e.NewEvent.Properties["ProcessID"].Value));
            _processes.Add(new ProcessModel(process));
        }
        catch (ArgumentException)
        {
            //log it
        }
        catch (InvalidOperationException)
        {
            //log it
        }
    }
}
XAML
<CheckBox Content='Show Processes' IsChecked='{Binding ShowProcessesIsChecked}' />
<DataGrid  ItemsSource="{Binding Processes}">
    <DataGrid.Resources>
      <DataGridTemplateColumn Header='Process'
                              x:Key='dgProcessName'
                              IsReadOnly='True'
                              x:Shared='False'>
        <DataGridTemplateColumn.CellTemplate>
            <DataTemplate>
                <StackPanel Orientation='Horizontal' VerticalAlignment='Center'>
                    <CheckBox IsChecked='{Binding IsChecked, Mode=TwoWay, UpdateSourceTrigger=PropertyChanged}' HorizontalAlignment='Stretch' VerticalAlignment='Stretch'> </CheckBox>
                    <TextBlock Text='{Binding ProcessObject.ProcessName}' />
                </StackPanel>
            </DataTemplate>
        </DataGridTemplateColumn.CellTemplate>
          </DataGridTemplateColumn>
          <DataGridTextColumn Header="PID"
                              Binding="{Binding ProcessObject.Id}"
                              IsReadOnly='True'
                              x:Key='dgPID'
                              x:Shared='False' />
          <DataGridTextColumn Header="Commit Size"
                              Binding='{Binding ProcessObject.PagedMemorySize64}'
                              IsReadOnly='True'
                              x:Key='dgCommitSize'
                              x:Shared='False' />
    </DataGrid.Resources>
</DataGrid>
 
     
    