博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
服务器开发中的多进程,多线程及多协程
阅读量:6090 次
发布时间:2019-06-20

本文共 4293 字,大约阅读时间需要 14 分钟。

服务器开发中,为了充分利用多核甚至多个cpu,或者是简化逻辑编写的难度,会应用多进程(比如一个进程负责一种逻辑)多线程(将不同的用户分配到不同的进程)或者协程(不同的用户分配不同的协程,在需要时切换到其他协程),并且往往同时利用这些技术比如多进程多线程。

一个经典的服务器框架可以说如下的框架:

而这些服务器进程之间协同配合,为用户提供服务,其中中心服务器提供集群的调度工作,而逻辑服可以是逻辑服务器1提供登录服务、逻辑服务器2提供购买服务;也可以是2个服务器提供相同服务,然后视负载用户数将不同用户分配到不同服务器。

而这些服务器为了方便开发往往会采用到多线程的技术,一个经典的模式就是多个网络线程处理网络事件,然后一个线程处理用户逻辑;更进一步还可以是多线程处理用户逻辑,在多线程下比较麻烦的一点是不同线程访问同一个资源时需要加锁,不过众所周知锁上万恶之源,带来了难查的死锁和低下的问题。不过这些都有特殊的技巧可以解决,比如死锁,对同一线程重复获取同一把锁导致的死锁,我们可以采用可重入的计数锁,当一个线程获取了这把锁后,如果其他线程尝试lock这个mutex时会进入系统的调度队列,而本线程获取这把锁则会引用计数+1,本线程解锁则-1,如果引用计数为0则解锁,实现一个这样的锁,具体的算法的伪代码大致如下:

// int getthreadid() 获取线程ID// switchtothread() 切换线程// wakeup(int id) 唤醒线程// bool exchange(bool * m, bool v) 原子的修改一个类型为bool变量的值,并且返回原有的值// bool load(bool * m) 原子的获取一个bool变量的值struct mutex{    mutex(){        islock = false;        lockid = -1;        ref = 0;    }    bool islock;    volatile int lockid;    int ref;    std::que
waitth;};void lock(mutex & mu){ int id = getthreadid(); if (id != mu.lockid){ if (exchange(&mu.islock, true)){ waitth.push(id); switchtothread(); } else { lockid = id; } } else { ref++; }}void unlock(mutex & mu){ int id = getthreadid(); if (id == mu.lockid) { if (--ref == 0){ wakeup(mu.que.pop()); } }}

这样的一组伪代码,其中关于原子操作和线程调度的实现在windows大致如下:

int getthreadid(){    return (int)GetCurrentThread();}int switchtothread(){    return SwitchToThread();}int wakeup(int id){    return ResumeThread((HANDLE)id);}

boost中就提供了可重入的mutex: boost::recursive_mutex,windows中的Mutex和Critical Section就是可重入的,而linux的pthread_mutex_t就是不可重入的。

而因为同时访问2个资源导致的死锁,一个可以采用对资源的访问采用同一顺序访问,即可回避。

 

对于锁带来的性能问题,一个取巧的做法则每个用户数据独立,对公共资源的访问则采用一些高效的算法比如rcu、比如无锁算法

比如一个多读多写且读请求大于写请求的场合,我们就可以采用rcu算法。

rcu算法是在读的时候直接读数据,而写入则获取一份读拷贝在本地修改,并注册一个写入的回调函数,在读操作全部完成后调用回调函数完成写入。写操作之间则使用传统的同步机制。

而无锁算法,比较经典的则是一些无锁队列,比如ms-que,optimistic-que

ms-que的原理是利用原子操作,在写入的时候,先创建一个新的节点,完成数据的写入,然后在尾部利用cas操作,将尾节点的next指针指向新的节点,并将尾节点指向新节点。

在读的时候则是利用cas操作,将头节点指向头节点的next节点。

 

但是在利用这些,虽然充分利用了cpu的性能,但是编程也带来了极大的不便,比如多线程下不利于扩展以便充分利用集群提高性能的问题,多进程下则是复杂的异步调用。

一个理想的情况就是在跨进程通信的情况下,可以在发起一次远程的请求后,原地等待远端的相应,然后继续执行,比如在多线程的情况下,可以一个用户一个线程,然后在一次send之后,调用recv在请求返回后在继续执行,

但是多线程的调度会带来巨大的调度开销,这种情况下,更轻量级的协程成为了一个更好的选择,一个用户一个协程,然后在发起一次远程的请求后,切换到其他被唤醒的用户协程继续执行,在这个协程的请求返回后则唤醒这个协程继续执行逻辑代码。

协程带来的问题是协程是有用户控制调度的,所以用户需要自己实现调度算法,以及对应的锁算法(因为多协程虽然可以是单线程内执行,但是因为不同协程间的资源争用所以锁还是需要,而且是同步执行逻辑,如果使用spinlock则意味着整个系统阻塞,在异步网络id下即便是网络事件也不会被响应)。

协程的调度如下:

context::context* scheduling::onscheduling(){	context::context * _context = 0;		{		while (!time_wait_task_que.empty()){			uint64_t t = timer::clock();			time_wait_handle top = time_wait_task_que.top();			if(t > top.handle->waittime){				time_wait_task_que.pop();				uint32_t _state = top.handle->_state_queue.front();				top.handle->_state_queue.pop();				if (_state == time_running){					continue;				}				_context = top.handle->_context;				goto do_task;			}		}	}			{		while (!in_signal_context_list.empty()){			context::context * _context_ = in_signal_context_list.front();			in_signal_context_list.pop();			actuator * _actuator = context2actuator(_context_);			if (_actuator == 0){				continue;			}			task * _task = _actuator->current_task();			if (_task == 0){				continue;			}			if (_task->_state == time_wait_task){				_task->_state = running_task;				_task->_wait_context._state_queue.front() = time_running;			}			_context = _context_;			goto do_task;		}	}	{		if (!_fn_scheduling.empty()){			_context = _fn_scheduling();			goto do_task;		}	}	{		if (!low_priority_context_list.empty()){			_context = in_signal_context_list.front();			in_signal_context_list.pop();			goto do_task;		}	}do_task:	{		if(_context == 0){			actuator * _actuator = _abstract_factory_actuator.create_product();			_context = _actuator->context();			_list_actuator.push_back(_actuator);		}	}	return _context;}

锁算法原理recursive_mutex一样,当获取锁失败则将当前协程调度到其他用户协程,在解锁时唤醒等待协程

void mutex::lock(){	if (_mutex){		_mutex = true;	} else {		_service_handle->scheduler();	}}void mutex::unlock(){	if (_mutex){		if (!wait_context_list.empty()){			auto weak_up_ct = wait_context_list.back();			wait_context_list.pop_back();			_service_handle->wake_up_context(weak_up_ct);		}		_mutex = false;	}}

 

转载于:https://www.cnblogs.com/qianqians/p/4440507.html

你可能感兴趣的文章
java.io.Serializable浅析
查看>>
我的友情链接
查看>>
多线程之线程池任务管理通用模板
查看>>
CSS3让长单词与URL地址自动换行——word-wrap属性
查看>>
CodeForces 580B Kefa and Company
查看>>
开发规范浅谈
查看>>
Spark Streaming揭秘 Day29 深入理解Spark2.x中的Structured Streaming
查看>>
鼠标增强软件StrokeIt使用方法
查看>>
本地连接linux虚拟机的方法
查看>>
某公司面试java试题之【二】,看看吧,说不定就是你将要做的题
查看>>
BABOK - 企业分析(Enterprise Analysis)概要
查看>>
Linux 配置vnc,开启linux远程桌面
查看>>
CentOS6.4关闭触控板
查看>>
React Native 极光推送填坑(ios)
查看>>
Terratest:一个用于自动化基础设施测试的开源Go库
查看>>
修改Windows远程终端默认端口,让服务器更安全
查看>>
扩展器必须,SAS 2.0未必(SAS挺进中端存储系统之三)
查看>>
Eclipse遇到Initializing Java Tooling解决办法
查看>>
while((ch = getchar()) != '\n')
查看>>
好程序员web前端分享JS检查浏览器类型和版本
查看>>