源码分析 Handler 机制(二) —— MessageQueue

本文基于 SDK-33

之前的文章,通过源码分析了 Handler 对象的创建、发送消息等一系列功能,在讲解发送消息的时候,Handler 通过调用 MessageQueue 的 enqueueMessage 方法将消息传入到消息队列中。接下来,我们分析一下 MessageQueue 的源码:

MessageQueue 对象的创建

    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        mPtr = nativeInit();
    }

MessageQueue 的访问修饰符是 default 的,只允许保内访问,所以我们无法直接通过构造方法创建 MessageQueue,事实上,实际开发中也无须我们自己手动创建消息队列。

MessageQueue 是在 Looper 对象创建时候随之创建的,这个在 Looper 的源码分析中会讲到。

消息入列 enqueueMessage()

在 Handler 发送消息的一系列方法中,最终都是调用了 MessageQueue 的 enqueueMessage 方法:

    boolean enqueueMessage(Message msg, long when) {
        if (msg.target == null) {//在 Handler 发送 Message 时,会设置 Message 的 target 为 Handler 对象本身
            throw new IllegalArgumentException("Message must have a target.");
        }

        synchronized (this) {//由于有可能会有不同的线程在往同一个消息队列中传入消息,为保障数据安全,必须加锁来保证线程安全
            if (msg.isInUse()) {//检查消息是否已经被回收了
                throw new IllegalStateException(msg + " This message is already in use.");
            }

            if (mQuitting) {//检查消息队列是否已经退出,如果队列退出,则回收消息并返回 false,通知 Handler 消息入队失败
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG, e.getMessage(), e);
                msg.recycle();
                return false;
            }

            msg.markInUse();
            msg.when = when;//设置 Message 的执行时间
            Message p = mMessages;//队列中的头部消息
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {//插入到队列头部
                //如果头部为 null(队列为空)
                //如果 when 字段为 0,表示消息是 Handler 的 sendMessageAtFrontOfQueue 方法发出的
                //如果入队 Message 的执行时间小于原队列头部 Message 的执行时间
                // New head, wake up the event queue if blocked.
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;//如果目前是阻塞状态,则唤醒
            } else {//插入到队列中间位置
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                //是否唤醒
                //如果目前处于阻塞状态+队列头部消息是同步屏障消息+插入的消息是异步消息,则唤醒
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;//标记上一个元素,作为临时标记
                for (;;) {//遍历队列,找到合适位置
                    prev = p;//头部元素指向临时元素
                    p = p.next;//原队列中第二个元素指向第一的位置
                    if (p == null || when < p.when) {//如果原队列第二个元素为空(只有一个元素)或者待插入消息的执行时间小于原队列第二个元素的执行之间
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                //下面两步,将消息插入到队列中
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }

            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
                nativeWake(mPtr);
            }
        }
        return true;
    }

消息出列 next()

消息加入到队列之中后,就可以在合适的时间去取了,取出消息的操作是由 Looper 来进行的,调用了 MessageQueue 的 next 方法:

@UnsupportedAppUsage
    Message next() {
        // Return here if the message loop has already quit and been disposed.
        // This can happen if the application tries to restart a looper after quit
        // which is not supported.
        final long ptr = mPtr;
        if (ptr == 0) {//mPtr 在队列销毁时,会设值为 0,如果此时为0,则表示队列已经释放,直接返回 null
            return null;
        }

        //待处理 IdleHandler 数量,初次迭代时,设置为 -1
        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        //下一次执行的唤醒时间
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
            //该方法的作用是根据 nextPollTimeoutMillis 来等待下一个 Message
            //如果 nextPollTimeoutMillis > 0:最长等待 nextPollTimeoutMillis 毫秒,但中间有可能会被打断
            //如果 nextPollTimeoutMillis == 0:立刻返回,不用等待,代码会继续往下执行
            //如果 nextPollTimeoutMillis == -1:一直等待下去,直至被打断
            //见:https://stackoverflow.com/questions/38818642/android-what-is-message-queue-native-poll-once-in-android
            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {//保障安全,线程加锁
                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();//当前时间
                Message prevMsg = null;//临时消息
                Message msg = mMessages;//队列头部消息
                //【【这里需要重点注意】】
                //如果发送了同步屏障消息,一定要记得删除,否则同步消息会一直得不到执行
                if (msg != null && msg.target == null) {//如果头部是同步屏障消息
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    //头部是屏障消息的话,则遍历整个队列,寻找队列中的异步消息
                    //如果队列中存在异步消息,则可以获取到
                    //如果队列中没有异步消息,则继续往下进行
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());//同步消息都略过,直到队尾或者找到异步消息
                }

                if (msg != null) {//此时 msg 为队列中的异步消息或者头部的同步消息(非同步屏障消息)
                    if (now < msg.when) {//还没到执行时间
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        //根据获取到的消息执行之间和当前时间,得到一个阻塞时间,以便到时唤醒
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        //获取到消息
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        msg.markInUse();//将消息标记为使用
                        return msg;//将消息返回
                    }
                } else {
                    // No more messages.
                    //没有消息的时候,将 nextPollTimeoutMillis 设置为 -1,系统进入阻塞状态,等待被唤醒
                    nextPollTimeoutMillis = -1;
                }

                // Process the quit message now that all pending messages have been handled.
                //队列退出,mPtr 设置为 0
                if (mQuitting) {
                    dispose();
                    return null;
                }

                // If first time idle, then get the number of idlers to run.
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    //首次进入,并且消息队列为空或者消息未到执行时间时,初始化 pendingIdleHandlerCount
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                //如果也没有 IdleHandler 需要处理,则跳出循环
                if (pendingIdleHandlerCount <= 0) {
                    // No idle handlers to run.  Loop and wait some more.
                    mBlocked = true;//没有任何消息需要处理了,那就设置阻塞标志,进入阻塞状态
                    continue;//暂时没有消息处理,也没有 IdleHandler 需要处理,那么则跳出 for 循环,此时 nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); 起作用。
                }

                //创建待处理 IdleHandler 数组
                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            // Run the idle handlers.
            // We only ever reach this code block during the first iteration.
            //开始执行 IdleHandler
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }

                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);//如果 idleHandler 返回 false,则执行一次之后就将其移除
                    }
                }
            }

            // Reset the idle handler count to 0 so we do not run them again.
            pendingIdleHandlerCount = 0;//idleHandler 遍历执行完了,将数量归0

            // While calling an idle handler, a new message could have been delivered
            // so go back and look again for a pending message without waiting.
            nextPollTimeoutMillis = 0;//设置阻塞时间为 0,执行到这里,证明 pendingIdleHandlerCount > 0,在执行 idle Handler的时候,有可能会有新消息进入,所以需要重置阻塞时间

            //重新一轮的 for 循环去获取消息
        }
    }

从队列中移除消息

在 Handler 对象中,有 removeCallbacksremoveMessagesremoveCallbacksAndMessages 方法用以移除已经发送的消息,实际最终调用的是 MessageQueue 的移除方法。

MessageQueue 提供了以下从队列中移除消息的方法:

  • void removeMessages(Handler h, int what, Object object)

    void removeMessages(Handler h, int what, Object object) {
          if (h == null) { //Handler 为空,直接返回
              return;
          }
    
      //因为可能存在不同线程调用 Handler 的移除消息方法,为保证线程安全,需要加锁
          synchronized (this) {
              Message p = mMessages;//拿到队列中头部消息
    
              // Remove all messages at front.
              while (p != null && p.target == h && p.what == what
                     && (object == null || p.obj == object)) {//从头部开始遍历,如果头部就是符合删除条件的 Message,那么从头部开始,遍历全部队列,移除消息
                  Message n = p.next;
                  mMessages = n;
                  p.recycleUnchecked();
                  p = n;
              }
    
              // Remove all messages after front.
              //经过上一个前面的 while 循环,可以确定头部不是符合删除条件的 Message,那么遍历后面的队列内容,找到符合条件的 Message,删除之
              while (p != null) {
                  Message n = p.next;
                  if (n != null) {
                      if (n.target == h && n.what == what
                          && (object == null || n.obj == object)) {
                          Message nn = n.next;
                          n.recycleUnchecked();
                          p.next = nn;
                          continue;
                      }
                  }
                  p = n;
              }
          }
      }
    
  • void removeEqualMessages(Handler h, int what, Object object)

    删除方式同上,区别只是在于寻找 Message 的条件不同,不赘述。

  • void removeMessages(Handler h, Runnable r, Object object)

    删除方式同上,区别只是在于寻找 Message 的条件不同,不赘述。

  • void removeEqualMessages(Handler h, Runnable r, Object object)

    删除方式同上,区别只是在于寻找 Message 的条件不同,不赘述。

  • void removeCallbacksAndMessages(Handler h, Object object)

    删除方式同上,区别只是在于寻找 Message 的条件不同,不赘述。

  • void removeCallbacksAndEqualMessages(Handler h, Object object)

    删除方式同上,区别只是在于寻找 Message 的条件不同,不赘述。

  • private void removeAllMessagesLocked()

    该方法在 MessageQueue 退出时,用于清空全部队列消息,就是一个简单的 while 循环,简单粗暴,在非安全退出时调用

      private void removeAllMessagesLocked() {
          Message p = mMessages;
          while (p != null) {
              Message n = p.next;
              p.recycleUnchecked();
              p = n;
          }
          mMessages = null;
      }
    
  • private void removeAllFutureMessagesLocked()

    在安全退出 MessageQueue 时,清空消息队列:

    private void removeAllFutureMessagesLocked() {
          final long now = SystemClock.uptimeMillis();//系统时间
          Message p = mMessages;//头消息
          if (p != null) {
              if (p.when > now) {//头消息大于当前时间,证明所有 Message 都还没到执行时间,那么就全部移除
                  removeAllMessagesLocked();
              } else {
                  //遍历消息队列,找到大于当前时间的位置,将位置之后的 Message 移除。
                  Message n;
                  for (;;) {
                      n = p.next;
                      if (n == null) {
                          return;
                      }
                      if (n.when > now) {
                          break;
                      }
                      p = n;
                  }
                  p.next = null;
                  //移除操作
                  do {
                      p = n;
                      n = p.next;
                      p.recycleUnchecked();
                  } while (n != null);
              }
          }
      }
    

是否有 Message

在 Handler 对象中调用的 hasMessageshasCallbacks 实际调用的也是 MessageQueue 方法,就是遍历整个队列,根据条件进行匹配:

      boolean hasMessages(Handler h, int what, Object object) {
          if (h == null) {
              return false;
          }

          synchronized (this) {
              Message p = mMessages;
              while (p != null) {
                  if (p.target == h && p.what == what && (object == null || p.obj == object)) {
                      return true;
                  }
                  p = p.next;
              }
              return false;
          }
      }

      boolean hasEqualMessages(Handler h, int what, Object object) {
          if (h == null) {
              return false;
          }

          synchronized (this) {
              Message p = mMessages;
              while (p != null) {
                  if (p.target == h && p.what == what && (object == null || object.equals(p.obj))) {
                      return true;
                  }
                  p = p.next;
              }
              return false;
          }
      }

      @UnsupportedAppUsage(maxTargetSdk = Build.VERSION_CODES.R, trackingBug = 170729553)
      boolean hasMessages(Handler h, Runnable r, Object object) {
          if (h == null) {
              return false;
          }

          synchronized (this) {
              Message p = mMessages;
              while (p != null) {
                  if (p.target == h && p.callback == r && (object == null || p.obj == object)) {
                      return true;
                  }
                  p = p.next;
              }
              return false;
          }
      }

      boolean hasMessages(Handler h) {
          if (h == null) {
              return false;
          }

          synchronized (this) {
              Message p = mMessages;
              while (p != null) {
                  if (p.target == h) {
                      return true;
                  }
                  p = p.next;
              }
              return false;
          }
      }

同步屏障

在消息出列的代码中有这样一段:

                  if (msg != null && msg.target == null) {
                      do {
                          prevMsg = msg;
                          msg = msg.next;
                      } while (msg != null && !msg.isAsynchronous());
                  }

可以看到显示在 if 条件中判断 Message 的 target 是否为 null,正常发送的消息都会在 Handler 的 enqueueMessage 方法中将 Message 的 target 设置为 Handler 对象本身,那么什么时候 Message 的 target 为 null 呢?

实际上,在 Handler 中,消息分为以下三种:

  1. 同步消息
  2. 异步消息
  3. 同步屏障消息

同步/异步消息可以通过 Message 的 setAsynchronous 方法来设置,也可以通过 Handler 的创建方式来设置。

但不管是同步还是异步,都会为 Message 的 target 设置一个 Handler 对象,同步屏障消息无须设置 target 。正常情况下,对于消息的入列和出列,同步异步之间没有区别,但是当 MessageQueue 中插入一个同步屏障消息的时候,同步消息就会被阻拦,而异步消息却可以正常出列,就像刚才那段代码中展示的那样,在消息出列的过程中,遇到一个同步屏障时,会先判断屏障之后的消息是否是同步(msg != null && !msg.isAsynchronous()),如果是同步,则略过,如果是异步,才会继续往下走,将消息出列返回。

在上面的代码中还可以看出一个问题,当同步屏障插入之后,如果没有移除,则会进行判断,后续的同步消息都会无法执行,所以要在适当的时机移除消息屏障。

系统提供了插入和移除同步屏障消息的方法:

    @UnsupportedAppUsage
    @TestApi
    public int postSyncBarrier() {
        return postSyncBarrier(SystemClock.uptimeMillis());
    }

    private int postSyncBarrier(long when) {
        // Enqueue a new sync barrier token.
        // We don't need to wake the queue because the purpose of a barrier is to stall it.
        synchronized (this) {
            final int token = mNextBarrierToken++;
            final Message msg = Message.obtain();
            msg.markInUse();
            msg.when = when;
            msg.arg1 = token;

            Message prev = null;
            Message p = mMessages;
            if (when != 0) {
                while (p != null && p.when <= when) {
                    prev = p;
                    p = p.next;
                }
            }
            if (prev != null) { // invariant: p == prev.next
                msg.next = p;
                prev.next = msg;
            } else {
                msg.next = p;
                mMessages = msg;
            }
            return token;
        }
    }

    @UnsupportedAppUsage
    @TestApi
    public void removeSyncBarrier(int token) {
        // Remove a sync barrier token from the queue.
        // If the queue is no longer stalled by a barrier then wake it.
        synchronized (this) {
            Message prev = null;
            Message p = mMessages;
            while (p != null && (p.target != null || p.arg1 != token)) {
                prev = p;
                p = p.next;
            }
            if (p == null) {
                throw new IllegalStateException("The specified message queue synchronization "
                        + " barrier token has not been posted or has already been removed.");
            }
            final boolean needWake;
            if (prev != null) {
                prev.next = p.next;
                needWake = false;
            } else {
                mMessages = p.next;
                needWake = mMessages == null || mMessages.target != null;
            }
            p.recycleUnchecked();

            // If the loop is quitting then it is already awake.
            // We can assume mPtr != 0 when mQuitting is false.
            if (needWake && !mQuitting) {
                nativeWake(mPtr);
            }
        }
    }

但它们是无法直接调用的,只能通过反射的方式来调用,下面简单演示一下:

发送同步屏障:
                    MessageQueue queue = handler.getLooper().getQueue();
                    Method method = queue.getClass().getDeclaredMethod("postSyncBarrier");
                    method.setAccessible(true);
                    token = (int) method.invoke(queue);
移除同步屏障:
                    MessageQueue queue = handler.getLooper().getQueue();
                    Method method = queue.getClass().getDeclaredMethod("removeSyncBarrier", int.class);
                    method.setAccessible(true);
                    method.invoke(queue, token);

可以通过同步屏障消息,来进行一些 Message 优先级的操作。但是需要注意的是,不要往主线程的 MessageQueue 中发送同步屏障消息,否则会引起正常的系统同步消息的执行。

退出

void quit(boolean safe) {
        if (!mQuitAllowed) {
            throw new IllegalStateException("Main thread not allowed to quit.");
        }

        synchronized (this) {
            if (mQuitting) {
                return;
            }
            mQuitting = true;

            if (safe) {
                removeAllFutureMessagesLocked();
            } else {
                removeAllMessagesLocked();
            }

            // We can assume mPtr != 0 because mQuitting was previously false.
            nativeWake(mPtr);
        }
    }

调用的是移除消息的方法

Copyright© 2020-2022 li-xyz 冀ICP备2022001112号-1