cat

dabaicai has a cat

0%

2.bloc-on分析

  • 1.分析代码
1
2
3
4
5
6
7
8
9
10
    on<TTInitEvent>(request);

on<TTRequestEvent>((event, emit) {
request(event, emit);
});
<!--more-->

FutureOr<void> request(TTRequestEvent event, Emitter<TTState> emit) {
emit.call(TTRequestSuccessState());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// 第一个入参类型
typedef EventHandler<Event, State> = FutureOr<void> Function(
Event event,
Emitter<State> emit,
);


typedef EventMapper<Event> = Stream<Event> Function(Event event);
// 第二个入参类型
typedef EventTransformer<Event> = Stream<Event> Function(
Stream<Event> events,
EventMapper<Event> mapper,
);
// 默认的第二个入参
static EventTransformer<dynamic> transformer = (events, mapper) {
return events
// _MapStream<T, S>(StreamController<Event>.broadcast().stream, mapper); ,单纯赋值
.map(mapper)
// _FlatMapStreamTransformer.bind(_MapStream)
.transform<dynamic>(const _FlatMapStreamTransformer<dynamic>());
};

Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
return streamTransformer.bind(this);
}
//
streamTransformer.bind(this);
class _FlatMapStreamTransformer<T> extends StreamTransformerBase<Stream<T>, T> {
const _FlatMapStreamTransformer();

@override
Stream<T> bind(Stream<Stream<T>> stream) {
final controller = StreamController<T>.broadcast(sync: true);
// Bloc初始化的stream 监听 这里新建 steam 的发生,错误,完成事件,并在监听一次完整事件后移除自己,
// _MapStream监听 Bloc的stream 变化(向Bloc的stream里面传数据,如add Event),错误(新建的)事件
// 这里新建的stream监听的是
// 新建的_MapStream,也就是前面on函数第二个参数新建的_MapStream
controller.onListen = () {
final subscriptions = <StreamSubscription<dynamic>>[];

final outerSubscription = stream.listen( // stream = _MapStream
(inner) { // inner = Bloc初始化的 StreamController<Event>.broadcast().stream
// Bloc初始化的stream 监听 这里新建 steam 的事件
final subscription = inner.listen(
controller.add,
onError: controller.addError,
);
// Bloc初始化的stream 监听一次完整的事件后从容器里面移除自己
subscription.onDone(() {
subscriptions.remove(subscription);
if (subscriptions.isEmpty) controller.close();
});

subscriptions.add(subscription);
},
onError: controller.addError,
);
// _MapStream 监听一次完整的事件后从容器里面移除自己
outerSubscription.onDone(() {
subscriptions.remove(outerSubscription);
if (subscriptions.isEmpty) controller.close();
});

subscriptions.add(outerSubscription);

controller.onCancel = () {
if (subscriptions.isEmpty) return null;
final cancels = [for (final s in subscriptions) s.cancel()];
return Future.wait(cancels).then((_) {});
};
};

return controller.stream;
}
}


void on<E extends Event>(
EventHandler<E, State> handler, {
EventTransformer<E>? transformer,
}) {

final subscription = (transformer ?? _eventTransformer)(
_eventController.stream.where((event) => event is E).cast<E>(),
(dynamic event) {
void onEmit(State state) {
if (isClosed) return;
if (this.state == state && _emitted) return;
onTransition(
Transition(
currentState: this.state,
event: event as E,
nextState: state,
),
);
emit(state);
}

final emitter = _Emitter(onEmit);
final controller = StreamController<E>.broadcast(
sync: true,
onCancel: emitter.cancel,
);

Future<void> handleEvent() async {
void onDone() {
emitter.complete();
_emitters.remove(emitter);
if (!controller.isClosed) controller.close();
}

try {
_emitters.add(emitter);
await handler(event as E, emitter);
} catch (error, stackTrace) {
onError(error, stackTrace);
rethrow;
} finally {
onDone();
}
}

handleEvent();
return controller.stream;
},
).listen(null);
_subscriptions.add(subscription);
}