本文基于 SDK-33
之前的文章,通过源码分析了 Handler 对象的创建、发送消息等一系列功能,在讲解发送消息的时候,Handler 通过调用 MessageQueue 的 enqueueMessage
方法将消息传入到消息队列中。接下来,我们分析一下 MessageQueue 的源码:
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
MessageQueue 的访问修饰符是 default 的,只允许保内访问,所以我们无法直接通过构造方法创建 MessageQueue,事实上,实际开发中也无须我们自己手动创建消息队列。
MessageQueue 是在 Looper 对象创建时候随之创建的,这个在 Looper 的源码分析中会讲到。
在 Handler 发送消息的一系列方法中,最终都是调用了 MessageQueue 的 enqueueMessage
方法:
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {//在 Handler 发送 Message 时,会设置 Message 的 target 为 Handler 对象本身
throw new IllegalArgumentException("Message must have a target.");
}
synchronized (this) {//由于有可能会有不同的线程在往同一个消息队列中传入消息,为保障数据安全,必须加锁来保证线程安全
if (msg.isInUse()) {//检查消息是否已经被回收了
throw new IllegalStateException(msg + " This message is already in use.");
}
if (mQuitting) {//检查消息队列是否已经退出,如果队列退出,则回收消息并返回 false,通知 Handler 消息入队失败
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;//设置 Message 的执行时间
Message p = mMessages;//队列中的头部消息
boolean needWake;
if (p == null || when == 0 || when < p.when) {//插入到队列头部
//如果头部为 null(队列为空)
//如果 when 字段为 0,表示消息是 Handler 的 sendMessageAtFrontOfQueue 方法发出的
//如果入队 Message 的执行时间小于原队列头部 Message 的执行时间
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;//如果目前是阻塞状态,则唤醒
} else {//插入到队列中间位置
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
//是否唤醒
//如果目前处于阻塞状态+队列头部消息是同步屏障消息+插入的消息是异步消息,则唤醒
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;//标记上一个元素,作为临时标记
for (;;) {//遍历队列,找到合适位置
prev = p;//头部元素指向临时元素
p = p.next;//原队列中第二个元素指向第一的位置
if (p == null || when < p.when) {//如果原队列第二个元素为空(只有一个元素)或者待插入消息的执行时间小于原队列第二个元素的执行之间
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
//下面两步,将消息插入到队列中
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}
消息加入到队列之中后,就可以在合适的时间去取了,取出消息的操作是由 Looper 来进行的,调用了 MessageQueue 的 next 方法:
@UnsupportedAppUsage
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {//mPtr 在队列销毁时,会设值为 0,如果此时为0,则表示队列已经释放,直接返回 null
return null;
}
//待处理 IdleHandler 数量,初次迭代时,设置为 -1
int pendingIdleHandlerCount = -1; // -1 only during first iteration
//下一次执行的唤醒时间
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
//该方法的作用是根据 nextPollTimeoutMillis 来等待下一个 Message
//如果 nextPollTimeoutMillis > 0:最长等待 nextPollTimeoutMillis 毫秒,但中间有可能会被打断
//如果 nextPollTimeoutMillis == 0:立刻返回,不用等待,代码会继续往下执行
//如果 nextPollTimeoutMillis == -1:一直等待下去,直至被打断
//见:https://stackoverflow.com/questions/38818642/android-what-is-message-queue-native-poll-once-in-android
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {//保障安全,线程加锁
// Try to retrieve the next message. Return if found.
final long now = SystemClock.uptimeMillis();//当前时间
Message prevMsg = null;//临时消息
Message msg = mMessages;//队列头部消息
//【【这里需要重点注意】】
//如果发送了同步屏障消息,一定要记得删除,否则同步消息会一直得不到执行
if (msg != null && msg.target == null) {//如果头部是同步屏障消息
// Stalled by a barrier. Find the next asynchronous message in the queue.
//头部是屏障消息的话,则遍历整个队列,寻找队列中的异步消息
//如果队列中存在异步消息,则可以获取到
//如果队列中没有异步消息,则继续往下进行
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());//同步消息都略过,直到队尾或者找到异步消息
}
if (msg != null) {//此时 msg 为队列中的异步消息或者头部的同步消息(非同步屏障消息)
if (now < msg.when) {//还没到执行时间
// Next message is not ready. Set a timeout to wake up when it is ready.
//根据获取到的消息执行之间和当前时间,得到一个阻塞时间,以便到时唤醒
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// Got a message.
//获取到消息
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();//将消息标记为使用
return msg;//将消息返回
}
} else {
// No more messages.
//没有消息的时候,将 nextPollTimeoutMillis 设置为 -1,系统进入阻塞状态,等待被唤醒
nextPollTimeoutMillis = -1;
}
// Process the quit message now that all pending messages have been handled.
//队列退出,mPtr 设置为 0
if (mQuitting) {
dispose();
return null;
}
// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
//首次进入,并且消息队列为空或者消息未到执行时间时,初始化 pendingIdleHandlerCount
pendingIdleHandlerCount = mIdleHandlers.size();
}
//如果也没有 IdleHandler 需要处理,则跳出循环
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
mBlocked = true;//没有任何消息需要处理了,那就设置阻塞标志,进入阻塞状态
continue;//暂时没有消息处理,也没有 IdleHandler 需要处理,那么则跳出 for 循环,此时 nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); 起作用。
}
//创建待处理 IdleHandler 数组
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// Run the idle handlers.
// We only ever reach this code block during the first iteration.
//开始执行 IdleHandler
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);//如果 idleHandler 返回 false,则执行一次之后就将其移除
}
}
}
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;//idleHandler 遍历执行完了,将数量归0
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
nextPollTimeoutMillis = 0;//设置阻塞时间为 0,执行到这里,证明 pendingIdleHandlerCount > 0,在执行 idle Handler的时候,有可能会有新消息进入,所以需要重置阻塞时间
//重新一轮的 for 循环去获取消息
}
}
在 Handler 对象中,有 removeCallbacks
、 removeMessages
和 removeCallbacksAndMessages
方法用以移除已经发送的消息,实际最终调用的是 MessageQueue 的移除方法。
MessageQueue 提供了以下从队列中移除消息的方法:
void removeMessages(Handler h, int what, Object object)
void removeMessages(Handler h, int what, Object object) {
if (h == null) { //Handler 为空,直接返回
return;
}
//因为可能存在不同线程调用 Handler 的移除消息方法,为保证线程安全,需要加锁
synchronized (this) {
Message p = mMessages;//拿到队列中头部消息
// Remove all messages at front.
while (p != null && p.target == h && p.what == what
&& (object == null || p.obj == object)) {//从头部开始遍历,如果头部就是符合删除条件的 Message,那么从头部开始,遍历全部队列,移除消息
Message n = p.next;
mMessages = n;
p.recycleUnchecked();
p = n;
}
// Remove all messages after front.
//经过上一个前面的 while 循环,可以确定头部不是符合删除条件的 Message,那么遍历后面的队列内容,找到符合条件的 Message,删除之
while (p != null) {
Message n = p.next;
if (n != null) {
if (n.target == h && n.what == what
&& (object == null || n.obj == object)) {
Message nn = n.next;
n.recycleUnchecked();
p.next = nn;
continue;
}
}
p = n;
}
}
}
void removeEqualMessages(Handler h, int what, Object object)
删除方式同上,区别只是在于寻找 Message 的条件不同,不赘述。
void removeMessages(Handler h, Runnable r, Object object)
删除方式同上,区别只是在于寻找 Message 的条件不同,不赘述。
void removeEqualMessages(Handler h, Runnable r, Object object)
删除方式同上,区别只是在于寻找 Message 的条件不同,不赘述。
void removeCallbacksAndMessages(Handler h, Object object)
删除方式同上,区别只是在于寻找 Message 的条件不同,不赘述。
void removeCallbacksAndEqualMessages(Handler h, Object object)
删除方式同上,区别只是在于寻找 Message 的条件不同,不赘述。
private void removeAllMessagesLocked()
该方法在 MessageQueue 退出时,用于清空全部队列消息,就是一个简单的 while 循环,简单粗暴,在非安全退出时调用。
private void removeAllMessagesLocked() {
Message p = mMessages;
while (p != null) {
Message n = p.next;
p.recycleUnchecked();
p = n;
}
mMessages = null;
}
private void removeAllFutureMessagesLocked()
在安全退出 MessageQueue 时,清空消息队列:
private void removeAllFutureMessagesLocked() {
final long now = SystemClock.uptimeMillis();//系统时间
Message p = mMessages;//头消息
if (p != null) {
if (p.when > now) {//头消息大于当前时间,证明所有 Message 都还没到执行时间,那么就全部移除
removeAllMessagesLocked();
} else {
//遍历消息队列,找到大于当前时间的位置,将位置之后的 Message 移除。
Message n;
for (;;) {
n = p.next;
if (n == null) {
return;
}
if (n.when > now) {
break;
}
p = n;
}
p.next = null;
//移除操作
do {
p = n;
n = p.next;
p.recycleUnchecked();
} while (n != null);
}
}
}
在 Handler 对象中调用的 hasMessages
和 hasCallbacks
实际调用的也是 MessageQueue 方法,就是遍历整个队列,根据条件进行匹配:
boolean hasMessages(Handler h, int what, Object object) {
if (h == null) {
return false;
}
synchronized (this) {
Message p = mMessages;
while (p != null) {
if (p.target == h && p.what == what && (object == null || p.obj == object)) {
return true;
}
p = p.next;
}
return false;
}
}
boolean hasEqualMessages(Handler h, int what, Object object) {
if (h == null) {
return false;
}
synchronized (this) {
Message p = mMessages;
while (p != null) {
if (p.target == h && p.what == what && (object == null || object.equals(p.obj))) {
return true;
}
p = p.next;
}
return false;
}
}
@UnsupportedAppUsage(maxTargetSdk = Build.VERSION_CODES.R, trackingBug = 170729553)
boolean hasMessages(Handler h, Runnable r, Object object) {
if (h == null) {
return false;
}
synchronized (this) {
Message p = mMessages;
while (p != null) {
if (p.target == h && p.callback == r && (object == null || p.obj == object)) {
return true;
}
p = p.next;
}
return false;
}
}
boolean hasMessages(Handler h) {
if (h == null) {
return false;
}
synchronized (this) {
Message p = mMessages;
while (p != null) {
if (p.target == h) {
return true;
}
p = p.next;
}
return false;
}
}
在消息出列的代码中有这样一段:
if (msg != null && msg.target == null) {
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
可以看到显示在 if 条件中判断 Message 的 target
是否为 null
,正常发送的消息都会在 Handler 的 enqueueMessage
方法中将 Message 的 target 设置为 Handler 对象本身,那么什么时候 Message 的 target 为 null 呢?
实际上,在 Handler 中,消息分为以下三种:
同步/异步消息可以通过 Message 的
setAsynchronous
方法来设置,也可以通过 Handler 的创建方式来设置。
但不管是同步还是异步,都会为 Message 的 target 设置一个 Handler 对象,同步屏障消息无须设置 target 。正常情况下,对于消息的入列和出列,同步异步之间没有区别,但是当 MessageQueue 中插入一个同步屏障消息的时候,同步消息就会被阻拦,而异步消息却可以正常出列,就像刚才那段代码中展示的那样,在消息出列的过程中,遇到一个同步屏障时,会先判断屏障之后的消息是否是同步(msg != null && !msg.isAsynchronous()
),如果是同步,则略过,如果是异步,才会继续往下走,将消息出列返回。
在上面的代码中还可以看出一个问题,当同步屏障插入之后,如果没有移除,则会进行判断,后续的同步消息都会无法执行,所以要在适当的时机移除消息屏障。
系统提供了插入和移除同步屏障消息的方法:
@UnsupportedAppUsage
@TestApi
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}
@UnsupportedAppUsage
@TestApi
public void removeSyncBarrier(int token) {
// Remove a sync barrier token from the queue.
// If the queue is no longer stalled by a barrier then wake it.
synchronized (this) {
Message prev = null;
Message p = mMessages;
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
final boolean needWake;
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
p.recycleUnchecked();
// If the loop is quitting then it is already awake.
// We can assume mPtr != 0 when mQuitting is false.
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}
但它们是无法直接调用的,只能通过反射的方式来调用,下面简单演示一下:
发送同步屏障:
MessageQueue queue = handler.getLooper().getQueue();
Method method = queue.getClass().getDeclaredMethod("postSyncBarrier");
method.setAccessible(true);
token = (int) method.invoke(queue);
移除同步屏障:
MessageQueue queue = handler.getLooper().getQueue();
Method method = queue.getClass().getDeclaredMethod("removeSyncBarrier", int.class);
method.setAccessible(true);
method.invoke(queue, token);
可以通过同步屏障消息,来进行一些 Message 优先级的操作。但是需要注意的是,不要往主线程的 MessageQueue 中发送同步屏障消息,否则会引起正常的系统同步消息的执行。
void quit(boolean safe) {
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}
synchronized (this) {
if (mQuitting) {
return;
}
mQuitting = true;
if (safe) {
removeAllFutureMessagesLocked();
} else {
removeAllMessagesLocked();
}
// We can assume mPtr != 0 because mQuitting was previously false.
nativeWake(mPtr);
}
}
调用的是移除消息的方法