工作队列:
内核中所有的工作队列都是由helper_wq工作队列创建的,那么helper_wq是谁创建的呢?答案是直接执行的。看一下代码:
static __init int helper_init(void)
{
helper_wq = create_singlethread_workqueue("kthread");
BUG_ON(!helper_wq);
return 0;
}
core_initcall(helper_init);
看看create_workqueue函数是怎么实现的:
struct workqueue_struct *__create_workqueue(const char *name, int singlethread)
{
int cpu, destroy = 0;
struct workqueue_struct *wq;
struct task_struct *p;
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
return NULL;
wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
if (!wq->cpu_wq) {
kfree(wq);
return NULL;
}
wq->name = name;
lock_cpu_hotplug();
if (singlethread) {
INIT_LIST_HEAD(&wq->list);
p = create_workqueue_thread(wq, singlethread_cpu);
if (!p)
destroy = 1;
else
wake_up_process(p);
} else {
spin_lock(&workqueue_lock);
list_add(&wq->list, &workqueues);
spin_unlock(&workqueue_lock);
for_each_online_cpu(cpu) {
p = create_workqueue_thread(wq, cpu);
if (p) {
kthread_bind(p, cpu);
wake_up_process(p);
} else
destroy = 1;
}
}
unlock_cpu_hotplug();
if (destroy) {
destroy_workqueue(wq);
wq = NULL;
}
return wq;
}
其中wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct)这一句创建了
struct cpu_workqueue_struct {
spinlock_t lock;
long remove_sequence;
long insert_sequence;
struct list_head worklist;
wait_queue_head_t more_work;
wait_queue_head_t work_done;
struct workqueue_struct *wq;
task_t *thread;
int run_depth;
} ____cacheline_aligned;
以上的这个结构是每个cpu结构,它是内嵌在
struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq;
const char *name;
struct list_head list; /* Empty if single thread */
};
的cpu_wq数组字段里面的。作为消遣,看看__alloc_percpu函数:
void *__alloc_percpu(size_t size)
{
int i;
struct percpu_data *pdata = kmalloc(sizeof(*pdata), GFP_KERNEL);
if (!pdata)
return NULL;
for_each_possible_cpu(i) {
int node = cpu_to_node(i);
if (node_online(node))
pdata->ptrs[i] = kmalloc_node(size, GFP_KERNEL, node);
else
pdata->ptrs[i] = kmalloc(size, GFP_KERNEL);
if (!pdata->ptrs[i])
goto unwind_oom;
memset(pdata->ptrs[i], 0, size);
}
return (void *)(~(unsigned long)pdata);
unwind_oom:
while (--i >= 0) {
if (!cpu_possible(i))
continue;
kfree(pdata->ptrs[i]);
}
kfree(pdata);
return NULL;
}
下面开始真正的重要过程:
static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
int cpu)
{
struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
struct task_struct *p;
spin_lock_init(&cwq->lock);
cwq->wq = wq;
cwq->thread = NULL;
cwq->insert_sequence = 0;
cwq->remove_sequence = 0;
INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work);
init_waitqueue_head(&cwq->work_done);
if (is_single_threaded(wq))
p = kthread_create(worker_thread, cwq, "%s", wq->name);
else
p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
if (IS_ERR(p))
return NULL;
cwq->thread = p;
return p;
}
struct task_struct *kthread_create(int (*threadfn)(void *data),
void *data,
const char namefmt[],
...)
{
struct kthread_create_info create;
DECLARE_WORK(work, keventd_create_kthread, &create);
create.threadfn = threadfn;
create.data = data;
init_completion(&create.started);
init_completion(&create.done);
if (!helper_wq)//注意这一句,如果没有helper_wq,那么就直接执行回调函数,这里就是keventd_create_kthread
work.func(work.data);
else {
queue_work(helper_wq, &work);
wait_for_completion(&create.done);
}
if (!IS_ERR(create.result)) {
va_list args;
va_start(args, namefmt);
vsnprintf(create.result->comm, sizeof(create.result->comm),
namefmt, args);
va_end(args);
}
return create.result;
}
上面的过程确实有点绕,实际上work.func和queue_work调用执行的是同一个过程,就是执行keventd_create_kthread函数,而前面传入的threadfn也就是worker_thread则是封装进了kthread_create_info结构里面了,要想知道往下该做什么了,就要看keventd_create_kthread函数:
static void keventd_create_kthread(void *_create)
{
struct kthread_create_info *create = _create;
int pid;
pid = kernel_thread(kthread, create, CLONE_FS | CLONE_FILES | SIGCHLD);
if (pid create->result = ERR_PTR(pid);
} else {
wait_for_completion(&create->started);
read_lock(&tasklist_lock);
create->result = find_task_by_pid(pid);
read_unlock(&tasklist_lock);
}
complete(&create->done);
}
到这里才真正启动一个内核线程kthread,然后这个内核线程函数以create为参数,到这儿,几乎可以猜出接下来要做什么了
static int kthread(void *_create)
{
struct kthread_create_info *create = _create;
int (*threadfn)(void *data);
void *data;
sigset_t blocked;
int ret = -EINTR;
kthread_exit_files();
threadfn = create->threadfn;
data = create->data;
sigfillset(&blocked);
sigprocmask(SIG_BLOCK, &blocked, NULL);
flush_signals(current);
set_cpus_allowed(current, CPU_MASK_ALL);
__set_current_state(TASK_INTERRUPTIBLE);
complete(&create->started);
schedule();
if (!kthread_should_stop())
ret = threadfn(data);
if (kthread_should_stop()) {
kthread_stop_info.err = ret;
complete(&kthread_stop_info.done);
}
return 0;
}
唉,最终threadfn(data)搞定了一切,这个threadfn(data)实际上就是
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
DECLARE_WAITQUEUE(wait, current);
struct k_sigaction sa;
sigset_t blocked;
current->flags |= PF_NOFREEZE;
set_user_nice(current, -5);
sigfillset(&blocked);
sigprocmask(SIG_BLOCK, &blocked, NULL);
flush_signals(current);
sa.sa.sa_handler = SIG_IGN;
sa.sa.sa_flags = 0;
siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
set_current_state(TASK_INTERRUPTIBLE);
while (!kthread_should_stop()) {
add_wait_queue(&cwq->more_work, &wait);
if (list_empty(&cwq->worklist))
schedule();
else
__set_current_state(TASK_RUNNING);
remove_wait_queue(&cwq->more_work, &wait);
if (!list_empty(&cwq->worklist))
run_workqueue(cwq);
set_current_state(TASK_INTERRUPTIBLE);
}
__set_current_state(TASK_RUNNING);
return 0;
}
再来一层调用就完事了:run_workqueue(cwq)
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
unsigned long flags;
spin_lock_irqsave(&cwq->lock, flags);
cwq->run_depth++;
if (cwq->run_depth > 3) {
printk("%s: recursion depth exceeded: %d/n",
__FUNCTION__, cwq->run_depth);
dump_stack();
}
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
void (*f) (void *) = work->func;
void *data = work->data;
list_del_init(cwq->worklist.next);
spin_unlock_irqrestore(&cwq->lock, flags);
BUG_ON(work->wq_data != cwq);
clear_bit(0, &work->pending);
f(data);
spin_lock_irqsave(&cwq->lock, flags);
cwq->remove_sequence++;
wake_up(&cwq->work_done);
}
cwq->run_depth--;
spin_unlock_irqrestore(&cwq->lock, flags);
}
以上就是工作队列的全过程,这是一个完美的层次嵌套结构,很完美!如果任何时间想用工作队列解决一些问题那么你只需要做两件事情,一是建立一个工作队列,二是往里面排入任务。
看完了上述代码,下面看一个应用,就是linux的异步io。
static int __init aio_setup(void)
{
kiocb_cachep = kmem_cache_create("kiocb", sizeof(struct kiocb),
0, SLAB_HWCACHE_ALIGN|SLAB_PANIC, NULL, NULL);
kioctx_cachep = kmem_cache_create("kioctx", sizeof(struct kioctx),
0, SLAB_HWCACHE_ALIGN|SLAB_PANIC, NULL, NULL);
aio_wq = create_workqueue("aio");
pr_debug("aio_setup: sizeof(struct page) = %d/n", (int)sizeof(struct page));
return 0;
}
在新内核里,睡眠唤醒和2.4的不同了:为了更加灵活,用回调函数来实现。
#define wake_up(x) __wake_up(x, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 1, NULL)
void fastcall __wake_up(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, void *key)
{
unsigned long flags;
spin_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr_exclusive, 0, key);
spin_unlock_irqrestore(&q->lock, flags);
}
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int sync, void *key)
{
struct list_head *tmp, *next;
list_for_each_safe(tmp, next, &q->task_list) {
wait_queue_t *curr;
unsigned flags;
curr = list_entry(tmp, wait_queue_t, task_list);
flags = curr->flags;
if (curr->func(curr, mode, sync, key) &&
(flags & WQ_FLAG_EXCLUSIVE) &&!--nr_exclusive)
break;
}
}
看看curr->func(curr, mode, sync, key) 这一句,wait_queue_t结构体里面加入了一个函数指针。对于一般的唤醒,就是default_wake_function函数,而对于异步io,就是aio_wake_function:
static int aio_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait);
list_del_init(&wait->task_list);
kick_iocb(iocb);
return 1;
}
kiocb结构是与异步io相关的结构,暂且现在不用深究。接着看kick_iocb
void fastcall kick_iocb(struct kiocb *iocb)
{
if (is_sync_kiocb(iocb)) {
kiocbSetKicked(iocb);
wake_up_process(iocb->ki_obj.tsk);
return;
}
try_queue_kicked_iocb(iocb);
}
重点就是try_queue_kicked_iocb:
static void try_queue_kicked_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
unsigned long flags;
int run = 0;
BUG_ON((!list_empty(&iocb->ki_wait.task_list)));
spin_lock_irqsave(&ctx->ctx_lock, flags);
if (!kiocbTryKick(iocb))
run = __queue_kicked_iocb(iocb);
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
if (run)
aio_queue_work(ctx);
}
__queue_kicked_iocb只是和队列相关的操作,而重点是aio_queue_work函数:
static void aio_queue_work(struct kioctx * ctx)
{
unsigned long timeout;
smp_mb();
if (waitqueue_active(&ctx->wait))
timeout = 1;
else
timeout = HZ/10;
queue_delayed_work(aio_wq, &ctx->wq, timeout);
}
queue_delayed_work应该很熟悉了,就是往前面系统初始化的时候建立的工作队列aio_wq里排入具体的任务ctx->wq那么,真正调度到该工作队列的时候该怎么办呢?答案是调用aio_kick_handler,这需要在某个地方初始化这个工作队列元素:
INIT_WORK(&ctx->wq, aio_kick_handler, ctx);
下面看一下这个函数:
static void aio_kick_handler(void *data)
{
struct kioctx *ctx = data;
mm_segment_t oldfs = get_fs();
int requeue;
set_fs(USER_DS);
use_mm(ctx->mm); //切换到提交aio进程的地址空间,因为O_DIRECT的io不需要内核缓存
spin_lock_irq(&ctx->ctx_lock);
requeue =__aio_run_iocbs(ctx);
unuse_mm(ctx->mm);
spin_unlock_irq(&ctx->ctx_lock);
set_fs(oldfs);
if (requeue)
queue_work(aio_wq, &ctx->wq);
}
__aio_run_iocbs(ctx)是真正完成任务的操作
static int __aio_run_iocbs(struct kioctx *ctx)
{
struct kiocb *iocb;
LIST_HEAD(run_list);
assert_spin_locked(&ctx->ctx_lock);
list_splice_init(&ctx->run_list, &run_list);
while (!list_empty(&run_list)) {
iocb = list_entry(run_list.next, struct kiocb,
ki_run_list);
list_del(&iocb->ki_run_list);
iocb->ki_users++; /* grab extra reference */
aio_run_iocb(iocb);
if (__aio_put_req(ctx, iocb)) /* drop extra ref */
put_ioctx(ctx);
}
if (!list_empty(&ctx->run_list))
return 1;
return 0;
}
进一步 aio_run_iocb:
static ssize_t aio_run_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
ssize_t (*retry)(struct kiocb *);
ssize_t ret;
if (iocb->ki_retried++ > 1024*1024) {
…
}
if (!(iocb->ki_retried & 0xff))
…
if (!(retry = iocb->ki_retry))
…
kiocbClearKicked(iocb);
iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL;
spin_unlock_irq(&ctx->ctx_lock);
if (kiocbIsCancelled(iocb)) {
ret = -EINTR;
aio_complete(iocb, ret, 0);
goto out;
}
BUG_ON(current->io_wait != NULL);
current->io_wait = &iocb->ki_wait;
ret = retry(iocb);
current->io_wait = NULL;
if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) {
BUG_ON(!list_empty(&iocb->ki_wait.task_list));
aio_complete(iocb, ret, 0);
}
out:
spin_lock_irq(&ctx->ctx_lock);
if (-EIOCBRETRY == ret) {
INIT_LIST_HEAD(&iocb->ki_run_list);
if (kiocbIsKicked(iocb)) {
__queue_kicked_iocb(iocb);
aio_queue_work(ctx);
}
}
return ret;
}
这个函数本质上就是执行retry回调函数,完成没有完成的工作。异步io和普通io的区别就是在于发生组赛的时候并不睡眠或发生调度,而是直接返回一个错误代码,(在AIO下,进程并不会sleep,而是简单地将一个含有特殊func的wait_queue_t挂入等待队列就返回了)。这里应用工作队列的方式令人赞叹。 那么还是看看普通的唤醒函数比较心安:
int default_wake_function(wait_queue_t *curr, unsigned mode, int sync,
void *key)
{
task_t *p = curr->private;
return try_to_wake_up(p, mode, sync);
}
就是这么简单,简简单单唤醒进程。在信号的传递结束以后,调用
int fastcall wake_up_process(task_t *p)
{
return try_to_wake_up(p, TASK_STOPPED | TASK_TRACED |
TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE, 0);
}
唤醒接受信号的进程。下面看一下关于等待队列的事情:
static inline void init_waitqueue_head(wait_queue_head_t *q)
{
spin_lock_init(&q->lock);
INIT_LIST_HEAD(&q->task_list);
}
static inline void INIT_LIST_HEAD(struct list_head *list)
{
list->next = list;
list->prev = list;
}
********************
void fastcall __sched sleep_on(wait_queue_head_t *q)
{
SLEEP_ON_VAR //初始化一个wait_queue_t
current->state = TASK_UNINTERRUPTIBLE;
SLEEP_ON_HEAD //将这个wait_queue_t加入睡眠队列
schedule();
SLEEP_ON_TAIL //睡眠完毕,从睡眠队列清除
}
就是这么简单。
分享到:
相关推荐
9.1 阻塞和非阻塞、同步和异步与 IO 操作 9.2 阻塞 IO 9.2.1 等待队列 9.3 实验 9.3.1 原理图 9.3.2 设备树 9.3.3 驱动
概念说明 ...针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0×00000000到0xBFFFFFFF),供各个进程使用,称为
聊天室程序 9.6.1 客户端 9.6.2 服务器 9.7 IO复用的高级应用三:同时处理TCP和UDP服务 9.8 超级服务xinetd 9.8.1 xinetd配置文件 9.8.2 xinetd工作流程 第10章 信号 10.1 Linux信号概述 10.1.1 发送信号 ...
该程序在那里说明了同步I / O和异步io_uring API之间的区别。 02_cat_uring :本示例使用io_uring提供的原始接口构建cat版本。 这样做是为了使用户了解io_uring界面在低层如何工作。 03_cat_liburing :此示例在...
第11章Linux设备驱动中的异步访问 11.1 Linux 2.6中的异步访问 11.1.1异步访问概念与GNU C库函数 11.1.2使用信号作为异步访问的通知 11.1.3使用回调函数作为异步访问的通知 11.1.4异步访问与设备驱动 11.2异步Fifo...
第9章 数据的IO和复用 237 9.1 IO函数 237 9.1.1 使用recv()函数接收数据 237 9.1.2 使用send()函数发送数据 239 9.1.3 使用readv()函数接收数据 240 9.1.4 使用writev()函数发送数据 240 9.1.5 使用...
kangle采用基于事件驱动(epoll等)加非阻塞socket及异步IO的方式构架,使其比传统web服务器性能更高;静态网页处理能力达到Apache的8-10倍左右。 kangle支持isapi,fastcgi,cgi,ajp,uwsgi,fcgi,hmux,http等多种扩展...
[14本经典Android开发教程] 8 Linux内核阅读心得体会 ...2 在工作队列中执行AIO 66 3 负责AIO执行的核心函数aio run iocb 67 4 AIO操作的完成 67 读核感悟 文件读写 内核态是否支持非direct I O方式的AIO ...
主要讲了协程、进程、异步IO多路复用。 协程和IO多路复用都是单线程的。 epoll 在linux下通过这个模块libevent.so实现 gevent 在底层也是用了libevent.so gevent可以理解为一个更上层的封装。 使用select...
预先建立的下载和上传队列 :check_mark: 自动下载和更新 :check_mark: 电子邮件/ Steam / GOG身份验证 :check_mark: 浏览/搜索/标记mod :check_mark: Mod依赖/评论/统计 :check_mark: 多语言C介面 :...
kikilib网络库blog:1、定位:kikilib网络库是轻量,高性能,纯c++11,更符合OOP语言特点且易用的一个Linux服务器网络库,它没有繁琐的回调函数设置和上下文指针机制,这让它的使用和对象的生命期管理都变得更加简单...
2.在工作队列中执行AIO....................................................66 3.负责AIO执行的核心函数aio_run_iocb.....................................67 4 AIO操作的完成.....................................
异步IO 类加载机制 双亲委派 OSGI 算法 搜索 二分 排序 选择 冒泡 插入 快速 归并 堆 桶 基数 常用算法 贪婪 回溯 剪枝 动态规划 数据挖掘算法 KMP算法 GZZ算法 HASH分桶 关联规则算法 ...
epoll学习实例,epoll是Linux内核为处理大批量文件描述符而作了改进的poll,...另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
构造函数实现线程池创建并开始运行,enqueue函数实现消息队列,通过future实现异步工作的lambda函数的传递;\ **②同步机制实现**:基于unique_lock以及condition_variable实现同步和互斥,符合RAII原则;\ **5)...
| libworkq: 工作队列 | | ## I/O | | | |--|--| | libbase64: Base64/32 编解码 | libconfig: 配置文件库 | liblog: 日志库 | libfile: 文件操作库 | | libstrex: | libsubmask: 网络地址翻译 | ## 多媒体 | | | |...
01 异步IO 02 selectors模块介绍 03 selectors模块应用 04 作业介绍 第37章 01 selctors实现文件上传与下载 02 html的介绍 03 html文档树的概念 04 meta标签以及一些基本标签 05 img标签和列表标签 06 form表单之...
1.1.1 Linux的诞生和发展.............................................. 2 1.1.2 Linux名称的由来........................................ ........ 3 1.2 Linux的发展要素.......................................
Tomcat Native 这个项目可以让 Tomcat 使用 Apache 的 apr 包来处理包括文件和网络IO操作,以提升性能。 预输入搜索 Cleo Cleo 是一个灵活的软件库用于处理一些预输入和自动完成的搜索功能,该项目是 LinkedIn 公司...
Tomcat Native 这个项目可以让 Tomcat 使用 Apache 的 apr 包来处理包括文件和网络IO操作,以提升性能。 预输入搜索 Cleo Cleo 是一个灵活的软件库用于处理一些预输入和自动完成的搜索功能,该项目是 LinkedIn 公司...