`

面向连接的Socket Server的简单实现

 
阅读更多
转自
http://www.cnblogs.com/forfuture1978/archive/2010/09/12/1824443.html


一、基本原理
    有时候我们需要实现一个公共的模块,需要对多个其他的模块提供服务,最常用的方式就是实现一个Socket Server,接受客户的请求,并返回给客户结果。

    这经常涉及到如果管理多个连接及如何多线程的提供服务的问题,常用的方式就是连接池和线程池,基本流程如下:


    1。首先服务器端有一个监听线程,不断监听来自客户端的连接。
    2.当一个客户端连接到监听线程后,便建立了一个新的连接。
    3.监听线程将新建立的连接放入连接池进行管理,然后继续监听新来的连接。
    4.线程池中有多个服务线程,每个线程都监听一个任务队列,一个建立的连接对应一个服务任务,当服务线程发现有新的任务的时候,便用此连接向客户端提供服务。
    5.一个Socket Server所能够提供的连接数可配置,如果超过配置的个数则拒绝新的连接。
    6.当服务线程完成服务的时候,客户端关闭连接,服务线程关闭连接,空闲并等待处理新的任务。
    7.连接池的监控线程清除其中关闭的连接对象,从而可以建立新的连接。


二、对Socket的封装
    Socket的调用主要包含以下的步骤:

clip_image001

     调用比较复杂,我们首先区分两类Socket,一类是Listening Socket,一类是Connected Socket.

    Listening Socket由MySocketServer负责,一旦accept,则生成一个Connected Socket,又MySocket负责。

MySocket主要实现的方法如下:
int MySocket::write(const char * buf, int length)
{
        int ret = 0;
        int left = length;
        int index = 0;
        while(left > 0)
        {
                ret = send(m_socket, buf + index, left, 0);
                if(ret == 0)
                        break;
                else if(ret == -1)
                {
                        break;
                }
                left -= ret;
                index += ret;
        }
        if(left > 0)
                return -1;
        return 0;
}

int MySocket::read(char * buf, int length)
{
        int ret = 0;
        int left = length;
        int index = 0;
        while(left > 0)
        {
                ret = recv(m_socket, buf + index, left, 0);
                if(ret == 0)
                        break;
                else if(ret == -1)
                        return -1;
                left -= ret;
                index += ret;
        }

        return index;
}

int MySocket::status()
{
        int status;
        int ret;
        fd_set checkset;
        struct timeval timeout;

        FD_ZERO(&checkset);
        FD_SET(m_socket, &checkset);

        timeout.tv_sec = 10;
        timeout.tv_usec = 0;

        status = select((int)m_socket + 1, &checkset, 0, 0, &timeout);
        if(status < 0)
                ret = -1;
        else if(status == 0)
                ret = 0;
        else
                ret = 0;
        return ret;
}

int MySocket::close()
{
        struct linger lin;
        lin.l_onoff = 1;
        lin.l_linger = 0;
        setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin));
        ::close(m_socket);
        return 0;
}


    MySocketServer的主要方法实现如下:
int MySocketServer::init(int port)
{
        if((m_socket = socket(AF_INET, SOCK_STREAM, 0)) == -1)
        {
                return -1;
        }

        struct sockaddr_in serverAddr;
        memset(&serverAddr, 0, sizeof(struct sockaddr_in));
        serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
        serverAddr.sin_family = AF_INET;
        serverAddr.sin_port = htons(port);

        if(bind(m_socket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) == -1)
        {
                ::close(m_socket);
                return -1;
        }

        if(listen(m_socket, SOMAXCONN) == -1)
        {
                ::close(m_socket);
                return -1;
        }

        struct linger lin;
        lin.l_onoff = 1;
        lin.l_linger = 0;

        setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin));
        m_port = port;
        m_inited = true;
        return 0;
}

MySocket * MySocketServer::accept()
{
        int sock;
        struct sockaddr_in clientAddr;
        socklen_t clientAddrSize = sizeof(clientAddr);
        if((sock = ::accept(m_socket, (struct sockaddr *)&clientAddr, &clientAddrSize)) == -1)
        {
                return NULL;
        }
        MySocket* socket = new MySocket(sock);
        return socket;
}

MySocket * MySocketServer::accept(int timeout)
{
        struct timeval timeout;
        timeout.tv_sec = timeout;
        timeout.tv_usec = 0;

        fd_set checkset;
        FD_ZERO(&checkset);
        FD_SET(m_socket, &checkset);

        int status = (int)select((int)(m_socket + 1), &checkset, NULL, NULL, &timeout);
        if(status < 0)
                return NULL;
        else if(status == 0)
                return NULL;

        if(FD_ISSET(m_socket, &checkset))
        {
                return accept();
        }
}


三、线程池的实现
    一个线程池一般有一个任务队列,启动的各个线程从任务队列中竞争任务,得到的线程则进行处理:list<MyTask *>  m_taskQueue;

    任务队列由锁保护,使得线程安全:pthread_mutex_t m_queueMutex

    任务队列需要条件变量来支持生产者消费者模式:pthread_cond_t m_cond

    如果任务列表为空,则线程等待,等待中的线程个数为:m_numWaitThreads

    需要一个列表来维护线程池中的线程:vector<MyThread *> m_threads

    每个线程需要一个线程运行函数:

void * __thread_new_proc(void *p)
{
    ((MyThread *)p)->run();
    return 0;
}

每个线程由MyThread类负责,主要函数如下:


int MyThread::start()
{

    pthread_attr_t  attr;
    pthread_attr_init(&attr);
    pthread_attr_setschedpolicy(&attr, SCHED_FIFO);

    int ret = pthread_create(&m_thread, &attr, thread_func, args);
    pthread_attr_destroy(&attr);

    if(ret != 0)
        return –1;

}

int MyThread::stop()
{

    int ret = pthread_kill(m_thread, SIGINT);

    if(ret != 0)
        return –1;
}

int MyThread::join()

{

    int ret = pthread_join(m_thread, NULL);

    if(ret != 0)

        return –1;

}

void MyThread::run()

{

    while (false == m_bStop)

    {

        MyTask *pTask = m_threadPool->getNextTask();

        if (NULL != pTask)

        {

            pTask->process();

        }

    }

}


    线程池由MyThreadPool负责,主要函数如下:
int MyThreadPool::init()
{

    pthread_condattr_t cond_attr;
    pthread_condattr_init(&cond_attr);
    pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
    int ret =  pthread_cond_init(&m_cond, &cond_attr);
    pthread_condattr_destroy(&cond_attr);

    if (ret_val != 0)
        return –1;

    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
    ret = pthread_mutex_init(&m_queueMutex, &attr);
    pthread_mutexattr_destroy(&attr);

    if (ret_val != 0)
        return –1;

    for (int i = 0; i< m_poolSize; ++i)
    {
        MyThread *thread = new MyThread(i+1, this);        
        m_threads.push_back(thread);
    }

    return 0;
}

int MyThreadPool::start()
{
    int ret;
    for (int i = 0; i< m_poolSize; ++i)
    {        
       ret = m_threads[i]->start();
       if (ret != 0)
           break;       
    }

    ret = pthread_cond_broadcast(&m_cond);

    if(ret != 0)
        return –1;
    return 0;
}

void MyThreadPool::addTask(MyTask *ptask)
{
    if (NULL == ptask)
        return;

    pthread_mutex_lock(&m_queueMutex);

    m_taskQueue.push_back(ptask);       

    if (m_waitingThreadCount > 0)
        pthread_cond_signal(&m_cond);

    pthread_mutex_unlock(&m_queueMutex);
}

MyTask * MyThreadPool::getNextTask()
{
    MyTask *pTask = NULL; 

    pthread_mutex_lock(&m_queueMutex);

    while (m_taskQueue.begin() == m_taskQueue.end())
    {  
        ++m_waitingThreadCount;

        pthread_cond_wait(&n_cond, &m_queueMutex);

        --m_waitingThreadCount;       
    }   

    pTask = m_taskQueue.front();

    m_taskQueue.pop_front();

    pthread_mutex_unlock(&m_queueMutex);

    return pTask;   
}


其中每一个任务的执行由MyTask负责,其主要方法如下:

void MyTask::process()

{

    //用read从客户端读取指令

    //对指令进行处理

    //用write向客户端写入结果

}



四、连接池的实现
    每个连接池保存一个链表保存已经建立的连接:list<MyConnection *> * m_connections

    当然这个链表也需要锁来进行多线程保护:pthread_mutex_t m_connectionMutex;

    此处一个MyConnection也是一个MyTask,由一个线程来负责。

    线程池也作为连接池的成员变量:MyThreadPool * m_threadPool  

    连接池由类MyConnectionPool负责,其主要函数如下:

void MyConnectionPool::addConnection(MyConnection * pConn)
{

    pthread_mutex_lock(&m_connectionMutex);

    m_connections->push_back(pConn);

    pthread_mutex_unlock(&m_connectionMutex);

    m_threadPool->addTask(pConn);
}


    MyConnectionPool也要启动一个背后的线程,来管理这些连接,移除结束的连接和错误的连接。

void MyConnectionPool::managePool()
{

    pthread_mutex_lock(&m_connectionMutex);

    for (list<MyConnection *>::iterator itr = m_connections->begin(); itr!=m_connections->end(); )
    {
        MyConnection *conn = *itr;        
        if (conn->isFinish())
        {
            delete conn;
            conn = NULL;
            list<MyConnection *>::iterator pos = itr++;
            m_connections->erase(pos);                         
        }
        else if (conn->isError())
        {

            //处理错误的连接
            ++itr;
        }
        else
        {
            ++itr;
        }
    }

    pthread_mutex_unlock(&m_connectionMutex);

}



五、监听线程的实现
    监听线程需要有一个MySocketServer来监听客户端的连接,每当形成一个新的连接,查看是否超过设置的最大连接数,如果超过则关闭连接,如果未超过设置的最大连接数,则形成一个新的MyConnection,将其加入连接池和线程池。

MySocketServer *pServer = new MySocketServer(port);

MyConnectionPool *pPool = new MyConnectionPool();

while (!stopFlag)

{

    MySocket * sock = pServer->acceptConnection(5);

    if(sock != null)

    {

        if(m_connections.size > maxConnectionSize)

        {

            sock.close();

        }

        MyTask *pTask = new MyConnection();

        pPool->addConnection(pTask);      

    }

}
分享到:
评论

相关推荐

    socket编程,CS间相互通讯(含实验报告)

    1)设计程序,分别构建通信的两端:服务器端和客户端应用程序,套接字类型为面向连接的Socket,自己构建双方的应答模式,实现双方的数据的发送和接收(S发给C,C发给S)。 2)服务端程序能响应单个或任意多个客户端连接...

    java socket通信 一对多

    1)设计程序,分别构建通信的两端:服务器端和客户端应用程序,套接字类型为面向连接的Socket,自己构建双方的应答模式,实现双方的数据的发送和接收(S发给C,C发给S)。 2)服务端程序能响应单个或任意多个客户端...

    chatDemo 网络聊天程序的开发

    Socket用于实现程序间的双向的面向连接的通信。建立Socket连接必须先要有服务器监听 ServerSocket用来侦听进入的连接,它为每个新建的连接都创建一个Socket对象 怎样创建客户套接字 Socket socket=null; socket=new...

    LInux中的socket编程包含基本介绍

    面向连接(TCP)无连接(UDP) 互联网寻址 # include In_addr_t inet_addr(const char *ip_address); 例如 int_addr_t server ; server=inet_addr(“197.124.10.1”); 不正确则返回-1 将四个十进制数转换为IP地址。 ...

    TCPSocket示例

    Socket面向连接基础通信示例,里面包含客户端TCPClient,服务端TCPServer 开发环境:VS2013

    详解C# Socket简单例子(服务器与客户端通信)

    这个例子只是简单实现了如何使用 Socket 类实现面向连接的通信。 注意:此例子的目的只是为了说明用套接字写程序的大概思路,而不是实际项目中的使用程序。在这个例子中,实际上还有很多问题没有解决,如消息边界...

    android在原生代码中使用POSIX通信面向连接的TCP客户端和服务端。

    android在原生代码中使用POSIX通信面向连接的TCP客户端和服务端。 android jni下的c++开发,

    客户服务器程序设计及实现.doc

    4、监听连接——listen() 功能:用于面向连接服务器,表明它愿意接收连接。 格式:int PASCAL FAR listen(SOCKET s, int backlog); 5、数据传输——send()与recv() 功能:数据的发送与接收 格式:int PASCAL FAR ...

    python3.5实现socket通讯示例(TCP)

    tcp是面向连接的一个协议,意味着,客户端和服务器开发发送数据之前,需要先握手创建一个TCP连接。TCP连接的一端与客户端套接字相互联系,另一端与服务器套接字相联系。当创建该TCP连接的时,我们需要讲客户端与...

    Herm(一套快速开发高性能的网络应用的C++库)

    本节给出两个分别用Framework和Socket组件实现的简单TCP Server的例子。所有的例子可以参考examples frameworks和multiplexors目录。 1.2.1 用Framework实现TCP Server 首先,实现一个Listener, class ...

    Web服务器的java实现

     (1) 连接:Web浏览器与Web服务器建立连接,打开一个称为socket(套接字)的虚拟文件,此文件的建立标志着连接建立成功。  (2) 请求:Web浏览器通过socket向Web服务器提交请求。HTTP的请求一般是GET或POST命令...

    【完整版】基于JAVA的五子棋游戏系统设计与实现.doc

    本软件使用JAVA语户端之间的连接,利用多线程技术言实现,通过对图形界面,绘图, 布局管理器等去构造出游戏的单机功能,在此根底上,利用SCOKET的知识,建立起效劳 器与客来处理效劳器端与客户端之间的数据传输,...

    基于Java的坦克大战游戏的设计与实现设计软件程序源码+word毕业论文文档.zip

    游戏通过分析JAVA游戏开发和代码设计,用Eclipse软件开发了坦克大战游戏,运用接口技术,使一个类能够实现多个接口,使用套接字Socket来完成client端和server端的连接。玩家通过连接访问进入游戏,通过操纵坦克来...

    利用反射实现类的动态加载

    功能很简单,手机用户通过GPRS打开Socket与服务器连接,我则根据用户传过来的数据做出响应。做过类似项目的兄弟一定都知道,首先需要定义一个类似于MSNP的通讯协议,不过今天的话题是如何把这个系统设计得具有高度的...

    老男孩python 四期

    3、简单的socket 交互程序 4、Socket Server的开发与使用 5、多线程非阻塞并发自处理 6、练习程序:用SOCKET模拟实现SSH客户端功能 6、项目实战二、开发FTP服务器\客户端软件 ※分别开发服务器端和客户端,通过...

    服务器的设计.docx

    就其原理而言,面向连接的Socket类似于电话系统,无连接的Socket类似于电报系统。Socket实质上是为网络程序提供了通讯的端点号。对于每个网络程序的一个Socket,它首先有一个半相关的端点号的描述:{协议,本地地址...

    客户服务器实验报告.doc

    " "4、监听连接——listen() " "功能:用于面向连接效劳器,说明它愿意接收连接。 " "格式:int PASCAL FAR listen(SOCKET s, int backlog); " "5、数据传输——send()与recv() " "功能:数据的发送与接收 " "格式:...

    基于JAVA的网络通讯系统设计与实现(WORD论文文档+系统).zip

    一般来说,聊天工具大多数由客户端程序和服务器程序,外加服务器端用于存放客户数据的数据库组成,本系统采用客户机/服务器架构模式,通过Java提供的Socket类来连接客户机和服务器并使客户机和服务器之间相互通信,...

    一个面向局域网、互联网的即时聊天程序源代码

    其客户端和服务器端依靠Winsock进行通讯(使用TCP/IP 协议),服务器通过建立Winsock控件数组实现多客户端同时连接的支持。Messenger 的消息走向主要采用 客户端发送——服务器中转——客户端接收的模式(个别功能...

Global site tag (gtag.js) - Google Analytics