`
qiezi
  • 浏览: 492041 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

项目快完成了

阅读更多
原打算分布式平台先使用异步编程方式来完成的,轻量级线程的实现以后再做个协议兼容的改造,后来发现现有项目的同步逻辑的代码要改成异步回调方式,改的东西太多,所以最近几星期把轻量级线程方式先实现了。测试结果还算理想,ucontext的切换效率在超过200万/秒,erlang在我测试的相同机器上非smp版本720万/秒,smp版本不到200万/秒,切换性能的确有差距,不过目前看来是足够用了。还没有去实现Lock-free的SMP版本,目前用的是线程池来跑多线程逻辑,需要多线程跑的部分只需要主动把当前轻量级线程切到线程池中,运行完那部分再切回来就可以了。IO部分把aio和event系统结合起来了,所以虽然是在单线程程序中执行,但文件读写部分并不是阻塞的。

使用erlang会有不少阻力,虽然过去曾经宣传过不少,项目组基本上也都知道这个东西,但用它做项目特别是改造现有项目,工作量还是大了些。用C++实现的分布式平台对现有代码兼容较好,对轻量级线程不友好的逻辑(占用栈过大,有阻塞等),可以简单地加一个线程池切换,和Eurasia的做法相似。

目前完成了actor模型(轻量级线程、send/receive及超时语义,进程注册等),socket模块,分布式模型(和erlang的node相似),aio整合,Future(参见IoLanguage,好玩的东东),由于使用C++开发,不用忍受Erlang的FP语法,还可以使用C++的语法特性。原以为这个平台做完以后现有代码修改较多,目前看来仅有少量修改即可,代码编写比异步回调方式容易多了。
分享到:
评论
7 楼 qiezi 2009-03-03  
hurd 写道

GetValue()时挂起轻量级线程是不是没用到信号量?能介绍下怎么实现的吗?

因为是轻量级线程,运行在同一个本地线程里,不需要用信号量。

调用者调用GetValue()时如果实际的操作已经返回,那么直接返回结果;如果没有返回,就把自己加到Future对象的等待队列里,并挂起;
执行的线程完成后,会把等待队列里的线程都唤醒,它们醒来后直接返回结果就行了。

这里的挂起就是交出执行权,并且不把自己放到调度器里。唤醒就是把该线程放到调度器的队列里。
6 楼 hurd 2009-03-03  
GetValue()时挂起轻量级线程是不是没用到信号量?能介绍下怎么实现的吗?
5 楼 inshua 2009-03-03  
qiezi 写道

上面调用GetValue()时,如果调用还没有返回,就会“挂起”当前轻量级线程,直到取得结果。IoLanguage那里学来的。


java 也有这种 future 的 feature
4 楼 qiezi 2009-03-02  
Future类功能是这样的,拿前面的CacheService为例,一次网络请求总会消耗时间的,如果并发发出2次请求,很可能只会用到一次的时间(考虑并行的map),客户端写这种代码不是很方便,可以用Future来实现。

假设已经实现了一个CacheClient:
class Cache
{
public:
  int Query(const std::string key, std::string& value);
};


假定它内部使用前面的Send/Receive来实现。Future是这么用的:
Cache cache;
std::string value1;
std::string value2;
Future<int(const std::string&, std::string&)> future1(&Cache::Query, cache, "key1", value1);
Future<int(const std::string&, std::string&)> future2(&Cache::Query, cache, "key2", value2);
std::cout << "result1: " << future1.GetValue() << ", value1: " << value1 << std::endl;
std::cout << "result2: " << future2.GetValue() << ", value2: " << value2 << std::endl;

上面调用GetValue()时,如果调用还没有返回,就会“挂起”当前轻量级线程,直到取得结果。IoLanguage那里学来的。
3 楼 qiezi 2009-03-02  
呵呵,这么快就看到啦。

语法还是c++的,融入了一些erlang的语法特点:
// 消息类
// 手写或idl自动生成元信息、序列化代码,可以分布式发送,否则只能在进程内发送
class QueryMessage : public Message
{
  std::string key;
};

class QueryResultMessage : public Message
{
  int result;
  std::string value;
};



// Cache服务
void CacheService(Pid pid) {
  typedef std::map<std::string, std::string> Cache;
  Cache cache;
  while(true) {
    Message* m = Receive();
    ON_EXIT_BLOCK(&Delete<Message>, m);
    QueryMessage* queryMessage = dynamic_cast<QueryMessage*>(m);
    if (!m) {
      printf("unknown message: %s", m->GetTypeName().c_str());
      continue;
    }
    QueryResultMessage* resultMessage = new QueryResultMessage;
    Cache::const_iterator iter = cache.find(queryMessage->key);
    if (iter == cache.end()) {
      resultMessage->result = -1;
    }
    else {
      resultMessage->result = 0;
      resultMessage->value = iter->second;
    }
    Send(resultMessage);
  }
}

void co_main()
{
  REGISTER("CacheService", Self());
  CacheService();
}


// Cache客户端
void CacheClient()
{
  while(true) {
    Sleep(500); // sleep 0.5 second
    QueryMessage* queryMessage = new QueryMessage;
    queryMessage->key = "hello";
    // 请求分布式服务
    Send("CacheService", "127.0.0.1:9999", queryMessage);
    Message* m = Receive(3000); // wait 3 seconds
    ON_BLOCK_EXIT(&Delete<Message>, m);
    QueryResultMessage* resultMessage = dynamic_cast<QueryResultMessage*>(m);
    if (!resultMessage) {
      printf("receive fail");
      continue;
    }
    printf("query result: %d, value: %s", resultMessage->result, resultMessage->value.c_str());
  }
}

void co_main()
{
  // 11 clients
  for (int i=0; i<10; i++) {
    SPAWN(&CacheClient);
  }
  CacheClient();
}
2 楼 dogstar 2009-03-01  
就说qiezi最近怎么不见发blog了呢。原来在闭关修炼中 :)
1 楼 inshua 2009-03-01  
  秀一段 sample 性的代碼看看? 比如 1 + 1 + 2

相关推荐

Global site tag (gtag.js) - Google Analytics