论坛首页 编程语言技术论坛

MongoDB 内存解析 python

浏览 4754 次
精华帖 (0) :: 良好帖 (1) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-10-15  
以下文章是通过经验总结而得出的结论
在使用mongodb开发工作工,mongodb内存使用非常之大,64G的内存使用了99%的内存
通过整理和查询,了解了mongodb的内存工作原理,特此跟大家分析

mongodb 使用MMAP 将文件映射到内存中

我自己写了一个mongodb python在内存实现方面的代码,代码如下仅供参考:

1.启动mongodb 服务
python _mongo_.py 


2.链接到mongo服务
telnet 0.0.0.0 8900


3.开始调试
ps x|grep python
#找到mongo对于的进程号
#接下来使用vmmap [mac] pmap[linux]查看内存使用情况
vmmap -resident 2546|grep wiyun

mapped file            0000000101000000-000000010d801000 [200.0M     8K] rw-/rwx SM=PRV  /Users/liuzheng/py.work.dir/wgit/wiyun0.db
mapped file            000000010d801000-000000011a002000 [200.0M     8K] rw-/rwx SM=PRV  /Users/liuzheng/py.work.dir/wgit/wiyun1.db

#这里大家看到文件初试大小为200M,内存使用了8k

#接下来我们操作数据库

telnet 0.0.0.0 8900
set a=saddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsaddsadd

vmmap -resident 2546|grep wiyun
mapped file            0000000101000000-000000010d801000 [200.0M  64.1M] rw-/rwx SM=PRV  /Users/liuzheng/py.work.dir/wgit/wiyun0.db
mapped file            000000010d801000-000000011a002000 [200.0M     8K] rw-/rwx SM=PRV  /Users/liuzheng/py.work.dir/wgit/wiyun1.db

#这里RSS内存使用变为了64M





#coding=utf-8

import os, sys

import errno
import functools
import socket
import time
import os
import traceback
import memcache
from tornado import ioloop, iostream
from threading import Thread
from Queue import Queue
import logging
import mmap
import random

streamDict = {} #用于保存stream
streamRequests = {} #Stream 请求数据队列
io_loop = None #侦听网络服务对象
mmap_obj = None

def printLog(log):
	tstr = time.strftime("%Y-%m-%d %X", time.localtime())
	print tstr, log


def stream_add(stream, count):
	streamkey = str(stream)
	streamDict[streamkey] = (stream, 0)
	streamRequests[streamkey] = Queue()


def stream_remove(streamkey, stream):
	print "stream closed", stream.__dict__
	stream.close()
	if streamDict.has_key(streamkey):
		del streamDict[streamkey]
	if streamRequests.has_key(streamkey):
		del streamRequests[streamkey]


class MMapping(object):
	data_map = []
	_hash = {}

	def __init__(self, data_path, db):
		self.read_ns_file(data_path,db)


	def read_ns_file(self, data_path, db):
		f = open(os.path.join(data_path, db + ".ns"), "w+b")
		for i in xrange(2):
			sz = (1024 * 1024 * 200 )
			db_name =  "%s%s.db" % (db, i)
			f = open(os.path.join(data_path,db_name), "w+b")
			f.write(db_name)
			f.seek(sz)
			if f.readline() != "\0":
				f.write('\0')
			f.flush()
			MMapping.data_map.append(mmap.mmap(f.fileno(), 0))

	def parse_data(self, data):
		command, values = data.split(" ")

		if command == "set":
			key, value = values.split("=")
			return command, key, value
		else:
			key = values.strip()
			self._hash.get(key, "no match\r\n")
			return command, key, None

	def flush_data(self, data):
		self.data_map[0].write(data)

	def get_data(self, data):
		command, key, value = self.parse_data(data)
		print "command,key,value", command, key, value
		if command == "set":
			d = "%s=%s" % (key, value)
			self.flush_data(d)
			self._hash[key] = value
			return "True"
		else:
			return self._hash.get(key, "on match")


class ClockResponse(Thread):
	def __init__(self):
		Thread.__init__(self)
		self.flag = True
		self.count = 0

	def run(self):
		while self.flag:
			#打印 LOG,N次/打印
			ct = self.count % 10000
			if ct == 0:
				print 'now connections:%s' % (len(streamDict))
			self.count = ct + 1

			us = 0
			for streamkey, (stream, num) in streamDict.items():
				queue = streamRequests[streamkey]
				try:
					data = queue.get(False)
				except Exception, e:
					continue

				if data:
					print "get data %s", data
				else:
					continue

				try:
					d = mmap_obj.get_data(data)
					stream.write(d + "\r\n")
				except Exception, e:
					logging.error(e)
					stream.write(str(e) + "\r\n")
			time.sleep(0.01)

	def stop(self):
		self.flag = False


class SocketRequest(object):
	delimiter = "\r\n"

	def __init__(self, stream, address):
		self.stream = stream
		self.streamkey = str(stream)
		self.address = address
		self.stream.read_until(SocketRequest.delimiter, self.on_body)

	def on_body(self, data):
		queue = streamRequests[self.streamkey]
		size = queue.qsize()

		if size <= 10000:
			print "put in queue %s " % data
			queue.put(data)

		try:
			self.stream.read_until(SocketRequest.delimiter, self.on_body)
		except Exception, e:
			print "in read", e
			stream_remove(self.streamkey, self.stream)


def connection_ready(sock, fd, events):
	while True:
		try:
			connection, address = sock.accept()
		except socket.error, e:
			if e[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
				raise
			return
		print "connection on:", address
		#non-block
		connection.setblocking(0)
		stream = iostream.IOStream(connection, io_loop)

		stream_add(stream, 0)
		skey = str(stream)
		scallback = functools.partial(stream_remove, skey, stream)
		stream.set_close_callback(scallback)

		SocketRequest(stream, address)


def run_thread_worker(n=1):
	threads = []
	for t in xrange(n):
		clock = ClockResponse()
		threads.append(clock)

	for t in threads:
		t.start()


if __name__ == '__main__':
	server_port = 8900
	#init data
	data_path = os.path.abspath(os.path.join(os.path.dirname(__file__)))
	mmap_obj = MMapping(data_path, "wiyun")

	run_thread_worker(1)

	sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP)
	sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
	sock.setblocking(0)
	sock.bind(("", server_port))
	sock.listen(9999)

	io_loop = ioloop.IOLoop.instance()
	callback = functools.partial(connection_ready, sock)
	io_loop.add_handler(sock.fileno(), callback, io_loop.READ)

	try:
		io_loop.start()
	except KeyboardInterrupt:
		io_loop.stop()
		clock.stop()
		print "exited cleanly"
		sys.exit(1)
	except Exception, e:
		print e
   发表时间:2011-10-20  
不错啊!很详细!
0 请登录后投票
   发表时间:2011-10-28  
这孩子人不错啊,例子详细!
0 请登录后投票
论坛首页 编程语言技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics