EventBus源码分析​(二):事件分发及处理

Quibbler 2021-9-22 961

EventBus源码分析(二):事件分发及处理


        前面了解了EventBus的构造及事件订阅,接下来看看EventBus事件分发的实现原理。



1、事件分发

        使用过EventBus的同学都应该知道,EventBus提供两个事件发送API方法:通过post(event)可以发送普通一次性事件外,另外一个方法是postSticky(event)可以发送粘性事件。

        所谓粘性事件其实和普通事件一样,只不过会将【粘性事件】存在stickyEvents映射集合中。当注册接收粘性事件的订阅者时,会将这里存储的粘性事件再次发送一遍,详见EventBus源码分析(一):构造及订阅2.1节

    /**
     * Posts the given event to the event bus and holds on to the event (because it is sticky). The most recent sticky
     * event of an event's type is kept in memory for future access by subscribers using {@link Subscribe#sticky()}.
     */
    public void postSticky(Object event) {
        synchronized (stickyEvents) {
            stickyEvents.put(event.getClass(), event);
        }
        // Should be posted after it is putted, in case the subscriber wants to remove immediately
        post(event);
    }

        

1.1、发送事件

        不管粘性事件还是非粘性事件,在主动发送事件时都将通过post(event)方法分发。


        就从这个方法开始,跟着源码去探索EventBus的事件分发过程。

    /** Posts the given event to the event bus. */
    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);
        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

        借助ThreadLocal,保持当前线程中的事件分发状态。其中存储在此线程中需要分发的事件集合、事件分发状态、是否是主线程等信息。

    /** For ThreadLocal, much faster to set (and get multiple values). */
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }

        无论从哪个线程访问ThreadLocal对象,都是当前线程中的副本,关于ThreadLocal详见之前的一篇积累:Thread和ThreadLocal

    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };


        用一个while循环遍历当前线程中所有的事件,并使用postSingleEvent(event,postingState)方法分发每一个事件:

    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        //默认配置为false
        if (eventInheritance) {
            ...
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        ...
    }

        接下来,是根据事件类型分发事件,先从subscriptionsByEventType中拿出当前【事件】对用的所有【订阅者】。注册订阅者的时候会根据订阅者订阅方法中的事件类型,将其添加到对应事件映射的集合中。详见EventBus源码分析(一):构造及订阅2.1节

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        ...
            for (Subscription subscription : subscriptions) {
                ...
                    postToSubscription(subscription, event, postingState.isMainThread);
                ...
            }
        ...
    }

        postToSubscription(subscription, event, isMainThread)是事件分发过程中的最后一个方法,根据订阅者的订阅方法的线程模型选择在指定的线程中处理事件,在第2节中,将详细的讨论这5种线程模式。

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

        最后,事件分发给对应的订阅者处理,调用其订阅方法。


1.2、删除粘性事件

        关于粘性事件,多提几个方法。因为粘性事件相当于内存缓存,将事件存储在内存中。每种类型的粘性事件只会缓存一个,并且是最近才发送的一个,因为stickyEvents映射使用事件的类型作为键值存储最新的该类型的粘性事件对象:

    private final Map<Class<?>, Object> stickyEvents;

        EventBus源码阅读的过程中就看到几个删除粘性事件的方法:

        removeAllStickyEvents():清空粘性事件,实际上就是清空stickyEvents映射。

    /**
     * Removes all sticky events.
     */
    public void removeAllStickyEvents() {
        synchronized (stickyEvents) {
            stickyEvents.clear();
        }
    }

        removeStickyEvent(Class<T> eventType):删除这个类型的最近的那个粘性事件,并返回。

    /**
     * Remove and gets the recent sticky event for the given event type.
     *
     * @see #postSticky(Object)
     */
    public <T> T removeStickyEvent(Class<T> eventType) {
        synchronized (stickyEvents) {
            return eventType.cast(stickyEvents.remove(eventType));
        }
    }

        removeStickyEvent(Object event):从EventBus中删除某个具体的粘性事件对象,并返回boolean表示是否成功删除。

    /**
     * Removes the sticky event if it equals to the given event.
     *
     * @return true if the events matched and the sticky event was removed.
     */
    public boolean removeStickyEvent(Object event) {
        synchronized (stickyEvents) {
            Class<?> eventType = event.getClass();
            Object existingEvent = stickyEvents.get(eventType);
            if (event.equals(existingEvent)) {
                stickyEvents.remove(eventType);
                return true;
            } else {
                return false;
            }
        }
    }

        getStickyEvent(Class<T> eventType)方法根据事件类型从EventBus中获取发送过的粘性事件(如果有)

    /**
     * Gets the most recent sticky event for the given type.
     *
     * @see #postSticky(Object)
     */
    public <T> T getStickyEvent(Class<T> eventType) {
        synchronized (stickyEvents) {
            return eventType.cast(stickyEvents.get(eventType));
        }
    }



2、事件处理(五种线程模型)

        在第一节中详细讲了事件的分发,最后在postToSubscription(subscription, event, isMainThread)方法中开始根据订阅方法中设置的不同线程模型来处理事件。用到的如下几个对象在EventBus构造的时候就已经初始化:

    EventBus(EventBusBuilder builder) {
        ...
        mainThreadSupport = builder.getMainThreadSupport();
        //MAIN、MAIN_ORDERED
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        //BACKGROUND
        backgroundPoster = new BackgroundPoster(this);        
        //ASYNC
        asyncPoster = new AsyncPoster(this);
        ...
        executorService = builder.executorService;
    }


2.1、POSTING

        直接在事件发送线程中处理。这个没啥好说的,直接invoke调用解析出的method方法,传入订阅者对象本身,及事件作为唯一的参数。

    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }


2.2、MAIN

        事件订阅方法在主线程执行,注意不能进行耗时操作!根据isMainThread判断是否在主线程,如果在主线程,那么和POSTING一样直接调用。

    if (isMainThread) {
        invokeSubscriber(subscription, event);
    } else {
        mainThreadPoster.enqueue(subscription, event);
    }

        否则就需要借助Handler切回主线程执行,这里借助mainThreadPoster完成。mainThreadPosterEventBus构造的时候就初始化:

    EventBus(EventBusBuilder builder) {
        ...
        mainThreadSupport = builder.getMainThreadSupport();
        mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
        ...
    }

        Android平台MainThreadSupport接口实现为AndroidHandlerMainThreadSupport类:

    class AndroidHandlerMainThreadSupport implements MainThreadSupport {
        private final Looper looper;
        
        //这里传入的是Android主线程Looper
        public AndroidHandlerMainThreadSupport(Looper looper) {
            this.looper = looper;
        }
        
        @Override
        public boolean isMainThread() {
            return looper == Looper.myLooper();
        }
        
        @Override
        public Poster createPoster(EventBus eventBus) {
            return new HandlerPoster(eventBus, looper, 10);
        }
    }

        HandlerPoster的实现如下,借助Handler在主线程执行订阅方法。关于Handler、Looper详见Handler、Looper、MessageQueue消息机制原理

public class HandlerPoster extends Handler implements Poster {
    ...
    
    public void enqueue(Subscription subscription, Object event) {
        ...
         queue.enqueue(pendingPost);
        ...
    }
    
    @Override
    public void handleMessage(Message msg) {
        ...
        eventBus.invokeSubscriber(pendingPost);
        ...
    }
}

        在EvebtBus类中invokeSubscriber(PendingPost pendingPost)方法的定义如下:

    void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {
            invokeSubscriber(subscription, event);
        }
    }

        最终调用的方法和POSTING还是一样的,都是invokeSubscriber(subscription, event)方法。


2.3、MAIN_ORDERED

        无论事件发送方法在子线程还是已经在主线程中,都将通过Handler消息队列执行事件订阅方法,非阻塞。

    if (mainThreadPoster != null) {
        mainThreadPoster.enqueue(subscription, event);
    } else {
        // temporary: technically not correct as poster not decoupled from subscriber
        invokeSubscriber(subscription, event);
    }


2.4、BACKGROUND

        在后台线程执行消息处理方法。如果事件发送线程不是主线程,那么直接调用事件订阅方法。

    if (isMainThread) {
        backgroundPoster.enqueue(subscription, event);
    } else {
        invokeSubscriber(subscription, event);
    }

        否则,借助backgroundPoster在线程中执行,backgroundPoster在EventBus构造时候初始化:

    EventBus(EventBusBuilder builder) {
        ...
        backgroundPoster = new BackgroundPoster(this);
        ...
        executorService = builder.executorService;
    }

        线程池使用EventBusBuilder中默认的缓存线程池:

    public class EventBusBuilder {
        private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
        ...
        ExecutorService executorService = DEFAULT_EXECUTOR_SERVICE;
        ...
    }

        从而完成在子线程中执行订阅方法:

    final class BackgroundPoster implements Runnable, Poster {
        ...
        
        public void enqueue(Subscription subscription, Object event) {
            ...
                eventBus.getExecutorService().execute(this);
            ...
        }
        
        @Override
        public void run() {
            ...
                eventBus.invokeSubscriber(pendingPost);
            ...
        }
    }

        

2.5、ASYNC

        无论事件发送在主线程还是已经在子线程中,总是在另外一个子线程中异步的执行事件订阅方法。

    asyncPoster.enqueue(subscription, event);

        借助AsyncPoster在子线程中执行订阅方法,这里用的线程池和前面BACKGROUND是一样的。

    class AsyncPoster implements Runnable, Poster {
        ...
        public void enqueue(Subscription subscription, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
            queue.enqueue(pendingPost);
            eventBus.getExecutorService().execute(this);
        }
        
        @Override
        public void run() {
            ...
            eventBus.invokeSubscriber(pendingPost);
        }
    }

        


        现在对EventBus事件分发及处理有更深刻的认识和理解。别看它只有1800余行代码,和网络请求库Volley一样,麻雀虽小,五脏俱全。



不忘初心的阿甘
最新回复 (1)
  • shangxiaobo 2021-9-24
    2
    • 安卓笔记本
      3
        登录 注册 QQ
返回
仅供学习交流,切勿用于商业用途。如有错误欢迎指出:fluent0418@gmail.com