论坛首页 Java企业应用论坛

基于Http的多线程上传大家谈谈想法

浏览 28279 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (9) :: 隐藏帖 (0)
作者 正文
   发表时间:2009-08-18  
最近因为工作需要,需要在web界面上实现多线程上传,各位兄弟提供些思路。
   发表时间:2009-08-18  
古怪的要求。为什么要多线程上传呢?
0 请登录后投票
   发表时间:2009-08-18  
头想提高上传速度
0 请登录后投票
   发表时间:2009-08-19  
你有没有发觉你东西的整个瓶颈到底在哪里?

             多线程就能解决你上传速度问题了? .. 


             i/o....raid0....or anything ..
0 请登录后投票
   发表时间:2009-08-19  
Saito 写道
你有没有发觉你东西的整个瓶颈到底在哪里?

             多线程就能解决你上传速度问题了? .. 


             i/o....raid0....or anything ..


先从纯技术角度分析如何实现,不用讨论到底值不值得做
0 请登录后投票
   发表时间:2009-08-19  
写activex,
html上也没办法把文件分成多块上传
0 请登录后投票
   发表时间:2009-08-19  
实际上这是个费时且无用的需求。上传的速度取决于你本地出口带宽和服务器带宽。无论如何你是不能高于本地带宽的。而提交的时候应该是可以使用能用到的全部带宽的。所以是否多线程并不能提高速度。
就像多线程下载软件。如果网络畅通没有限制,那么你就是单线程也非常快。多线程主要是因为服务器端对单用户进行了限速,才用多线程多连接来提速。
这个要求要求本地使用ActiveX开发,服务器端也要变得非常复杂。等于你把客户机作为下载服务器,服务器作为多线程下载软件用。
2 请登录后投票
   发表时间:2009-08-19  
我看大米盘己经实现了,里面主要用js实现了多线程技术去上请求多个post,通过http协议Content-Range,来分块上传。关健问题是如何同时请求多个post,并控制子线程的信号量。最后如何触发把多块文件合并。
0 请登录后投票
   发表时间:2009-08-19   最后修改:2009-08-19
大米盘基于google的gears。一起分析一下他的js

Array.prototype.remove = function(b) {
	for ( var a = 0; a < this.length; a++) {
		if (b == this[a]) {
			this.splice(a, 1)
		}
	}
};
function Map() {
	this.keys = [];
	this.index = 0;
	this.data = {};
	this.length = 0;
	this.set = function(a, b) {
		if ("undefined" === typeof this.data[a]) {
			this.keys.push(a);
			this.length = this.keys.length
		}
		this.data[a] = b
	};
	this.get = function(a) {
		if ("undefined" === typeof this.data[a]) {
			return null
		}
		return this.data[a]
	};
	this.del = function(a) {
		if ("undefined" !== typeof this.data[a]) {
			this.keys.remove(a);
			delete this.data[a];
			this.length--
		}
	};
	this.sum = function(a) {
		var d = 0;
		for ( var c = 0; c < this.keys.length; c++) {
			var b = this.keys[c];
			if ("undefined" !== typeof a) {
				d += this.data[b][a]
			} else {
				d += this.data[b]
			}
		}
		return d
	};
	this.foreach = function(c) {
		if ("function" !== typeof c) {
			return
		}
		for ( var b = 0; b < this.keys.length; b++) {
			var a = this.keys[b];
			var d = c(a, this.data[a], b, this);
			if ("break" === d) {
				break
			}
		}
	};
	this.each = function() {
		var a = this.current();
		if (false !== a) {
			this.index++
		}
		return a
	};
	this.current = function() {
		if (this.index >= this.length) {
			return false
		}
		var a = this.data[this.keys[this.index]];
		return a
	};
	this.reset = function() {
		this.index = 0
	};
	this.toString = function() {
		var c = "Object Map {\n";
		for ( var b = 0; b < this.keys.length; b++) {
			var a = this.keys[b];
			c += "    " + a + ":" + this.data[a] + ",\n"
		}
		c += "}";
		return c
	}
}
function DmpGearsUpload(handlers) {
	this.text_has_not_selected = "请选择要上传的文件!";
	this.text_file_to_large = "您所选的文件太大,单个文件请不要超过%s!";
	this.maxChunks = 2000;
	this.minChunks = 3;
	this.progressFrequency = 1000;
	this.checkTimeout = 5000;
	this.chunkSize = 8388608;
	this.maxFileSize = 5368709120;
	this.maxThreads = 5;
	this.workerUrl = "http://vdami/js/worker.js";
	this.tablesVersion = 903231727;
	this.maxProcess = 1;
	this.multiFileSupport = 0;
	this.geoAddress = null;
	this.locationLoaded = false;
	this.files = new Map;
	this.checkoutLock = false;
	this.runningProcess = new Map;
	this.crossFuncs = new Map;
	this.wp = google.gears.factory.create("beta.workerpool");
	this.cwp = false;
	this.uploadStarted = false;
	this.showOpenFile = function() {
	};
	this.showSubmit = function() {
	};
	this.showProgress = function() {
	};
	this.showComplete = function() {
	};
	this.showInit = function() {
	};
	this.showInitError = function() {
	};
	this.showInitFail = function() {
	};
	this.showNetError = function() {
	};
	var _private_allow_handlers = {};
	if ("undefined" !== typeof handlers) {
		for (n in handlers) {
			console.debug("bind handler: " + n);
			this[n] = handlers[n]
		}
	}
	this.openFile = function() {
		var this_ = this;
		var desktop = google.gears.factory.create("beta.desktop");
		var singleFile = !this.multiFileSupport;
		desktop
				.openFiles(
						function(files) {
							if (1 > files.length) {
								return
							}
							if (singleFile) {
								this_.files = new Map
							}
							while (file = files.pop()) {
								var fileid = Math.random();
								if (this_.maxFileSize < file.blob.length) {
									alert(this_.text_file_to_large.replace(
											"%s",
											genFileSize(this_.maxFileSize)));
									return
								}
								if (1073741824 < file.blob.length
										&& file.name.match(/\.(rar|zip|7z)$/)) {
									alert("温馨提示:如果您上传的RAR是为了打包多个视频文件,我们建议您还是分开上传。因为每个视频文件极有可能在电驴和迅雷的P2P网络里已经有很多源,单独下载时会大大增加下载速度。")
								}
								this_.files.set(fileid, {
									fileid : fileid,
									name : file.name,
									hash : "",
									size : file.blob.length,
									blob : file.blob,
									loaded : 0,
									cmaps : [],
									threads : new Map,
									status : "init"
								});
								var tmp_chunks = Math.ceil(file.blob.length
										/ this_.chunkSize);
								var chunk_size = this_.chunkSize;
								if (tmp_chunks > this_.maxChunks) {
									chunk_size = Math.ceil(file.blob.length
											/ this_.maxChunks)
								} else {
									if (tmp_chunks < this_.minChunks) {
										chunk_size = Math.ceil(file.blob.length
												/ this_.minChunks)
									}
								}
								console.debug("chunksize front:"
										+ this_.chunkSize + " final:"
										+ chunk_size);
								console.debug("chunks front:"
										+ tmp_chunks
										+ " final:"
										+ Math.ceil(file.blob.length
												/ chunk_size));
								var chunk_size = 9728000;
								var i = 0;
								var w = true;
								do {
									var start = i * chunk_size;
									var chunk_no = start / chunk_size;
									if ((start + chunk_size) >= file.blob.length) {
										offset = file.blob.length - start;
										w = false
									} else {
										offset = chunk_size
									}
									this_.files.get(fileid).cmaps[i] = {
										no : i,
										start : start,
										offset : offset,
										status : "init"
									};
									i++
								} while (w);
								this_.showOpenFile(file);
								console.debug(file)
							}
						}, {
							singleFile : singleFile
						})
	};
			this.upload = function() {
				if (0 === this.files.length) {
					alert(this.text_has_not_selected);
					return false
				}
				var ps = this.files.length > this.maxProcess ? this.maxProcess
						: this.files.length;
				for ( var i = 0; i < ps; i++) {
					console.debug("require process " + i);
					this.showSubmit(this.newProcess())
				}
				this.uploadStarted = true
			},
			this.newProcess = function() {
				if (this.maxProcess < this.runningProcess.length) {
					console.debug("new process needless.");
					return
				}
				var this_ = this;
				this.files
						.foreach( function(key, val, i, files) {
							if ("" == val.hash) {
								console.debug("start new process. " + val.name);
								try {
									clearInterval(this_.runningProcess
											.get(val.fileid))
								} catch (e) {
								}
								this_.runningProcess.set(val.fileid,
										setInterval( function() {
											this_.checkProcess(val.fileid)
										}, this_.progressFrequency));
								val.status = "uploading";
								var threads = val.cmaps.length > this_.maxThreads ? this_.maxThreads
										: val.cmaps.length;
								for ( var i = 0; i < threads; i++) {
									this_.newThread(val)
								}
								return "break"
							} else {
								console.debug("new process skip. " + val.name
										+ "/" + i + "/" + val.hash)
							}
						})
			};
	this.setThreads = function(threads) {
		var file = this.files.current();
		this.maxThreads = threads;
		if (0 == file.threads.length) {
			return
		}
		var neednew = this.maxThreads - file.threads.length;
		if (neednew > 0) {
			for ( var i = 0; i < neednew; i++, this.newThread(file)) {
			}
		}
	}, this.newThread = function(file) {
		if (this.maxThreads <= file.threads.length) {
			console.debug("new thread needless.");
			return
		}
		for ( var i = 0; i < file.cmaps.length; i++) {
			var cmap = file.cmaps[i];
			if ("init" == cmap.status || "error" == cmap.status) {
				console.debug(cmap);
				cmap.status = "uploading";
				var range = cmap.start + "-" + cmap.offset;
				var req = {
					method : "POST",
					url : "/?app=dami_upload&act=chunk&todo=upload&pid="
							+ file.fileid + "&total_size=" + file.size,
					headers : [ [ "Content-Type", "application/octet-stream" ],
							[ "Content-Range", "bytes " + range ] ],
					blob : file.blob.slice(cmap.start, cmap.offset)
				};
				var this_ = this;
				var funcs = {
					onprogress : function(obj) {
						obj.cmapno = cmap.no;
						obj.fileid = file.fileid;
						this_.onProgress(obj);
						file.threads.set(obj.cmapno, {
							reqid : reqid,
							loaded : obj.loaded
						})
					},
					oncomplete : function(obj) {
						obj.cmapno = cmap.no;
						obj.fileid = file.fileid;
						console.debug("funcs_fileid:" + obj.fileid);
						this_.onComplete(obj);
						file.threads.del(cmap.no)
					}
				};
				this.fileid = file.fileid;
				var reqid = this.crossPost(req, funcs);
				file.threads.set(cmap.no, {
					reqid : reqid,
					loaded : 0
				});
				console.debug("new thread reqid: " + reqid);
				break
			}
		}
	};
	this.onProgress = function(obj) {
		var file = this.files.get(obj.fileid);
		var cmap = file.cmaps[obj.cmapno]
	}, this.checkProcess = function(fileid) {
		var file = this.files.get(fileid);
		var loaded = file.loaded + file.threads.sum("loaded");
		var speed = genSpeed( {
			size : file.size,
			received : loaded
		});
		if (0 == parseInt(speed.speed)) {
			if ("undefined" === typeof this._checkProcessTimeout) {
				var this_ = this;
				this._checkProcessTimeout = setTimeout( function() {
					this_.showNetError( {
						fileid : fileid
					})
				}, this.checkTimeout)
			}
		} else {
			try {
				clearTimeout(this._checkProcessTimeout);
				delete this._checkProcessTimeout
			} catch (e) {
			}
			this.showProgress( {
				fileid : file.fileid,
				total : file.size,
				loaded : loaded,
				speed : speed.speed,
				timeLeft : speed.timeLeft,
				running : file.threads.length
			})
		}
	};
	this.onComplete = function(obj) {
		console.debug(obj.responseText);
		var file = this.files.get(obj.fileid);
		var cmap = file.cmaps[obj.cmapno];
		do {
			if (200 != obj.status) {
				console.debug(obj.status);
				cmap.status = "error";
				file.threads.del(obj.cmapno);
				this_.showNetError( {
					fileid : file.fileid
				});
				break
			}
			var data = false;
			try {
				data = eval("(" + obj.responseText + ")")
			} catch (e) {
			}
			if (false === data) {
				break
			}
			if (0 < parseInt(data.error) && this_.onError(data)) {
				console.debug("chunk upload error.");
				file.threads.del(obj.cmapno);
				break
			}
			cmap.status = "complete";
			file.loaded += cmap.offset;
			file.threads.del(obj.cmapno)
		} while (false);
		if (file.size > file.loaded) {
			console.debug("new upload thread.");
			return this.newThread(file)
		}
		this.showProgress( {
			fileid : file.fileid,
			total : file.size,
			loaded : file.size,
			speed : 0,
			timeLeft : "...",
			running : 1
		});
		this.checkout(obj)
	};
	this.checkout = function(obj) {
		try {
			clearTimeout(this.runningProcess.get(obj.fileid))
		} catch (e) {
		}
		if (this.checkoutLock) {
			return
		}
		this.checkoutLock = true;
		var file = this.files.get(obj.fileid);
		var cmap = file.cmaps[obj.cmapno];
		var url = "/?app=dami_upload&act=chunk&todo=checkout&pid="
				+ file.fileid + "&total_size=" + file.size + "&merge_thread=1";
		var filename = encodeURIComponent(file.name);
		var req_timeout = (Math.ceil(file.size / 11534336) + 10) * 1000;
		var req = {
			timeout : req_timeout,
			method : "POST",
			url : url,
			headers : [ [ "Content-Disposition",
					'attachment; filename="' + filename + '"' ] ]
		};
		var this_ = this;
		try {
			clearTimeout(this._checkProcessTimeout);
			delete this._checkProcessTimeout
		} catch (e) {
		}
		var funcs = {
			oncomplete : function(req_obj) {
				if (200 != req_obj.status) {
					this_.checkoutLock = false;
					console.debug(req_obj);
					console.debug("retry checkout.");
					return this_.checkout(obj)
				}
				var data = eval("(" + req_obj.responseText + ")");
				if (0 < parseInt(data.error) && this_.onError(data)) {
					return
				}
				this_.cleanup(obj);
				console.debug("chekcout");
				setTimeout( function() {
					console.debug("chekcout");
					var file = this_.files.get(obj.fileid);
					file.hash = data.fileinfo.filehash;
					file.status = "complete";
					console.debug("<---------------");
					console.debug(file);
					console.debug(this_.files);
					console.debug("--------------->");
					this_.runningProcess.del(obj.fileid);
					data.fileid = obj.fileid;
					this_.showComplete(data);
					this_.checkoutLock = false
				}, 1000)
			}
		};
		this.crossPost(req, funcs)
	};
	this.cleanup = function(obj) {
		var file = this.files.get(obj.fileid);
		var url = "/?app=dami_upload&act=chunk&todo=cleanup&pid=" + file.fileid
				+ "&total_size=" + file.size;
		var req = {
			method : "POST",
			url : url
		};
		console.debug(url);
		var funcs = {};
		this.crossPost(req, funcs)
	};
	this.onError = function(obj) {
		alert("出现严重错误,上传不能继续,请稍后重试。#" + obj.error);
		window.location.href = window.location.href;
		return true
	};
	this.initRetries = 1;
	this.initCrossWorker = function() {
		if (!this.cwp) {
			if (10 < this.initRetries) {
				this.showInitFail();
				return
			}
			this.initRetries++;
			this.cwp = this.wp.createWorkerFromUrl(this.workerUrl + "?"
					+ Math.random())
		}
		console.debug("cross worker, try to initialize");
		this.showInit()
	};
	var this_ = this;
	window.onerror = function(message, url, line) {
		if (0 == line && message.match(/worker\.js/)) {
			this_.showInitError(message);
			console.debug("initialize fail");
			setTimeout( function() {
				this_.cwp = false;
				this_.initCrossWorker()
			}, 1000);
			return true
		}
		console.debug(message);
		return false
	};
	this.initCrossWorker();
	this.wp.onmessage = function(a, b, message) {
		var obj = message.body;
		var funcs = this_.crossFuncs.get(obj.reqid);
		if ("debug" === obj.act) {
			console.debug("worker:" + obj.data);
			return
		}
		if (null === funcs) {
			console.debug("nofuncs " + message.body);
			return
		}
		switch (obj.act) {
		case "onprogress":
			if ("undefined" !== typeof funcs.onprogress) {
				funcs.onprogress(obj.data)
			}
			break;
		case "oncomplete":
			if ("undefined" !== typeof funcs.oncomplete) {
				funcs.oncomplete(obj.data)
			}
			break;
		default:
			console.debug(obj);
			break
		}
	};
	this.crossPost = function(req, funcs) {
		var reqid = Math.random();
		this.crossFuncs.set(reqid, funcs);
		req.reqid = reqid;
		this.wp.sendMessage(req, this.cwp);
		this.reqid = reqid;
		return reqid
	};
	this.crossAbort = function(reqid) {
		var req = {
			abort : reqid
		};
		this.wp.sendMessage(req, this.cwp);
		this.crossFuncs.del(reqid)
	};
	this.cancel = function(fileid) {
		var file = this.files.get(fileid);
		try {
			clearInterval(this.runningProcess.get(fileid))
		} catch (e) {
		}
		var this_ = this;
		file.threads.foreach( function(key, val, i) {
			console.debug("abort:" + val.reqid);
			this_.crossAbort(val.reqid);
			file.threads.del(key)
		});
		this.files.del(fileid);
		this.runningProcess.del(fileid);
		if (this.files.length > 0 && this.uploadStarted) {
			this.upload()
		}
	}
}
function genSpeed(e) {
	var h = 300;
	if ("undefined" === typeof g_dmp_upload_lastime) {
		g_dmp_upload_lastime = new Date().getTime() - 1;
		g_dmp_upload_lastreceived = 0;
		g_dmp_upload_speed = [];
		g_dmp_cache_speed = {
			speed : 0,
			timeLeft : "Infinity"
		}
	}
	var a = g_dmp_upload_speed;
	var c = g_dmp_upload_lastime;
	var f = g_dmp_upload_lastreceived;
	var k = new Date().getTime();
	var j = k - c;
	var g;
	var b = 0;
	if (j < 1000) {
		return g_dmp_cache_speed
	}
	var l = Math.abs(parseInt((((e.received - f) / (j)) * 1000) * 1.1));
	a.push(l);
	if (h < a.length) {
		a.shift()
	}
	for ( var d = 0; d < a.length; d++) {
		b += a[d]
	}
	b = b / a.length;
	g = (0 === b) ? "Infinity" : sec2time((e.size - e.received) / b);
	g_dmp_upload_lastime = k;
	g_dmp_upload_lastreceived = e.received;
	g_dmp_cache_speed = {
		speed : genFileSize(b),
		timeLeft : g
	};
	return g_dmp_cache_speed
};

0 请登录后投票
   发表时间:2009-08-19  
多线程上传好像没什么优势吧。问题在于带宽。
   如果这要实现,推荐客户端用C#实现Activex,服务器端可以用JAVA实现,目前我们的处理时这样的。单线程处理也很快,局域网中完全可以利用上行带宽,如果楼主是用在广域网/Internet中那还是建议不要使用多线程,出错率会非常高。而且不好控制。
   换一种想法,如果带宽不够,多线程带来的恐怕是麻烦。
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics