简介

提到EventBus同样是每个Android 开发所必须掌握的一个开源库,它是一个事件发布订阅系统,用法十分简单,但是能够在很大程度上解决模块间存在的耦合问题,当某个模块的某个事件产生的时候,对应的事件通过post方法将其发布到Eventbus上,再由EventBus对该事件进行分发,如果某个类需要响应某个事件,必须事先通过register方法进行注册将自己订阅到总线上,这样EventBus就会根据实际的情况将事件源post出来的事件分发到有处理指定事件类型能力的订阅者方法上。一旦订阅者订阅了对应的事件,订阅者将会接收到对应类型的事件,直到调用unregitser方法取消注册。事件订阅方法必须使用@Subscribe注释,并且必须是public方法。返回值必须是void,并且只能有一个参数,该参数的类型为事件对象类型,用法就这么简单,三言两语就搞定了。但是这篇博客的关注点不在于它的使用上,这篇博客想从源码角度来对EventBus原理进行分析看下上述提到的功能是如何实现的。

但是在分析源码之前还是先要熟悉下EventBus是如何使用的,毕竟我们熟悉源码的目的也是为了应用。
首先我们先看下下面两张对比图:

第一张是没有使用EventBus的项目结构图,可以看出整个结构几乎呈网状,这样的结构相对来说耦合度就相对高。
而第二张整个结构呈现的是星型结构,EventBus处于星型结构的核心位置,主要负责事件的接送与调度。事件的产生源和事件的消费者耦合度就大大得降低了,大家只和EventBus进行交互,事件源将事件分发到事件总线,订阅者不会相互交互而是监听事件总线分发的事件。所以订阅者和事件源避免了之间的强约束。

这篇博客介绍的是EventBus 3.xEventBus 3 相对于之前的版本引入了EventBusAnnotationProcessor,我们可以使用编译时注解的方式来使用Eventbus了。在EventBus 早期的版本中事件注册信息的获取采用的是反射机制,这样就会导致效率上的降低,在EventBus 3的版本上并行使用反射和编译时注解两种方式,我们可以根据自己的实际需求来选择采用哪种方式。这个在后面源代码分析的时候会进行介绍。

在项目中引入EventBus 3.x

这里推荐大家使用编译时注解方式。我这边为了方便起见,也只介绍使用编译时注解的方式引入。
这里需要注意的是目前很多教程注解预编译所采用的是android-apt的方式,不过随着Android Gradle 插件 2.2 版本的发布,Android Studio推出了编译时预处理官方插件,所以Apt工具的作者也就宣布不再维护该工具了。目前使用的是annotationProcessor来取代android-apt方式。
如果大家使用的是还是android-apt方式的话,建议通过如下方式来切换到annotationProcess方式。

切换步骤:
首先要确保Android Gradle插件版本是2.2以上:

  1. 修改Project 的build.gradle配置

android-apt方式

dependencies {
classpath 'com.android.tools.build:gradle:2.2.3'
classpath 'com.neenbedankt.gradle.plugins:android-apt:1.8'
}

修改后annotationProcessor 方式

dependencies {
classpath 'com.android.tools.build:gradle:2.2.3'
}

也就是把原先的

classpath 'com.neenbedankt.gradle.plugins:android-apt:1.8' 去掉
  1. 修改module的build.gradle配置

android-apt方式

buildscript {
repositories {
jcenter()
}
dependencies {
classpath 'com.neenbedankt.gradle.plugins:android-apt:1.8'
}
}
````
apply plugin: 'com.neenbedankt.android-apt'

dependencies {
compile ‘org.greenrobot:eventbus:3.0.0’
apt’org.greenrobot:eventbus-annotation-processor:3.0.1’
}



修改后annotationProcessor 方式,只保留dependencies 里面的引用并且把apt 换成annotationProcessor就可以了

dependencies {
compile ‘org.greenrobot:eventbus:3.0.0’
annotationProcessor ‘org.greenrobot:eventbus-annotation-processor:3.0.1’
}


3. EventBus 3.0 Index类的制定方式

android-apt方式

apt {
arguments {
eventBusIndex “org.greenrobot.eventbus.demo.MyEventBusIndex”
}
}

修改后annotationProcessor  方式

defaultConfig {
javaCompileOptions {
annotationProcessorOptions {
arguments = [ eventBusIndex : ‘org.greenrobot.eventbus.demo.MyEventBusIndex’ ]
}
}
}


紧接着rebuild下项目,就会发现在build目录下上面指定报名下生成了一个MyEventBusIndex,这个文件是怎么生成的我们在介绍源码的时候会进行介绍,这里先不管这些。我们接下来就需要将生成的索引添加到EventBus中供事件分发是时候查找。我们知道EventBus默认有一个单例,可以通过getDefault()获取,但是这里我要做的是使用EventBusBuilder对其进行配置后再将Builder生成的EventBus作为默认的EventBus.

EventBus.builder().addIndex(new MyEventBusIndex()).installDefaultEventBus();



#### EventBus 的使用:

1. 定义订阅方法:
EventBus使用注解@Subscribe的方式来定义订阅方法。这里面我们需要重点了解下threadMode,sticky,priority这三个属性的含义:

@Subscribe(threadMode = ThreadMode.POSTING,sticky = false,priority = 1)
public void onMessageEvent(MessageEvent event) {
tv.setText(event.message);
}


(1)ThreadMode :

这时候订阅者执行的线程与事件的发布者所在的线程为同一个线程。也就是说


ThreadMode: POSTING
事件由哪个线程发布的,订阅者就在哪个线程中执行。这时候订阅者执行的线程与事件的发布者所在的线程为同一个线程,这个是EventBus默认的线程模式

ThreadMode: MAIN
订阅方法是在Android的主线程中运行的。如果提交的线程也是主线程,那么他就和ThreadMode.POSTING一样了。当然在由于是在主线程中运行的,所以在这里就不能执行一些耗时的任务。

ThreadMode: BACKGROUND
这种模式下,我们的订阅者将会在后台线程中执行。如果发布者是在主线程中进行的事件发布,那么订阅者将会重新开启一个子线程运行,若是发布者在不是在主线程中进行的事件发布,那么这时候订阅者就在发布者所在的线程中执行任务。

ThreadMode: ASYNC
这种模式下,订阅者将会独立运行在一个线程中。不管发布者是在主线程还是在子线程中进行事件的发布,订阅者都是在重新开启一个线程来执行任务。


(2)priority:

在订阅者中我们也可以为其设置优先级,优先级高的将会首先接收到发布者所发布的事件。并且我们还能在高优先中取消事件,这时候的优先级的订阅者将接收不到事件。这类似于BroadcastReceiver中的取消广播。不过这里有一点我们要注意,对于订阅者的优先级只是针对于相同的ThreadMode中。

(3) sticky
sticky与一般的事件的区别是sticky事件发送事件之后再订阅该事件还能收到所订阅事件的最新事件。一般事件在事件发生后才订阅是不会收到之前发送的事件的。
注解方法定义还需要注意,必须是public方法,必须只有一个参数,不能是抽象或者static方法。

2. 订阅事件:
register(this) 这个就不需要介绍了。对于sticky事件在注册的时候会立刻收到最新的事件。
3. 取消订阅:
unregister(this) 一定要记得取消订阅否则会继续接收到事件。这是一个很尴尬的事情。
4. 发布事件:
post(xxxx) postSticky 将事件发布到事件总线上让EventBus来调度。


##### EventBus源码解析:

1. EventBus 实例的创建
EventBus 的实例创建使用的是单例模式 + 建造者模式:

public static EventBus getDefault() {
//使用单例的方法创建eventbus
//getDefault方法使用了double check(双重检查锁定模式),多了一层判断,故可以减少上锁开销。
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}

在创建的时候我们会通过丢进去一个设置好各种属性后的Builder,然后在EventBus构造函数上从Builder上获取已经设置好的各种属性,这个在很多开源代码中都使用过这种方式,比如picasso Okhttp等,这种比较适合于有多个属性需要设置的情况。但是我们这里的重点在于EventBus有哪些属性,在下面代码中对一些部件进行了注释,还有一些没有注释的是比较重要的,需要在后面分析中重点提到的,我们接着往下看。

public EventBus() {
this(DEFAULT_BUILDER);
}


private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();

EventBus(EventBusBuilder builder) {

//这三个变量很重要后面会重点介绍
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();

//三个事件分发器对应不同的threadMode
//主线程分发器
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
//后台线程分发器
backgroundPoster = new BackgroundPoster(this);
//异步线程分发器
asyncPoster = new AsyncPoster(this);
//这个是执行任务的线程池
executorService = builder.executorService;


//这个是我们上面提到的在使用EventBus编译时注解方式的时候会通过addIndex将编译时生成的Index注入。subscriberInfoIndexes就是用于存放这些Index的。
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
//这个类负责查找订阅者方法
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex);


//下面是和调试相关的开关:
//是否打印订阅异常
logSubscriberExceptions = builder.logSubscriberExceptions;
//是否打印没有订阅者的Log
logNoSubscriberMessages = builder.logNoSubscriberMessages;
//是否发送订阅者异常
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
//是否发送没有订阅者的事件
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
//是否抛出订阅异常
throwSubscriberException = builder.throwSubscriberException;

//eventInheritance 设置为true的时候会发送事件以及当前事件所实现的接口以及当前事件的父类事件。
eventInheritance = builder.eventInheritance;

}


2. 注册订阅者

要将当前某个类作为某个事件的订阅者需要调用EventBus的register方法:

public void register(Object subscriber) {
//获取订阅者的class对象
Class<?> subscriberClass = subscriber.getClass();
//在这个类中查找对应的订阅方法以及父类的订阅方法
List subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
//针对每个订阅方法调用subscribe进行订阅
subscribe(subscriber, subscriberMethod);
}
}
}

这里会使用SubscriberMethodFinder对作为参数传入对象的class进行查找,找到对应的订阅方法以及父类的订阅方法。我们先看下这部分代码:

List findSubscriberMethods(Class subscriberClass) { //先从缓存中获取某个订阅类的订阅方法 List subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null) { //如果缓存中有那么就直接使用缓存中的缓存方法 return subscriberMethods; } //使用不同的方式来寻找订阅方法,EventBus3.0版本提供了EventBusAnnotationProcessor这个类,用于在编译期获取并缓存@Subscribe注解的方法 //ignoreGeneratedIndex = false 的时候使用编译期间获取到的Subscribe注释的方法 if (ignoreGeneratedIndex) { //使用’findUsingReflection(Class subscriberClass)‘方法,进行反射来获取
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//使用编译期间获取到的Subscribe注释方法
//通过 findUsingInfo(Class<?> subscriberClass) 在apt中进行查找获取
subscriberMethods = findUsingInfo(subscriberClass);
}

//如果在当前类以及父类中没找到任何的订阅方法抛出异常
if (subscriberMethods.isEmpty()) {
    throw new EventBusException("Subscriber " + subscriberClass
            + " and its super classes have no public methods with the @Subscribe annotation");
} else {
    //如果有找到那么添加到缓存中以便后续查找使用,存储的方式为订阅类----->订阅方法
    METHOD_CACHE.put(subscriberClass, subscriberMethods);
    return subscriberMethods;
}

}


查找订阅方法有两类一种是使用反射的方式来获取到@SubScriber注解的方法,另一种是EventBus3.0 版本之后会使用编译时注解方式在编译的时候获取并缓存@Subscribe注解的方法,后一种大家在上一篇介绍Butterknife源码的时候介绍了大致的方式,这里我还会带大家对EventBusAnnotationProcessor 这个注解处理器进行分析。

找到当前类包含@Subscribe方法后会将这些方法添加到内存缓存中,以便后续查找方便,因为特别是反射方式是一种十分耗性能的方式,这个解决方案在Butterknife上我们也看到过。
METHOD_CACHE 这个缓冲是以当前类的class对象作为key,每个key对应的值是当前类的所有使用@Subscribe注解的方法,也就是事件订阅方法。这里需要注意的是如果在当前类以及父类中没找到任何的订阅方法抛出异常。这是我第一次使用EventBus遇到的一个问题,记忆犹新。哈。


接下来我们来看下这两种获取订阅方法的流程,首先我们看下通过反射的方式 --- findUsingReflection


private List findUsingReflection(Class<?> subscriberClass) {
/** 为FindState创建一个对象池,复用FindState对象,防止对象被多次new或者gc. */
FindState findState = prepareFindState();
//将订阅方法赋给FindState对象
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//查找当前订阅类的订阅方法
findUsingReflectionInSingleClass(findState);
//查找当前订阅类父类的订阅方法
findState.moveToSuperclass();
}
//将所有的订阅方法从findState中取出并返回
return getMethodsAndRelease(findState);
}

首先在查找的时候,每次查找会对应一个FindState,这里为了避免频繁创建FindState对象,使用了复用对象池的方法,每次使用先在对象池中查找,如果有之前用过的就直接使用,避免了重新创建一个对象。实在没有的情况再通过new的方式来创建,用完后并不是立即就释放不用,而是放到缓存中供下一次使用。

private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
return new FindState();
}


我们先看下FindState的结构
![](/开源代码分析之EventBus/findstate.png)
FindState用于记录当前查的结果,我们首先会调用initForSubscriber来给FindState“打个标签”用于说明当前的FindState对象用于存放哪个订阅类的查找结果,然后在查找过程中会将查找结果先调用checkAdd以及checkAddWithMethodSignature对查找到的结果进行两级检测,然后再添加到subscriberMethods 中。

我们接下来看下如何使用反射来查找订阅方法:

//对注册对类的方法进行遍历,必须是public 必须只有一个参数,必须使用@SubScribe注释,并且当前事件类型
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
//获取全部的方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
for (Method method : methods) {
//获取方法的修饰符
int modifiers = method.getModifiers();
//在需要检查修饰符的情况下需要修饰符为public
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
//获取参数
Class[] parameterTypes = method.getParameterTypes(); //如果参数为1个那么满足要求 if (parameterTypes.length == 1) { //查看当前的注释释放包括Subscribe Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null) { //如果有Subscribe注释,那么parameterTypes[0]就是事件类型 Class eventType = parameterTypes[0];
//通过上面的层层筛选,获取到通过上述筛选的方法,以及方法中作为参数的事件类型,将其作为参数传递到findState进行检查
//检查分两级,一般只需要一级检查,检查当前类中当前事件是否被某个方法已经注册处理,如果没有那么就添加到subscriberMethods中。
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
//获取注释中的线程模型,事件类型,方法,优先级,是否是sticky方法
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + “.” + method.getName();
throw new EventBusException(“@Subscribe method “ + methodName +
“must have exactly 1 parameter but has “ + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + “.” + method.getName();
throw new EventBusException(methodName +
“ is a illegal @Subscribe method: must be public, non-static, and non-abstract”);
}
}
}

首先会检查方法的修饰符是否是public,是否只有一个参数,是否目前已经有订阅方法订阅了该事件,订阅该事件的订阅方法是否是同一个方法。如果通过上述的检查,就会将这个订阅方法添加到FindState中的subscriberMethods。查找完当前类后会继续查找其父类。最后调用getMethodsAndRelease将FindState中存放的找到的订阅方法取出,然后将FindState添加到对象池中。

private List findUsingInfo(Class<?> subscriberClass) {
//从池中取出一个不为空的FindState对象,避免了重新创建
FindState findState = prepareFindState();
//将当前的subscriberClass赋给FindState
findState.initForSubscriber(subscriberClass);
//如果事件对象不为空
while (findState.clazz != null) {
//从订阅者类到其父类,逐步获取订阅者信息
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
//如果存在订阅者信息,那么获取订阅者方法
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
//检查是否已经可以添加
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
//添加
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
//在当个类中使用反射来获取对应的订阅方法
findUsingReflectionInSingleClass(findState);
}
//移动到它的父类继续查找
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

上面的流程和之前介绍的用反射的流程一致,只不过在获取方式上存在差别:
我们重点看下getSubscriberInfo 方法:

private SubscriberInfo getSubscriberInfo(FindState findState) {

//这部分很重要回头要认真看下有部分与EventBusAnnotationProcessor相关
if (subscriberInfoIndexes != null) {
    for (SubscriberInfoIndex index : subscriberInfoIndexes) {
        SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
        if (info != null) {
            return info;
        }
    }
}
return null;

}

getSubscriberInfo 中会遍历subscriberInfoIndexes 取出其中的SubscribeInfo,subscriberInfoIndexes还记得是怎么来的吧,就是我们之前通过addIndex将编译生成的类添加进来。

接下来我们就需要深究EventBusAnnotationProcessor 以及它生成的代码了:
我们先看它的process方法,这个方法会在编译的时候执行。首先它会要求我们在build.gradle中有如下配置:

defaultConfig {
javaCompileOptions {
annotationProcessorOptions {
arguments = [ eventBusIndex : ‘org.greenrobot.eventbus.demo.MyEventBusIndex’ ]
}
}
}

否则就不会继续进行,eventBusIndex主要用于指定要生成哪个类。

@Override
public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment env) {
Messager messager = processingEnv.getMessager();
try {
//首先需要在build.gradle中配置生成的EventBusIndex类的名字,如果没有那么就会打印出错误消息
String index = processingEnv.getOptions().get(OPTION_EVENT_BUS_INDEX);
if (index == null) {
messager.printMessage(Diagnostic.Kind.ERROR, “No option “ + OPTION_EVENT_BUS_INDEX +
“ passed to annotation processor”);
return false;
}
verbose = Boolean.parseBoolean(processingEnv.getOptions().get(OPTION_VERBOSE));
int lastPeriod = index.lastIndexOf(‘.’);
String indexPackage = lastPeriod != -1 ? index.substring(0, lastPeriod) : null;
round++;
if (verbose) {
messager.printMessage(Diagnostic.Kind.NOTE, “Processing round “ + round + “, new annotations: “ +
!annotations.isEmpty() + “, processingOver: “ + env.processingOver());
}
if (env.processingOver()) {
if (!annotations.isEmpty()) {
messager.printMessage(Diagnostic.Kind.ERROR,
“Unexpected processing state: annotations still available after processing over”);
return false;
}
}
//如果当前方法没有注解那么就返回false
if (annotations.isEmpty()) {
return false;
}

    //收集订阅者
    collectSubscribers(annotations, env, messager);
    checkForSubscribersToSkip(messager, indexPackage);
    //如果不为空那么就创建Index文件
    if (!methodsByClass.isEmpty()) {
        createInfoIndexFile(index);
    } else {
        messager.printMessage(Diagnostic.Kind.WARNING, "No @Subscribe annotations found");
    }
    writerRoundDone = true;
} catch (RuntimeException e) {
    // IntelliJ does not handle exceptions nicely, so log and print a message
    e.printStackTrace();
    messager.printMessage(Diagnostic.Kind.ERROR, "Unexpected error in EventBusAnnotationProcessor: " + e);
}
return true;

}


collectSubscribers 方法检查并收集订阅方法,如果有订阅方法那么就调用createInfoIndexFile生成对应的index类。

首先看下collectSubscribers 方法,我们会对传入注解处理器的所有方法注解进行便利。然后调用checkHasNoErrors对其进行检查。这个和FindState对象的checkAdd类似。如果检查通过了就会将其添加到methodsByClass中。其中该注解所处的类作为key。对应的方法作为value

private void collectSubscribers(Set<? extends TypeElement> annotations, RoundEnvironment env, Messager messager) {
for (TypeElement annotation : annotations) {
Set<? extends Element> elements = env.getElementsAnnotatedWith(annotation);
for (Element element : elements) {
//如果是可执行的元素
if (element instanceof ExecutableElement) {
ExecutableElement method = (ExecutableElement) element;
//检查方法的修饰符以及参数个数
if (checkHasNoErrors(method, messager)) {
TypeElement classElement = (TypeElement) method.getEnclosingElement();
//方法所处的类 ——- 方法
methodsByClass.putElement(classElement, method);
}
} else {
messager.printMessage(Diagnostic.Kind.ERROR, “@Subscribe is only valid for methods”, element);
}
}
}
}

紧接着我们看下checkHasNoErrors这个方法,这个方法主要检查方法的修饰符以及方法的参数个数。

//检测方法的修饰符,以及方法的参数个数
private boolean checkHasNoErrors(ExecutableElement element, Messager messager) {
if (element.getModifiers().contains(Modifier.STATIC)) {
messager.printMessage(Diagnostic.Kind.ERROR, “Subscriber method must not be static”, element);
return false;
}

if (!element.getModifiers().contains(Modifier.PUBLIC)) {
    messager.printMessage(Diagnostic.Kind.ERROR, "Subscriber method must be public", element);
    return false;
}

List<? extends VariableElement> parameters = ((ExecutableElement) element).getParameters();
if (parameters.size() != 1) {
    messager.printMessage(Diagnostic.Kind.ERROR, "Subscriber method must have exactly 1 parameter", element);
    return false;
}
return true;

}

最后我们看下如何创建Index文件:

private void createInfoIndexFile(String index) {
BufferedWriter writer = null;
try {
JavaFileObject sourceFile = processingEnv.getFiler().createSourceFile(index);
int period = index.lastIndexOf(‘.’);
String myPackage = period > 0 ? index.substring(0, period) : null;
String clazz = index.substring(period + 1);
writer = new BufferedWriter(sourceFile.openWriter());
if (myPackage != null) {
writer.write(“package “ + myPackage + “;\n\n”);
}
writer.write(“import org.greenrobot.eventbus.meta.SimpleSubscriberInfo;\n”);
writer.write(“import org.greenrobot.eventbus.meta.SubscriberMethodInfo;\n”);
writer.write(“import org.greenrobot.eventbus.meta.SubscriberInfo;\n”);
writer.write(“import org.greenrobot.eventbus.meta.SubscriberInfoIndex;\n\n”);
writer.write(“import org.greenrobot.eventbus.ThreadMode;\n\n”);
writer.write(“import java.util.HashMap;\n”);
writer.write(“import java.util.Map;\n\n”);
writer.write(“/** This class is generated by EventBus, do not edit. */\n”);
writer.write(“public class “ + clazz + “ implements SubscriberInfoIndex {\n”);
writer.write(“ private static final Map<Class, SubscriberInfo> SUBSCRIBER_INDEX;\n\n"); writer.write(" static {\n"); writer.write(" SUBSCRIBER_INDEX = new HashMap, SubscriberInfo>();\n\n”);
writeIndexLines(writer, myPackage);
writer.write(“ }\n\n”);
writer.write(“ private static void putIndex(SubscriberInfo info) {\n”);
writer.write(“ SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);\n”);
writer.write(“ }\n\n”);
writer.write(“ @Override\n”);
writer.write(“ public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {\n”);
writer.write(“ SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);\n”);
writer.write(“ if (info != null) {\n”);
writer.write(“ return info;\n”);
writer.write(“ } else {\n”);
writer.write(“ return null;\n”);
writer.write(“ }\n”);
writer.write(“ }\n”);
writer.write(“}\n”);
} catch (IOException e) {
throw new RuntimeException(“Could not write source for “ + index, e);
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
//Silent
}
}
}
}


这样代码看起开很难看,我们直接看生成的代码样子吧,大体长这样:

/** This class is generated by EventBus, do not edit. */
public class MyEventBusIndex implements SubscriberInfoIndex {
private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;

static {
    SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();
    putIndex(new SimpleSubscriberInfo(com.idealist.tbfungeek.core.mvp.view.core.BaseWebViewActivity.class, true,
            new SubscriberMethodInfo[] {
        new SubscriberMethodInfo("onProcessMainEvent", BaseMainEvent.class, ThreadMode.MAIN),
        new SubscriberMethodInfo("onProcessAsyncEvent", BaseAsyncEvent.class, ThreadMode.ASYNC),
        new SubscriberMethodInfo("onProcessBgEvent", BaseBackgroundEvent.class, ThreadMode.BACKGROUND),
        new SubscriberMethodInfo("onProcessPostEvent", BasePostEvent.class),
    }));

    putIndex(new SimpleSubscriberInfo(com.idealist.tbfungeek.core.mvp.view.core.BaseActivity.class, true,
            new SubscriberMethodInfo[] {
        new SubscriberMethodInfo("onProcessMainEvent", BaseMainEvent.class, ThreadMode.MAIN),
        new SubscriberMethodInfo("onProcessAsyncEvent", BaseAsyncEvent.class, ThreadMode.ASYNC),
        new SubscriberMethodInfo("onProcessBgEvent", BaseBackgroundEvent.class, ThreadMode.BACKGROUND),
        new SubscriberMethodInfo("onProcessPostEvent", BasePostEvent.class),
    }));

    putIndex(new SimpleSubscriberInfo(com.idealist.tbfungeek.core.mvp.view.core.BaseFragment.class, true,
            new SubscriberMethodInfo[] {
        new SubscriberMethodInfo("onProcessMainEvent", BaseMainEvent.class, ThreadMode.MAIN),
        new SubscriberMethodInfo("onProcessAsyncEvent", BaseAsyncEvent.class, ThreadMode.ASYNC),
        new SubscriberMethodInfo("onProcessBgEvent", BaseBackgroundEvent.class, ThreadMode.BACKGROUND),
        new SubscriberMethodInfo("onProcessPostEvent", BasePostEvent.class),
    }));

}

private static void putIndex(SubscriberInfo info) {
    SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
}

@Override
public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
    SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
    if (info != null) {
        return info;
    } else {
        return null;
    }
}

}

这里有个SUBSCRIBER_INDEX,每个当中存放着一个SimpleSubscriberInfo对象,每个SimpleSubscriberInfo对象由一个全路径名表示的class对象。这个class对象指明了当前是哪个订阅者,然后还包含着一个SubscriberMethodInfo数组,这里面存放的是每个订阅者的信息,包括方法名,事件类。threadMode等信息。我们上面也看到了在创建EventBusBuilder的时候通过addindex将这个生成的索引类对象注入。在findUsingInfo方法中调用索引类的getSubscriberInfo方法从SUBSCRIBER_INDEX中取出对应的SubscriberInfo也就是刚刚存放到里面的SimpleSubscriberInfo,我们看下SimpleSubscriberInfo的定义:

public class SimpleSubscriberInfo extends AbstractSubscriberInfo {

private final SubscriberMethodInfo[] methodInfos;

public SimpleSubscriberInfo(Class subscriberClass, boolean shouldCheckSuperclass, SubscriberMethodInfo[] methodInfos) {
    super(subscriberClass, null, shouldCheckSuperclass);
    this.methodInfos = methodInfos;
}

@Override
public synchronized SubscriberMethod[] getSubscriberMethods() {
    int length = methodInfos.length;
    SubscriberMethod[] methods = new SubscriberMethod[length];
    for (int i = 0; i < length; i++) {
        SubscriberMethodInfo info = methodInfos[i];
        methods[i] = createSubscriberMethod(info.methodName, info.eventType, info.threadMode,
                info.priority, info.sticky);
    }
    return methods;
}

}


这里的SubscriberMethodInfo 就是该订阅类的订阅订阅方法。在回头看下findUsingInfo方法。findUsingInfo中会继续调用SubscriberMethodInfo的getSubscriberMethods
来获得SubscriberMethod。如果认真看的话我们会发现刚刚添加SubscriberMethodInfo的时候方法实用的是方法名称,而SubscriberMethod中存放的是Method对象,也就是说这里还必须有个转换。这个转换就发生在createSubscriberMethod中。

protected SubscriberMethod createSubscriberMethod(String methodName, Class<?> eventType,
ThreadMode threadMode,int priority, boolean sticky) {
try {
//从订阅类中获取指定方法名和事件类型的方法
Method method = subscriberClass.getDeclaredMethod(methodName, eventType);
return new SubscriberMethod(method, eventType, threadMode, priority, sticky);
} catch (NoSuchMethodException e) {
throw new EventBusException(“Could not find subscriber method in “ + subscriberClass +
“. Maybe a missing ProGuard rule?”, e);
}
}


之后的逻辑就和之前的一样了,先进行检查后添加到FindState中的scribeMethod列表中,然后再拷贝出来并将FindState放回对象池。

整个SubscriberFinder的流程如下图所示:
不论是通过反射还是编译时注解大体的流程都一致的,都是先从FindState对象池中获取一个FindState用于存放查找的中间结果,然后调用SubscribeFinder针对订阅类中的订阅方法进行查找,查找到对应的订阅方法后,调用checkAdd进行检查是否有相同的方法订阅同一个事件,经过上述的步骤后获取到我们想要的订阅方法列表,最后将这个列表从FindState中拷贝出来,存放到subscribeMethods列表中,并将FindState放回到对象池中,以便下一次使用。
![](/开源代码分析之EventBus/subscribe_finder.png)


我们获得当前订阅类中的订阅方法后我们还需要将其添加到事件总线中,以便事件总线根据对应的关系来分发事件源发送到事件。我们就来看下这部分代码:

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
/某某类中的某某方法 用于处理某个事件/
//获取当前订阅方法的事件类型(事件类型为订阅方法的参数)
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber/订阅类/, subscriberMethod/该订阅类的某个订阅方法/);

//从subscriptionsByEventType获取事件类型为eventType的订阅者,看下该事件是否已经订阅了
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
    //如果为空(表示该事件还没找到订阅者)那么新建一个空的传进去
    subscriptions = new CopyOnWriteArrayList<>();
    subscriptionsByEventType.put(eventType, subscriptions);
} else {
    //如果不为空表示该事件已经有订阅者了,如果当前订阅者信息中已经有同样方法已经订阅了,那么抛出异常
    if (subscriptions.contains(newSubscription)) {
        throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                + eventType);
    }
}

//subscriptionsByEventType  按照事件类别对订阅者进行分类
// 事件类1
//    |-----订阅类1  ---- 方法1
//    |-----订阅类2  ---- 方法2
//    |-----订阅类3  ---- 方法3
//    |-----订阅类4  ---- 方法4
//    |-----订阅类5  ---- 方法5
//    |-----订阅类6  ---- 方法6
//    |-----订阅类7  ---- 方法7

//优先级从大到小,适当的位置插入
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
    if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
        subscriptions.add(i, newSubscription);
        break;
    }
}

//typesBySubscriber
//       |--------事件类型
//       |--------事件类型
//       |--------事件类型
//       |--------事件类型
//       |--------事件类型
//当前订阅者订阅了哪些事件集合.
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
    subscribedEvents = new ArrayList<>();
    typesBySubscriber.put(subscriber, subscribedEvents);
}
//将当前的事件类型添加到当前订阅者订阅的事件集合
subscribedEvents.add(eventType);

if (subscriberMethod.sticky) {
    if (eventInheritance) {
        // Existing sticky events of all subclasses of eventType have to be considered.
        // Note: Iterating over all events may be inefficient with lots of sticky events,
        // thus data structure should be changed to allow a more efficient lookup
        // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
        Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
        for (Map.Entry<Class<?>, Object> entry : entries) {
            Class<?> candidateEventType = entry.getKey();
            if (eventType.isAssignableFrom(candidateEventType)) {
                Object stickyEvent = entry.getValue();
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    } else {
        //如果当前事件类型是属于sticky事件类型,那么在注册的时候将其发送到订阅者
        Object stickyEvent = stickyEvents.get(eventType);
        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
    }
}

}

看完上面代码注释后我们来介绍下EventBus中三个重要的列表:

subscriptionsByEventType: key是某个事件类型,value是订阅这个事件类型的订阅者列表(按照优先级顺序排列) 这个是最重要,事件分发到时候就是依赖这个表
typesBySubscriber:key 是某个订阅者,value是这个订阅者所订阅的事件类型。
stickyEvents:sticky事件列表


关于这些列表的实际作用我们介绍完事件分发后再来看看这个在事件分发过程中是如何起作用的。

3. 事件源发布事件:
事件的发布是通过post以及postSticky来发布的,我们先来看下通过post发布普通事件的流程:

public void post(Object event) {
//获取当前posting线程的状态
PostingThreadState postingState = currentPostingThreadState.get();
//获取事件队列
List eventQueue = postingState.eventQueue;
//将事件添加到事件队列中
eventQueue.add(event);
if (!postingState.isPosting) {
//是否是在主线程中
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
//将postingState置为true
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException(“Internal error. Abort state was not reset”);
}
try {
while (!eventQueue.isEmpty()) {
//将队头的元素发布出去
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

发布过程如下:
(1) 将当前事件添加到事件队列列表中
(2) 查看事件队列列表是否有缓存的事件,如果有那么将队头的事件发布出去


private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
//事件类
Class eventClass = event.getClass(); //是否找到订阅者 boolean subscriptionFound = false; /*如果eventInheritance 为true 那么当前事件以及接口,子接口父类事件都会被post*/ if (eventInheritance) { //获取当前事件以及接口,子接口,以及父类,比如当前的事件类型为MotionEvents ,那么MotionEvents本身,以及它的接口子接口,以及父类都会被添加到eventTypes List> eventTypes = lookupAllEventTypes(eventClass);
//找到对应的事件类型以及子类型调用postSingleEventForEventType
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//调用postSingleEventForEventType
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//如果没有找到订阅者
if (!subscriptionFound) {
//如果打开没有订阅者的Log发出对应的Log
if (logNoSubscriberMessages) {
Log.d(TAG, “No subscribers registered for event “ + eventClass);
}
//发送NoSubscriberEvent,我们可以注册这个来处理这个事件
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}


首先这里面有个关键变量eventInheritance 这个变量如果是true,那么调用post的时候不但会讲当前事件发布出去,也会将其父类以及接口类作为事件发布出去,这个值默认是true,这个需要注意下。
紧接着最关键的代码在postSingleEventForEventType以及postToSubscription 中

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList subscriptions;
synchronized (this) {
//获取订阅这个事件的订阅者们,可以有多个
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//将事件传递给各个订阅类
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
//重置状态
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}


private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据订阅线程模式使用不同的poster进行订阅
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
//当前进程中执行
invokeSubscriber(subscription, event);
break;
case MAIN:
//在主线程中执行
if (isMainThread) {
//如果当前线程是主线程那么直接在当前线程中运行
invokeSubscriber(subscription, event);
} else {
//如果当前线程不是主线程,那么使用主线程Handler运行
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
//如果当前线程是主线程,那么使用backgroundPoster运行
backgroundPoster.enqueue(subscription, event);
} else {
//如果当前是后台线程,那么直接在这个后台线程中运行
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
//如果是异步的都归到asyncPoster运行
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException(“Unknown thread mode: “ + subscription.subscriberMethod.threadMode);
}
}

当事件发送到事件总线中后会使用事件类型从我们第一阶段形成的subscriptionsByEventType列表中获取对应的subscription,需要注意的是一个事件可能有多个subscription,所以获取到的是subscription列表。然后再根据subscription中的threadMode的情况来触发invokeSubscriber方法。在这个方法中实际上是调用subscription中method对象的invoke方法来出发点对应的订阅方法。

我们来详细看下不同threadMode是如何处理的:
(1)POSTING
这种情况最为简单就是直接调用invokeSubscriber

void invokeSubscriber(Subscription subscription, Object event) {
try {
//触发订阅方法,并将事件类型作为参数传给它
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException(“Unexpected exception”, e);
}
}

(2) MAIN:
这种情况会先判断当前线程是否是主线程,如果是的话就直接调用invokeSubscriber,如果不是的话就必须依靠mainThreadPoster了,我们看下mainThreadPoster:

final class HandlerPoster extends Handler {

//……………..
void enqueue(Subscription subscription, Object event) {
//从池中获取一个PendingPost
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//入队
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException(“Could not send handler message”);
}
}
}
}

@Override
public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
        long started = SystemClock.uptimeMillis();
        while (true) {
            PendingPost pendingPost = queue.poll();
            if (pendingPost == null) {
                synchronized (this) {
                    // Check again, this time in synchronized
                    pendingPost = queue.poll();
                    if (pendingPost == null) {
                        handlerActive = false;
                        return;
                    }
                }
            }
            eventBus.invokeSubscriber(pendingPost);
            long timeInMethod = SystemClock.uptimeMillis() - started;
            if (timeInMethod >= maxMillisInsideHandleMessage) {
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
                rescheduled = true;
                return;
            }
        }
    } finally {
        handlerActive = rescheduled;
    }
}

}


在介绍mainPoster之前我们先来看下PendingPost,这里有个对象池,最大的大小为10000,每次发布事件的时候不是直接通过new一个PendingPost而是从对象池中取出一个已经存在的PendingPost,通过这种复用对象池来缓解频繁创建对象带来的性能问题。

final class PendingPost {
private final static List pendingPostPool = new ArrayList();

Object event;
Subscription subscription;
PendingPost next;

private PendingPost(Object event, Subscription subscription) {
    this.event = event;
    this.subscription = subscription;
}

static PendingPost obtainPendingPost(Subscription subscription, Object event) {
    synchronized (pendingPostPool) {
        int size = pendingPostPool.size();
        if (size > 0) {
            //从尾部取出后填充并返回
            PendingPost pendingPost = pendingPostPool.remove(size - 1);
            pendingPost.event = event;
            pendingPost.subscription = subscription;
            pendingPost.next = null;
            return pendingPost;
        }
    }
    //否则新建一个返回
    return new PendingPost(event, subscription);
}

static void releasePendingPost(PendingPost pendingPost) {
    pendingPost.event = null;
    pendingPost.subscription = null;
    pendingPost.next = null;
    synchronized (pendingPostPool) {
        //释放后放入池中
        // Don't let the pool grow indefinitely
        if (pendingPostPool.size() < 10000) {
            pendingPostPool.add(pendingPost);
        }
    }
}

}

我们回过头看mainThreadPoster,当我们post一个事件到mainThreadPoster的时候,会触发向Handler 发送消息,收到消息后会在handleMessage不断循环调用invokeScriber.直到队列中的PandingPost处理完,或者处理事件超过设定的最大事件处理时间。整个流程如下所示:
![](/开源代码分析之EventBus/mainposter.png)

接着我们看下BackgroundPoster,这里会从线程池中获取一个线程,然后调用execute(this) 触发BackgroundPoster的run方法,在run方法中会从PendingPostQueue中不断取出PendingPost调用invokeSubscriber

final class BackgroundPoster implements Runnable {

//..................
public void enqueue(Subscription subscription, Object event) {
    //从池中获取一个可复用的PendingPost
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
        //将其添加到队列中
        queue.enqueue(pendingPost);
        if (!executorRunning) {
            executorRunning = true;
            //将线程池设置为阻塞状态
            eventBus.getExecutorService().execute(this);
        }
    }
}

@Override
public void run() {
    try {
        try {
            //这里会不断从队列中获取可执行的,一直执行直到队列为空的时候退出。
            while (true) {
                PendingPost pendingPost = queue.poll(1000);
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            executorRunning = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
            }
        } catch (InterruptedException e) {
            Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
        }
    } finally {
        executorRunning = false;
    }
}

}

![](/开源代码分析之EventBus/background.png)


最后我们再看AsyncPoster:
整个大体的结构和backgroundPoster很相似,但是它们之间的区别在于,AsyncPoster每次会从线程池中获取一个线程来执行,而backgroundPoster会在同一个现场中完成。

class AsyncPoster implements Runnable {

//............
public void enqueue(Subscription subscription, Object event) {
    //从池中取出一个可复用的PendingPost对象
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    //插入到队列中
    queue.enqueue(pendingPost);
    //调用线程池从线程池中取出一个运行
    eventBus.getExecutorService().execute(this);
}

@Override
public void run() {
    //取出队列中的第一个
    PendingPost pendingPost = queue.poll();
    if(pendingPost == null) {
        throw new IllegalStateException("No pending post available");
    }
    //出发订阅者订阅方法
    eventBus.invokeSubscriber(pendingPost);
}

}


![](/开源代码分析之EventBus/async.png)

到此整个EventBus的源码分析结束:
老规矩最后以一个图来总结整个流程:
首先我们需要先通过register来订阅某些事件,在调用register的时候,SubscriberFinder从订阅类中找出所有的订阅处理方法,并将其挂接在事件分发中心EventBus的subscriptionsByEventType列表中,如果有事件触发那么就会将事件发送到事件分发中心,事件分发中心就会从subscriptionsByEventType列表中查找订阅者方法。然后根据对应的threadMode在不同的poster中调用invokeScriber来执行订阅者方法。
![](/开源代码分析之EventBus/final.png)






















Contents
  1. 1. 简介
  2. 2. 在项目中引入EventBus 3.x