anet库是redis对tcp网络层以及unix域实现的一个封装。redis的客户端和server端通信使用的均为TCP协议。Basic TCP socket stuff made a bit less boring
anet.h
1
#ifndef ANET_H
2
#define
ANET_H
3
4
#define
ANET_OK 0
5
#define
ANET_ERR -1
6
#define
ANET_ERR_LEN 256
7
8
#if
defined(__sun)
9
#define
AF_LOCAL AF_UNIX
10
#endif
11
12
int
anetTcpConnect(
char
*err,
char
*addr,
int
port); //错误信息是通过指针参数传入的
13
int
anetTcpNonBlockConnect(
char
*err,
char
*addr,
int
port);
14
int
anetUnixConnect(
char
*err,
char
*
path);
15
int
anetUnixNonBlockConnect(
char
*err,
char
*
path);
16
int
anetRead(
int
fd,
char
*buf,
int
count);
17
int
anetResolve(
char
*err,
char
*host,
char
*
ipbuf);
18
int
anetTcpServer(
char
*err,
int
port,
char
*
bindaddr);
19
int
anetUnixServer(
char
*err,
char
*
path, mode_t perm);
20
int
anetTcpAccept(
char
*err,
int
serversock,
char
*ip,
int
*
port);
21
int
anetUnixAccept(
char
*err,
int
serversock);
22
int
anetWrite(
int
fd,
char
*buf,
int
count);
23
int
anetNonBlock(
char
*err,
int
fd);
24
int
anetTcpNoDelay(
char
*err,
int
fd);
25
int
anetTcpKeepAlive(
char
*err,
int
fd);
26
int
anetPeerToString(
int
fd,
char
*ip,
int
*
port);
27
28
#endif
anet.c
1
#include
"
fmacros.h
"
2
3
#include <sys/types.h>
4
#include <sys/socket.h>
5
#include <sys/stat.h>
6
#include <sys/un.h>
7
#include <netinet/
in
.h>
8
#include <netinet/tcp.h>
9
#include <arpa/inet.h>
10
#include <unistd.h>
11
#include <fcntl.h>
12
#include <
string
.h>
13
#include <netdb.h>
14
#include <errno.h>
15
#include <stdarg.h>
16
#include <stdio.h>
17
18
#include
"
anet.h
"
19
20
static
void
anetSetError(
char
*err,
const
char
*
fmt, ...)
21
{
22
va_list ap;
23
24
if
(!err)
return
;
25
va_start(ap, fmt);
26
vsnprintf(err, ANET_ERR_LEN, fmt, ap);
27
va_end(ap);
28
}
29
30
int
anetNonBlock(
char
*err,
int
fd) //将一个描述符设成非阻塞
31
{
32
int
flags;
33
34
/*
Set the socket nonblocking.
35
* Note that fcntl(2) for F_GETFL and F_SETFL can't be
36
* interrupted by a signal.
*/
37
if
((flags = fcntl(fd, F_GETFL)) == -
1
) {
38
anetSetError(err,
"
fcntl(F_GETFL): %s
"
, strerror(errno));
39
return
ANET_ERR;
40
}
41
if
(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -
1
) {
42
anetSetError(err,
"
fcntl(F_SETFL,O_NONBLOCK): %s
"
, strerror(errno));
43
return
ANET_ERR;
44
}
45
return
ANET_OK;
46
}
47
//将一个tcp描述符设成NoDelay,客户端可能会设置这个选项,禁止NODELAY算法
//如下摘抄自《UNPv1》7.9.2关于TCP_NODELAY套接字选项的讨论
"开启本选项将会禁止TCP的Nagle算法。默认情况下本算法是开启的。
Nagle算法的目的在于减少广域网上小分组的数目。
"
48
int
anetTcpNoDelay(
char
*err,
int
fd)
49
{
50
int
yes =
1
;
51
if
(setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes,
sizeof
(yes)) == -
1
)
52
{
53
anetSetError(err,
"
setsockopt TCP_NODELAY: %s
"
, strerror(errno));
54
return
ANET_ERR;
55
}
56
return
ANET_OK;
57
}
58
59
int
anetSetSendBuffer(
char
*err,
int
fd,
int
buffsize) //设置发送缓冲区大小
60
{
61
if
(setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffsize,
sizeof
(buffsize)) == -
1
)
62
{
63
anetSetError(err,
"
setsockopt SO_SNDBUF: %s
"
, strerror(errno));
64
return
ANET_ERR;
65
}
66
return
ANET_OK;
67
}
68
69
int
anetTcpKeepAlive(
char
*err,
int
fd) //设置keep-alive选项
70
{
71
int
yes =
1
;
72
if
(setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes,
sizeof
(yes)) == -
1
) {
73
anetSetError(err,
"
setsockopt SO_KEEPALIVE: %s
"
, strerror(errno));
74
return
ANET_ERR;
75
}
76
return
ANET_OK;
77
}
78
79
int
anetResolve(
char
*err,
char
*host,
char
*
ipbuf)
80
{
81
struct
sockaddr_in sa;
82
83
sa.sin_family =
AF_INET;
84
if
(inet_aton(host, &sa.sin_addr) ==
0
) {
85
struct
hostent *
he;
86
87
he =
gethostbyname(host);
88
if
(he ==
NULL) {
89
anetSetError(err,
"
can't resolve: %s
"
, host);
90
return
ANET_ERR;
91
}
92
memcpy(&sa.sin_addr, he->h_addr,
sizeof
(
struct
in_addr));
93
}
94
strcpy(ipbuf,inet_ntoa(sa.sin_addr));
95
return
ANET_OK;
96
}
97
98
static
int
anetCreateSocket(
char
*err,
int
domain) {
99
int
s, on =
1
;
100
if
((s = socket(domain, SOCK_STREAM,
0
)) == -
1
) {
101
anetSetError(err,
"
creating socket: %s
"
, strerror(errno));
102
return
ANET_ERR;
103
}
104
105
/*
Make sure connection-intensive things like the redis benckmark
106
* will be able to close/open sockets a zillion of times
*/
107
if
(setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on,
sizeof
(on)) == -
1
) {
108
anetSetError(err,
"
setsockopt SO_REUSEADDR: %s
"
, strerror(errno));
109
return
ANET_ERR;
110
}
111
return
s;
112
}
113
114
#define
ANET_CONNECT_NONE 0
115
#define
ANET_CONNECT_NONBLOCK 1
116
static
int
anetTcpGenericConnect(
char
*err,
char
*addr,
int
port,
int
flags)
117
{
118
int
s;
119
struct
sockaddr_in sa;
120
121
if
((s = anetCreateSocket(err,AF_INET)) ==
ANET_ERR)
122
return
ANET_ERR;
123
124
sa.sin_family =
AF_INET;
125
sa.sin_port =
htons(port);
126
if
(inet_aton(addr, &sa.sin_addr) ==
0
) {
127
struct
hostent *
he;
128
129
he =
gethostbyname(addr);
130
if
(he ==
NULL) {
131
anetSetError(err,
"
can't resolve: %s
"
, addr);
132
close(s);
133
return
ANET_ERR;
134
}
135
memcpy(&sa.sin_addr, he->h_addr,
sizeof
(
struct
in_addr));
136
}
137
if
(flags &
ANET_CONNECT_NONBLOCK) {
138
if
(anetNonBlock(err,s) !=
ANET_OK)
139
return
ANET_ERR;
140
}
141
if
(connect(s, (
struct
sockaddr*)&sa,
sizeof
(sa)) == -
1
) {
142
if
(errno == EINPROGRESS &&
143
flags &
ANET_CONNECT_NONBLOCK)
144
return
s;
145
146
anetSetError(err,
"
connect: %s
"
, strerror(errno));
147
close(s);
148
return
ANET_ERR;
149
}
150
return
s;
151
}
152
153
int
anetTcpConnect(
char
*err,
char
*addr,
int
port)
154
{
155
return
anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONE);
156
}
157
158
int
anetTcpNonBlockConnect(
char
*err,
char
*addr,
int
port)
159
{
160
return
anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONBLOCK);
161
}
162
163
int
anetUnixGenericConnect(
char
*err,
char
*path,
int
flags)
164
{
165
int
s;
166
struct
sockaddr_un sa;
167
168
if
((s = anetCreateSocket(err,AF_LOCAL)) ==
ANET_ERR)
169
return
ANET_ERR;
170
171
sa.sun_family =
AF_LOCAL;
172
strncpy(sa.sun_path,path,
sizeof
(sa.sun_path)-
1
);
173
if
(flags &
ANET_CONNECT_NONBLOCK) {
174
if
(anetNonBlock(err,s) !=
ANET_OK)
175
return
ANET_ERR;
176
}
177
if
(connect(s,(
struct
sockaddr*)&sa,
sizeof
(sa)) == -
1
) {
178
if
(errno == EINPROGRESS &&
179
flags &
ANET_CONNECT_NONBLOCK)
180
return
s;
181
182
anetSetError(err,
"
connect: %s
"
, strerror(errno));
183
close(s);
184
return
ANET_ERR;
185
}
186
return
s;
187
}
188
189
int
anetUnixConnect(
char
*err,
char
*
path)
190
{
191
return
anetUnixGenericConnect(err,path,ANET_CONNECT_NONE);
192
}
193
194
int
anetUnixNonBlockConnect(
char
*err,
char
*
path)
195
{
196
return
anetUnixGenericConnect(err,path,ANET_CONNECT_NONBLOCK);
197
}
198
199
/*
Like read(2) but make sure 'count' is read before to return
200
* (unless error or EOF condition is encountered)
*/
201
int
anetRead(
int
fd,
char
*buf,
int
count)
202
{
203
int
nread, totlen =
0
;
204
while
(totlen !=
count) {
205
nread = read(fd,buf,count-
totlen);
206
if
(nread ==
0
)
return
totlen;
207
if
(nread == -
1
)
return
-
1
;
208
totlen +=
nread;
209
buf +=
nread;
210
}
211
return
totlen;
212
}
213
214
/*
Like write(2) but make sure 'count' is read before to return
215
* (unless error is encountered)
*/
216
int
anetWrite(
int
fd,
char
*buf,
int
count)
217
{
218
int
nwritten, totlen =
0
;
219
while
(totlen !=
count) {
220
nwritten = write(fd,buf,count-
totlen);
221
if
(nwritten ==
0
)
return
totlen;
222
if
(nwritten == -
1
)
return
-
1
;
223
totlen +=
nwritten;
224
buf +=
nwritten;
225
}
226
return
totlen;
227
}
228
229
static
int
anetListen(
char
*err,
int
s,
struct
sockaddr *
sa, socklen_t len) {
230
if
(bind(s,sa,len) == -
1
) {
231
anetSetError(err,
"
bind: %s
"
, strerror(errno));
232
close(s);
233
return
ANET_ERR;
234
}
235
if
(listen(s,
511
) == -
1
) {
/*
the magic 511 constant is from nginx
*/
236
anetSetError(err,
"
listen: %s
"
, strerror(errno));
237
close(s);
238
return
ANET_ERR;
239
}
240
return
ANET_OK;
241
}
242
243
int
anetTcpServer(
char
*err,
int
port,
char
*
bindaddr)
244
{
245
int
s;
246
struct
sockaddr_in sa;
247
248
if
((s = anetCreateSocket(err,AF_INET)) ==
ANET_ERR)
249
return
ANET_ERR;
250
251
memset(&sa,
0
,
sizeof
(sa));
252
sa.sin_family =
AF_INET;
253
sa.sin_port =
htons(port);
254
sa.sin_addr.s_addr =
htonl(INADDR_ANY);
255
if
(bindaddr && inet_aton(bindaddr, &sa.sin_addr) ==
0
) {
256
anetSetError(err,
"
invalid bind address
"
);
257
close(s);
258
return
ANET_ERR;
259
}
260
if
(anetListen(err,s,(
struct
sockaddr*)&sa,
sizeof
(sa)) ==
ANET_ERR)
261
return
ANET_ERR;
262
return
s;
263
}
264
265
int
anetUnixServer(
char
*err,
char
*
path, mode_t perm)
266
{
267
int
s;
268
struct
sockaddr_un sa;
269
270
if
((s = anetCreateSocket(err,AF_LOCAL)) ==
ANET_ERR)
271
return
ANET_ERR;
272
273
memset(&sa,
0
,
sizeof
(sa));
274
sa.sun_family =
AF_LOCAL;
275
strncpy(sa.sun_path,path,
sizeof
(sa.sun_path)-
1
);
276
if
(anetListen(err,s,(
struct
sockaddr*)&sa,
sizeof
(sa)) ==
ANET_ERR)
277
return
ANET_ERR;
278
if
(perm)
279
chmod(sa.sun_path, perm);
280
return
s;
281
}
282
283
static
int
anetGenericAccept(
char
*err,
int
s,
struct
sockaddr *sa, socklen_t *
len) {
284
int
fd;
285
while
(
1
) {
286
fd =
accept(s,sa,len);
287
if
(fd == -
1
) {
288
if
(errno ==
EINTR)
289
continue
;
290
else
{
291
anetSetError(err,
"
accept: %s
"
, strerror(errno));
292
return
ANET_ERR;
293
}
294
}
295
break
;
296
}
297
return
fd;
298
}
299
300
int
anetTcpAccept(
char
*err,
int
s,
char
*ip,
int
*
port) {
301
int
fd;
302
struct
sockaddr_in sa;
303
socklen_t salen =
sizeof
(sa);
304
if
((fd = anetGenericAccept(err,s,(
struct
sockaddr*)&sa,&salen)) ==
ANET_ERR)
305
return
ANET_ERR;
306
307
if
(ip) strcpy(ip,inet_ntoa(sa.sin_addr));
308
if
(port) *port =
ntohs(sa.sin_port);
309
return
fd;
310
}
311
312
int
anetUnixAccept(
char
*err,
int
s) {
313
int
fd;
314
struct
sockaddr_un sa;
315
socklen_t salen =
sizeof
(sa);
316
if
((fd = anetGenericAccept(err,s,(
struct
sockaddr*)&sa,&salen)) ==
ANET_ERR)
317
return
ANET_ERR;
318
319
return
fd;
320
}
321
322
int
anetPeerToString(
int
fd,
char
*ip,
int
*
port) {
323
struct
sockaddr_in sa;
324
socklen_t salen =
sizeof
(sa);
325
326
if
(getpeername(fd,(
struct
sockaddr*)&sa,&salen) == -
1
)
return
-
1
;
327
if
(ip) strcpy(ip,inet_ntoa(sa.sin_addr));
328
if
(port) *port =
ntohs(sa.sin_port);
329
return
0
;
330
}

