博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
android源码分析-深入MessageQueue
阅读量:6938 次
发布时间:2019-06-27

本文共 14950 字,大约阅读时间需要 49 分钟。

承接上文在looper中会在一开始就创建一个MessageQueue,并且在loop中每次都会从其中取出一个message处理。那么我们就来看看这个MessageQueue:

MessageQueue(boolean quitAllowed) {        mQuitAllowed = quitAllowed;        mPtr = nativeInit();    }

nativeInit,无可避免的又要进入c层进行分析。对应的文件是/frameworks/base/core/jni/android_os_MessageQueue.cpp:

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();    if (!nativeMessageQueue) {        jniThrowRuntimeException(env, "Unable to allocate native queue");        return 0;    }    nativeMessageQueue->incStrong(env);    return reinterpret_cast
(nativeMessageQueue);}

这里创建了一个新的NativeMessageQueue并返回他的指针。这个类的定义也在此文件中,看看他的构造做了什么:

NativeMessageQueue::NativeMessageQueue() :        mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {    mLooper = Looper::getForThread();    if (mLooper == NULL) {        mLooper = new Looper(false);        Looper::setForThread(mLooper);    }}

新建了一个Looper对象,这个肯定不是java层的那个了,但是前后都有getForThread和setForThread。那么他们分别在干什么呢?我的理解是在做tls线程本地变量的处理,确保本线程只有一个looper。具体的内容在这里不再论述,后续有机会可以剖析下。

我们下面来看看这个Looper是什么吧,他的构造函数如下:

Looper::Looper(bool allowNonCallbacks) :        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),        mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),        mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);    LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",                        strerror(errno));    AutoMutex _l(mLock);    rebuildEpollLocked();}

除了状态的值得设置外,就是rebuildEpollLocked:

void Looper::rebuildEpollLocked() {    // Close old epoll instance if we have one.    if (mEpollFd >= 0) {#if DEBUG_CALLBACKS        ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);#endif        close(mEpollFd);    }    // Allocate the new epoll instance and register the wake pipe.    mEpollFd = epoll_create(EPOLL_SIZE_HINT);    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));    struct epoll_event eventItem;    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union    eventItem.events = EPOLLIN;    eventItem.data.fd = mWakeEventFd;    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",                        strerror(errno));    for (size_t i = 0; i < mRequests.size(); i++) {        const Request& request = mRequests.valueAt(i);        struct epoll_event eventItem;        request.initEventItem(&eventItem);        int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);        if (epollResult < 0) {            ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",                  request.fd, strerror(errno));        }    }}

我们看到了什么?epoll。这不是linux中的epoll吗?就是这个玩意,为了控制多个fd(文件描述符)的读写等事件而诞生的,一般多用于网络开发,类似win上的完成端口。然后新建了一个eventItem用于监听mWakeEventFd,就是将唤醒的eventfd放到epoll的监听队列中,用于唤醒机制。然后呢,进行了一个循环,取出所有的request,并且都放到了epoll监听,首次调用这个for循环不会被执行,因为mRequests的size是0。这些request都是什么呢?看定义:

struct Request {        int fd;        int ident;        int events;        int seq;        sp
callback; void* data; void initEventItem(struct epoll_event* eventItem) const; };

那么他们对应的具体内容又是什么呢?先放一放,往下看。

回到java层的loop函数中,每次调用next方法获取message,那么看看这个MessageQueue的next方法:

Message next() {        // Return here if the message loop has already quit and been disposed.        // This can happen if the application tries to restart a looper after quit        // which is not supported.        final long ptr = mPtr;        if (ptr == 0) {            return null;        }        int pendingIdleHandlerCount = -1; // -1 only during first iteration        int nextPollTimeoutMillis = 0;        for (;;) {            if (nextPollTimeoutMillis != 0) {                Binder.flushPendingCommands();            }            nativePollOnce(ptr, nextPollTimeoutMillis);            synchronized (this) {                // Try to retrieve the next message.  Return if found.                final long now = SystemClock.uptimeMillis();                Message prevMsg = null;                Message msg = mMessages;                if (msg != null && msg.target == null) {                    // Stalled by a barrier.  Find the next asynchronous message in the queue.                    do {                        prevMsg = msg;                        msg = msg.next;                    } while (msg != null && !msg.isAsynchronous());                }                if (msg != null) {                    if (now < msg.when) {                        // Next message is not ready.  Set a timeout to wake up when it is ready.                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);                    } else {                        // Got a message.                        mBlocked = false;                        if (prevMsg != null) {                            prevMsg.next = msg.next;                        } else {                            mMessages = msg.next;                        }                        msg.next = null;                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);                        msg.markInUse();                        return msg;                    }                } else {                    // No more messages.                    nextPollTimeoutMillis = -1;                }                // Process the quit message now that all pending messages have been handled.                if (mQuitting) {                    dispose();                    return null;                }                // If first time idle, then get the number of idlers to run.                // Idle handles only run if the queue is empty or if the first message                // in the queue (possibly a barrier) is due to be handled in the future.                if (pendingIdleHandlerCount < 0                        && (mMessages == null || now < mMessages.when)) {                    pendingIdleHandlerCount = mIdleHandlers.size();                }                if (pendingIdleHandlerCount <= 0) {                    // No idle handlers to run.  Loop and wait some more.                    mBlocked = true;                    continue;                }                if (mPendingIdleHandlers == null) {                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];                }                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);            }            // Run the idle handlers.            // We only ever reach this code block during the first iteration.            for (int i = 0; i < pendingIdleHandlerCount; i++) {                final IdleHandler idler = mPendingIdleHandlers[i];                mPendingIdleHandlers[i] = null; // release the reference to the handler                boolean keep = false;                try {                    keep = idler.queueIdle();                } catch (Throwable t) {                    Log.wtf(TAG, "IdleHandler threw exception", t);                }                if (!keep) {                    synchronized (this) {                        mIdleHandlers.remove(idler);                    }                }            }            // Reset the idle handler count to 0 so we do not run them again.            pendingIdleHandlerCount = 0;            // While calling an idle handler, a new message could have been delivered            // so go back and look again for a pending message without waiting.            nextPollTimeoutMillis = 0;        }    }

首先看到获取了mPtr,这个ptr就是c层的nativeMessageQueue的地址。然后进入了一个死循环,率先走了一个nativePollOnce(ptr, nextPollTimeoutMillis);内部调用了android_os_MessageQueue_nativePollOnce:

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,        jlong ptr, jint timeoutMillis) {    NativeMessageQueue* nativeMessageQueue = reinterpret_cast
(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis);}

这里实际上还原了地址为NativeMessageQueue对象,并调用了pollOnce方法:

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {    mPollEnv = env;    mPollObj = pollObj;    mLooper->pollOnce(timeoutMillis);    mPollObj = NULL;    mPollEnv = NULL;    if (mExceptionObj) {        env->Throw(mExceptionObj);        env->DeleteLocalRef(mExceptionObj);        mExceptionObj = NULL;    }}

保留了pollObj对象,并且调用了Looper的pollOnce。相当于c层Looper的初始化。那么来看看pollOnce:

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {    int result = 0;    for (;;) {        while (mResponseIndex < mResponses.size()) {            const Response& response = mResponses.itemAt(mResponseIndex++);            int ident = response.request.ident;            if (ident >= 0) {                int fd = response.request.fd;                int events = response.events;                void* data = response.request.data;#if DEBUG_POLL_AND_WAKE                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "                        "fd=%d, events=0x%x, data=%p",                        this, ident, fd, events, data);#endif                if (outFd != NULL) *outFd = fd;                if (outEvents != NULL) *outEvents = events;                if (outData != NULL) *outData = data;                return ident;            }        }        if (result != 0) {#if DEBUG_POLL_AND_WAKE            ALOGD("%p ~ pollOnce - returning result %d", this, result);#endif            if (outFd != NULL) *outFd = 0;            if (outEvents != NULL) *outEvents = 0;            if (outData != NULL) *outData = NULL;            return result;        }        result = pollInner(timeoutMillis);    }}

一个死循环,里面先是一个while,优先处理应答response(一个request对应一个response),并返回。如果没有response需要处理的时候,走pollInner。这个pollInner是个关键,代码比较多,我们节选看:

......    struct epoll_event eventItems[EPOLL_MAX_EVENTS];    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);    ......    for (int i = 0; i < eventCount; i++) {        int fd = eventItems[i].data.fd;        uint32_t epollEvents = eventItems[i].events;        if (fd == mWakeEventFd) {            if (epollEvents & EPOLLIN) {                awoken();            } else {                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);            }        } else {            ssize_t requestIndex = mRequests.indexOfKey(fd);            if (requestIndex >= 0) {                int events = 0;                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;                pushResponse(events, mRequests.valueAt(requestIndex));            } else {                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "                        "no longer registered.", epollEvents, fd);            }        }    }    ......

epoll_wait在mEpollFd上阻塞等待,直到有事件发生。如果等到了就执行下面的for循环,枚举每一个epoll_event,如果等待到的消息是唤醒消息(fd==mWakeEventFd),则执行awoken唤醒,否则判断epollEvents是否含有相关事件,如果有填写生成好的events,这个应该是转换一下事件为了上层使用。然后进行了pushResponse的动作,这里终于有个response生成的过程了,继续看下去:

void Looper::pushResponse(int events, const Request& request) {    Response response;    response.events = events;    response.request = request;    mResponses.push(response);}

看到了吧,就是个填充response的过程,并将其push到mResponses中。再回到pollInner中往下看:

......Done: ;    // Invoke pending message callbacks.    mNextMessageUptime = LLONG_MAX;    while (mMessageEnvelopes.size() != 0) {        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);        if (messageEnvelope.uptime <= now) {            // Remove the envelope from the list.            // We keep a strong reference to the handler until the call to handleMessage            // finishes.  Then we drop it so that the handler can be deleted *before*            // we reacquire our lock.            { // obtain handler                sp
handler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock();#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this, handler.get(), message.what);#endif handler->handleMessage(message); } // release handler mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } } ......

一上来就是一个while循环,处理一下之前堆积的事件。注意,这里是c层(native层)自己的消息,与java层的没关系。这里有个时间的对比,如果每个messageEnvelope的uptime<=now,也即是小于等于当前时间,那么这个uptime是个什么呢?我的理解是一个唤醒时间,也就是message的执行时间,因为message是允许被后置一段时间执行的。如果需要被执行的时间比当前时间晚,就调用这个message的handler的handleMessage。看起来很合理,就是为了清除一下之前堆积还未执行的事件的handle的回调。

之后又是一个for循环:

......    for (size_t i = 0; i < mResponses.size(); i++) {        Response& response = mResponses.editItemAt(i);        if (response.request.ident == POLL_CALLBACK) {            int fd = response.request.fd;            int events = response.events;            void* data = response.request.data;#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",                    this, response.request.callback.get(), fd, events, data);#endif            // Invoke the callback.  Note that the file descriptor may be closed by            // the callback (and potentially even reused) before the function returns so            // we need to be a little careful when removing the file descriptor afterwards.            int callbackResult = response.request.callback->handleEvent(fd, events, data);            if (callbackResult == 0) {                removeFd(fd, response.request.seq);            }            // Clear the callback reference in the response structure promptly because we            // will not clear the response vector itself until the next poll.            response.request.callback.clear();            result = POLL_CALLBACK;        }    }    ......

这里就是处理response了,就是走一个response.request.callback->handleEvent。

我们现在继续找线索下,在Looper的构造中出现了mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);,这个eventfd就是用来支持进程或者线程间通讯的通道,类似管道。

好吧,这个过程基本上分析完毕了,其实就是通过epoll不断的处理消息,并且调用消息的回调。但是其实整个过程还有很多不是很明确的地方,例如:1.这个epoll绑定的fd到底是个什么东西?是管道吗?网上的文章基本上都是说管道,这里我没有找到线索,不好确定。2.这个c层的looper中的sendmessage已经很明确是根据传递进来的参数来设定messageEnvelope的handler。但是调用他的是哪个东西呢?怎么和java层结合起来呢?有不少问题。

转载地址:http://tjbnl.baihongyu.com/

你可能感兴趣的文章
MyBatis注解select in参数
查看>>
禁止有道爬虫
查看>>
java核心技术I
查看>>
关于用VS实现开机自启动功能(win7/winXp)
查看>>
重拾Python 笔记五
查看>>
Yii 日志
查看>>
我的友情链接
查看>>
3.2 双向链表
查看>>
MySQL for Mac 安装和基本操作
查看>>
MFC does not support WINVER less than 0x0501.
查看>>
如何用一年时间学完MIT四年的计算机科学课程
查看>>
网络之二
查看>>
Go应用程序使用dockerfile multi-stage的问题
查看>>
吐槽:iOS要获得联系人所属分组好麻烦
查看>>
中文环境下的Weblogic
查看>>
[Android]开源中国源码分析——UI设计XML文件分析
查看>>
Angular CLI 常用命令
查看>>
Nginx的安装和配置
查看>>
mysql 5.6.27 rpm 安装
查看>>
Android Studio 创建项目常见问题
查看>>