Otto中的生产者和消费者
前面在Otto小巧的事件总线一文中了解了Otto事件总线这个库。本篇详细介绍一下Otto中生产者和消费者的实现,Otto中有两个注解:@Subscribe和@Produce。
@Subscribe注解大家比较熟悉,在EventBus中也有同名的注解,在订阅者类中定义接收事件的方法。
/**
* Marks a method as an event handler, as used by {@link AnnotatedHandlerFinder} and {@link Bus}.
*
* <p>The method's first (and only) parameter defines the event type.
* <p>If this annotation is applied to methods with zero parameters or more than one parameter, the object containing
* the method will not be able to register for event delivery from the {@link Bus}. Otto fails fast by throwing
* runtime exceptions in these cases.
*
* @author Cliff Biffle
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
}
而另一个注解不常用:@Produce。用来定义生产者中发送事件的方法,当生产者注册到Bus中会发送定义的事件。
/**
* Marks a method as an instance producer, as used by {@link AnnotatedHandlerFinder} and {@link Bus}.
* <p>
* Otto infers the instance type from the annotated method's return type. Producer methods may return null when there is
* no appropriate value to share. The calling {@link Bus} ignores such returns and posts nothing.
*
* @author Jake Wharton
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Produce {
}
同样需要反注册。这种操作确实不常用,所以@Produce注解用的较少。
同时Otto库还定义了HandlerFinder接口,通过findAllProducers(Object listener)方法和findAllSubscribers(Object listener)方法来查找所有生产者和消费者。
/** Finds producer and subscriber methods. */
interface HandlerFinder {
Map<Class<?>, EventProducer> findAllProducers(Object listener);
Map<Class<?>, Set<EventHandler>> findAllSubscribers(Object listener);
HandlerFinder ANNOTATED = new HandlerFinder() {
@Override
public Map<Class<?>, EventProducer> findAllProducers(Object listener) {
return AnnotatedHandlerFinder.findAllProducers(listener);
}
@Override
public Map<Class<?>, Set<EventHandler>> findAllSubscribers(Object listener) {
return AnnotatedHandlerFinder.findAllSubscribers(listener);
}
};
}
提供了内置也是默认的实现ANNOTATED实例,来查找注册的生产者和消费者。内部借助AnnotatedHandlerFinder工具类实现,该类内部定义了两个方法:findAllProducers(Object listener)和findAllSubscribers(Object listener)。
通过findAllProducers(Object listener)方法查找所有生产者:
static Map<Class<?>, EventProducer> findAllProducers(Object listener) {
final Class<?> listenerClass = listener.getClass();
Map<Class<?>, EventProducer> handlersInMethod = new HashMap<Class<?>, EventProducer>();
Map<Class<?>, Method> methods = PRODUCERS_CACHE.get(listenerClass);
if (null == methods) {
methods = new HashMap<Class<?>, Method>();
//通过loadAnnotatedProducerMethods方法
loadAnnotatedProducerMethods(listenerClass, methods);
}
if (!methods.isEmpty()) {
for (Map.Entry<Class<?>, Method> e : methods.entrySet()) {
EventProducer producer = new EventProducer(listener, e.getValue());
handlersInMethod.put(e.getKey(), producer);
}
}
return handlersInMethod;
}
紧接着在findAllProducers(Object listener)方法内部通过loadAnnotatedProducerMethods()私有方法解析:
private static void loadAnnotatedProducerMethods(Class<?> listenerClass,
Map<Class<?>, Method> producerMethods) {
Map<Class<?>, Set<Method>> subscriberMethods = new HashMap<Class<?>, Set<Method>>();
loadAnnotatedMethods(listenerClass, producerMethods, subscriberMethods);
}
而通过findAllSubscribers(Object listener)查找所有消费者:
static Map<Class<?>, Set<EventHandler>> findAllSubscribers(Object listener) {
Class<?> listenerClass = listener.getClass();
Map<Class<?>, Set<EventHandler>> handlersInMethod = new HashMap<Class<?>, Set<EventHandler>>();
Map<Class<?>, Set<Method>> methods = SUBSCRIBERS_CACHE.get(listenerClass);
if (null == methods) {
methods = new HashMap<Class<?>, Set<Method>>();
//通过loadAnnotatedSubscriberMethods方法
loadAnnotatedSubscriberMethods(listenerClass, methods);
}
if (!methods.isEmpty()) {
for (Map.Entry<Class<?>, Set<Method>> e : methods.entrySet()) {
Set<EventHandler> handlers = new HashSet<EventHandler>();
for (Method m : e.getValue()) {
handlers.add(new EventHandler(listener, m));
}
handlersInMethod.put(e.getKey(), handlers);
}
}
return handlersInMethod;
}
同样的在findAllSubscribers(Object listener)方法内部通过loadAnnotatedSubscriberMethods()私有方法解析:
private static void loadAnnotatedSubscriberMethods(Class<?> listenerClass,
Map<Class<?>, Set<Method>> subscriberMethods) {
Map<Class<?>, Method> producerMethods = new HashMap<Class<?>, Method>();
loadAnnotatedMethods(listenerClass, producerMethods, subscriberMethods);
}
loadAnnotatedProducerMethods()和loadAnnotatedSubscriberMethods()最终都是通过loadAnnotatedMethods()方法来解析注解,此方法是重点,解析注册对象类中的生产者和消费者:
/**
* Load all methods annotated with {@link Produce} or {@link Subscribe} into their respective caches for the
* specified class.
*/
private static void loadAnnotatedMethods(Class<?> listenerClass,
Map<Class<?>, Method> producerMethods, Map<Class<?>, Set<Method>> subscriberMethods) {
for (Method method : listenerClass.getDeclaredMethods()) {
// The compiler sometimes creates synthetic bridge methods as part of the
// type erasure process. As of JDK8 these methods now include the same
// annotations as the original declarations. They should be ignored for
// subscribe/produce.
if (method.isBridge()) {
continue;
}
if (method.isAnnotationPresent(Subscribe.class)) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length != 1) {
throw new IllegalArgumentException("Method " + method + " has @Subscribe annotation but requires "
+ parameterTypes.length + " arguments. Methods must require a single argument.");
}
Class<?> eventType = parameterTypes[0];
if (eventType.isInterface()) {
throw new IllegalArgumentException("Method " + method + " has @Subscribe annotation on " + eventType
+ " which is an interface. Subscription must be on a concrete class type.");
}
if ((method.getModifiers() & Modifier.PUBLIC) == 0) {
throw new IllegalArgumentException("Method " + method + " has @Subscribe annotation on " + eventType
+ " but is not 'public'.");
}
Set<Method> methods = subscriberMethods.get(eventType);
if (methods == null) {
methods = new HashSet<Method>();
subscriberMethods.put(eventType, methods);
}
methods.add(method);
} else if (method.isAnnotationPresent(Produce.class)) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length != 0) {
throw new IllegalArgumentException("Method " + method + "has @Produce annotation but requires "
+ parameterTypes.length + " arguments. Methods must require zero arguments.");
}
if (method.getReturnType() == Void.class) {
throw new IllegalArgumentException("Method " + method
+ " has a return type of void. Must declare a non-void type.");
}
Class<?> eventType = method.getReturnType();
if (eventType.isInterface()) {
throw new IllegalArgumentException("Method " + method + " has @Produce annotation on " + eventType
+ " which is an interface. Producers must return a concrete class type.");
}
if (eventType.equals(Void.TYPE)) {
throw new IllegalArgumentException("Method " + method + " has @Produce annotation but has no return type.");
}
if ((method.getModifiers() & Modifier.PUBLIC) == 0) {
throw new IllegalArgumentException("Method " + method + " has @Produce annotation on " + eventType
+ " but is not 'public'.");
}
if (producerMethods.containsKey(eventType)) {
throw new IllegalArgumentException("Producer for type " + eventType + " has already been registered.");
}
producerMethods.put(eventType, method);
}
}
//缓存生产者类型
PRODUCERS_CACHE.put(listenerClass, producerMethods);
//缓存消费者类型
SUBSCRIBERS_CACHE.put(listenerClass, subscriberMethods);
}
并且将已解析过的类缓存起来,加快注册过程中的解析。
最后,再介绍一下与@Subscribe和@Produce对应的两个封装类:EventHandler和EventProducer。
EventProducer这个类用来发送事件,构造的时候封装事件生产者和对应的生产方法(注意需要时无参的方法):
EventProducer(Object target, Method method) {
if (target == null) {
throw new NullPointerException("EventProducer target cannot be null.");
}
if (method == null) {
throw new NullPointerException("EventProducer method cannot be null.");
}
this.target = target;
this.method = method;
method.setAccessible(true);
// Compute hash code eagerly since we know it will be used frequently and we cannot estimate the runtime of the
// target's hashCode call.
final int prime = 31;
hashCode = (prime + method.hashCode()) * prime + target.hashCode();
}
当注册生产者,产生事件的时候触发调用produceEvent()方法,获取对应的事件,进而向Bus发送此事件。
/**
* Invokes the wrapped producer method.
*
* @throws java.lang.IllegalStateException if previously invalidated.
* @throws java.lang.reflect.InvocationTargetException if the wrapped method throws any {@link Throwable} that is not
* an {@link Error} ({@code Error}s are propagated as-is).
*/
public Object produceEvent() throws InvocationTargetException {
if (!valid) {
throw new IllegalStateException(toString() + " has been invalidated and can no longer produce events.");
}
try {
return method.invoke(target);
} catch (IllegalAccessException e) {
throw new AssertionError(e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
EventHandler用来处理事件,构造时缓存消费者和对应的事件处理方法:
EventHandler(Object target, Method method) {
if (target == null) {
throw new NullPointerException("EventHandler target cannot be null.");
}
if (method == null) {
throw new NullPointerException("EventHandler method cannot be null.");
}
this.target = target;
this.method = method;
method.setAccessible(true);
// Compute hash code eagerly since we know it will be used frequently and we cannot estimate the runtime of the
// target's hashCode call.
final int prime = 31;
hashCode = (prime + method.hashCode()) * prime + target.hashCode();
}
通过handleEvent(Object event)传入对应的事件,消费者从而处理此事件:
/**
* Invokes the wrapped handler method to handle {@code event}.
*
* @param event event to handle
* @throws java.lang.IllegalStateException if previously invalidated.
* @throws java.lang.reflect.InvocationTargetException if the wrapped method throws any {@link Throwable} that is not
* an {@link Error} ({@code Error}s are propagated as-is).
*/
public void handleEvent(Object event) throws InvocationTargetException {
if (!valid) {
throw new IllegalStateException(toString() + " has been invalidated and can no longer handle events.");
}
try {
method.invoke(target, event);
} catch (IllegalAccessException e) {
throw new AssertionError(e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}