`
totoxian
  • 浏览: 1033597 次
  • 性别: Icon_minigender_2
  • 来自: 西安
文章分类
社区版块
存档分类
最新评论

linux工作队列和异步io

 
阅读更多

工作队列:
内核中所有的工作队列都是由helper_wq工作队列创建的,那么helper_wq是谁创建的呢?答案是直接执行的。看一下代码:
static __init int helper_init(void)
{
helper_wq = create_singlethread_workqueue("kthread");
BUG_ON(!helper_wq);
return 0;
}
core_initcall(helper_init);
看看create_workqueue函数是怎么实现的:
struct workqueue_struct *__create_workqueue(const char *name, int singlethread)
{
int cpu, destroy = 0;
struct workqueue_struct *wq;
struct task_struct *p;
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
return NULL;
wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
if (!wq->cpu_wq) {
kfree(wq);
return NULL;
}
wq->name = name;
lock_cpu_hotplug();
if (singlethread) {
INIT_LIST_HEAD(&wq->list);
p = create_workqueue_thread(wq, singlethread_cpu);
if (!p)
destroy = 1;
else
wake_up_process(p);
} else {
spin_lock(&workqueue_lock);
list_add(&wq->list, &workqueues);
spin_unlock(&workqueue_lock);
for_each_online_cpu(cpu) {
p = create_workqueue_thread(wq, cpu);
if (p) {
kthread_bind(p, cpu);
wake_up_process(p);
} else
destroy = 1;
}
}
unlock_cpu_hotplug();
if (destroy) {
destroy_workqueue(wq);
wq = NULL;
}
return wq;
}
其中wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct)这一句创建了
struct cpu_workqueue_struct {
spinlock_t lock;
long remove_sequence;
long insert_sequence;
struct list_head worklist;
wait_queue_head_t more_work;
wait_queue_head_t work_done;
struct workqueue_struct *wq;
task_t *thread;
int run_depth;
} ____cacheline_aligned;
以上的这个结构是每个cpu结构,它是内嵌在
struct workqueue_struct {
struct cpu_workqueue_struct *cpu_wq;
const char *name;
struct list_head list; /* Empty if single thread */
};
的cpu_wq数组字段里面的。作为消遣,看看__alloc_percpu函数:
void *__alloc_percpu(size_t size)
{
int i;
struct percpu_data *pdata = kmalloc(sizeof(*pdata), GFP_KERNEL);
if (!pdata)
return NULL;
for_each_possible_cpu(i) {
int node = cpu_to_node(i);
if (node_online(node))
pdata->ptrs[i] = kmalloc_node(size, GFP_KERNEL, node);
else
pdata->ptrs[i] = kmalloc(size, GFP_KERNEL);
if (!pdata->ptrs[i])
goto unwind_oom;
memset(pdata->ptrs[i], 0, size);
}
return (void *)(~(unsigned long)pdata);
unwind_oom:
while (--i >= 0) {
if (!cpu_possible(i))
continue;
kfree(pdata->ptrs[i]);
}
kfree(pdata);
return NULL;
}
下面开始真正的重要过程:
static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
int cpu)
{
struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
struct task_struct *p;
spin_lock_init(&cwq->lock);
cwq->wq = wq;
cwq->thread = NULL;
cwq->insert_sequence = 0;
cwq->remove_sequence = 0;
INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work);
init_waitqueue_head(&cwq->work_done);
if (is_single_threaded(wq))
p = kthread_create(worker_thread, cwq, "%s", wq->name);
else
p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
if (IS_ERR(p))
return NULL;
cwq->thread = p;
return p;
}
struct task_struct *kthread_create(int (*threadfn)(void *data),
void *data,
const char namefmt[],
...)
{
struct kthread_create_info create;
DECLARE_WORK(work, keventd_create_kthread, &create);
create.threadfn = threadfn;
create.data = data;
init_completion(&create.started);
init_completion(&create.done);
if (!helper_wq)//注意这一句,如果没有helper_wq,那么就直接执行回调函数,这里就是keventd_create_kthread
work.func(work.data);
else {
queue_work(helper_wq, &work);
wait_for_completion(&create.done);
}
if (!IS_ERR(create.result)) {
va_list args;
va_start(args, namefmt);
vsnprintf(create.result->comm, sizeof(create.result->comm),
namefmt, args);
va_end(args);
}
return create.result;
}
上面的过程确实有点绕,实际上work.func和queue_work调用执行的是同一个过程,就是执行keventd_create_kthread函数,而前面传入的threadfn也就是worker_thread则是封装进了kthread_create_info结构里面了,要想知道往下该做什么了,就要看keventd_create_kthread函数:
static void keventd_create_kthread(void *_create)
{
struct kthread_create_info *create = _create;
int pid;
pid = kernel_thread(kthread, create, CLONE_FS | CLONE_FILES | SIGCHLD);
if (pid create->result = ERR_PTR(pid);
} else {
wait_for_completion(&create->started);
read_lock(&tasklist_lock);
create->result = find_task_by_pid(pid);
read_unlock(&tasklist_lock);
}
complete(&create->done);
}
到这里才真正启动一个内核线程kthread,然后这个内核线程函数以create为参数,到这儿,几乎可以猜出接下来要做什么了
static int kthread(void *_create)
{
struct kthread_create_info *create = _create;
int (*threadfn)(void *data);
void *data;
sigset_t blocked;
int ret = -EINTR;
kthread_exit_files();
threadfn = create->threadfn;
data = create->data;
sigfillset(&blocked);
sigprocmask(SIG_BLOCK, &blocked, NULL);
flush_signals(current);
set_cpus_allowed(current, CPU_MASK_ALL);
__set_current_state(TASK_INTERRUPTIBLE);
complete(&create->started);
schedule();
if (!kthread_should_stop())
ret = threadfn(data);
if (kthread_should_stop()) {
kthread_stop_info.err = ret;
complete(&kthread_stop_info.done);
}
return 0;
}
唉,最终threadfn(data)搞定了一切,这个threadfn(data)实际上就是
static int worker_thread(void *__cwq)
{
struct cpu_workqueue_struct *cwq = __cwq;
DECLARE_WAITQUEUE(wait, current);
struct k_sigaction sa;
sigset_t blocked;
current->flags |= PF_NOFREEZE;
set_user_nice(current, -5);
sigfillset(&blocked);
sigprocmask(SIG_BLOCK, &blocked, NULL);
flush_signals(current);
sa.sa.sa_handler = SIG_IGN;
sa.sa.sa_flags = 0;
siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
set_current_state(TASK_INTERRUPTIBLE);
while (!kthread_should_stop()) {
add_wait_queue(&cwq->more_work, &wait);
if (list_empty(&cwq->worklist))
schedule();
else
__set_current_state(TASK_RUNNING);
remove_wait_queue(&cwq->more_work, &wait);
if (!list_empty(&cwq->worklist))
run_workqueue(cwq);
set_current_state(TASK_INTERRUPTIBLE);
}
__set_current_state(TASK_RUNNING);
return 0;
}
再来一层调用就完事了:run_workqueue(cwq)
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
unsigned long flags;
spin_lock_irqsave(&cwq->lock, flags);
cwq->run_depth++;
if (cwq->run_depth > 3) {
printk("%s: recursion depth exceeded: %d/n",
__FUNCTION__, cwq->run_depth);
dump_stack();
}
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
void (*f) (void *) = work->func;
void *data = work->data;
list_del_init(cwq->worklist.next);
spin_unlock_irqrestore(&cwq->lock, flags);
BUG_ON(work->wq_data != cwq);
clear_bit(0, &work->pending);
f(data);
spin_lock_irqsave(&cwq->lock, flags);
cwq->remove_sequence++;
wake_up(&cwq->work_done);
}
cwq->run_depth--;
spin_unlock_irqrestore(&cwq->lock, flags);
}
以上就是工作队列的全过程,这是一个完美的层次嵌套结构,很完美!如果任何时间想用工作队列解决一些问题那么你只需要做两件事情,一是建立一个工作队列,二是往里面排入任务。
看完了上述代码,下面看一个应用,就是linux的异步io。
static int __init aio_setup(void)
{
kiocb_cachep = kmem_cache_create("kiocb", sizeof(struct kiocb),
0, SLAB_HWCACHE_ALIGN|SLAB_PANIC, NULL, NULL);
kioctx_cachep = kmem_cache_create("kioctx", sizeof(struct kioctx),
0, SLAB_HWCACHE_ALIGN|SLAB_PANIC, NULL, NULL);
aio_wq = create_workqueue("aio");
pr_debug("aio_setup: sizeof(struct page) = %d/n", (int)sizeof(struct page));
return 0;
}
在新内核里,睡眠唤醒和2.4的不同了:为了更加灵活,用回调函数来实现。
#define wake_up(x) __wake_up(x, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE, 1, NULL)
void fastcall __wake_up(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, void *key)
{
unsigned long flags;
spin_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr_exclusive, 0, key);
spin_unlock_irqrestore(&q->lock, flags);
}
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int sync, void *key)
{
struct list_head *tmp, *next;
list_for_each_safe(tmp, next, &q->task_list) {
wait_queue_t *curr;
unsigned flags;
curr = list_entry(tmp, wait_queue_t, task_list);
flags = curr->flags;
if (curr->func(curr, mode, sync, key) &&
(flags & WQ_FLAG_EXCLUSIVE) &&!--nr_exclusive)
break;
}
}
看看curr->func(curr, mode, sync, key) 这一句,wait_queue_t结构体里面加入了一个函数指针。对于一般的唤醒,就是default_wake_function函数,而对于异步io,就是aio_wake_function:
static int aio_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
struct kiocb *iocb = container_of(wait, struct kiocb, ki_wait);
list_del_init(&wait->task_list);
kick_iocb(iocb);
return 1;
}
kiocb结构是与异步io相关的结构,暂且现在不用深究。接着看kick_iocb
void fastcall kick_iocb(struct kiocb *iocb)
{
if (is_sync_kiocb(iocb)) {
kiocbSetKicked(iocb);
wake_up_process(iocb->ki_obj.tsk);
return;
}
try_queue_kicked_iocb(iocb);
}
重点就是try_queue_kicked_iocb:
static void try_queue_kicked_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
unsigned long flags;
int run = 0;
BUG_ON((!list_empty(&iocb->ki_wait.task_list)));
spin_lock_irqsave(&ctx->ctx_lock, flags);
if (!kiocbTryKick(iocb))
run = __queue_kicked_iocb(iocb);
spin_unlock_irqrestore(&ctx->ctx_lock, flags);
if (run)
aio_queue_work(ctx);
}
__queue_kicked_iocb只是和队列相关的操作,而重点是aio_queue_work函数:
static void aio_queue_work(struct kioctx * ctx)
{
unsigned long timeout;
smp_mb();
if (waitqueue_active(&ctx->wait))
timeout = 1;
else
timeout = HZ/10;
queue_delayed_work(aio_wq, &ctx->wq, timeout);
}
queue_delayed_work应该很熟悉了,就是往前面系统初始化的时候建立的工作队列aio_wq里排入具体的任务ctx->wq那么,真正调度到该工作队列的时候该怎么办呢?答案是调用aio_kick_handler,这需要在某个地方初始化这个工作队列元素:
INIT_WORK(&ctx->wq, aio_kick_handler, ctx);
下面看一下这个函数:
static void aio_kick_handler(void *data)
{
struct kioctx *ctx = data;
mm_segment_t oldfs = get_fs();
int requeue;
set_fs(USER_DS);
use_mm(ctx->mm); //切换到提交aio进程的地址空间,因为O_DIRECT的io不需要内核缓存
spin_lock_irq(&ctx->ctx_lock);
requeue =__aio_run_iocbs(ctx);
unuse_mm(ctx->mm);
spin_unlock_irq(&ctx->ctx_lock);
set_fs(oldfs);
if (requeue)
queue_work(aio_wq, &ctx->wq);
}
__aio_run_iocbs(ctx)是真正完成任务的操作
static int __aio_run_iocbs(struct kioctx *ctx)
{
struct kiocb *iocb;
LIST_HEAD(run_list);
assert_spin_locked(&ctx->ctx_lock);
list_splice_init(&ctx->run_list, &run_list);
while (!list_empty(&run_list)) {
iocb = list_entry(run_list.next, struct kiocb,
ki_run_list);
list_del(&iocb->ki_run_list);
iocb->ki_users++; /* grab extra reference */
aio_run_iocb(iocb);
if (__aio_put_req(ctx, iocb)) /* drop extra ref */
put_ioctx(ctx);
}
if (!list_empty(&ctx->run_list))
return 1;
return 0;
}
进一步 aio_run_iocb:
static ssize_t aio_run_iocb(struct kiocb *iocb)
{
struct kioctx *ctx = iocb->ki_ctx;
ssize_t (*retry)(struct kiocb *);
ssize_t ret;
if (iocb->ki_retried++ > 1024*1024) {

}
if (!(iocb->ki_retried & 0xff))

if (!(retry = iocb->ki_retry))

kiocbClearKicked(iocb);
iocb->ki_run_list.next = iocb->ki_run_list.prev = NULL;
spin_unlock_irq(&ctx->ctx_lock);
if (kiocbIsCancelled(iocb)) {
ret = -EINTR;
aio_complete(iocb, ret, 0);
goto out;
}
BUG_ON(current->io_wait != NULL);
current->io_wait = &iocb->ki_wait;
ret = retry(iocb);
current->io_wait = NULL;
if (ret != -EIOCBRETRY && ret != -EIOCBQUEUED) {
BUG_ON(!list_empty(&iocb->ki_wait.task_list));
aio_complete(iocb, ret, 0);
}
out:
spin_lock_irq(&ctx->ctx_lock);
if (-EIOCBRETRY == ret) {
INIT_LIST_HEAD(&iocb->ki_run_list);
if (kiocbIsKicked(iocb)) {
__queue_kicked_iocb(iocb);
aio_queue_work(ctx);
}
}
return ret;
}
这个函数本质上就是执行retry回调函数,完成没有完成的工作。异步io和普通io的区别就是在于发生组赛的时候并不睡眠或发生调度,而是直接返回一个错误代码,(在AIO下,进程并不会sleep,而是简单地将一个含有特殊func的wait_queue_t挂入等待队列就返回了)。这里应用工作队列的方式令人赞叹。 那么还是看看普通的唤醒函数比较心安:
int default_wake_function(wait_queue_t *curr, unsigned mode, int sync,
void *key)
{
task_t *p = curr->private;
return try_to_wake_up(p, mode, sync);
}
就是这么简单,简简单单唤醒进程。在信号的传递结束以后,调用
int fastcall wake_up_process(task_t *p)
{
return try_to_wake_up(p, TASK_STOPPED | TASK_TRACED |
TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE, 0);
}
唤醒接受信号的进程。下面看一下关于等待队列的事情:
static inline void init_waitqueue_head(wait_queue_head_t *q)
{
spin_lock_init(&q->lock);
INIT_LIST_HEAD(&q->task_list);
}
static inline void INIT_LIST_HEAD(struct list_head *list)
{
list->next = list;
list->prev = list;
}
********************
void fastcall __sched sleep_on(wait_queue_head_t *q)
{
SLEEP_ON_VAR //初始化一个wait_queue_t
current->state = TASK_UNINTERRUPTIBLE;
SLEEP_ON_HEAD //将这个wait_queue_t加入睡眠队列
schedule();
SLEEP_ON_TAIL //睡眠完毕,从睡眠队列清除
}
就是这么简单。

分享到:
评论

相关推荐

    9.阻塞IO1

    9.1 阻塞和非阻塞、同步和异步与 IO 操作 9.2 阻塞 IO 9.2.1 等待队列 9.3 实验 9.3.1 原理图 9.3.2 设备树 9.3.3 驱动

    Linux 下的五种 IO 模型详细介绍

    概念说明 ...针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0×00000000到0xBFFFFFFF),供各个进程使用,称为

    Linux高性能服务器编程

    聊天室程序 9.6.1 客户端 9.6.2 服务器 9.7 IO复用的高级应用三:同时处理TCP和UDP服务 9.8 超级服务xinetd 9.8.1 xinetd配置文件 9.8.2 xinetd工作流程 第10章 信号 10.1 Linux信号概述 10.1.1 发送信号 ...

    io_uring-by-example:io_uring by Example文章系列的配套存储库

    该程序在那里说明了同步I / O和异步io_uring API之间的区别。 02_cat_uring :本示例使用io_uring提供的原始接口构建cat版本。 这样做是为了使用户了解io_uring界面在低层如何工作。 03_cat_liburing :此示例在...

    Android驱动开发权威指南

    第11章Linux设备驱动中的异步访问 11.1 Linux 2.6中的异步访问 11.1.1异步访问概念与GNU C库函数 11.1.2使用信号作为异步访问的通知 11.1.3使用回调函数作为异步访问的通知 11.1.4异步访问与设备驱动 11.2异步Fifo...

    linux网络编程-宋敬彬-part1

    第9章 数据的IO和复用 237 9.1 IO函数 237 9.1.1 使用recv()函数接收数据 237 9.1.2 使用send()函数发送数据 239 9.1.3 使用readv()函数接收数据 240 9.1.4 使用writev()函数发送数据 240 9.1.5 使用...

    kangle最新版linux安装包

    kangle采用基于事件驱动(epoll等)加非阻塞socket及异步IO的方式构架,使其比传统web服务器性能更高;静态网页处理能力达到Apache的8-10倍左右。 kangle支持isapi,fastcgi,cgi,ajp,uwsgi,fcgi,hmux,http等多种扩展...

    [14本经典Android开发教程]-8-Linux内核阅读心得体会

    [14本经典Android开发教程] 8 Linux内核阅读心得体会 ...2 在工作队列中执行AIO 66 3 负责AIO执行的核心函数aio run iocb 67 4 AIO操作的完成 67 读核感悟 文件读写 内核态是否支持非direct I O方式的AIO ...

    python RabbitMQ 使用详细介绍(小结)

    主要讲了协程、进程、异步IO多路复用。 协程和IO多路复用都是单线程的。 epoll 在linux下通过这个模块libevent.so实现 gevent 在底层也是用了libevent.so gevent可以理解为一个更上层的封装。 使用select...

    modio-sdk-legacy:用于将mod.io集成到游戏中的SDK-游戏开发人员的改装API

    预先建立的下载和上传队列 :check_mark: 自动下载和更新 :check_mark: 电子邮件/ Steam / GOG身份验证 :check_mark: 浏览/搜索/标记mod :check_mark: Mod依赖/评论/统计 :check_mark: 多语言C介面 :...

    kikilib:一个面向对象的轻量级高性能C++网络Linux系统库

    kikilib网络库blog:1、定位:kikilib网络库是轻量,高性能,纯c++11,更符合OOP语言特点且易用的一个Linux服务器网络库,它没有繁琐的回调函数设置和上下文指针机制,这让它的使用和对象的生命期管理都变得更加简单...

    Linux内核阅读

    2.在工作队列中执行AIO....................................................66 3.负责AIO执行的核心函数aio_run_iocb.....................................67 4 AIO操作的完成.....................................

    【白雪红叶】JAVA学习技术栈梳理思维导图.xmind

    异步IO 类加载机制 双亲委派 OSGI 算法 搜索 二分 排序 选择 冒泡 插入 快速 归并 堆 桶 基数 常用算法 贪婪 回溯 剪枝 动态规划 数据挖掘算法 KMP算法 GZZ算法 HASH分桶 关联规则算法 ...

    epoll学习实例

    epoll学习实例,epoll是Linux内核为处理大批量文件描述符而作了改进的poll,...另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

    基于C++实现的HTTP服务器改进版源码+项目使用说明+详细注释.zip

    构造函数实现线程池创建并开始运行,enqueue函数实现消息队列,通过future实现异步工作的lambda函数的传递;\ **②同步机制实现**:基于unique_lock以及condition_variable实现同步和互斥,符合RAII原则;\ **5)...

    gear-lib通用的C基础库.rar

    | libworkq: 工作队列 | | ## I/O | | | |--|--| | libbase64: Base64/32 编解码 | libconfig: 配置文件库 | liblog: 日志库 | libfile: 文件操作库 | | libstrex: | libsubmask: 网络地址翻译 | ## 多媒体 | | | |...

    python入门到高级全栈工程师培训 第3期 附课件代码

    01 异步IO 02 selectors模块介绍 03 selectors模块应用 04 作业介绍 第37章 01 selctors实现文件上传与下载 02 html的介绍 03 html文档树的概念 04 meta标签以及一些基本标签 05 img标签和列表标签 06 form表单之...

    linux网路编程 中文 23M 版

    1.1.1 Linux的诞生和发展.............................................. 2 1.1.2 Linux名称的由来........................................ ........ 3 1.2 Linux的发展要素.......................................

    java开源包1

    Tomcat Native 这个项目可以让 Tomcat 使用 Apache 的 apr 包来处理包括文件和网络IO操作,以提升性能。 预输入搜索 Cleo Cleo 是一个灵活的软件库用于处理一些预输入和自动完成的搜索功能,该项目是 LinkedIn 公司...

    java开源包11

    Tomcat Native 这个项目可以让 Tomcat 使用 Apache 的 apr 包来处理包括文件和网络IO操作,以提升性能。 预输入搜索 Cleo Cleo 是一个灵活的软件库用于处理一些预输入和自动完成的搜索功能,该项目是 LinkedIn 公司...

Global site tag (gtag.js) - Google Analytics