Use of the Subject type is often frowned upon in reactive programming. In the following situation, I use a Subject to allow subscribing to notifications before the underlying source of the notification is created. Is there an alternative to accomplish this without using Subject?
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
using Windows.Networking.Sockets;
using Windows.Storage.Streams;
class Program
{
public static void Main()
{
var socket = new ObservableMessageWebSocket();
socket.Messages.Subscribe(Print); // Caller is allowed to subscribe before connect
var uri = new Uri("ws://mydomain.com/messages");
socket.ConnectAsync(uri).Wait(); // Caller is allowed to connect after subscribe
Console.ReadLine();
}
public static void Print(string message)
{
Console.WriteLine(message);
}
}
class ObservableMessageWebSocket
{
// Is there a way to get rid of this Subject?
private readonly Subject<string> subject = new Subject<string>();
private MessageWebSocket webSocket;
public IObservable<string> Messages => subject;
public async Task ConnectAsync(Uri uri)
{
webSocket = new MessageWebSocket();
webSocket.Control.MessageType = SocketMessageType.Utf8;
Observable
.FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(webSocket, nameof(webSocket.MessageReceived))
.Select(ReadString)
.Subscribe(subject);
await webSocket.ConnectAsync(uri);
}
private static string ReadString(EventPattern<MessageWebSocketMessageReceivedEventArgs> pattern)
{
using (var reader = pattern.EventArgs.GetDataReader())
{
reader.UnicodeEncoding = UnicodeEncoding.Utf8;
return reader.ReadString(reader.UnconsumedBufferLength);
}
}
}
EDIT: To clarify, I have several software components that subscribe to ObservableMessageWebSocket.Messages for push notifications. Some components subscribe before ObservableMessageWebSocket.ConnectAsync is called, and some subscribe after.
The code below avoids Subject, but does not function correctly. A component subscribes after connect, and never receives notifications.
class ObservableMessageWebSocket
{
private MessageWebSocket WebSocket { get; }
public IObservable<string> Messages { get; }
public ObservableMessageWebSocket()
{
WebSocket = new MessageWebSocket();
WebSocket.Control.MessageType = SocketMessageType.Utf8;
Messages = Observable
.FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(WebSocket, nameof(WebSocket.MessageReceived))
.Select(ReadString);
}
private static string ReadString(EventPattern<MessageWebSocketMessageReceivedEventArgs> pattern)
{
using (var reader = pattern.EventArgs.GetDataReader())
{
reader.UnicodeEncoding = UnicodeEncoding.Utf8;
return reader.ReadString(reader.UnconsumedBufferLength);
}
}
public async Task ConnectAsync(Uri uri)
{
await WebSocket.ConnectAsync(uri);
}
}
The code below does not work either. Same symptom.
class ObservableMessageWebSocket
{
private MessageWebSocket WebSocket { get; }
public IObservable<string> Messages { get; }
public ObservableMessageWebSocket()
{
WebSocket = new MessageWebSocket();
WebSocket.Control.MessageType = SocketMessageType.Utf8;
Messages = Observable.Create<string>(o => Observable
.FromEventPattern<MessageWebSocketMessageReceivedEventArgs>(WebSocket, nameof(WebSocket.MessageReceived))
.Select(ReadString)
.Subscribe(o));
}
private static string ReadString(EventPattern<MessageWebSocketMessageReceivedEventArgs> pattern)
{
using (var reader = pattern.EventArgs.GetDataReader())
{
reader.UnicodeEncoding = UnicodeEncoding.Utf8;
return reader.ReadString(reader.UnconsumedBufferLength);
}
}
public async Task ConnectAsync(Uri uri)
{
await WebSocket.ConnectAsync(uri);
}
}
Somehow, the code below works.
class ObservableMessageWebSocket
{
private MessageWebSocket WebSocket { get; }
private event EventHandler<string> StringReceived;
public IObservable<string> Messages { get; }
public ObservableMessageWebSocket()
{
WebSocket = new MessageWebSocket();
WebSocket.Control.MessageType = SocketMessageType.Utf8;
WebSocket.MessageReceived += HandleEvent;
Messages = Observable
.FromEventPattern<string>(this, nameof(StringReceived))
.Select(p => p.EventArgs);
}
private void HandleEvent(MessageWebSocket sender, MessageWebSocketMessageReceivedEventArgs args)
{
var handler = StringReceived;
if (handler == null) return;
string message;
using (var reader = args.GetDataReader())
{
reader.UnicodeEncoding = UnicodeEncoding.Utf8;
message= reader.ReadString(reader.UnconsumedBufferLength);
}
handler.Invoke(this, message);
}
public async Task ConnectAsync(Uri uri)
{
await WebSocket.ConnectAsync(uri);
}
}
To me, all three seem similar. How come only the last one works?