redis6.0.5之BIO阅读笔记-后台IO操作

更新时间:2023-07-17 13:51:47 阅读: 评论:0

redis6.0.5之BIO阅读笔记-后台IO操作#ifndef __BIO_H
#define __BIO_H
/* Exported API */供调⽤的API
void bioInit(void); 后台IO初始化
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3); 创建后台IO任务
unsigned long long bioPendingJobsOfType(int type); 根据类型挂起后台任务
unsigned long long bioWaitStepOfType(int type); 根据类型等待任务
time_t bioOlderJobOfType(int type);
void bioKillThreads(void);停⽌IO线程
/* Background job opcodes */后台作业操作码
#define BIO_CLOSE_FILE    0 /* Deferred clo(2) syscall. */ 延迟进⾏关闭的系统调⽤
#define BIO_AOF_FSYNC    1 /* Deferred AOF fsync. */ 延迟将内存中的已修改的数据保存到存储设备
#define BIO_LAZY_FREE    2 /* Deferred objects freeing. */ 延迟对象释放
#define BIO_NUM_OPS      3
#endif
******************************************************************************************
/* Background I/O rvice for Redis. Redis的后台I/O服务
*
* This file implements operations that we need to perform in the background.
* Currently there is only a single operation, that is a background clo(2)
* system call. This is needed as when the process is the last owner of a
* reference to a file closing it means unlinking it, and the deletion of the
* file is slow, blocking the rver.日本投降书
这个⽂件实现了那些我们需要在后台执⾏的操作。当前只实现了⼀个操作,就是后台关闭的系统调⽤。
当进程是最后⼀个该⽂件的拥有者,当关闭该进程时意味着失去该⽂件的联系,直接删除该⽂件⽐较慢,会阻塞服务,
这⾏情况下就需要这个后台关闭系统调⽤。
* In the future we'll either continue implementing new things we need or
* we'll switch to libeio. However there are probably long term us for this
* file as we may want to put here Redis specific background tasks (for instance
* it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
* implementation).
将来我们或者会继续实现我们需要的新接⼝,或者转⽽使⽤libeio库。然⽽很长⼀个时期我们将使⽤这
个⽂件
因为我们可能希望在这⾥放置特定于Redis的后台任务(例如,我们不可能需要⾮阻塞FLUSHDB/FLUSHALL实现)
* DESIGN 设计
香芋焖排骨* ------
*
* The design is trivial, we have a structure reprenting a job to perform
* and a different thread and job queue for every job type.
全家福配文字经典句子
这个设计很平凡,我们有⼀个表⽰要执⾏的作业的结构,每个作业类型有⼀个不同的线程和作业队列。
* Every thread waits for new jobs in its queue, and process every job
* quentially.
每个线程在其队列中等待新作业,并按顺序处理每个作业
* Jobs of the same type are guaranteed to be procesd from the least
* recently inrted to the most recently inrted (older jobs procesd
* first).
同⼀类型的作业保证先进先出(先处理较旧的作业)
* Currently there is no way for the creator of the job to be notified about
* the completion of the operation, this will only be added when/if needed.
⽬前⽆法通知任务创建者操作已完成,如果有有需要可以添加
#include "rver.h"白居易代表作
#include "bio.h"
static pthread_t bio_threads[BIO_NUM_OPS];  io线程数组
static pthread_mutex_t bio_mutex[BIO_NUM_OPS]; 线程互斥锁数组
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS]; 新任务条件变量数组
static pthread_cond_t bio_step_cond[BIO_NUM_OPS]; 步骤的条件变量数组
static list *bio_jobs[BIO_NUM_OPS]; 任务列表数组
/* The following array is ud to hold the number of pending jobs for every
* OP type. This allows us to export the bioPendingJobsOfType() API that is
* uful when the main thread wants to perform some operation that may involve
* objects shared with the background thread. The main thread will just wait
* that there are no longer jobs of this type to be executed before performing
* the nsible operation. This data is also uful for reporting. */
下⾯这个数组是⽤来保持每种不同类型等待任务的数量。
这允许我们使⽤API bioPendingJobsOfType,当主线程想要执⾏⼀些可能涉及与后台线程共享的对象的操作时,该API⾮常有⽤。static unsigned long long bio_pending[BIO_NUM_OPS]; 保存每种类型等待任务数量的数组
/* This structure reprents a background Job. It is only ud locally to this
* file as the API does not expo the internals at all. */
这个结构表⽰⼀个后台任务。它只是⽤在这个⽂件api中,不对外暴露
struct bio_job {
time_t time; /* Time at which the job was created. */任务创建的时间
/* Job specific arguments pointers. If we need to pass more than three
* arguments we can just pass a pointer to a structure or alike. */
任务特定的参数指针。如果我们需要传⼊超过三个参数,我们可以传⼊⼀个指向结构或者类似结构的指针
void *arg1, *arg2, *arg3;
};
void *bioProcessBackgroundJobs(void *arg);
void lazyfreeFreeObjectFromBioThread(robj *o);
void lazyfreeFreeDatabaFromBioThread(dict *ht1, dict *ht2);
void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl);
/* Make sure we have enough stack to perform all the things we do in the
* main thread. */
确保我们有⾜够的堆栈来执⾏我们在主线程中所做的所有事情。
#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
/* Initialize the background system, spawning the thread. */
初始化后台系统,⽣成线程
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* Initialization of state vars and objects */初始化状态变量和对象
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL); 初始化锁
pthread_cond_init(&bio_newjob_cond[j],NULL); 新任务条件变量
三月三广西pthread_cond_init(&bio_step_cond[j],NULL); 步骤条件变量
马小媛
bio_jobs[j] = listCreate();  任务列表
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system */
设置栈的默认⼤⼩(4M),因为在⼀些系统中栈可能太⼩
pthread_attr_init(&attr); 初始化线程属性
pthread_attr_getstacksize(&attr,&stacksize); 获取线程栈的⼤⼩
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */吐槽,这个世界到处都是Solaris系统的补丁
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; 如果线程的栈空间⼩于4M,那么⼀直翻倍,直到⼤于等于4M为⽌    pthread_attr_tstacksize(&attr, stacksize); 重置线程栈空间
/* Ready to spawn our threads. We u the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
准备⽣产我们的线程,我们使⽤接受单个参数的线程函数,⽬的是为了传递任务线程负责的任务ID.
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { 创建新线程
rverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread; 获取新线程ID
}
}
创建单个后台任务
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]); 针对这个类型加线程互斥锁
listAddNodeTail(bio_jobs[type],job); 将创建的新任务挂到对应类型的任务列表上
bio_pending[type]++; 该类型挂起任务+1
pthread_cond_signal(&bio_newjob_cond[type]);给该种类型的发送信号,解除阻塞
pthread_mutex_unlock(&bio_mutex[type]); 解锁,这样可以继续执⾏
}
处理后台任务
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigt_t sigt;
/* Check that the type is within the right interval. */
我与传统文化作文检查类型是否在规定的范围内
if (type >= BIO_NUM_OPS) { 只有3种类型,超过就是有问题
rverLog(LL_WARNING,
"Warning: bio thread started with wrong type %lu",type);
return NULL;
}
switch (type) {
ca BIO_CLOSE_FILE:
redis_t_thread_title("bio_clo_file");
break;
ca BIO_AOF_FSYNC:
redis_t_thread_title("bio_aof_fsync");
break;
ca BIO_LAZY_FREE:
redis_t_thread_title("bio_lazy_free");
break;
}
redisSetCpuAffinity(rver.bio_cpulist); 设置亲和CPU
/* Make the thread killable at any time, so that bioKillThreads()
* can work reliably. */确认线程可以在任何时候被停⽌,这样函数bioKillThreads就能可靠的⼯作
pthread_tcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
提出请求。线程在取消请求(pthread_cancel)发出后会继续运⾏,直到到达某个取消点(CancellationPoint)
pthread_tcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); 设置本线程取消动作的执⾏时机,⽴即执⾏取消动作(退出)    pthread_mutex_lock(&bio_mutex[type]);
/* Block SIGALRM so we are sure that only the main thread will
* receive the watchdog signal. */
阻⽌SIGALRM,这样我们就可以确保只有主线程才会收到看门狗信号
sigemptyt(&sigt);清空信号集
sigaddt(&sigt, SIGALRM);添加警告信号
if (pthread_sigmask(SIG_BLOCK, &sigt, NULL))
rverLog(LL_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */循环开始的时候需要锁住
if (listLength(bio_jobs[type]) == 0) { 如果没有挂起的任务,就开始等待
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); 设置等待条件(这⾥函数pthread_cond_wait会释放锁)continue; 继续循环,直到等待条件满⾜
}
/* Pop the job from the queue. */从挂起队列中弹出任务
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
现在可以解锁后台系统,因为我们知道有⼀个独⽴的作业结构要处理
pthread_mutex_unlock(&bio_mutex[type]);解锁
/* Process the job accordingly to its type. */根据任务类型处理任务
if (type == BIO_CLOSE_FILE) {
clo((long)job->arg1);
} el if (type == BIO_AOF_FSYNC) {
redis_fsync((long)job->arg1);
} el if (type == BIO_LAZY_FREE) {
/* What we free changes depending on what arguments are t:
我们释放的内容取决于设置的参数
* arg1 -> free the object at pointer. 参数1 在指针处释放对象
* arg2 & arg3 -> free two dictionaries (a Redis DB). 参数2和3 释放两个字典(⼀个redis数据库)
* only arg3 -> free the skiplist. */只有参数3  释放跳表
if (job->arg1)
lazyfreeFreeObjectFromBioThread(job->arg1);
el if (job->arg2 && job->arg3)
lazyfreeFreeDatabaFromBioThread(job->arg2,job->arg3);
el if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} el {
rverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job); 处理了任务,就可以释放了
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
在进⼊循环迭代之前再次锁住,如果没有挂起的任务需要处理,我们将在函数pthread_cond_wait中再次阻塞
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln); 删除节点
bio_pending[type]--;挂起任务减少1个
/* Unblock threads blocked on bioWaitStepOfType() if any. */
解除在函数bioWaitStepOfType中阻塞的线程
pthread_cond_broadcast(&bio_step_cond[type]);
}
}
/* Return the number of pending jobs of the specified type. */
返回特定类型挂起任务的数量
unsigned long long bioPendingJobsOfType(int type) {
unsigned long long val;
pthread_mutex_lock(&bio_mutex[type]); 先锁住,不锁住值会变化
val = bio_pending[type]; 再获取值
pthread_mutex_unlock(&bio_mutex[type]); 解锁
return val;
}
/* If there are pending jobs for the specified type, the function blocks
* and waits that the next job was procesd. Otherwi the function
* does not block and returns ASAP.
如果存在指定类型的挂起任务,函数阻塞和等待下个任务的处理。否则不阻塞并且尽快返回。
家常鸡肉* The function returns the number of jobs still to process of the
* requested type.
函数返回仍要处理的请求类型的作业数
* This function is uful when from another thread, we want to wait
* a bio.c thread to do more work in a blocking way.
当我们从另⼀个线程等待bio.c线程以阻塞⽅式执⾏更多⼯作时,此函数⾮常有⽤
*/
unsigned long long bioWaitStepOfType(int type) {
unsigned long long val;
pthread_mutex_lock(&bio_mutex[type]); 锁定
val = bio_pending[type];
if (val != 0) {
pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]); 等待步骤的条件变量
val = bio_pending[type];
}
pthread_mutex_unlock(&bio_mutex[type]); 解锁
return val;
}
/* Kill the running bio threads in an unclean way. This function should be
* ud only when it's critical to stop the threads for some reason.
* Currently Redis does this only on crash (for instance on SIGSEGV) in order
* to perform a fast memory check without other threads messing with memory. */
⽤粗鲁的⽅式终⽌运⾏的bio线程。仅当出于某种原因停⽌线程⾮常迫切时,才应使⽤此函数(这个函数是迫不得已⽽⽤的)⽬前,Redis仅在崩溃时(例如在SIGSEGV上)执⾏此操作,以便执⾏快速内存检查,⽽不让其他线程⼲扰内存
void bioKillThreads(void) {
int err, j;
for (j = 0; j < BIO_NUM_OPS; j++) {
if (bio_threads[j] && pthread_cancel(bio_threads[j]) == 0) {  取消线程
if ((err = pthread_join(bio_threads[j],NULL)) != 0) { 等待线程结束
rverLog(LL_WARNING,
"Bio thread for job type #%d can be joined: %s",
j, strerror(err));
} el {
rverLog(LL_WARNING,
"Bio thread for job type #%d terminated",j);
}
}
}
}
***************************************************************************************************

本文发布于:2023-07-17 13:51:47,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/89/1085129.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:任务   线程   类型   等待   后台   需要
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图