redis源码笔记-ae.c

系统 2424 0

ae.c是redis事件框架的具体实现,这篇blog对这份源码进行简单说明。其中谈到了作者已经标记的一些未来可能做的改进。

ae.c

      
          1
      
       #include <stdio.h>

      
          2
      
       #include <sys/time.h>

      
          3
      
       #include <sys/types.h>

      
          4
      
       #include <unistd.h>

      
          5
      
       #include <stdlib.h>

      
          6
      
      
          7
      
       #include 
      
        "
      
      
        ae.h
      
      
        "
      
      
          8
      
       #include 
      
        "
      
      
        zmalloc.h
      
      
        "
      
      
          9
      
       #include 
      
        "
      
      
        config.h
      
      
        "
      
      
         10
      
      
         11
      
      
        /*
      
      
         Include the best multiplexing layer supported by this system.

      
      
         12
      
      
         * The following should be ordered by performances, descending. 
      
      
        */
        
//为了支持不同的平台,redis用相同的接口封装了系统提供的多路复用层代码。接口共提供如下函数:aeApiCreate\aeApiFree\aeApiAddEvent\aeApiDelEvent\aeApiPoll\aeApiName函数,以及一个struct aeApiState
//通过包含不同的头文件,选择不同的底层实现
//按如下顺序,效率递减: epoll > kqueue > select 13 #ifdef HAVE_EPOLL 14 #include " ae_epoll.c " 15 #else 16 #ifdef HAVE_KQUEUE 17 #include " ae_kqueue.c " 18 #else 19 #include " ae_select.c " 20 #endif 21 #endif 22
//初始化函数,创建事件循环,函数内部alloc一个结构,用于表示事件状态,供后续其他函数作为参数使用 23 aeEventLoop *aeCreateEventLoop( void ) { 24 aeEventLoop * eventLoop; 25 int i; 26 27 eventLoop = zmalloc( sizeof (* eventLoop)); 28 if (!eventLoop) return NULL;
//时间event用链表存储
29 eventLoop->timeEventHead = NULL; 30 eventLoop->timeEventNextId = 0 ;
//表示是否停止事件循环
31 eventLoop->stop = 0 ;
//maxfd只由ae_select.c使用,后续有些相关的处理,如果使用epoll的话,其实可以进行简化
32 eventLoop->maxfd = - 1 ;
//每次调用epoll\select前调用的函数,由框架使用者注册
33 eventLoop->beforesleep = NULL; 34 if (aeApiCreate(eventLoop) == - 1 ) { 35 zfree(eventLoop); 36 return NULL; 37 } 38 /* Events with mask == AE_NONE are not set. So let's initialize the 39 * vector with it. */

//将所有的文件描述符的mask设置为无效值,作为初始化 40 for (i = 0 ; i < AE_SETSIZE; i++ ) 41 eventLoop->events[i].mask = AE_NONE; 42 return eventLoop; 43 } 44
//底层实现执行释放操作后,释放state的内存 45 void aeDeleteEventLoop(aeEventLoop * eventLoop) { 46 aeApiFree(eventLoop); 47 zfree(eventLoop); 48 } 49
//停止事件循环,redis作为一个无限循环的server,在redis.c中并没有任何一处调用此函数 50 void aeStop(aeEventLoop * eventLoop) { 51 eventLoop->stop = 1 ; 52 } 53
//创建文件fd事件,加入事件循环监控列表,使得后续epoll\select时将会测试这个文件描述符的可读性(可写性) 54 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, 55 aeFileProc *proc, void * clientData) 56 { 57 if (fd >= AE_SETSIZE) return AE_ERR; 58 aeFileEvent *fe = &eventLoop-> events[fd]; 59 60 if (aeApiAddEvent(eventLoop, fd, mask) == - 1 ) 61 return AE_ERR; 62 fe->mask |= mask;
//注册函数
63 if (mask & AE_READABLE) fe->rfileProc = proc; 64 if (mask & AE_WRITABLE) fe->wfileProc = proc; 65 fe->clientData = clientData;
//更新maxfd
66 if (fd > eventLoop-> maxfd) 67 eventLoop->maxfd = fd; 68 return AE_OK; 69 } 70 71 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) 72 { 73 if (fd >= AE_SETSIZE) return ; 74 aeFileEvent *fe = &eventLoop-> events[fd]; 75 76 if (fe->mask == AE_NONE) return ; 77 fe->mask = fe->mask & (~ mask); 78 if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { 79 /* Update the max fd */ 80 int j; 81 82 for (j = eventLoop->maxfd- 1 ; j >= 0 ; j-- ) 83 if (eventLoop->events[j].mask != AE_NONE) break ; 84 eventLoop->maxfd = j; 85 } 86 aeApiDelEvent(eventLoop, fd, mask); 87 } 88
//返回值为该文件描述符关注的事件类型(可读、可写) 89 int aeGetFileEvents(aeEventLoop *eventLoop, int fd) { 90 if (fd >= AE_SETSIZE) return 0 ; 91 aeFileEvent *fe = &eventLoop-> events[fd]; 92 93 return fe-> mask; 94 } 95 96 static void aeGetTime( long *seconds, long * milliseconds) 97 { 98 struct timeval tv; 99 100 gettimeofday(& tv, NULL); 101 *seconds = tv.tv_sec; 102 *milliseconds = tv.tv_usec/ 1000 ; 103 } 104 105 static void aeAddMillisecondsToNow( long long milliseconds, long *sec, long * ms) { 106 long cur_sec, cur_ms, when_sec, when_ms; 107 108 aeGetTime(&cur_sec, & cur_ms); 109 when_sec = cur_sec + milliseconds/ 1000 ; 110 when_ms = cur_ms + milliseconds% 1000 ; 111 if (when_ms >= 1000 ) { 112 when_sec ++ ; 113 when_ms -= 1000 ; 114 } 115 *sec = when_sec; 116 *ms = when_ms; 117 } 118 119 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, 120 aeTimeProc *proc, void * clientData, 121 aeEventFinalizerProc * finalizerProc) 122 { 123 long long id = eventLoop->timeEventNextId++ ; 124 aeTimeEvent * te; 125 126 te = zmalloc( sizeof (* te)); 127 if (te == NULL) return AE_ERR; 128 te->id = id;
//time event执行时间为绝对时间点
129 aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te-> when_ms); 130 te->timeProc = proc; 131 te->finalizerProc = finalizerProc; 132 te->clientData = clientData;
//time event链表为无序表,直接插入到链表头
133 te->next = eventLoop-> timeEventHead; 134 eventLoop->timeEventHead = te; 135 return id; 136 } 137
//从链表中删除,是以id作为key查找的(顺序遍历) 138 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) 139 { 140 aeTimeEvent *te, *prev = NULL; 141 142 te = eventLoop-> timeEventHead; 143 while (te) { 144 if (te->id == id) { 145 if (prev == NULL) 146 eventLoop->timeEventHead = te-> next; 147 else 148 prev->next = te-> next; 149 if (te-> finalizerProc) 150 te->finalizerProc(eventLoop, te-> clientData); 151 zfree(te); 152 return AE_OK; 153 } 154 prev = te; 155 te = te-> next; 156 } 157 return AE_ERR; /* NO event with the specified ID found */ 158 } 159 160 /* Search the first timer to fire. 161 * This operation is useful to know how many time the select can be 162 * put in sleep without to delay any event. (没大看懂),也许是给一个阻塞时间的上限值 163 * If there are no timers NULL is returned. 164 * 165 * Note that's O(N) since time events are unsorted. 166 * Possible optimizations (not needed by Redis so far, but...): 167 * 1) Insert the event in order, so that the nearest is just the head. 168 * Much better but still insertion or deletion of timers is O(N). 169 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). 170 */ 171 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop * eventLoop) 172 { 173 aeTimeEvent *te = eventLoop-> timeEventHead; 174 aeTimeEvent *nearest = NULL; 175 176 while (te) { 177 if (!nearest || te->when_sec < nearest->when_sec || 178 (te->when_sec == nearest->when_sec && 179 te->when_ms < nearest-> when_ms)) 180 nearest = te; 181 te = te-> next; 182 } 183 return nearest; 184 } 185
//对time event进行处理 186 /* Process time events */ 187 static int processTimeEvents(aeEventLoop * eventLoop) {
//处理的事件数
188 int processed = 0 ; 189 aeTimeEvent * te; 190 long long maxId; 191 192 te = eventLoop-> timeEventHead; 193 maxId = eventLoop->timeEventNextId- 1 ; 194 while (te) { 195 long now_sec, now_ms; 196 long long id; 197 198 if (te->id > maxId) { 199 te = te-> next; 200 continue ; 201 } 202 aeGetTime(&now_sec, & now_ms); 203 if (now_sec > te->when_sec || 204 (now_sec == te->when_sec && now_ms >= te-> when_ms)) 205 { 206 int retval; 207 208 id = te-> id;
//如果执行时间到或已超出,则执行对应的时间处理函数
209 retval = te->timeProc(eventLoop, id, te-> clientData); 210 processed++ ; 211 /* After an event is processed our time event list may 212 * no longer be the same, so we restart from head. //因为时间处理函数timeProc可能改变此链表 213 * Still we make sure to don't process events registered 214 * by event handlers itself in order to don't loop forever.? 215 * To do so we saved the max ID we want to handle. 216 * 217 * FUTURE OPTIMIZATIONS: 218 * Note that this is NOT great algorithmically. Redis uses 219 * a single time event so it's not a problem but the right 220 * way to do this is to add the new elements on head, and 221 * to flag deleted elements in a special way for later 222 * deletion (putting references to the nodes to delete into 223 * another linked list). */

//如果这个时间处理函数不再继续执行,则从time event的链表中删除事件;否则,retval为继续执行的时间间隔(单位为ms),在当前的timeevent struct的时间值上进行增加 224 if (retval != AE_NOMORE) { 225 aeAddMillisecondsToNow(retval,&te->when_sec,&te-> when_ms); 226 } else { 227 aeDeleteTimeEvent(eventLoop, id); 228 } 229 te = eventLoop-> timeEventHead; 230 } else { 231 te = te-> next; 232 } 233 } 234 return processed; 235 } 236 237 /* Process every pending(悬而未决) time event, then every pending file event 238 * (that may be registered by time event callbacks just processed). 239 * Without special flags the function sleeps until some file event 240 * fires, or when the next time event occurrs (if any). 241 * 242 * If flags is 0, the function does nothing and returns. 243 * if flags has AE_ALL_EVENTS set, all the kind of events are processed. 244 * if flags has AE_FILE_EVENTS set, file events are processed. 245 * if flags has AE_TIME_EVENTS set, time events are processed. 246 * if flags has AE_DONT_WAIT set the function returns ASAP until all 247 * the events that's possible to process without to wait are processed. 248 * 249 * The function returns the number of events processed. */ 250 int aeProcessEvents(aeEventLoop *eventLoop, int flags) 251 { 252 int processed = 0 , numevents; 253 254 /* Nothing to do? return ASAP */ 255 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0 ; 256 257 /* Note that we want call select() even if there are no 258 * file events to process as long as we want to process time 259 * events, in order to sleep until the next time event is ready 260 * to fire. */ 261 if (eventLoop->maxfd != - 1 || 262 ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { 263 int j; 264 aeTimeEvent *shortest = NULL; 265 struct timeval tv, * tvp; 266 267 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) 268 shortest = aeSearchNearestTimer(eventLoop); 269 if (shortest) { 270 long now_sec, now_ms; 271 272 /* Calculate the time missing for the nearest 273 * timer to fire. */ 274 aeGetTime(&now_sec, & now_ms); 275 tvp = & tv; 276 tvp->tv_sec = shortest->when_sec - now_sec; 277 if (shortest->when_ms < now_ms) { 278 tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; 279 tvp->tv_sec --; 280 } else { 281 tvp->tv_usec = (shortest->when_ms - now_ms)*1000; 282 } 283 if (tvp->tv_sec < 0) tvp->tv_sec = 0; 284 if (tvp->tv_usec < 0) tvp->tv_usec = 0; 疑似有bug,比如当前时间为 1s +2ms ,shortest时间为0s+1ms的case 285 } else { 286 /* If we have to check for events but need to return 287 * ASAP because of AE_DONT_WAIT we need to se the timeout 288 * to zero */ 289 if (flags & AE_DONT_WAIT) { 290 tv.tv_sec = tv.tv_usec = 0 ; 291 tvp = & tv; 292 } else { 293 /* Otherwise we can block */ 294 tvp = NULL; /* wait forever */ 295 } 296 } 297
//指定这个tvp是说这个tvp到期时,至少由time event可以执行 298 numevents = aeApiPoll(eventLoop, tvp); 299 for (j = 0 ; j < numevents; j++ ) { 300 aeFileEvent *fe = &eventLoop->events[eventLoop-> fired[j].fd]; 301 int mask = eventLoop-> fired[j].mask; 302 int fd = eventLoop-> fired[j].fd; 303 int rfired = 0 ; 304 305 /* note the fe->mask & mask & ... code: maybe an already processed 306 * event removed an element that fired and we still didn't 307 * processed, so we check if the event is still valid. */ 308 if (fe->mask & mask & AE_READABLE) { 309 rfired = 1 ; 310 fe->rfileProc(eventLoop,fd,fe-> clientData,mask); 311 }
//避免同一个注册函数被调用两次
312 if (fe->mask & mask & AE_WRITABLE) { 313 if (!rfired || fe->wfileProc != fe-> rfileProc) 314 fe->wfileProc(eventLoop,fd,fe-> clientData,mask); 315 } 316 processed++ ; 317 } 318 } 319 /* Check time events */ 320 if (flags & AE_TIME_EVENTS) 321 processed += processTimeEvents(eventLoop); 322 323 return processed; /* return the number of processed file/time events */ 324 } 325
//对select的一个简单封装,没有太大意义 326 /* Wait for millseconds until the given file descriptor becomes 327 * writable/readable/exception */ 328 int aeWait( int fd, int mask, long long milliseconds) { 329 struct timeval tv; 330 fd_set rfds, wfds, efds; 331 int retmask = 0 , retval; 332 333 tv.tv_sec = milliseconds/ 1000 ; 334 tv.tv_usec = (milliseconds% 1000 )* 1000 ; 335 FD_ZERO(& rfds); 336 FD_ZERO(& wfds); 337 FD_ZERO(& efds); 338 339 if (mask & AE_READABLE) FD_SET(fd,& rfds); 340 if (mask & AE_WRITABLE) FD_SET(fd,& wfds); 341 if ((retval = select (fd+ 1 , &rfds, &wfds, &efds, &tv)) > 0 ) { 342 if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE; 343 if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE; 344 return retmask; 345 } else { 346 return retval; 347 } 348 } 349 350 void aeMain(aeEventLoop * eventLoop) { 351 eventLoop->stop = 0 ; 352 while (!eventLoop-> stop) { 353 if (eventLoop->beforesleep != NULL) 354 eventLoop-> beforesleep(eventLoop); 355 aeProcessEvents(eventLoop, AE_ALL_EVENTS); 356 } 357 } 358 359 char *aeGetApiName( void ) { 360 return aeApiName(); 361 } 362 363 void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc * beforesleep) { 364 eventLoop->beforesleep = beforesleep; 365 }

redis源码笔记-ae.c


更多文章、技术交流、商务合作、联系博主

微信扫码或搜索:z360901061

微信扫一扫加我为好友

QQ号联系: 360901061

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描下面二维码支持博主2元、5元、10元、20元等您想捐的金额吧,狠狠点击下面给点支持吧,站长非常感激您!手机微信长按不能支付解决办法:请将微信支付二维码保存到相册,切换到微信,然后点击微信右上角扫一扫功能,选择支付二维码完成支付。

【本文对您有帮助就好】

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请用微信扫描上面二维码支持博主2元、5元、10元、自定义金额等您想捐的金额吧,站长会非常 感谢您的哦!!!

发表我的评论
最新评论 总共0条评论