开源代码分析之EventBus
简介
提到EventBus同样是每个Android 开发所必须掌握的一个开源库,它是一个事件发布订阅系统,用法十分简单,但是能够在很大程度上解决模块间存在的耦合问题,当某个模块的某个事件产生的时候,对应的事件通过post方法将其发布到Eventbus上,再由EventBus对该事件进行分发,如果某个类需要响应某个事件,必须事先通过register方法进行注册将自己订阅到总线上,这样EventBus就会根据实际的情况将事件源post出来的事件分发到有处理指定事件类型能力的订阅者方法上。一旦订阅者订阅了对应的事件,订阅者将会接收到对应类型的事件,直到调用unregitser方法取消注册。事件订阅方法必须使用@Subscribe注释,并且必须是public方法。返回值必须是void,并且只能有一个参数,该参数的类型为事件对象类型,用法就这么简单,三言两语就搞定了。但是这篇博客的关注点不在于它的使用上,这篇博客想从源码角度来对EventBus原理进行分析看下上述提到的功能是如何实现的。
但是在分析源码之前还是先要熟悉下EventBus是如何使用的,毕竟我们熟悉源码的目的也是为了应用。
首先我们先看下下面两张对比图:
第一张是没有使用EventBus的项目结构图,可以看出整个结构几乎呈网状,这样的结构相对来说耦合度就相对高。
而第二张整个结构呈现的是星型结构,EventBus处于星型结构的核心位置,主要负责事件的接送与调度。事件的产生源和事件的消费者耦合度就大大得降低了,大家只和EventBus进行交互,事件源将事件分发到事件总线,订阅者不会相互交互而是监听事件总线分发的事件。所以订阅者和事件源避免了之间的强约束。
这篇博客介绍的是EventBus 3.xEventBus 3 相对于之前的版本引入了EventBusAnnotationProcessor,我们可以使用编译时注解的方式来使用Eventbus了。在EventBus 早期的版本中事件注册信息的获取采用的是反射机制,这样就会导致效率上的降低,在EventBus 3的版本上并行使用反射和编译时注解两种方式,我们可以根据自己的实际需求来选择采用哪种方式。这个在后面源代码分析的时候会进行介绍。
在项目中引入EventBus 3.x
这里推荐大家使用编译时注解方式。我这边为了方便起见,也只介绍使用编译时注解的方式引入。
这里需要注意的是目前很多教程注解预编译所采用的是android-apt的方式,不过随着Android Gradle 插件 2.2 版本的发布,Android Studio推出了编译时预处理官方插件,所以Apt工具的作者也就宣布不再维护该工具了。目前使用的是annotationProcessor来取代android-apt方式。
如果大家使用的是还是android-apt方式的话,建议通过如下方式来切换到annotationProcess方式。
切换步骤:
首先要确保Android Gradle插件版本是2.2以上:
- 修改Project 的build.gradle配置
android-apt方式
dependencies { |
修改后annotationProcessor 方式
dependencies { |
也就是把原先的
classpath 'com.neenbedankt.gradle.plugins:android-apt:1.8' 去掉 |
- 修改module的build.gradle配置
android-apt方式
buildscript { |
dependencies {
compile ‘org.greenrobot:eventbus:3.0.0’
apt’org.greenrobot:eventbus-annotation-processor:3.0.1’
}
|
dependencies {
compile ‘org.greenrobot:eventbus:3.0.0’
annotationProcessor ‘org.greenrobot:eventbus-annotation-processor:3.0.1’
}
|
apt {
arguments {
eventBusIndex “org.greenrobot.eventbus.demo.MyEventBusIndex”
}
}
修改后annotationProcessor 方式 |
defaultConfig {
javaCompileOptions {
annotationProcessorOptions {
arguments = [ eventBusIndex : ‘org.greenrobot.eventbus.demo.MyEventBusIndex’ ]
}
}
}
|
EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();
|
@Subscribe(threadMode = ThreadMode.POSTING,sticky = false,priority = 1)
public void onMessageEvent(MessageEvent event) {
tv.setText(event.message);
}
|
public static EventBus getDefault() {
//使用单例的方法创建eventbus
//getDefault方法使用了double check(双重检查锁定模式),多了一层判断,故可以减少上锁开销。
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
在创建的时候我们会通过丢进去一个设置好各种属性后的Builder,然后在EventBus构造函数上从Builder上获取已经设置好的各种属性,这个在很多开源代码中都使用过这种方式,比如picasso Okhttp等,这种比较适合于有多个属性需要设置的情况。但是我们这里的重点在于EventBus有哪些属性,在下面代码中对一些部件进行了注释,还有一些没有注释的是比较重要的,需要在后面分析中重点提到的,我们接着往下看。 |
public EventBus() {
this(DEFAULT_BUILDER);
}
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
EventBus(EventBusBuilder builder) {
//这三个变量很重要后面会重点介绍
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
//三个事件分发器对应不同的threadMode
//主线程分发器
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
//后台线程分发器
backgroundPoster = new BackgroundPoster(this);
//异步线程分发器
asyncPoster = new AsyncPoster(this);
//这个是执行任务的线程池
executorService = builder.executorService;
//这个是我们上面提到的在使用EventBus编译时注解方式的时候会通过addIndex将编译时生成的Index注入。subscriberInfoIndexes就是用于存放这些Index的。
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
//这个类负责查找订阅者方法
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex);
//下面是和调试相关的开关:
//是否打印订阅异常
logSubscriberExceptions = builder.logSubscriberExceptions;
//是否打印没有订阅者的Log
logNoSubscriberMessages = builder.logNoSubscriberMessages;
//是否发送订阅者异常
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
//是否发送没有订阅者的事件
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
//是否抛出订阅异常
throwSubscriberException = builder.throwSubscriberException;
//eventInheritance 设置为true的时候会发送事件以及当前事件所实现的接口以及当前事件的父类事件。
eventInheritance = builder.eventInheritance;
}
|
public void register(Object subscriber) {
//获取订阅者的class对象
Class<?> subscriberClass = subscriber.getClass();
//在这个类中查找对应的订阅方法以及父类的订阅方法
List
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//针对每个订阅方法调用subscribe进行订阅
subscribe(subscriber, subscriberMethod);
}
}
}
这里会使用SubscriberMethodFinder对作为参数传入对象的class进行查找,找到对应的订阅方法以及父类的订阅方法。我们先看下这部分代码: |
List
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//使用编译期间获取到的Subscribe注释方法
//通过 findUsingInfo(Class<?> subscriberClass) 在apt中进行查找获取
subscriberMethods = findUsingInfo(subscriberClass);
}
//如果在当前类以及父类中没找到任何的订阅方法抛出异常
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
//如果有找到那么添加到缓存中以便后续查找使用,存储的方式为订阅类----->订阅方法
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
|
private List
/** 为FindState创建一个对象池,复用FindState对象,防止对象被多次new或者gc. */
FindState findState = prepareFindState();
//将订阅方法赋给FindState对象
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//查找当前订阅类的订阅方法
findUsingReflectionInSingleClass(findState);
//查找当前订阅类父类的订阅方法
findState.moveToSuperclass();
}
//将所有的订阅方法从findState中取出并返回
return getMethodsAndRelease(findState);
}
首先在查找的时候,每次查找会对应一个FindState,这里为了避免频繁创建FindState对象,使用了复用对象池的方法,每次使用先在对象池中查找,如果有之前用过的就直接使用,避免了重新创建一个对象。实在没有的情况再通过new的方式来创建,用完后并不是立即就释放不用,而是放到缓存中供下一次使用。 |
private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
return new FindState();
}
|
//对注册对类的方法进行遍历,必须是public 必须只有一个参数,必须使用@SubScribe注释,并且当前事件类型
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
//获取全部的方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
//获取方法的修饰符
int modifiers = method.getModifiers();
//在需要检查修饰符的情况下需要修饰符为public
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//获取参数
Class>[] parameterTypes = method.getParameterTypes();
//如果参数为1个那么满足要求
if (parameterTypes.length == 1) {
//查看当前的注释释放包括Subscribe
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
//如果有Subscribe注释,那么parameterTypes[0]就是事件类型
Class> eventType = parameterTypes[0];
//通过上面的层层筛选,获取到通过上述筛选的方法,以及方法中作为参数的事件类型,将其作为参数传递到findState进行检查
//检查分两级,一般只需要一级检查,检查当前类中当前事件是否被某个方法已经注册处理,如果没有那么就添加到subscriberMethods中。
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
//获取注释中的线程模型,事件类型,方法,优先级,是否是sticky方法
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + “.” + method.getName();
throw new EventBusException(“@Subscribe method “ + methodName +
“must have exactly 1 parameter but has “ + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + “.” + method.getName();
throw new EventBusException(methodName +
“ is a illegal @Subscribe method: must be public, non-static, and non-abstract”);
}
}
}
首先会检查方法的修饰符是否是public,是否只有一个参数,是否目前已经有订阅方法订阅了该事件,订阅该事件的订阅方法是否是同一个方法。如果通过上述的检查,就会将这个订阅方法添加到FindState中的subscriberMethods。查找完当前类后会继续查找其父类。最后调用getMethodsAndRelease将FindState中存放的找到的订阅方法取出,然后将FindState添加到对象池中。 |
private List
//从池中取出一个不为空的FindState对象,避免了重新创建
FindState findState = prepareFindState();
//将当前的subscriberClass赋给FindState
findState.initForSubscriber(subscriberClass);
//如果事件对象不为空
while (findState.clazz != null) {
//从订阅者类到其父类,逐步获取订阅者信息
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
//如果存在订阅者信息,那么获取订阅者方法
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
//检查是否已经可以添加
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
//添加
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//在当个类中使用反射来获取对应的订阅方法
findUsingReflectionInSingleClass(findState);
}
//移动到它的父类继续查找
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
上面的流程和之前介绍的用反射的流程一致,只不过在获取方式上存在差别: |
private SubscriberInfo getSubscriberInfo(FindState findState) {
//这部分很重要回头要认真看下有部分与EventBusAnnotationProcessor相关
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}
getSubscriberInfo 中会遍历subscriberInfoIndexes 取出其中的SubscribeInfo,subscriberInfoIndexes还记得是怎么来的吧,就是我们之前通过addIndex将编译生成的类添加进来。 |
defaultConfig {
javaCompileOptions {
annotationProcessorOptions {
arguments = [ eventBusIndex : ‘org.greenrobot.eventbus.demo.MyEventBusIndex’ ]
}
}
}
否则就不会继续进行,eventBusIndex主要用于指定要生成哪个类。 |
@Override
public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment env) {
Messager messager = processingEnv.getMessager();
try {
//首先需要在build.gradle中配置生成的EventBusIndex类的名字,如果没有那么就会打印出错误消息
String index = processingEnv.getOptions().get(OPTION_EVENT_BUS_INDEX);
if (index == null) {
messager.printMessage(Diagnostic.Kind.ERROR, “No option “ + OPTION_EVENT_BUS_INDEX +
“ passed to annotation processor”);
return false;
}
verbose = Boolean.parseBoolean(processingEnv.getOptions().get(OPTION_VERBOSE));
int lastPeriod = index.lastIndexOf(‘.’);
String indexPackage = lastPeriod != -1 ? index.substring(0, lastPeriod) : null;
round++;
if (verbose) {
messager.printMessage(Diagnostic.Kind.NOTE, “Processing round “ + round + “, new annotations: “ +
!annotations.isEmpty() + “, processingOver: “ + env.processingOver());
}
if (env.processingOver()) {
if (!annotations.isEmpty()) {
messager.printMessage(Diagnostic.Kind.ERROR,
“Unexpected processing state: annotations still available after processing over”);
return false;
}
}
//如果当前方法没有注解那么就返回false
if (annotations.isEmpty()) {
return false;
}
//收集订阅者
collectSubscribers(annotations, env, messager);
checkForSubscribersToSkip(messager, indexPackage);
//如果不为空那么就创建Index文件
if (!methodsByClass.isEmpty()) {
createInfoIndexFile(index);
} else {
messager.printMessage(Diagnostic.Kind.WARNING, "No @Subscribe annotations found");
}
writerRoundDone = true;
} catch (RuntimeException e) {
// IntelliJ does not handle exceptions nicely, so log and print a message
e.printStackTrace();
messager.printMessage(Diagnostic.Kind.ERROR, "Unexpected error in EventBusAnnotationProcessor: " + e);
}
return true;
}
|
private void collectSubscribers(Set<? extends TypeElement> annotations, RoundEnvironment env, Messager messager) {
for (TypeElement annotation : annotations) {
Set<? extends Element> elements = env.getElementsAnnotatedWith(annotation);
for (Element element : elements) {
//如果是可执行的元素
if (element instanceof ExecutableElement) {
ExecutableElement method = (ExecutableElement) element;
//检查方法的修饰符以及参数个数
if (checkHasNoErrors(method, messager)) {
TypeElement classElement = (TypeElement) method.getEnclosingElement();
//方法所处的类 ——- 方法
methodsByClass.putElement(classElement, method);
}
} else {
messager.printMessage(Diagnostic.Kind.ERROR, “@Subscribe is only valid for methods”, element);
}
}
}
}
紧接着我们看下checkHasNoErrors这个方法,这个方法主要检查方法的修饰符以及方法的参数个数。 |
//检测方法的修饰符,以及方法的参数个数
private boolean checkHasNoErrors(ExecutableElement element, Messager messager) {
if (element.getModifiers().contains(Modifier.STATIC)) {
messager.printMessage(Diagnostic.Kind.ERROR, “Subscriber method must not be static”, element);
return false;
}
if (!element.getModifiers().contains(Modifier.PUBLIC)) {
messager.printMessage(Diagnostic.Kind.ERROR, "Subscriber method must be public", element);
return false;
}
List<? extends VariableElement> parameters = ((ExecutableElement) element).getParameters();
if (parameters.size() != 1) {
messager.printMessage(Diagnostic.Kind.ERROR, "Subscriber method must have exactly 1 parameter", element);
return false;
}
return true;
}
最后我们看下如何创建Index文件: |
private void createInfoIndexFile(String index) {
BufferedWriter writer = null;
try {
JavaFileObject sourceFile = processingEnv.getFiler().createSourceFile(index);
int period = index.lastIndexOf(‘.’);
String myPackage = period > 0 ? index.substring(0, period) : null;
String clazz = index.substring(period + 1);
writer = new BufferedWriter(sourceFile.openWriter());
if (myPackage != null) {
writer.write(“package “ + myPackage + “;\n\n”);
}
writer.write(“import org.greenrobot.eventbus.meta.SimpleSubscriberInfo;\n”);
writer.write(“import org.greenrobot.eventbus.meta.SubscriberMethodInfo;\n”);
writer.write(“import org.greenrobot.eventbus.meta.SubscriberInfo;\n”);
writer.write(“import org.greenrobot.eventbus.meta.SubscriberInfoIndex;\n\n”);
writer.write(“import org.greenrobot.eventbus.ThreadMode;\n\n”);
writer.write(“import java.util.HashMap;\n”);
writer.write(“import java.util.Map;\n\n”);
writer.write(“/** This class is generated by EventBus, do not edit. */\n”);
writer.write(“public class “ + clazz + “ implements SubscriberInfoIndex {\n”);
writer.write(“ private static final Map<Class>, SubscriberInfo> SUBSCRIBER_INDEX;\n\n");
writer.write(" static {\n");
writer.write(" SUBSCRIBER_INDEX = new HashMap
writeIndexLines(writer, myPackage);
writer.write(“ }\n\n”);
writer.write(“ private static void putIndex(SubscriberInfo info) {\n”);
writer.write(“ SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);\n”);
writer.write(“ }\n\n”);
writer.write(“ @Override\n”);
writer.write(“ public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {\n”);
writer.write(“ SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);\n”);
writer.write(“ if (info != null) {\n”);
writer.write(“ return info;\n”);
writer.write(“ } else {\n”);
writer.write(“ return null;\n”);
writer.write(“ }\n”);
writer.write(“ }\n”);
writer.write(“}\n”);
} catch (IOException e) {
throw new RuntimeException(“Could not write source for “ + index, e);
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
//Silent
}
}
}
}
|
/** This class is generated by EventBus, do not edit. */
public class MyEventBusIndex implements SubscriberInfoIndex {
private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;
static {
SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();
putIndex(new SimpleSubscriberInfo(com.idealist.tbfungeek.core.mvp.view.core.BaseWebViewActivity.class, true,
new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onProcessMainEvent", BaseMainEvent.class, ThreadMode.MAIN),
new SubscriberMethodInfo("onProcessAsyncEvent", BaseAsyncEvent.class, ThreadMode.ASYNC),
new SubscriberMethodInfo("onProcessBgEvent", BaseBackgroundEvent.class, ThreadMode.BACKGROUND),
new SubscriberMethodInfo("onProcessPostEvent", BasePostEvent.class),
}));
putIndex(new SimpleSubscriberInfo(com.idealist.tbfungeek.core.mvp.view.core.BaseActivity.class, true,
new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onProcessMainEvent", BaseMainEvent.class, ThreadMode.MAIN),
new SubscriberMethodInfo("onProcessAsyncEvent", BaseAsyncEvent.class, ThreadMode.ASYNC),
new SubscriberMethodInfo("onProcessBgEvent", BaseBackgroundEvent.class, ThreadMode.BACKGROUND),
new SubscriberMethodInfo("onProcessPostEvent", BasePostEvent.class),
}));
putIndex(new SimpleSubscriberInfo(com.idealist.tbfungeek.core.mvp.view.core.BaseFragment.class, true,
new SubscriberMethodInfo[] {
new SubscriberMethodInfo("onProcessMainEvent", BaseMainEvent.class, ThreadMode.MAIN),
new SubscriberMethodInfo("onProcessAsyncEvent", BaseAsyncEvent.class, ThreadMode.ASYNC),
new SubscriberMethodInfo("onProcessBgEvent", BaseBackgroundEvent.class, ThreadMode.BACKGROUND),
new SubscriberMethodInfo("onProcessPostEvent", BasePostEvent.class),
}));
}
private static void putIndex(SubscriberInfo info) {
SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}
@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
if (info != null) {
return info;
} else {
return null;
}
}
}
这里有个SUBSCRIBER_INDEX,每个当中存放着一个SimpleSubscriberInfo对象,每个SimpleSubscriberInfo对象由一个全路径名表示的class对象。这个class对象指明了当前是哪个订阅者,然后还包含着一个SubscriberMethodInfo数组,这里面存放的是每个订阅者的信息,包括方法名,事件类。threadMode等信息。我们上面也看到了在创建EventBusBuilder的时候通过addindex将这个生成的索引类对象注入。在findUsingInfo方法中调用索引类的getSubscriberInfo方法从SUBSCRIBER_INDEX中取出对应的SubscriberInfo也就是刚刚存放到里面的SimpleSubscriberInfo,我们看下SimpleSubscriberInfo的定义: |
public class SimpleSubscriberInfo extends AbstractSubscriberInfo {
private final SubscriberMethodInfo[] methodInfos;
public SimpleSubscriberInfo(Class subscriberClass, boolean shouldCheckSuperclass, SubscriberMethodInfo[] methodInfos) {
super(subscriberClass, null, shouldCheckSuperclass);
this.methodInfos = methodInfos;
}
@Override
public synchronized SubscriberMethod[] getSubscriberMethods() {
int length = methodInfos.length;
SubscriberMethod[] methods = new SubscriberMethod[length];
for (int i = 0; i < length; i++) {
SubscriberMethodInfo info = methodInfos[i];
methods[i] = createSubscriberMethod(info.methodName, info.eventType, info.threadMode,
info.priority, info.sticky);
}
return methods;
}
}
|
protected SubscriberMethod createSubscriberMethod(String methodName, Class<?> eventType,
ThreadMode threadMode,int priority, boolean sticky) {
try {
//从订阅类中获取指定方法名和事件类型的方法
Method method = subscriberClass.getDeclaredMethod(methodName, eventType);
return new SubscriberMethod(method, eventType, threadMode, priority, sticky);
} catch (NoSuchMethodException e) {
throw new EventBusException(“Could not find subscriber method in “ + subscriberClass +
“. Maybe a missing ProGuard rule?”, e);
}
}
|
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
/某某类中的某某方法 用于处理某个事件/
//获取当前订阅方法的事件类型(事件类型为订阅方法的参数)
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber/订阅类/, subscriberMethod/该订阅类的某个订阅方法/);
//从subscriptionsByEventType获取事件类型为eventType的订阅者,看下该事件是否已经订阅了
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
//如果为空(表示该事件还没找到订阅者)那么新建一个空的传进去
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
//如果不为空表示该事件已经有订阅者了,如果当前订阅者信息中已经有同样方法已经订阅了,那么抛出异常
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
//subscriptionsByEventType 按照事件类别对订阅者进行分类
// 事件类1
// |-----订阅类1 ---- 方法1
// |-----订阅类2 ---- 方法2
// |-----订阅类3 ---- 方法3
// |-----订阅类4 ---- 方法4
// |-----订阅类5 ---- 方法5
// |-----订阅类6 ---- 方法6
// |-----订阅类7 ---- 方法7
//优先级从大到小,适当的位置插入
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//typesBySubscriber
// |--------事件类型
// |--------事件类型
// |--------事件类型
// |--------事件类型
// |--------事件类型
//当前订阅者订阅了哪些事件集合.
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
//将当前的事件类型添加到当前订阅者订阅的事件集合
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
//如果当前事件类型是属于sticky事件类型,那么在注册的时候将其发送到订阅者
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
看完上面代码注释后我们来介绍下EventBus中三个重要的列表: |
subscriptionsByEventType: key是某个事件类型,value是订阅这个事件类型的订阅者列表(按照优先级顺序排列) 这个是最重要,事件分发到时候就是依赖这个表
typesBySubscriber:key 是某个订阅者,value是这个订阅者所订阅的事件类型。
stickyEvents:sticky事件列表
|
public void post(Object event) {
//获取当前posting线程的状态
PostingThreadState postingState = currentPostingThreadState.get();
//获取事件队列
List
发布过程如下: |
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
//事件类
Class> eventClass = event.getClass();
//是否找到订阅者
boolean subscriptionFound = false;
/*如果eventInheritance 为true 那么当前事件以及接口,子接口父类事件都会被post*/
if (eventInheritance) {
//获取当前事件以及接口,子接口,以及父类,比如当前的事件类型为MotionEvents ,那么MotionEvents本身,以及它的接口子接口,以及父类都会被添加到eventTypes
List
//找到对应的事件类型以及子类型调用postSingleEventForEventType
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//调用postSingleEventForEventType
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//如果没有找到订阅者
if (!subscriptionFound) {
//如果打开没有订阅者的Log发出对应的Log
if (logNoSubscriberMessages) {
Log.d(TAG, “No subscribers registered for event “ + eventClass);
}
//发送NoSubscriberEvent,我们可以注册这个来处理这个事件
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
|
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList
synchronized (this) {
//获取订阅这个事件的订阅者们,可以有多个
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//将事件传递给各个订阅类
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
//重置状态
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据订阅线程模式使用不同的poster进行订阅
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
//当前进程中执行
invokeSubscriber(subscription, event);
break;
case MAIN:
//在主线程中执行
if (isMainThread) {
//如果当前线程是主线程那么直接在当前线程中运行
invokeSubscriber(subscription, event);
} else {
//如果当前线程不是主线程,那么使用主线程Handler运行
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
//如果当前线程是主线程,那么使用backgroundPoster运行
backgroundPoster.enqueue(subscription, event);
} else {
//如果当前是后台线程,那么直接在这个后台线程中运行
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
//如果是异步的都归到asyncPoster运行
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException(“Unknown thread mode: “ + subscription.subscriberMethod.threadMode);
}
}
当事件发送到事件总线中后会使用事件类型从我们第一阶段形成的subscriptionsByEventType列表中获取对应的subscription,需要注意的是一个事件可能有多个subscription,所以获取到的是subscription列表。然后再根据subscription中的threadMode的情况来触发invokeSubscriber方法。在这个方法中实际上是调用subscription中method对象的invoke方法来出发点对应的订阅方法。 |
void invokeSubscriber(Subscription subscription, Object event) {
try {
//触发订阅方法,并将事件类型作为参数传给它
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException(“Unexpected exception”, e);
}
}
(2) MAIN: |
final class HandlerPoster extends Handler {
//……………..
void enqueue(Subscription subscription, Object event) {
//从池中获取一个PendingPost
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//入队
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException(“Could not send handler message”);
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
|
final class PendingPost {
private final static List
Object event;
Subscription subscription;
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
synchronized (pendingPostPool) {
int size = pendingPostPool.size();
if (size > 0) {
//从尾部取出后填充并返回
PendingPost pendingPost = pendingPostPool.remove(size - 1);
pendingPost.event = event;
pendingPost.subscription = subscription;
pendingPost.next = null;
return pendingPost;
}
}
//否则新建一个返回
return new PendingPost(event, subscription);
}
static void releasePendingPost(PendingPost pendingPost) {
pendingPost.event = null;
pendingPost.subscription = null;
pendingPost.next = null;
synchronized (pendingPostPool) {
//释放后放入池中
// Don't let the pool grow indefinitely
if (pendingPostPool.size() < 10000) {
pendingPostPool.add(pendingPost);
}
}
}
}
我们回过头看mainThreadPoster,当我们post一个事件到mainThreadPoster的时候,会触发向Handler 发送消息,收到消息后会在handleMessage不断循环调用invokeScriber.直到队列中的PandingPost处理完,或者处理事件超过设定的最大事件处理时间。整个流程如下所示: |
final class BackgroundPoster implements Runnable {
//..................
public void enqueue(Subscription subscription, Object event) {
//从池中获取一个可复用的PendingPost
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//将其添加到队列中
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
//将线程池设置为阻塞状态
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
//这里会不断从队列中获取可执行的,一直执行直到队列为空的时候退出。
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
![](/开源代码分析之EventBus/background.png)
最后我们再看AsyncPoster:
整个大体的结构和backgroundPoster很相似,但是它们之间的区别在于,AsyncPoster每次会从线程池中获取一个线程来执行,而backgroundPoster会在同一个现场中完成。
class AsyncPoster implements Runnable {
//............
public void enqueue(Subscription subscription, Object event) {
//从池中取出一个可复用的PendingPost对象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
//插入到队列中
queue.enqueue(pendingPost);
//调用线程池从线程池中取出一个运行
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
//取出队列中的第一个
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
//出发订阅者订阅方法
eventBus.invokeSubscriber(pendingPost);
}
}
![](/开源代码分析之EventBus/async.png)
到此整个EventBus的源码分析结束:
老规矩最后以一个图来总结整个流程:
首先我们需要先通过register来订阅某些事件,在调用register的时候,SubscriberFinder从订阅类中找出所有的订阅处理方法,并将其挂接在事件分发中心EventBus的subscriptionsByEventType列表中,如果有事件触发那么就会将事件发送到事件分发中心,事件分发中心就会从subscriptionsByEventType列表中查找订阅者方法。然后根据对应的threadMode在不同的poster中调用invokeScriber来执行订阅者方法。
![](/开源代码分析之EventBus/final.png)