UniRx入门系列一
# UniRx入门系列一
# 什么是UniRX
UniRX 是neuecc为Unity开发的一个Reactive Extensions程序库
什么是Reactive Extensions,概要如下:
- MicrosoftResearch开发的面向C#的异步处理库
- 基于设计模式中的Observer(观察者)模式设计的
- 用于描述与时间、执行定时等相关的操作
- 完成度较高,已被移植到各大主流的编程语言中(Java、JavaScript和Swift中)
UniRX这个基于ReactiveExtension移植到Unity的库,和.NET 版本的RX相比,有如下区别:
- UniRX基于Unity的编程思想进行过优化
- 基于Unity的开发习惯增加了一些方便的功能和操作符
- 增加了ReactiveProperty等响应式属性
- 相比原来的.NET RX 内存性能更好
# C#中的event和UniRX
我们在写C#程序时,会经常使用的一个功能——event,Event用于在某个时刻发出消息并让其在另外一个地方处理的功能。
using UnityEngine;
using UniRx;
using System.Collections;
public class TimeCounter : MonoBehaviour
{
public delegate void TimerEventHandler(float time);
public event TimerEventHandler OnTimeChanged;
void Start()
{
StartCoroutine(TimerCoroutine());
}
IEnumerator TimerCoroutine()
{
var time = 100.0f;
while (time > 0)
{
time -= Time.deltaTime;
OnTimeChanged(time);
yield return null;
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
事件订阅方,执行对应的事件操作
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using TMPro;
public class TimeView : MonoBehaviour
{
public TimeCounter timeCounter;
public TextMeshProUGUI textMeshProUGUI;
void Start()
{
timeCounter.OnTimeChanged += (time) =>
{
textMeshProUGUI.text = time.ToString("F3");
};
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
以上代码是一个倒计时的操作,事件发出方,将当前剩余时间对外广播,订阅了这个事件的接收方,接收这个事件,并在UGUI文本中显示当前剩余时间。
在注册事件处理程序时出现了一个名为Subject的类,而不是event;所以,我们可以看出Subject是UniRX的核心,通过Subject和订阅者连接起来。接下来,我们将详细介绍《OnNext和Subscribe》。
# UniRX中的《OnNext和SubScribe》
OnNext和Subscribe都是在Subject中实现的方法,它们具有如下行为。
- Subscribe 在接收到消息时执行注册的函数
- OnNext 将收到的消息传递给Subscribe注册的函数并执行
看一下下方的示例代码:
using System.Text;
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using UniRx;
public class TimeTest : MonoBehaviour
{
// Start is called before the first frame update
void Start()
{
Subject<string> subject=new Subject<string>();
subject.Subscribe(msg=>Debug.Log("Subscribe1:"+msg+"\n"));
subject.Subscribe(msg=>Debug.Log("Subscribe2:"+msg+"\n"));
subject.Subscribe(msg=>Debug.Log("Subscribe3:"+msg+"\n"));
subject.OnNext("hello 001");
subject.OnNext("hello 002");
subject.OnNext("hello 003");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
输出结果如下
Subscribe1:hello 001
Subscribe2:hello 001
Subscribe3:hello 001
Subscribe1:hello 002
Subscribe2:hello 002
Subscribe3:hello 002
Subscribe1:hello 003
Subscribe2:hello 003
Subscribe3:hello 003
2
3
4
5
6
7
8
9
10
11
12
如上所述,你可能会发现,Subscribe是接收到消息时执行相对应的操作,OnNext是发出消息后,按顺序调用Subscribe注册的操作。
上图描述了Subject的的行为,解释了UniRX的基本动作机制以及核心思想;下一部分中,我们将要讨论一个更加抽象的话题,IObserver接口和IObservable接口。
# UniRX中的IObserver和IObservable
在Subject中实现了OnNext和Subscribe两个方法;这样说其实不并太准确,更确切地说,Subject实现了IObserver接口和IObservable两个接口
# UniRX中的IObserver接口
IObserver是一个接口,定义了RX中可以发布事件消息的行为,其定义如下:
using System;
namespace UniRx
{
public interface IObservable<T>
{
IDisposable Subscribe(IObserver<T> observer);
}
}
2
3
4
5
6
7
8
9
IObservable只简单的定义了一个Subscribe方法。
现在我们看一下刚才用到的Subject类的定义。
namespace UniRx
{
public sealed class Subject<T> : ISubject<T>, IDisposable, IOptimizedObservable<T> {/*省略具体实现*/}
}
2
3
4
5
Subject实现了ISubject接口,ISubject接口定义如下:
namespace UniRx
{
public interface ISubject<TSource, TResult> : IObserver<TSource>, IObservable<TResult>
{
}
public interface ISubject<T> : ISubject<T, T>, IObserver<T>, IObservable<T>
{
}
}
2
3
4
5
6
7
8
9
10
可以看到Subject实现了IObservable和IObserver两个接口,也就是说Subject具有发布值和可以订阅值两个功能。 如下图:
# 事件的过滤、筛选
假设有如下实现:
void Start()
{
Subject<string> subjectStr = new Subject<string>();
subjectStr.Subscribe(x => Debug.Log(string.Format("Hello :{0} ", x)));
subjectStr.OnNext("Enemy");
subjectStr.OnNext("Wall");
subjectStr.OnNext("Wall");
subjectStr.OnNext("Enemy");
subjectStr.OnNext("Weapon");
}
2
3
4
5
6
7
8
9
10
11
12
输出如下:
Hello :Enemy
Hello :Wall
Hello :Wall
Hello :Enemy
Hello :Weapon
2
3
4
5
6
现在,我们的需求是,我们只想要输出Enemy,其它的消息需要被过滤掉(不输出);使用传统的编程思路,我们就得老老实实的在Subscribe的回调函数中写if条件判断语句,但是这样就失去了使用UniRX的意义。
# 是否可以在Subject和Subscribe之间加上过滤操作呢?
如之前所说,处理发出值的对象被定义为IObserver(观察者),订阅值的处理被定义为IObservable(被观察者),这意味着我们可以将两个实现类夹在Subject和Subscribe之间,然后在Subscribe中实现具体的细节。现在,我们尝试使用Where操作符过滤以下信息。
void Start()
{
Subject<string> subject = new Subject<string>();
subject
.Where(x => x == "Enemy")
.Subscribe(x => Debug.Log(string.Format("Hello : {0}", x)));
subject.OnNext("Wall");
subject.OnNext("Wall");
subject.OnNext("Enemy");
subject.OnNext("Enemy");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
输出如下所示:
Hello :Enemy
Hello : Enemy
2
3
通过Where操作符,就可以在IObservable和IObserver之间,对信息进行过滤。
# UniRX中的操作符
在UniRX中除了Where还有许多的操作符,这里介绍其中的一部分:
- Where 过滤操作符
- Select 转换操作符
- Distinct 排除重复元素操作符
- Buffer 等待缓存一定的数量
- ThrottleFirst 在给定条件内,只使用最前面那个 上述操作符只是众多操作员中的一小部分,UniRX提供了许多处理流的操作符,之后会出一个汇总的表格。
# 流
目前,我们使用”Subject、Subscribe“和"Subject 组合操作符来表达UniRX的执行过程。这种描述方式其实并不好,所以我们会用流形容这个过程。
在UniRX中,用流来表达从发出消息到Subscribe的一系列处理流程;组合操作符、构建流,执行Subscribe,启动流,传递流、发布OnCompleted、停止流等等。
流是整个UniRX的核心,以后我们会经常使用流这个词,所以需要熟悉他。
# 总结
- UniRX的核心是Subject
- Subject和Subscribe是UniRX中的核心
- 实际使用时需要注意IObserver和IObservable两个接口
- 利用操作符使事件的发布和订阅分离,使得事件的处理更加灵活。
- 使用操作符连接的一系列事件处理称之为流 在下一个系列中,我将和大家解释一下OnError、OnCompleted、Dispose等等。
# 写一个自己自定义的操作符
之前曾经说过,在有IObserver和IObservable类的Subject和Subscribe之间,通过过滤操作符,写入过滤条件,即可对流进行过滤。那么我们可以定义自己的操作符吗?答案是肯定的,下面我们将自己定义一个具有相同过滤行为的自定义过滤操作符。
using System;
public class MyFilter<T> : IObservable<T>
{
private IObservable<T> _source { get; }
private Func<T, bool> _conditionalFunc { get; }
public MyFilter(IObservable<T> source, Func<T, bool> conditionalFunc)
{
_source = source;
_conditionalFunc = conditionalFunc;
}
public IDisposable Subscribe(IObserver<T> observer)
{
return new MyFilterInternal(this, observer).Run();
}
private class MyFilterInternal : IObserver<T>
{
private MyFilter<T> _parent;
private IObserver<T> _observer;
private object lockObject = new object();
public MyFilterInternal(MyFilter<T> parent, IObserver<T> observer)
{
_observer = observer;
_parent = parent;
}
public IDisposable Run()
{
return _parent._source.Subscribe(this);
}
public void OnCompleted()
{
lock (lockObject)
{
_observer.OnCompleted();
_observer = null;
}
}
public void OnError(Exception error)
{
lock (lockObject)
{
_observer.OnError(error);
_observer = null;
}
}
public void OnNext(T value)
{
lock ((lockObject))
{
if (_observer == null) return;
try
{
if (_parent._conditionalFunc(value))
{
_observer.OnNext(value);
}
}
catch (Exception e)
{
_observer.OnError(e);
_observer = null;
}
}
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
其中,OnNext是比较重要的一个部分。
if (_parent._conditionalFunc(value))
{
_observer.OnNext(value);
}
2
3
4
5
这里判断你自己定义的条件是否满足,满足就继续向流中传递值。
# 使用扩展方法,增强自定义操作符的可用性
按照现在的流程,我们需要每一次使用自定义操作符时,首先需要进行实例化操作,使用起来并不方便。为了能够以流式结构来使用MyFilter自定义过滤操作符,使用如下扩展方法来定义:
using System;
public static class ObservableOperators
{
public static IObservable<T> FilterOperator<T>(this IObservable<T> source, Func<T, bool> conditionalFunc)
{
return new MyFilter<T>(source, conditionalFunc);
}
}
2
3
4
5
6
7
8
9
接下来,让我们看一下如何使用我们自己定义的这个操作符吧:
// Start is called before the first frame update
void Start()
{
/// <summary>
/// 自定义的过滤操作符
/// </summary>
/// <typeparam name="string"></typeparam>
/// <returns></returns>
Subject<string> subject = new Subject<string>();
subject.FilterOperator(x => x == "Enemy")
.Subscribe(x => Debug.Log(string.Format("Hello :{0}", x)));
subject.OnNext("Wall");
subject.OnNext("Wall");
subject.OnNext("Enemy");
subject.OnNext("Weapon");
subject.OnNext("Player");
subject.OnNext("Enemy");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
输出如下:
Hello :Enemy
Hello :Enemy
2
正如我们期望的那样,与过滤条件不符的信息都被剔除了,只输出满足过滤条件的流信息。