作者在bio.c的头注释中对设计进行了详细的介绍
/*
Background I/O service for Redis.
这个文件是redis后台IO服务的实现
*
* This file implements operations that we need to perform in the background.
* Currently there is only a single operation, that is a background close(2)
* system call. This is needed as when the process is the last owner of a
* reference to a file closing it means unlinking it, and the deletion of the
* file is slow, blocking the server.
这个文件负责我们需要在后台执行的操作。现在redis的版本中只有一类的操作,后台的close 系统调用。
为了避免一个文件最后的owner在执行close操作带来的unlink使得阻塞server,将这类操作用单独的后台线程来执行
*
* In the future we'll either continue implementing new things we need or
* we'll switch to libeio. However there are probably long term uses for this
* file as we may want to put here Redis specific background tasks (for instance
* it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
* implementation).
*
* DESIGN
* ------
*
* The design is trivial, we have a structure representing a job to perform
* and a different thread and job queue for every job type.
* Every thread wait for new jobs in its queue, and process every job
* sequentially.
每种作业类型一个queue。每个线程在它的queue里等待新的job到来。并且按照FIFO的顺序处理作业。
*
* Jobs of the same type are guaranteed to be processed from the least
* recently inserted to the most recently inserted (older jobs processed
* first).
*
* Currently there is no way for the creator of the job to be notified about
* the completion of the operation, this will only be added when/if needed.
作业完成后,其creator无法得到通知。
*/
现在的两类作业类型:1.close 2.aof_fsync
/*
Background job opcodes
*/
#define
REDIS_BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
#define
REDIS_BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
1
#include
"
redis.h
"
2
#include
"
bio.h
"
3
//使用互斥量+条件变量,作为线程的保护条件
4
static
pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS];
5
static
pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS];
//两类作业的队列
6
static
list *
bio_jobs[REDIS_BIO_NUM_OPS];
7
/*
The following array is used to hold the number of pending jobs for every
8
* OP type. This allows us to export the bioPendingJobsOfType() API that is
9
* useful when the main thread wants to perform some operation that may involve
10
* objects shared with the background thread. The main thread will just wait
11
* that there are no longer jobs of this type to be executed before performing
12
* the sensible operation. This data is also useful for reporting.
*/
13
static
unsigned
long
long
bio_pending[REDIS_BIO_NUM_OPS];
14
15
/*
This structure represents a background Job. It is only used locally to this
16
* file as the API deos not expose the internals at all.
*/
17
struct
bio_job {
18
time_t time;
/*
Time at which the job was created.
*/
19
/*
Job specific arguments pointers. If we need to pass more than three
20
* arguments we can just pass a pointer to a structure or alike.
*/
21
void
*arg1, *arg2, *
arg3;
22
};
23
24
void
*bioProcessBackgroundJobs(
void
*
arg);
25
26
/*
Make sure we have enough stack to perform all the things we do in the
27
* main thread.
*/
28
#define
REDIS_THREAD_STACK_SIZE (1024*1024*4)
29
30
/*
Initialize the background system, spawning the thread.
*/
31
void
bioInit(
void
) {
32
pthread_attr_t attr;
33
pthread_t thread;
34
size_t stacksize;
35
int
j;
36
37
/*
Initialization of state vars and objects
*/
38
for
(j =
0
; j < REDIS_BIO_NUM_OPS; j++
) {
39
pthread_mutex_init(&
bio_mutex[j],NULL);
40
pthread_cond_init(&
bio_condvar[j],NULL);
41
bio_jobs[j] =
listCreate();
42
bio_pending[j] =
0
;
43
}
44
45
/*
Set the stack size as by default it may be small in some system
*/
46
pthread_attr_init(&
attr);
47
pthread_attr_getstacksize(&attr,&
stacksize);
48
if
(!stacksize) stacksize =
1
;
/*
The world is full of Solaris Fixes
*/
49
while
(stacksize < REDIS_THREAD_STACK_SIZE) stacksize *=
2
;
50
pthread_attr_setstacksize(&
attr, stacksize);
51
52
/*
Ready to spawn our threads. We use the single argument the thread
53
* function accepts in order to pass the job ID the thread is
54
* responsible of.
*/
55
for
(j =
0
; j < REDIS_BIO_NUM_OPS; j++
) {
56
void
*arg = (
void
*)(unsigned
long
) j;
57
if
(pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) !=
0
) {
58
redisLog(REDIS_WARNING,
"
Fatal: Can't initialize Background Jobs.
"
);
59
exit(
1
);
60
}
61
}
62
}
63
64
void
bioCreateBackgroundJob(
int
type,
void
*arg1,
void
*arg2,
void
*
arg3) {
65
struct
bio_job *job = zmalloc(
sizeof
(*
job));
66
67
job->time =
time(NULL);
68
job->arg1 =
arg1;
69
job->arg2 =
arg2;
70
job->arg3 =
arg3;
71
pthread_mutex_lock(&
bio_mutex[type]);
72
listAddNodeTail(bio_jobs[type],job);
73
bio_pending[type]++
;
74
pthread_cond_signal(&
bio_condvar[type]);
75
pthread_mutex_unlock(&
bio_mutex[type]);
76
}
77
78
void
*bioProcessBackgroundJobs(
void
*
arg) {
79
struct
bio_job *
job;
80
unsigned
long
type = (unsigned
long
) arg;
81
82
pthread_detach(pthread_self());
83
pthread_mutex_lock(&
bio_mutex[type]);
84
while
(
1
) {
85
listNode *
ln;
86
87
/*
The loop always starts with the lock hold.
*/
88
if
(listLength(bio_jobs[type]) ==
0
) {
89
pthread_cond_wait(&bio_condvar[type],&
bio_mutex[type]);
90
continue
;
91
}
92
/*
Pop the job from the queue.
*/
93
ln =
listFirst(bio_jobs[type]);
94
job = ln->
value;
95
/*
It is now possible to unlock the background system as we know have
96
* a stand alone job structure to process.
*/
97
pthread_mutex_unlock(&
bio_mutex[type]);
98
99
/*
Process the job accordingly to its type.
*/
100
if
(type ==
REDIS_BIO_CLOSE_FILE) {
101
close((
long
)job->
arg1);
102
}
else
if
(type ==
REDIS_BIO_AOF_FSYNC) {
103
aof_fsync((
long
)job->
arg1);
104
}
else
{
105
redisPanic(
"
Wrong job type in bioProcessBackgroundJobs().
"
);
106
}
107
zfree(job);
108
109
/*
Lock again before reiterating the loop, if there are no longer
110
* jobs to process we'll block again in pthread_cond_wait().
*/
111
pthread_mutex_lock(&
bio_mutex[type]);
112
listDelNode(bio_jobs[type],ln);
113
bio_pending[type]--
;
114
}
115
}
116
117
/*
Return the number of pending jobs of the specified type.
*/
118
unsigned
long
long
bioPendingJobsOfType(
int
type) {
119
unsigned
long
long
val;
120
pthread_mutex_lock(&
bio_mutex[type]);
121
val =
bio_pending[type];
122
pthread_mutex_unlock(&
bio_mutex[type]);
123
return
val;
124
}

