aof是redis提供的一种数据持久化机制,通过将每一条命令dump下来,保持数据和内存中的数据一致。
1 #include " redis.h " 2 #include " bio.h " 3 4 #include <signal.h> 5 #include <fcntl.h> 6 #include <sys/stat.h> 7 #include <sys/types.h> 8 #include <sys/time.h> 9 #include <sys/resource.h> 10 #include <sys/wait.h> 11 12 void aofUpdateCurrentSize( void ); 13 14 void aof_background_fsync( int fd) { 15 bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,( void *)( long )fd,NULL,NULL); 16 } 17 18 /* Called when the user switches from "appendonly yes" to "appendonly no" 19 * at runtime using the CONFIG command. */ 20 void stopAppendOnly( void ) { 21 flushAppendOnlyFile( 1 ); 22 aof_fsync(server.appendfd); 23 close(server.appendfd); 24 25 server.appendfd = - 1 ; 26 server.appendseldb = - 1 ; 27 server.appendonly = 0 ; 28 /* rewrite operation in progress? kill it, wait child exit */ 29 if (server.bgrewritechildpid != - 1 ) { 30 int statloc; 31 32 if (kill(server.bgrewritechildpid,SIGKILL) != - 1 ) 33 wait3(&statloc, 0 ,NULL); 34 /* reset the buffer accumulating changes while the child saves */ 35 sdsfree(server.bgrewritebuf); 36 server.bgrewritebuf = sdsempty(); 37 server.bgrewritechildpid = - 1 ; 38 } 39 } 40 41 /* Called when the user switches from "appendonly no" to "appendonly yes" 42 * at runtime using the CONFIG command. */ 43 int startAppendOnly( void ) { 44 server.appendonly = 1 ; 45 server.lastfsync = time(NULL); 46 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT, 0644 ); 47 if (server.appendfd == - 1 ) { 48 redisLog(REDIS_WARNING, " Used tried to switch on AOF via CONFIG, but I can't open the AOF file: %s " ,strerror(errno)); 49 return REDIS_ERR; 50 } 51 if (rewriteAppendOnlyFileBackground() == REDIS_ERR) { 52 server.appendonly = 0 ; 53 close(server.appendfd); 54 redisLog(REDIS_WARNING, " Used tried to switch on AOF via CONFIG, I can't trigger a background AOF rewrite operation. Check the above logs for more info about the error. " ,strerror(errno)); 55 return REDIS_ERR; 56 } 57 return REDIS_OK; 58 } 59 60 /* Write the append only file buffer on disk. 61 * 62 * Since we are required to write the AOF before replying to the client, 63 * and the only way the client socket can get a write is entering when the 64 * the event loop, we accumulate all the AOF writes in a memory 65 * buffer and write it on disk using this function just before entering 66 * the event loop again. 67 * 68 * About the 'force' argument: 69 * 70 * When the fsync policy is set to 'everysec' we may delay the flush if there 71 * is still an fsync() going on in the background thread, since for instance 72 * on Linux write(2) will be blocked by the background fsync anyway. 73 * When this happens we remember that there is some aof buffer to be 74 * flushed ASAP, and will try to do that in the serverCron() function. 75 * 76 * However if force is set to 1 we'll write regardless of the background 77 * fsync. */ 78 void flushAppendOnlyFile( int force) { 79 ssize_t nwritten; 80 int sync_in_progress = 0 ; 81 82 if (sdslen(server.aofbuf) == 0 ) return ; 83 84 if (server.appendfsync == APPENDFSYNC_EVERYSEC) 85 sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0 ; 86 87 if (server.appendfsync == APPENDFSYNC_EVERYSEC && ! force) { 88 /* With this append fsync policy we do background fsyncing. 89 * If the fsync is still in progress we can try to delay 90 * the write for a couple of seconds. */ 91 if (sync_in_progress) { 92 if (server.aof_flush_postponed_start == 0 ) { 93 /* No previous write postponinig, remember that we are 94 * postponing the flush and return. */ 95 server.aof_flush_postponed_start = server.unixtime; 96 return ; 97 } else if (server.unixtime - server.aof_flush_postponed_start < 2 ) { 98 /* We were already waiting for fsync to finish, but for less 99 * than two seconds this is still ok. Postpone again. */ 100 return ; 101 } 102 /* Otherwise fall trough, and go write since we can't wait 103 * over two seconds. */ 104 redisLog(REDIS_NOTICE, " Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis. " ); 105 } 106 } 107 /* If you are following this code path, then we are going to write so 108 * set reset the postponed flush sentinel to zero. */ 109 server.aof_flush_postponed_start = 0 ; 110 111 /* We want to perform a single write. This should be guaranteed atomic 112 * at least if the filesystem we are writing is a real physical one. 113 * While this will save us against the server being killed I don't think 114 * there is much to do about the whole server stopping for power problems 115 * or alike */ 116 nwritten = write(server.appendfd,server.aofbuf,sdslen(server.aofbuf)); 117 if (nwritten != (signed)sdslen(server.aofbuf)) { 118 /* Ooops, we are in troubles. The best thing to do for now is 119 * aborting instead of giving the illusion that everything is 120 * working as expected. */ 121 if (nwritten == - 1 ) { 122 redisLog(REDIS_WARNING, " Exiting on error writing to the append-only file: %s " ,strerror(errno)); 123 } else { 124 redisLog(REDIS_WARNING, " Exiting on short write while writing to the append-only file: %s " ,strerror(errno)); 125 } 126 exit( 1 ); 127 } 128 server.appendonly_current_size += nwritten; 129 130 /* Re-use AOF buffer when it is small enough. The maximum comes from the 131 * arena size of 4k minus some overhead (but is otherwise arbitrary). */ 132 if ((sdslen(server.aofbuf)+sdsavail(server.aofbuf)) < 4000 ) { 133 sdsclear(server.aofbuf); 134 } else { 135 sdsfree(server.aofbuf); 136 server.aofbuf = sdsempty(); 137 } 138 139 /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are 140 * children doing I/O in the background. */ 141 if (server.no_appendfsync_on_rewrite && 142 (server.bgrewritechildpid != - 1 || server.bgsavechildpid != - 1 )) 143 return ; 144 145 /* Perform the fsync if needed. */ 146 if (server.appendfsync == APPENDFSYNC_ALWAYS) { 147 /* aof_fsync is defined as fdatasync() for Linux in order to avoid 148 * flushing metadata. */ 149 aof_fsync(server.appendfd); /* Let's try to get this data on the disk */ 150 server.lastfsync = server.unixtime; 151 } else if ((server.appendfsync == APPENDFSYNC_EVERYSEC && 152 server.unixtime > server.lastfsync)) { 153 if (! sync_in_progress) aof_background_fsync(server.appendfd); 154 server.lastfsync = server.unixtime; 155 } 156 } 157 158 sds catAppendOnlyGenericCommand(sds dst, int argc, robj ** argv) { 159 char buf[ 32 ]; 160 int len, j; 161 robj * o; 162 163 buf[ 0 ] = ' * ' ; 164 len = 1 +ll2string(buf+ 1 , sizeof (buf)- 1 ,argc); 165 buf[len++] = ' \r ' ; 166 buf[len++] = ' \n ' ; 167 dst = sdscatlen(dst,buf,len); 168 169 for (j = 0 ; j < argc; j++ ) { 170 o = getDecodedObject(argv[j]); 171 buf[ 0 ] = ' $ ' ; 172 len = 1 +ll2string(buf+ 1 , sizeof (buf)- 1 ,sdslen(o-> ptr)); 173 buf[len++] = ' \r ' ; 174 buf[len++] = ' \n ' ; 175 dst = sdscatlen(dst,buf,len); 176 dst = sdscatlen(dst,o->ptr,sdslen(o-> ptr)); 177 dst = sdscatlen(dst, " \r\n " , 2 ); 178 decrRefCount(o); 179 } 180 return dst; 181 } 182 183 sds catAppendOnlyExpireAtCommand(sds buf, robj *key, robj * seconds) { 184 int argc = 3 ; 185 long when; 186 robj *argv[ 3 ]; 187 188 /* Make sure we can use strtol */ 189 seconds = getDecodedObject(seconds); 190 when = time(NULL)+strtol(seconds->ptr,NULL, 10 ); 191 decrRefCount(seconds); 192 193 argv[ 0 ] = createStringObject( " EXPIREAT " , 8 ); 194 argv[ 1 ] = key; 195 argv[ 2 ] = createObject(REDIS_STRING, 196 sdscatprintf(sdsempty(), " %ld " ,when)); 197 buf = catAppendOnlyGenericCommand(buf, argc, argv); 198 decrRefCount(argv[ 0 ]); 199 decrRefCount(argv[ 2 ]); 200 return buf; 201 } 202 203 void feedAppendOnlyFile( struct redisCommand *cmd, int dictid, robj **argv, int argc) { 204 sds buf = sdsempty(); 205 robj *tmpargv[ 3 ]; 206 207 /* The DB this command was targetting is not the same as the last command 208 * we appendend. To issue a SELECT command is needed. */ 209 if (dictid != server.appendseldb) { 210 char seldb[ 64 ]; 211 212 snprintf(seldb, sizeof (seldb), " %d " ,dictid); 213 buf = sdscatprintf(buf, " *2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n " , 214 (unsigned long )strlen(seldb),seldb); 215 server.appendseldb = dictid; 216 } 217 218 if (cmd->proc == expireCommand) { 219 /* Translate EXPIRE into EXPIREAT */ 220 buf = catAppendOnlyExpireAtCommand(buf,argv[ 1 ],argv[ 2 ]); 221 } else if (cmd->proc == setexCommand) { 222 /* Translate SETEX to SET and EXPIREAT */ 223 tmpargv[ 0 ] = createStringObject( " SET " , 3 ); 224 tmpargv[ 1 ] = argv[ 1 ]; 225 tmpargv[ 2 ] = argv[ 3 ]; 226 buf = catAppendOnlyGenericCommand(buf, 3 ,tmpargv); 227 decrRefCount(tmpargv[ 0 ]); 228 buf = catAppendOnlyExpireAtCommand(buf,argv[ 1 ],argv[ 2 ]); 229 } else { 230 buf = catAppendOnlyGenericCommand(buf,argc,argv); 231 } 232 233 /* Append to the AOF buffer. This will be flushed on disk just before 234 * of re-entering the event loop, so before the client will get a 235 * positive reply about the operation performed. */ 236 server.aofbuf = sdscatlen(server.aofbuf,buf,sdslen(buf)); 237 238 /* If a background append only file rewriting is in progress we want to 239 * accumulate the differences between the child DB and the current one 240 * in a buffer, so that when the child process will do its work we 241 * can append the differences to the new append only file. */ 242 if (server.bgrewritechildpid != - 1 ) 243 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf)); 244 245 sdsfree(buf); 246 } 247 248 /* In Redis commands are always executed in the context of a client, so in 249 * order to load the append only file we need to create a fake client. */ 250 struct redisClient *createFakeClient( void ) { 251 struct redisClient *c = zmalloc( sizeof (* c)); 252 253 selectDb(c, 0 ); 254 c->fd = - 1 ; 255 c->querybuf = sdsempty(); 256 c->argc = 0 ; 257 c->argv = NULL; 258 c->bufpos = 0 ; 259 c->flags = 0 ; 260 /* We set the fake client as a slave waiting for the synchronization 261 * so that Redis will not try to send replies to this client. */ 262 c->replstate = REDIS_REPL_WAIT_BGSAVE_START; 263 c->reply = listCreate(); 264 c->reply_bytes = 0 ; 265 c->watched_keys = listCreate(); 266 listSetFreeMethod(c-> reply,decrRefCount); 267 listSetDupMethod(c-> reply,dupClientReplyValue); 268 initClientMultiState(c); 269 return c; 270 } 271 272 void freeFakeClient( struct redisClient * c) { 273 sdsfree(c-> querybuf); 274 listRelease(c-> reply); 275 listRelease(c-> watched_keys); 276 freeClientMultiState(c); 277 zfree(c); 278 } 279 280 /* Replay the append log file. On error REDIS_OK is returned. On non fatal 281 * error (the append only file is zero-length) REDIS_ERR is returned. On 282 * fatal error an error message is logged and the program exists. */ 283 int loadAppendOnlyFile( char * filename) { 284 struct redisClient * fakeClient; 285 FILE *fp = fopen(filename, " r " ); 286 struct redis_stat sb; 287 int appendonly = server.appendonly; 288 long loops = 0 ; 289 290 if (fp && redis_fstat(fileno(fp),&sb) != - 1 && sb.st_size == 0 ) { 291 server.appendonly_current_size = 0 ; 292 fclose(fp); 293 return REDIS_ERR; 294 } 295 296 if (fp == NULL) { 297 redisLog(REDIS_WARNING, " Fatal error: can't open the append log file for reading: %s " ,strerror(errno)); 298 exit( 1 ); 299 } 300 301 /* Temporarily disable AOF, to prevent EXEC from feeding a MULTI 302 * to the same file we're about to read. */ 303 server.appendonly = 0 ; 304 305 fakeClient = createFakeClient(); 306 startLoading(fp); 307 308 while ( 1 ) { 309 int argc, j; 310 unsigned long len; 311 robj ** argv; 312 char buf[ 128 ]; 313 sds argsds; 314 struct redisCommand * cmd; 315 int force_swapout; 316 317 /* Serve the clients from time to time */ 318 if (!(loops++ % 1000 )) { 319 loadingProgress(ftello(fp)); 320 aeProcessEvents(server.el, AE_FILE_EVENTS| AE_DONT_WAIT); 321 } 322 323 if (fgets(buf, sizeof (buf),fp) == NULL) { 324 if (feof(fp)) 325 break ; 326 else 327 goto readerr; 328 } 329 if (buf[ 0 ] != ' * ' ) goto fmterr; 330 argc = atoi(buf+ 1 ); 331 if (argc < 1 ) goto fmterr; 332 333 argv = zmalloc( sizeof (robj*)* argc); 334 for (j = 0 ; j < argc; j++ ) { 335 if (fgets(buf, sizeof (buf),fp) == NULL) goto readerr; 336 if (buf[ 0 ] != ' $ ' ) goto fmterr; 337 len = strtol(buf+ 1 ,NULL, 10 ); 338 argsds = sdsnewlen(NULL,len); 339 if (len && fread(argsds,len, 1 ,fp) == 0 ) goto fmterr; 340 argv[j] = createObject(REDIS_STRING,argsds); 341 if (fread(buf, 2 , 1 ,fp) == 0 ) goto fmterr; /* discard CRLF */ 342 } 343 344 /* Command lookup */ 345 cmd = lookupCommand(argv[ 0 ]-> ptr); 346 if (! cmd) { 347 redisLog(REDIS_WARNING, " Unknown command '%s' reading the append only file " , argv[ 0 ]-> ptr); 348 exit( 1 ); 349 } 350 /* Run the command in the context of a fake client */ 351 fakeClient->argc = argc; 352 fakeClient->argv = argv; 353 cmd-> proc(fakeClient); 354 355 /* The fake client should not have a reply */ 356 redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0 ); 357 /* The fake client should never get blocked */ 358 redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0 ); 359 360 /* Clean up. Command code may have changed argv/argc so we use the 361 * argv/argc of the client instead of the local variables. */ 362 for (j = 0 ; j < fakeClient->argc; j++ ) 363 decrRefCount(fakeClient-> argv[j]); 364 zfree(fakeClient-> argv); 365 366 /* Handle swapping while loading big datasets when VM is on */ 367 force_swapout = 0 ; 368 if ((zmalloc_used_memory() - server.vm_max_memory) > 1024 * 1024 * 32 ) 369 force_swapout = 1 ; 370 371 if (server.vm_enabled && force_swapout) { 372 while (zmalloc_used_memory() > server.vm_max_memory) { 373 if (vmSwapOneObjectBlocking() == REDIS_ERR) break ; 374 } 375 } 376 } 377 378 /* This point can only be reached when EOF is reached without errors. 379 * If the client is in the middle of a MULTI/EXEC, log error and quit. */ 380 if (fakeClient->flags & REDIS_MULTI) goto readerr; 381 382 fclose(fp); 383 freeFakeClient(fakeClient); 384 server.appendonly = appendonly; 385 stopLoading(); 386 aofUpdateCurrentSize(); 387 server.auto_aofrewrite_base_size = server.appendonly_current_size; 388 return REDIS_OK; 389 390 readerr: 391 if (feof(fp)) { 392 redisLog(REDIS_WARNING, " Unexpected end of file reading the append only file " ); 393 } else { 394 redisLog(REDIS_WARNING, " Unrecoverable error reading the append only file: %s " , strerror(errno)); 395 } 396 exit( 1 ); 397 fmterr: 398 redisLog(REDIS_WARNING, " Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename> " ); 399 exit( 1 ); 400 } 401 402 /* Write a sequence of commands able to fully rebuild the dataset into 403 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */ 404 int rewriteAppendOnlyFile( char * filename) { 405 dictIterator *di = NULL; 406 dictEntry * de; 407 FILE * fp; 408 char tmpfile[ 256 ]; 409 int j; 410 time_t now = time(NULL); 411 412 /* Note that we have to use a different temp name here compared to the 413 * one used by rewriteAppendOnlyFileBackground() function. */ 414 snprintf(tmpfile, 256 , " temp-rewriteaof-%d.aof " , ( int ) getpid()); 415 fp = fopen(tmpfile, " w " ); 416 if (! fp) { 417 redisLog(REDIS_WARNING, " Failed rewriting the append only file: %s " , strerror(errno)); 418 return REDIS_ERR; 419 } 420 for (j = 0 ; j < server.dbnum; j++ ) { 421 char selectcmd[] = " *2\r\n$6\r\nSELECT\r\n " ; 422 redisDb *db = server.db+ j; 423 dict *d = db-> dict; 424 if (dictSize(d) == 0 ) continue ; 425 di = dictGetSafeIterator(d); 426 if (! di) { 427 fclose(fp); 428 return REDIS_ERR; 429 } 430 431 /* SELECT the new DB */ 432 if (fwrite(selectcmd, sizeof (selectcmd)- 1 , 1 ,fp) == 0 ) goto werr; 433 if (fwriteBulkLongLong(fp,j) == 0 ) goto werr; 434 435 /* Iterate this DB writing every entry */ 436 while ((de = dictNext(di)) != NULL) { 437 sds keystr = dictGetEntryKey(de); 438 robj key, * o; 439 time_t expiretime; 440 int swapped; 441 442 keystr = dictGetEntryKey(de); 443 o = dictGetEntryVal(de); 444 initStaticStringObject(key,keystr); 445 /* If the value for this key is swapped, load a preview in memory. 446 * We use a "swapped" flag to remember if we need to free the 447 * value object instead to just increment the ref count anyway 448 * in order to avoid copy-on-write of pages if we are forked() */ 449 if (!server.vm_enabled || o->storage == REDIS_VM_MEMORY || 450 o->storage == REDIS_VM_SWAPPING) { 451 swapped = 0 ; 452 } else { 453 o = vmPreviewObject(o); 454 swapped = 1 ; 455 } 456 expiretime = getExpire(db,& key); 457 458 /* Save the key and associated value */ 459 if (o->type == REDIS_STRING) { 460 /* Emit a SET command */ 461 char cmd[]= " *3\r\n$3\r\nSET\r\n " ; 462 if (fwrite(cmd, sizeof (cmd)- 1 , 1 ,fp) == 0 ) goto werr; 463 /* Key and value */ 464 if (fwriteBulkObject(fp,&key) == 0 ) goto werr; 465 if (fwriteBulkObject(fp,o) == 0 ) goto werr; 466 } else if (o->type == REDIS_LIST) { 467 /* Emit the RPUSHes needed to rebuild the list */ 468 char cmd[]= " *3\r\n$5\r\nRPUSH\r\n " ; 469 if (o->encoding == REDIS_ENCODING_ZIPLIST) { 470 unsigned char *zl = o-> ptr; 471 unsigned char *p = ziplistIndex(zl, 0 ); 472 unsigned char * vstr; 473 unsigned int vlen; 474 long long vlong; 475 476 while (ziplistGet(p,&vstr,&vlen,& vlong)) { 477 if (fwrite(cmd, sizeof (cmd)- 1 , 1 ,fp) == 0 ) goto werr; 478 if (fwriteBulkObject(fp,&key) == 0 ) goto werr; 479 if (vstr) { 480 if (fwriteBulkString(fp,( char *)vstr,vlen) == 0 ) 481 goto werr; 482 } else { 483 if (fwriteBulkLongLong(fp,vlong) == 0 ) 484 goto werr; 485 } 486 p = ziplistNext(zl,p); 487 } 488 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) { 489 list *list = o-> ptr; 490 listNode * ln; 491 listIter li; 492 493 listRewind(list,& li); 494 while ((ln = listNext(& li))) { 495 robj *eleobj = listNodeValue(ln); 496 497 if (fwrite(cmd, sizeof (cmd)- 1 , 1 ,fp) == 0 ) goto werr; 498 if (fwriteBulkObject(fp,&key) == 0 ) goto werr; 499 if (fwriteBulkObject(fp,eleobj) == 0 ) goto werr; 500 } 501 } else { 502 redisPanic( " Unknown list encoding " ); 503 } 504 } else if (o->type == REDIS_SET) { 505 char cmd[]= " *3\r\n$4\r\nSADD\r\n " ; 506 507 /* Emit the SADDs needed to rebuild the set */ 508 if (o->encoding == REDIS_ENCODING_INTSET) { 509 int ii = 0 ; 510 int64_t llval; 511 while (intsetGet(o->ptr,ii++,& llval)) { 512 if (fwrite(cmd, sizeof (cmd)- 1 , 1 ,fp) == 0 ) goto werr; 513 if (fwriteBulkObject(fp,&key) == 0 ) goto werr; 514 if (fwriteBulkLongLong(fp,llval) == 0 ) goto werr; 515 } 516 } else if (o->encoding == REDIS_ENCODING_HT) { 517 dictIterator *di = dictGetIterator(o-> ptr); 518 dictEntry * de; 519 while ((de = dictNext(di)) != NULL) { 520 robj *eleobj = dictGetEntryKey(de); 521 if (fwrite(cmd, sizeof (cmd)- 1 , 1 ,fp) == 0 ) goto werr; 522 if (fwriteBulkObject(fp,&key) == 0 ) goto werr; 523 if (fwriteBulkObject(fp,eleobj) == 0 ) goto werr; 524 } 525 dictReleaseIterator(di); 526 } else { 527 redisPanic( " Unknown set encoding " ); 528 } 529 } else if (o->type == REDIS_ZSET) { 530 /* Emit the ZADDs needed to rebuild the sorted set */ 531 char cmd[]= " *4\r\n$4\r\nZADD\r\n " ; 532 533 if (o->encoding == REDIS_ENCODING_ZIPLIST) { 534 unsigned char *zl = o-> ptr; 535 unsigned char *eptr, * sptr; 536 unsigned char * vstr; 537 unsigned int vlen; 538 long long vll; 539 double score; 540 541 eptr = ziplistIndex(zl, 0 ); 542 redisAssert(eptr != NULL); 543 sptr = ziplistNext(zl,eptr); 544 redisAssert(sptr != NULL); 545 546 while (eptr != NULL) { 547 redisAssert(ziplistGet(eptr,&vstr,&vlen,& vll)); 548 score = zzlGetScore(sptr); 549 550 if (fwrite(cmd, sizeof (cmd)- 1 , 1 ,fp) == 0 ) goto werr; 551 if (fwriteBulkObject(fp,&key) == 0 ) goto werr; 552 if (fwriteBulkDouble(fp,score) == 0 ) goto werr; 553 if (vstr != NULL) { 554 if (fwriteBulkString(fp,( char *)vstr,vlen) == 0 ) 555 goto werr; 556 } else { 557 if (fwriteBulkLongLong(fp,vll) == 0 ) 558 goto werr; 559 } 560 zzlNext(zl,&eptr,& sptr); 561 } 562 } else if (o->encoding == REDIS_ENCODING_SKIPLIST) { 563 zset *zs = o-> ptr; 564 dictIterator *di = dictGetIterator(zs-> dict); 565 dictEntry * de; 566 567 while ((de = dictNext(di)) != NULL) { 568 robj *eleobj = dictGetEntryKey(de); 569 double *score = dictGetEntryVal(de); 570 571 if (fwrite(cmd, sizeof (cmd)- 1 , 1 ,fp) == 0 ) goto werr; 572 if (fwriteBulkObject(fp,&key) == 0 ) goto werr; 573 if (fwriteBulkDouble(fp,*score) == 0 ) goto werr; 574 if (fwriteBulkObject(fp,eleobj) == 0 ) goto werr; 575 } 576 dictReleaseIterator(di); 577 } else { 578 redisPanic( " Unknown sorted set encoding " ); 579 } 580 } else if (o->type == REDIS_HASH) { 581 char cmd[]= " *4\r\n$4\r\nHSET\r\n " ; 582 583 /* Emit the HSETs needed to rebuild the hash */ 584 if (o->encoding == REDIS_ENCODING_ZIPMAP) { 585 unsigned char *p = zipmapRewind(o-> ptr); 586 unsigned char *field, * val; 587 unsigned int flen, vlen; 588 589 while ((p = zipmapNext(p,&field,&flen,&val,&vlen)) != NULL) { 590 if (fwrite(cmd, sizeof (cmd)- 1 , 1 ,fp) == 0 ) goto werr; 591 if (fwriteBulkObject(fp,&key) == 0 ) goto werr; 592 if (fwriteBulkString(fp,( char *)field,flen) == 0 ) 593 goto werr; 594 if (fwriteBulkString(fp,( char *)val,vlen) == 0 ) 595 goto werr; 596 } 597 } else { 598 dictIterator *di = dictGetIterator(o-> ptr); 599 dictEntry * de; 600 601 while ((de = dictNext(di)) != NULL) { 602 robj *field = dictGetEntryKey(de); 603 robj *val = dictGetEntryVal(de); 604 605 if (fwrite(cmd, sizeof (cmd)- 1 , 1 ,fp) == 0 ) goto werr; 606 if (fwriteBulkObject(fp,&key) == 0 ) goto werr; 607 if (fwriteBulkObject(fp,field) == 0 ) goto werr; 608 if (fwriteBulkObject(fp,val) == 0 ) goto werr; 609 } 610 dictReleaseIterator(di); 611 } 612 } else { 613 redisPanic( " Unknown object type " ); 614 } 615 /* Save the expire time */ 616 if (expiretime != - 1 ) { 617 char cmd[]= " *3\r\n$8\r\nEXPIREAT\r\n " ; 618 /* If this key is already expired skip it */ 619 if (expiretime < now) continue ; 620 if (fwrite(cmd, sizeof (cmd)- 1 , 1 ,fp) == 0 ) goto werr; 621 if (fwriteBulkObject(fp,&key) == 0 ) goto werr; 622 if (fwriteBulkLongLong(fp,expiretime) == 0 ) goto werr; 623 } 624 if (swapped) decrRefCount(o); 625 } 626 dictReleaseIterator(di); 627 } 628 629 /* Make sure data will not remain on the OS's output buffers */ 630 fflush(fp); 631 aof_fsync(fileno(fp)); 632 fclose(fp); 633 634 /* Use RENAME to make sure the DB file is changed atomically only 635 * if the generate DB file is ok. */ 636 if (rename(tmpfile,filename) == - 1 ) { 637 redisLog(REDIS_WARNING, " Error moving temp append only file on the final destination: %s " , strerror(errno)); 638 unlink(tmpfile); 639 return REDIS_ERR; 640 } 641 redisLog(REDIS_NOTICE, " SYNC append only file rewrite performed " ); 642 return REDIS_OK; 643 644 werr: 645 fclose(fp); 646 unlink(tmpfile); 647 redisLog(REDIS_WARNING, " Write error writing append only file on disk: %s " , strerror(errno)); 648 if (di) dictReleaseIterator(di); 649 return REDIS_ERR; 650 } 651 652 /* This is how rewriting of the append only file in background works: 653 * 654 * 1) The user calls BGREWRITEAOF 655 * 2) Redis calls this function, that forks(): 656 * 2a) the child rewrite the append only file in a temp file. 657 * 2b) the parent accumulates differences in server.bgrewritebuf. 658 * 3) When the child finished '2a' exists. 659 * 4) The parent will trap the exit code, if it's OK, will append the 660 * data accumulated into server.bgrewritebuf into the temp file, and 661 * finally will rename(2) the temp file in the actual file name. 662 * The the new file is reopened as the new append only file. Profit! 663 */ 664 int rewriteAppendOnlyFileBackground( void ) { 665 pid_t childpid; 666 long long start; 667 668 if (server.bgrewritechildpid != - 1 ) return REDIS_ERR; 669 if (server.vm_enabled) waitEmptyIOJobsQueue(); 670 start = ustime(); 671 if ((childpid = fork()) == 0 ) { 672 char tmpfile[ 256 ]; 673 674 /* Child */ 675 if (server.vm_enabled) vmReopenSwapFile(); 676 if (server.ipfd > 0 ) close(server.ipfd); 677 if (server.sofd > 0 ) close(server.sofd); 678 snprintf(tmpfile, 256 , " temp-rewriteaof-bg-%d.aof " , ( int ) getpid()); 679 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) { 680 _exit( 0 ); 681 } else { 682 _exit( 1 ); 683 } 684 } else { 685 /* Parent */ 686 server.stat_fork_time = ustime()- start; 687 if (childpid == - 1 ) { 688 redisLog(REDIS_WARNING, 689 " Can't rewrite append only file in background: fork: %s " , 690 strerror(errno)); 691 return REDIS_ERR; 692 } 693 redisLog(REDIS_NOTICE, 694 " Background append only file rewriting started by pid %d " ,childpid); 695 server.aofrewrite_scheduled = 0 ; 696 server.bgrewritechildpid = childpid; 697 updateDictResizePolicy(); 698 /* We set appendseldb to -1 in order to force the next call to the 699 * feedAppendOnlyFile() to issue a SELECT command, so the differences 700 * accumulated by the parent into server.bgrewritebuf will start 701 * with a SELECT statement and it will be safe to merge. */ 702 server.appendseldb = - 1 ; 703 return REDIS_OK; 704 } 705 return REDIS_OK; /* unreached */ 706 } 707 708 void bgrewriteaofCommand(redisClient * c) { 709 if (server.bgrewritechildpid != - 1 ) { 710 addReplyError(c, " Background append only file rewriting already in progress " ); 711 } else if (server.bgsavechildpid != - 1 ) { 712 server.aofrewrite_scheduled = 1 ; 713 addReplyStatus(c, " Background append only file rewriting scheduled " ); 714 } else if (rewriteAppendOnlyFileBackground() == REDIS_OK) { 715 addReplyStatus(c, " Background append only file rewriting started " ); 716 } else { 717 addReply(c,shared.err); 718 } 719 } 720 721 void aofRemoveTempFile(pid_t childpid) { 722 char tmpfile[ 256 ]; 723 724 snprintf(tmpfile, 256 , " temp-rewriteaof-bg-%d.aof " , ( int ) childpid); 725 unlink(tmpfile); 726 } 727 728 /* Update the server.appendonly_current_size filed explicitly using stat(2) 729 * to check the size of the file. This is useful after a rewrite or after 730 * a restart, normally the size is updated just adding the write length 731 * to the current lenght, that is much faster. */ 732 void aofUpdateCurrentSize( void ) { 733 struct redis_stat sb; 734 735 if (redis_fstat(server.appendfd,&sb) == - 1 ) { 736 redisLog(REDIS_WARNING, " Unable to check the AOF length: %s " , 737 strerror(errno)); 738 } else { 739 server.appendonly_current_size = sb.st_size; 740 } 741 } 742 743 /* A background append only file rewriting (BGREWRITEAOF) terminated its work. 744 * Handle this. */ 745 void backgroundRewriteDoneHandler( int statloc) { 746 int exitcode = WEXITSTATUS(statloc); 747 int bysignal = WIFSIGNALED(statloc); 748 749 if (!bysignal && exitcode == 0 ) { 750 int newfd, oldfd; 751 int nwritten; 752 char tmpfile[ 256 ]; 753 long long now = ustime(); 754 755 redisLog(REDIS_NOTICE, 756 " Background AOF rewrite terminated with success " ); 757 758 /* Flush the differences accumulated by the parent to the 759 * rewritten AOF. */ 760 snprintf(tmpfile, 256 , " temp-rewriteaof-bg-%d.aof " , 761 ( int )server.bgrewritechildpid); 762 newfd = open(tmpfile,O_WRONLY| O_APPEND); 763 if (newfd == - 1 ) { 764 redisLog(REDIS_WARNING, 765 " Unable to open the temporary AOF produced by the child: %s " , strerror(errno)); 766 goto cleanup; 767 } 768 769 nwritten = write(newfd,server.bgrewritebuf,sdslen(server.bgrewritebuf)); 770 if (nwritten != (signed)sdslen(server.bgrewritebuf)) { 771 if (nwritten == - 1 ) { 772 redisLog(REDIS_WARNING, 773 " Error trying to flush the parent diff to the rewritten AOF: %s " , strerror(errno)); 774 } else { 775 redisLog(REDIS_WARNING, 776 " Short write trying to flush the parent diff to the rewritten AOF: %s " , strerror(errno)); 777 } 778 close(newfd); 779 goto cleanup; 780 } 781 782 redisLog(REDIS_NOTICE, 783 " Parent diff successfully flushed to the rewritten AOF (%lu bytes) " , nwritten); 784 785 /* The only remaining thing to do is to rename the temporary file to 786 * the configured file and switch the file descriptor used to do AOF 787 * writes. We don't want close(2) or rename(2) calls to block the 788 * server on old file deletion. 789 * 790 * There are two possible scenarios: 791 * 792 * 1) AOF is DISABLED and this was a one time rewrite. The temporary 793 * file will be renamed to the configured file. When this file already 794 * exists, it will be unlinked, which may block the server. 795 * 796 * 2) AOF is ENABLED and the rewritten AOF will immediately start 797 * receiving writes. After the temporary file is renamed to the 798 * configured file, the original AOF file descriptor will be closed. 799 * Since this will be the last reference to that file, closing it 800 * causes the underlying file to be unlinked, which may block the 801 * server. 802 * 803 * To mitigate the blocking effect of the unlink operation (either 804 * caused by rename(2) in scenario 1, or by close(2) in scenario 2), we 805 * use a background thread to take care of this. First, we 806 * make scenario 1 identical to scenario 2 by opening the target file 807 * when it exists. The unlink operation after the rename(2) will then 808 * be executed upon calling close(2) for its descriptor. Everything to 809 * guarantee atomicity for this switch has already happened by then, so 810 * we don't care what the outcome or duration of that close operation 811 * is, as long as the file descriptor is released again. */ 812 if (server.appendfd == - 1 ) { 813 /* AOF disabled */ 814 815 /* Don't care if this fails: oldfd will be -1 and we handle that. 816 * One notable case of -1 return is if the old file does 817 * not exist. */ 818 oldfd = open(server.appendfilename,O_RDONLY| O_NONBLOCK); 819 } else { 820 /* AOF enabled */ 821 oldfd = - 1 ; /* We'll set this to the current AOF filedes later. */ 822 } 823 824 /* Rename the temporary file. This will not unlink the target file if 825 * it exists, because we reference it with "oldfd". */ 826 if (rename(tmpfile,server.appendfilename) == - 1 ) { 827 redisLog(REDIS_WARNING, 828 " Error trying to rename the temporary AOF: %s " , strerror(errno)); 829 close(newfd); 830 if (oldfd != - 1 ) close(oldfd); 831 goto cleanup; 832 } 833 834 if (server.appendfd == - 1 ) { 835 /* AOF disabled, we don't need to set the AOF file descriptor 836 * to this new file, so we can close it. */ 837 close(newfd); 838 } else { 839 /* AOF enabled, replace the old fd with the new one. */ 840 oldfd = server.appendfd; 841 server.appendfd = newfd; 842 if (server.appendfsync == APPENDFSYNC_ALWAYS) 843 aof_fsync(newfd); 844 else if (server.appendfsync == APPENDFSYNC_EVERYSEC) 845 aof_background_fsync(newfd); 846 server.appendseldb = - 1 ; /* Make sure SELECT is re-issued */ 847 aofUpdateCurrentSize(); 848 server.auto_aofrewrite_base_size = server.appendonly_current_size; 849 850 /* Clear regular AOF buffer since its contents was just written to 851 * the new AOF from the background rewrite buffer. */ 852 sdsfree(server.aofbuf); 853 server.aofbuf = sdsempty(); 854 } 855 856 redisLog(REDIS_NOTICE, " Background AOF rewrite successful " ); 857 858 /* Asynchronously close the overwritten AOF. */ 859 if (oldfd != - 1 ) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,( void *)( long )oldfd,NULL,NULL); 860 861 redisLog(REDIS_VERBOSE, 862 " Background AOF rewrite signal handler took %lldus " , ustime()- now); 863 } else if (!bysignal && exitcode != 0 ) { 864 redisLog(REDIS_WARNING, 865 " Background AOF rewrite terminated with error " ); 866 } else { 867 redisLog(REDIS_WARNING, 868 " Background AOF rewrite terminated by signal %d " , 869 WTERMSIG(statloc)); 870 } 871 872 cleanup: 873 sdsfree(server.bgrewritebuf); 874 server.bgrewritebuf = sdsempty(); 875 aofRemoveTempFile(server.bgrewritechildpid); 876 server.bgrewritechildpid = - 1 ; 877 }