作者在bio.c的头注释中对设计进行了详细的介绍
/* Background I/O service for Redis.
这个文件是redis后台IO服务的实现 * * This file implements operations that we need to perform in the background. * Currently there is only a single operation, that is a background close(2) * system call. This is needed as when the process is the last owner of a * reference to a file closing it means unlinking it, and the deletion of the * file is slow, blocking the server.
这个文件负责我们需要在后台执行的操作。现在redis的版本中只有一类的操作,后台的close 系统调用。
为了避免一个文件最后的owner在执行close操作带来的unlink使得阻塞server,将这类操作用单独的后台线程来执行 * * In the future we'll either continue implementing new things we need or * we'll switch to libeio. However there are probably long term uses for this * file as we may want to put here Redis specific background tasks (for instance * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL * implementation). * * DESIGN * ------ * * The design is trivial, we have a structure representing a job to perform * and a different thread and job queue for every job type. * Every thread wait for new jobs in its queue, and process every job * sequentially.
每种作业类型一个queue。每个线程在它的queue里等待新的job到来。并且按照FIFO的顺序处理作业。 * * Jobs of the same type are guaranteed to be processed from the least * recently inserted to the most recently inserted (older jobs processed * first). * * Currently there is no way for the creator of the job to be notified about * the completion of the operation, this will only be added when/if needed.
作业完成后,其creator无法得到通知。 */
现在的两类作业类型:1.close 2.aof_fsync
/* Background job opcodes */ #define REDIS_BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */ #define REDIS_BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
1 #include " redis.h " 2 #include " bio.h " 3
//使用互斥量+条件变量,作为线程的保护条件 4 static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; 5 static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];
//两类作业的队列 6 static list * bio_jobs[REDIS_BIO_NUM_OPS]; 7 /* The following array is used to hold the number of pending jobs for every 8 * OP type. This allows us to export the bioPendingJobsOfType() API that is 9 * useful when the main thread wants to perform some operation that may involve 10 * objects shared with the background thread. The main thread will just wait 11 * that there are no longer jobs of this type to be executed before performing 12 * the sensible operation. This data is also useful for reporting. */ 13 static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; 14 15 /* This structure represents a background Job. It is only used locally to this 16 * file as the API deos not expose the internals at all. */ 17 struct bio_job { 18 time_t time; /* Time at which the job was created. */ 19 /* Job specific arguments pointers. If we need to pass more than three 20 * arguments we can just pass a pointer to a structure or alike. */ 21 void *arg1, *arg2, * arg3; 22 }; 23 24 void *bioProcessBackgroundJobs( void * arg); 25 26 /* Make sure we have enough stack to perform all the things we do in the 27 * main thread. */ 28 #define REDIS_THREAD_STACK_SIZE (1024*1024*4) 29 30 /* Initialize the background system, spawning the thread. */ 31 void bioInit( void ) { 32 pthread_attr_t attr; 33 pthread_t thread; 34 size_t stacksize; 35 int j; 36 37 /* Initialization of state vars and objects */ 38 for (j = 0 ; j < REDIS_BIO_NUM_OPS; j++ ) { 39 pthread_mutex_init(& bio_mutex[j],NULL); 40 pthread_cond_init(& bio_condvar[j],NULL); 41 bio_jobs[j] = listCreate(); 42 bio_pending[j] = 0 ; 43 } 44 45 /* Set the stack size as by default it may be small in some system */ 46 pthread_attr_init(& attr); 47 pthread_attr_getstacksize(&attr,& stacksize); 48 if (!stacksize) stacksize = 1 ; /* The world is full of Solaris Fixes */ 49 while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2 ; 50 pthread_attr_setstacksize(& attr, stacksize); 51 52 /* Ready to spawn our threads. We use the single argument the thread 53 * function accepts in order to pass the job ID the thread is 54 * responsible of. */ 55 for (j = 0 ; j < REDIS_BIO_NUM_OPS; j++ ) { 56 void *arg = ( void *)(unsigned long ) j; 57 if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0 ) { 58 redisLog(REDIS_WARNING, " Fatal: Can't initialize Background Jobs. " ); 59 exit( 1 ); 60 } 61 } 62 } 63 64 void bioCreateBackgroundJob( int type, void *arg1, void *arg2, void * arg3) { 65 struct bio_job *job = zmalloc( sizeof (* job)); 66 67 job->time = time(NULL); 68 job->arg1 = arg1; 69 job->arg2 = arg2; 70 job->arg3 = arg3; 71 pthread_mutex_lock(& bio_mutex[type]); 72 listAddNodeTail(bio_jobs[type],job); 73 bio_pending[type]++ ; 74 pthread_cond_signal(& bio_condvar[type]); 75 pthread_mutex_unlock(& bio_mutex[type]); 76 } 77 78 void *bioProcessBackgroundJobs( void * arg) { 79 struct bio_job * job; 80 unsigned long type = (unsigned long ) arg; 81 82 pthread_detach(pthread_self()); 83 pthread_mutex_lock(& bio_mutex[type]); 84 while ( 1 ) { 85 listNode * ln; 86 87 /* The loop always starts with the lock hold. */ 88 if (listLength(bio_jobs[type]) == 0 ) { 89 pthread_cond_wait(&bio_condvar[type],& bio_mutex[type]); 90 continue ; 91 } 92 /* Pop the job from the queue. */ 93 ln = listFirst(bio_jobs[type]); 94 job = ln-> value; 95 /* It is now possible to unlock the background system as we know have 96 * a stand alone job structure to process. */ 97 pthread_mutex_unlock(& bio_mutex[type]); 98 99 /* Process the job accordingly to its type. */ 100 if (type == REDIS_BIO_CLOSE_FILE) { 101 close(( long )job-> arg1); 102 } else if (type == REDIS_BIO_AOF_FSYNC) { 103 aof_fsync(( long )job-> arg1); 104 } else { 105 redisPanic( " Wrong job type in bioProcessBackgroundJobs(). " ); 106 } 107 zfree(job); 108 109 /* Lock again before reiterating the loop, if there are no longer 110 * jobs to process we'll block again in pthread_cond_wait(). */ 111 pthread_mutex_lock(& bio_mutex[type]); 112 listDelNode(bio_jobs[type],ln); 113 bio_pending[type]-- ; 114 } 115 } 116 117 /* Return the number of pending jobs of the specified type. */ 118 unsigned long long bioPendingJobsOfType( int type) { 119 unsigned long long val; 120 pthread_mutex_lock(& bio_mutex[type]); 121 val = bio_pending[type]; 122 pthread_mutex_unlock(& bio_mutex[type]); 123 return val; 124 }