EventBus源码分析(二):事件分发及处理
前面了解了EventBus的构造及事件订阅,接下来看看EventBus事件分发的实现原理。
1、事件分发
使用过EventBus的同学都应该知道,EventBus提供两个事件发送API方法:通过post(event)可以发送普通一次性事件外,另外一个方法是postSticky(event)可以发送粘性事件。
所谓粘性事件其实和普通事件一样,只不过会将【粘性事件】存在stickyEvents映射集合中。当注册接收粘性事件的订阅者时,会将这里存储的粘性事件再次发送一遍,详见EventBus源码分析(一):构造及订阅第2.1节。
/**
* Posts the given event to the event bus and holds on to the event (because it is sticky). The most recent sticky
* event of an event's type is kept in memory for future access by subscribers using {@link Subscribe#sticky()}.
*/
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
1.1、发送事件
不管粘性事件还是非粘性事件,在主动发送事件时都将通过post(event)方法分发。

就从这个方法开始,跟着源码去探索EventBus的事件分发过程。
/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
借助ThreadLocal,保持当前线程中的事件分发状态。其中存储在此线程中需要分发的事件集合、事件分发状态、是否是主线程等信息。
/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
无论从哪个线程访问ThreadLocal对象,都是当前线程中的副本,关于ThreadLocal详见之前的一篇积累:Thread和ThreadLocal。
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
用一个while循环遍历当前线程中所有的事件,并使用postSingleEvent(event,postingState)方法分发每一个事件:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//默认配置为false
if (eventInheritance) {
...
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
...
}
接下来,是根据事件类型分发事件,先从subscriptionsByEventType中拿出当前【事件】对用的所有【订阅者】。注册订阅者的时候会根据订阅者订阅方法中的事件类型,将其添加到对应事件映射的集合中。详见EventBus源码分析(一):构造及订阅第2.1节。
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
...
for (Subscription subscription : subscriptions) {
...
postToSubscription(subscription, event, postingState.isMainThread);
...
}
...
}
postToSubscription(subscription, event, isMainThread)是事件分发过程中的最后一个方法,根据订阅者的订阅方法的线程模型选择在指定的线程中处理事件,在第2节中,将详细的讨论这5种线程模式。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
最后,事件分发给对应的订阅者处理,调用其订阅方法。
1.2、删除粘性事件
关于粘性事件,多提几个方法。因为粘性事件相当于内存缓存,将事件存储在内存中。每种类型的粘性事件只会缓存一个,并且是最近才发送的一个,因为stickyEvents映射使用事件的类型作为键值存储最新的该类型的粘性事件对象:
private final Map<Class<?>, Object> stickyEvents;
EventBus源码阅读的过程中就看到几个删除粘性事件的方法:
①removeAllStickyEvents():清空粘性事件,实际上就是清空stickyEvents映射。
/**
* Removes all sticky events.
*/
public void removeAllStickyEvents() {
synchronized (stickyEvents) {
stickyEvents.clear();
}
}
②removeStickyEvent(Class<T> eventType):删除这个类型的最近的那个粘性事件,并返回。
/**
* Remove and gets the recent sticky event for the given event type.
*
* @see #postSticky(Object)
*/
public <T> T removeStickyEvent(Class<T> eventType) {
synchronized (stickyEvents) {
return eventType.cast(stickyEvents.remove(eventType));
}
}
③removeStickyEvent(Object event):从EventBus中删除某个具体的粘性事件对象,并返回boolean表示是否成功删除。
/**
* Removes the sticky event if it equals to the given event.
*
* @return true if the events matched and the sticky event was removed.
*/
public boolean removeStickyEvent(Object event) {
synchronized (stickyEvents) {
Class<?> eventType = event.getClass();
Object existingEvent = stickyEvents.get(eventType);
if (event.equals(existingEvent)) {
stickyEvents.remove(eventType);
return true;
} else {
return false;
}
}
}
getStickyEvent(Class<T> eventType)方法根据事件类型从EventBus中获取发送过的粘性事件(如果有):
/**
* Gets the most recent sticky event for the given type.
*
* @see #postSticky(Object)
*/
public <T> T getStickyEvent(Class<T> eventType) {
synchronized (stickyEvents) {
return eventType.cast(stickyEvents.get(eventType));
}
}
2、事件处理(五种线程模型)
在第一节中详细讲了事件的分发,最后在postToSubscription(subscription, event, isMainThread)方法中开始根据订阅方法中设置的不同线程模型来处理事件。用到的如下几个对象在EventBus构造的时候就已经初始化:
EventBus(EventBusBuilder builder) {
...
mainThreadSupport = builder.getMainThreadSupport();
//MAIN、MAIN_ORDERED
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
//BACKGROUND
backgroundPoster = new BackgroundPoster(this);
//ASYNC
asyncPoster = new AsyncPoster(this);
...
executorService = builder.executorService;
}
2.1、POSTING
直接在事件发送线程中处理。这个没啥好说的,直接invoke调用解析出的method方法,传入订阅者对象本身,及事件作为唯一的参数。
void invokeSubscriber(Subscription subscription, Object event) {
try {
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
2.2、MAIN
事件订阅方法在主线程执行,注意不能进行耗时操作!根据isMainThread判断是否在主线程,如果在主线程,那么和POSTING一样直接调用。
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
否则就需要借助Handler切回主线程执行,这里借助mainThreadPoster完成。mainThreadPoster在EventBus构造的时候就初始化:
EventBus(EventBusBuilder builder) {
...
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
...
}
Android平台MainThreadSupport接口实现为AndroidHandlerMainThreadSupport类:
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
//这里传入的是Android主线程Looper
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
}
HandlerPoster的实现如下,借助Handler在主线程执行订阅方法。关于Handler、Looper详见Handler、Looper、MessageQueue消息机制原理。
public class HandlerPoster extends Handler implements Poster {
...
public void enqueue(Subscription subscription, Object event) {
...
queue.enqueue(pendingPost);
...
}
@Override
public void handleMessage(Message msg) {
...
eventBus.invokeSubscriber(pendingPost);
...
}
}
在EvebtBus类中invokeSubscriber(PendingPost pendingPost)方法的定义如下:
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
invokeSubscriber(subscription, event);
}
}
最终调用的方法和POSTING还是一样的,都是invokeSubscriber(subscription, event)方法。
2.3、MAIN_ORDERED
无论事件发送方法在子线程还是已经在主线程中,都将通过Handler消息队列执行事件订阅方法,非阻塞。
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
2.4、BACKGROUND
在后台线程执行消息处理方法。如果事件发送线程不是主线程,那么直接调用事件订阅方法。
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
否则,借助backgroundPoster在线程中执行,backgroundPoster在EventBus构造时候初始化:
EventBus(EventBusBuilder builder) {
...
backgroundPoster = new BackgroundPoster(this);
...
executorService = builder.executorService;
}
线程池使用EventBusBuilder中默认的缓存线程池:
public class EventBusBuilder {
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
...
ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
...
}
从而完成在子线程中执行订阅方法:
final class BackgroundPoster implements Runnable, Poster {
...
public void enqueue(Subscription subscription, Object event) {
...
eventBus.getExecutorService().execute(this);
...
}
@Override
public void run() {
...
eventBus.invokeSubscriber(pendingPost);
...
}
}
2.5、ASYNC
无论事件发送在主线程还是已经在子线程中,总是在另外一个子线程中异步的执行事件订阅方法。
asyncPoster.enqueue(subscription, event);
借助AsyncPoster在子线程中执行订阅方法,这里用的线程池和前面BACKGROUND是一样的。
class AsyncPoster implements Runnable, Poster {
...
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
...
eventBus.invokeSubscriber(pendingPost);
}
}
现在对EventBus事件分发及处理有更深刻的认识和理解。别看它只有1800余行代码,和网络请求库Volley一样,麻雀虽小,五脏俱全。