响应式编程在C#生态中主要通过Rx.NET(也就是常说的C#Rx)实现,其所有功能都建立在几个核心接口之上,理解这些接口是掌握C#Rx的基础。

IObservable<T>接口
IObservable<T>是C#Rx中可观察序列的抽象,代表一个可以推送数据给观察者的数据源,其定义非常简洁:
public interface IObservable<T>
{
// 订阅观察者,返回可释放对象用于取消订阅
IDisposable Subscribe(IObserver<T> observer);
}所有Rx中的数据流,比如从事件转换来的序列、定时器生成的序列,最终都实现了这个接口。它的核心作用是定义数据推送的源头,外部不需要关心数据是怎么产生的,只需要通过Subscribe方法订阅就能接收数据。
IObserver<T>接口
IObserver<T>是观察者的抽象,用于接收IObservable<T>推送的数据和通知,定义如下:
public interface IObserver<T>
{
// 接收数据项
void OnNext(T value);
// 接收错误信息
void OnError(Exception error);
// 接收序列完成通知
void OnCompleted();
}这三个方法对应了数据流处理的三种情况:正常推送数据调用OnNext,出现错误调用OnError,序列全部数据推送完成调用OnCompleted。需要注意的是,一旦调用了OnError或者OnCompleted,这个观察者就不会再收到任何后续的OnNext调用。
ISubject<T>接口
ISubject<T>是一个特殊的接口,它同时继承了IObservable<T>和IObserver<T>,也就是说它既是可观察序列,又是观察者,相当于一个数据的中转站,定义如下:
public interface ISubject<T> : IObservable<T>, IObserver<T>
{
}常见的实现是Subject<T>,它可以接收外部推送的数据,同时把这些数据转发给所有订阅它的观察者,很多场景下可以用来做数据流的桥接。比如下面的例子,用Subject连接两个数据流:
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
class Program
{
static void Main()
{
// 创建Subject实例
var subject = new Subject<int>();
// 订阅Subject,作为观察者接收数据
subject.Subscribe(
onNext: x => Console.WriteLine($"收到数据: {x}"),
onError: ex => Console.WriteLine($"发生错误: {ex.Message}"),
onCompleted: () => Console.WriteLine("序列完成")
);
// 作为观察者推送数据
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();
}
}三个核心接口的关系
这三个接口的关系可以通过下面的表格清晰展示:
| 接口名称 | 角色 | 核心方法 | 典型使用场景 |
|---|---|---|---|
| IObservable<T> | 数据推送方 | Subscribe | 定义数据源,供外部订阅 |
| IObserver<T> | 数据接收方 | OnNext、OnError、OnCompleted | 处理接收到的数据、错误和完成通知 |
| ISubject<T> | 数据中转方 | 同时拥有上述两类接口的方法 | 桥接不同数据流,手动控制数据推送 |
实际使用注意事项
在使用这些接口时,有几个点需要特别注意:
- 订阅IObservable<T>时返回的IDisposable对象,一定要在不需要订阅的时候调用Dispose取消订阅,否则可能会造成内存泄漏。
- IObserver<T>的OnError和OnCompleted是互斥的,调用其中一个之后,另一个就不会再被调用,也不需要再处理后续的OnNext。
- Subject<T>不是线程安全的,如果需要在多线程场景下使用,可以选择用Subject<T>.Synchronize()做线程同步,或者选择其他线程安全的实现。
除了上述三个核心接口,C#Rx中还有比如IConnectableObservable<T>这类扩展接口,用于实现可连接的可观察序列,处理冷热序列的转换问题,不过最基础的核心还是前面介绍的这三个,掌握它们之后再去学习其他扩展接口会容易很多。
C#RxIObservableIObserverISubject响应式编程修改时间:2026-05-29 14:27:36