锁定老帖子 主题:基于Http的多线程上传大家谈谈想法
精华帖 (0) :: 良好帖 (0) :: 新手帖 (9) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2009-08-18
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2009-08-18
古怪的要求。为什么要多线程上传呢?
|
|
返回顶楼 | |
发表时间:2009-08-18
头想提高上传速度
|
|
返回顶楼 | |
发表时间:2009-08-19
你有没有发觉你东西的整个瓶颈到底在哪里?
多线程就能解决你上传速度问题了? .. i/o....raid0....or anything .. |
|
返回顶楼 | |
发表时间:2009-08-19
Saito 写道 你有没有发觉你东西的整个瓶颈到底在哪里?
多线程就能解决你上传速度问题了? .. i/o....raid0....or anything .. 先从纯技术角度分析如何实现,不用讨论到底值不值得做 |
|
返回顶楼 | |
发表时间:2009-08-19
写activex,
html上也没办法把文件分成多块上传 |
|
返回顶楼 | |
发表时间:2009-08-19
实际上这是个费时且无用的需求。上传的速度取决于你本地出口带宽和服务器带宽。无论如何你是不能高于本地带宽的。而提交的时候应该是可以使用能用到的全部带宽的。所以是否多线程并不能提高速度。
就像多线程下载软件。如果网络畅通没有限制,那么你就是单线程也非常快。多线程主要是因为服务器端对单用户进行了限速,才用多线程多连接来提速。 这个要求要求本地使用ActiveX开发,服务器端也要变得非常复杂。等于你把客户机作为下载服务器,服务器作为多线程下载软件用。 |
|
返回顶楼 | |
发表时间:2009-08-19
我看大米盘己经实现了,里面主要用js实现了多线程技术去上请求多个post,通过http协议Content-Range,来分块上传。关健问题是如何同时请求多个post,并控制子线程的信号量。最后如何触发把多块文件合并。
|
|
返回顶楼 | |
发表时间:2009-08-19
最后修改:2009-08-19
大米盘基于google的gears。一起分析一下他的js
Array.prototype.remove = function(b) { for ( var a = 0; a < this.length; a++) { if (b == this[a]) { this.splice(a, 1) } } }; function Map() { this.keys = []; this.index = 0; this.data = {}; this.length = 0; this.set = function(a, b) { if ("undefined" === typeof this.data[a]) { this.keys.push(a); this.length = this.keys.length } this.data[a] = b }; this.get = function(a) { if ("undefined" === typeof this.data[a]) { return null } return this.data[a] }; this.del = function(a) { if ("undefined" !== typeof this.data[a]) { this.keys.remove(a); delete this.data[a]; this.length-- } }; this.sum = function(a) { var d = 0; for ( var c = 0; c < this.keys.length; c++) { var b = this.keys[c]; if ("undefined" !== typeof a) { d += this.data[b][a] } else { d += this.data[b] } } return d }; this.foreach = function(c) { if ("function" !== typeof c) { return } for ( var b = 0; b < this.keys.length; b++) { var a = this.keys[b]; var d = c(a, this.data[a], b, this); if ("break" === d) { break } } }; this.each = function() { var a = this.current(); if (false !== a) { this.index++ } return a }; this.current = function() { if (this.index >= this.length) { return false } var a = this.data[this.keys[this.index]]; return a }; this.reset = function() { this.index = 0 }; this.toString = function() { var c = "Object Map {\n"; for ( var b = 0; b < this.keys.length; b++) { var a = this.keys[b]; c += " " + a + ":" + this.data[a] + ",\n" } c += "}"; return c } } function DmpGearsUpload(handlers) { this.text_has_not_selected = "请选择要上传的文件!"; this.text_file_to_large = "您所选的文件太大,单个文件请不要超过%s!"; this.maxChunks = 2000; this.minChunks = 3; this.progressFrequency = 1000; this.checkTimeout = 5000; this.chunkSize = 8388608; this.maxFileSize = 5368709120; this.maxThreads = 5; this.workerUrl = "http://vdami/js/worker.js"; this.tablesVersion = 903231727; this.maxProcess = 1; this.multiFileSupport = 0; this.geoAddress = null; this.locationLoaded = false; this.files = new Map; this.checkoutLock = false; this.runningProcess = new Map; this.crossFuncs = new Map; this.wp = google.gears.factory.create("beta.workerpool"); this.cwp = false; this.uploadStarted = false; this.showOpenFile = function() { }; this.showSubmit = function() { }; this.showProgress = function() { }; this.showComplete = function() { }; this.showInit = function() { }; this.showInitError = function() { }; this.showInitFail = function() { }; this.showNetError = function() { }; var _private_allow_handlers = {}; if ("undefined" !== typeof handlers) { for (n in handlers) { console.debug("bind handler: " + n); this[n] = handlers[n] } } this.openFile = function() { var this_ = this; var desktop = google.gears.factory.create("beta.desktop"); var singleFile = !this.multiFileSupport; desktop .openFiles( function(files) { if (1 > files.length) { return } if (singleFile) { this_.files = new Map } while (file = files.pop()) { var fileid = Math.random(); if (this_.maxFileSize < file.blob.length) { alert(this_.text_file_to_large.replace( "%s", genFileSize(this_.maxFileSize))); return } if (1073741824 < file.blob.length && file.name.match(/\.(rar|zip|7z)$/)) { alert("温馨提示:如果您上传的RAR是为了打包多个视频文件,我们建议您还是分开上传。因为每个视频文件极有可能在电驴和迅雷的P2P网络里已经有很多源,单独下载时会大大增加下载速度。") } this_.files.set(fileid, { fileid : fileid, name : file.name, hash : "", size : file.blob.length, blob : file.blob, loaded : 0, cmaps : [], threads : new Map, status : "init" }); var tmp_chunks = Math.ceil(file.blob.length / this_.chunkSize); var chunk_size = this_.chunkSize; if (tmp_chunks > this_.maxChunks) { chunk_size = Math.ceil(file.blob.length / this_.maxChunks) } else { if (tmp_chunks < this_.minChunks) { chunk_size = Math.ceil(file.blob.length / this_.minChunks) } } console.debug("chunksize front:" + this_.chunkSize + " final:" + chunk_size); console.debug("chunks front:" + tmp_chunks + " final:" + Math.ceil(file.blob.length / chunk_size)); var chunk_size = 9728000; var i = 0; var w = true; do { var start = i * chunk_size; var chunk_no = start / chunk_size; if ((start + chunk_size) >= file.blob.length) { offset = file.blob.length - start; w = false } else { offset = chunk_size } this_.files.get(fileid).cmaps[i] = { no : i, start : start, offset : offset, status : "init" }; i++ } while (w); this_.showOpenFile(file); console.debug(file) } }, { singleFile : singleFile }) }; this.upload = function() { if (0 === this.files.length) { alert(this.text_has_not_selected); return false } var ps = this.files.length > this.maxProcess ? this.maxProcess : this.files.length; for ( var i = 0; i < ps; i++) { console.debug("require process " + i); this.showSubmit(this.newProcess()) } this.uploadStarted = true }, this.newProcess = function() { if (this.maxProcess < this.runningProcess.length) { console.debug("new process needless."); return } var this_ = this; this.files .foreach( function(key, val, i, files) { if ("" == val.hash) { console.debug("start new process. " + val.name); try { clearInterval(this_.runningProcess .get(val.fileid)) } catch (e) { } this_.runningProcess.set(val.fileid, setInterval( function() { this_.checkProcess(val.fileid) }, this_.progressFrequency)); val.status = "uploading"; var threads = val.cmaps.length > this_.maxThreads ? this_.maxThreads : val.cmaps.length; for ( var i = 0; i < threads; i++) { this_.newThread(val) } return "break" } else { console.debug("new process skip. " + val.name + "/" + i + "/" + val.hash) } }) }; this.setThreads = function(threads) { var file = this.files.current(); this.maxThreads = threads; if (0 == file.threads.length) { return } var neednew = this.maxThreads - file.threads.length; if (neednew > 0) { for ( var i = 0; i < neednew; i++, this.newThread(file)) { } } }, this.newThread = function(file) { if (this.maxThreads <= file.threads.length) { console.debug("new thread needless."); return } for ( var i = 0; i < file.cmaps.length; i++) { var cmap = file.cmaps[i]; if ("init" == cmap.status || "error" == cmap.status) { console.debug(cmap); cmap.status = "uploading"; var range = cmap.start + "-" + cmap.offset; var req = { method : "POST", url : "/?app=dami_upload&act=chunk&todo=upload&pid=" + file.fileid + "&total_size=" + file.size, headers : [ [ "Content-Type", "application/octet-stream" ], [ "Content-Range", "bytes " + range ] ], blob : file.blob.slice(cmap.start, cmap.offset) }; var this_ = this; var funcs = { onprogress : function(obj) { obj.cmapno = cmap.no; obj.fileid = file.fileid; this_.onProgress(obj); file.threads.set(obj.cmapno, { reqid : reqid, loaded : obj.loaded }) }, oncomplete : function(obj) { obj.cmapno = cmap.no; obj.fileid = file.fileid; console.debug("funcs_fileid:" + obj.fileid); this_.onComplete(obj); file.threads.del(cmap.no) } }; this.fileid = file.fileid; var reqid = this.crossPost(req, funcs); file.threads.set(cmap.no, { reqid : reqid, loaded : 0 }); console.debug("new thread reqid: " + reqid); break } } }; this.onProgress = function(obj) { var file = this.files.get(obj.fileid); var cmap = file.cmaps[obj.cmapno] }, this.checkProcess = function(fileid) { var file = this.files.get(fileid); var loaded = file.loaded + file.threads.sum("loaded"); var speed = genSpeed( { size : file.size, received : loaded }); if (0 == parseInt(speed.speed)) { if ("undefined" === typeof this._checkProcessTimeout) { var this_ = this; this._checkProcessTimeout = setTimeout( function() { this_.showNetError( { fileid : fileid }) }, this.checkTimeout) } } else { try { clearTimeout(this._checkProcessTimeout); delete this._checkProcessTimeout } catch (e) { } this.showProgress( { fileid : file.fileid, total : file.size, loaded : loaded, speed : speed.speed, timeLeft : speed.timeLeft, running : file.threads.length }) } }; this.onComplete = function(obj) { console.debug(obj.responseText); var file = this.files.get(obj.fileid); var cmap = file.cmaps[obj.cmapno]; do { if (200 != obj.status) { console.debug(obj.status); cmap.status = "error"; file.threads.del(obj.cmapno); this_.showNetError( { fileid : file.fileid }); break } var data = false; try { data = eval("(" + obj.responseText + ")") } catch (e) { } if (false === data) { break } if (0 < parseInt(data.error) && this_.onError(data)) { console.debug("chunk upload error."); file.threads.del(obj.cmapno); break } cmap.status = "complete"; file.loaded += cmap.offset; file.threads.del(obj.cmapno) } while (false); if (file.size > file.loaded) { console.debug("new upload thread."); return this.newThread(file) } this.showProgress( { fileid : file.fileid, total : file.size, loaded : file.size, speed : 0, timeLeft : "...", running : 1 }); this.checkout(obj) }; this.checkout = function(obj) { try { clearTimeout(this.runningProcess.get(obj.fileid)) } catch (e) { } if (this.checkoutLock) { return } this.checkoutLock = true; var file = this.files.get(obj.fileid); var cmap = file.cmaps[obj.cmapno]; var url = "/?app=dami_upload&act=chunk&todo=checkout&pid=" + file.fileid + "&total_size=" + file.size + "&merge_thread=1"; var filename = encodeURIComponent(file.name); var req_timeout = (Math.ceil(file.size / 11534336) + 10) * 1000; var req = { timeout : req_timeout, method : "POST", url : url, headers : [ [ "Content-Disposition", 'attachment; filename="' + filename + '"' ] ] }; var this_ = this; try { clearTimeout(this._checkProcessTimeout); delete this._checkProcessTimeout } catch (e) { } var funcs = { oncomplete : function(req_obj) { if (200 != req_obj.status) { this_.checkoutLock = false; console.debug(req_obj); console.debug("retry checkout."); return this_.checkout(obj) } var data = eval("(" + req_obj.responseText + ")"); if (0 < parseInt(data.error) && this_.onError(data)) { return } this_.cleanup(obj); console.debug("chekcout"); setTimeout( function() { console.debug("chekcout"); var file = this_.files.get(obj.fileid); file.hash = data.fileinfo.filehash; file.status = "complete"; console.debug("<---------------"); console.debug(file); console.debug(this_.files); console.debug("--------------->"); this_.runningProcess.del(obj.fileid); data.fileid = obj.fileid; this_.showComplete(data); this_.checkoutLock = false }, 1000) } }; this.crossPost(req, funcs) }; this.cleanup = function(obj) { var file = this.files.get(obj.fileid); var url = "/?app=dami_upload&act=chunk&todo=cleanup&pid=" + file.fileid + "&total_size=" + file.size; var req = { method : "POST", url : url }; console.debug(url); var funcs = {}; this.crossPost(req, funcs) }; this.onError = function(obj) { alert("出现严重错误,上传不能继续,请稍后重试。#" + obj.error); window.location.href = window.location.href; return true }; this.initRetries = 1; this.initCrossWorker = function() { if (!this.cwp) { if (10 < this.initRetries) { this.showInitFail(); return } this.initRetries++; this.cwp = this.wp.createWorkerFromUrl(this.workerUrl + "?" + Math.random()) } console.debug("cross worker, try to initialize"); this.showInit() }; var this_ = this; window.onerror = function(message, url, line) { if (0 == line && message.match(/worker\.js/)) { this_.showInitError(message); console.debug("initialize fail"); setTimeout( function() { this_.cwp = false; this_.initCrossWorker() }, 1000); return true } console.debug(message); return false }; this.initCrossWorker(); this.wp.onmessage = function(a, b, message) { var obj = message.body; var funcs = this_.crossFuncs.get(obj.reqid); if ("debug" === obj.act) { console.debug("worker:" + obj.data); return } if (null === funcs) { console.debug("nofuncs " + message.body); return } switch (obj.act) { case "onprogress": if ("undefined" !== typeof funcs.onprogress) { funcs.onprogress(obj.data) } break; case "oncomplete": if ("undefined" !== typeof funcs.oncomplete) { funcs.oncomplete(obj.data) } break; default: console.debug(obj); break } }; this.crossPost = function(req, funcs) { var reqid = Math.random(); this.crossFuncs.set(reqid, funcs); req.reqid = reqid; this.wp.sendMessage(req, this.cwp); this.reqid = reqid; return reqid }; this.crossAbort = function(reqid) { var req = { abort : reqid }; this.wp.sendMessage(req, this.cwp); this.crossFuncs.del(reqid) }; this.cancel = function(fileid) { var file = this.files.get(fileid); try { clearInterval(this.runningProcess.get(fileid)) } catch (e) { } var this_ = this; file.threads.foreach( function(key, val, i) { console.debug("abort:" + val.reqid); this_.crossAbort(val.reqid); file.threads.del(key) }); this.files.del(fileid); this.runningProcess.del(fileid); if (this.files.length > 0 && this.uploadStarted) { this.upload() } } } function genSpeed(e) { var h = 300; if ("undefined" === typeof g_dmp_upload_lastime) { g_dmp_upload_lastime = new Date().getTime() - 1; g_dmp_upload_lastreceived = 0; g_dmp_upload_speed = []; g_dmp_cache_speed = { speed : 0, timeLeft : "Infinity" } } var a = g_dmp_upload_speed; var c = g_dmp_upload_lastime; var f = g_dmp_upload_lastreceived; var k = new Date().getTime(); var j = k - c; var g; var b = 0; if (j < 1000) { return g_dmp_cache_speed } var l = Math.abs(parseInt((((e.received - f) / (j)) * 1000) * 1.1)); a.push(l); if (h < a.length) { a.shift() } for ( var d = 0; d < a.length; d++) { b += a[d] } b = b / a.length; g = (0 === b) ? "Infinity" : sec2time((e.size - e.received) / b); g_dmp_upload_lastime = k; g_dmp_upload_lastreceived = e.received; g_dmp_cache_speed = { speed : genFileSize(b), timeLeft : g }; return g_dmp_cache_speed }; |
|
返回顶楼 | |
发表时间:2009-08-19
多线程上传好像没什么优势吧。问题在于带宽。
如果这要实现,推荐客户端用C#实现Activex,服务器端可以用JAVA实现,目前我们的处理时这样的。单线程处理也很快,局域网中完全可以利用上行带宽,如果楼主是用在广域网/Internet中那还是建议不要使用多线程,出错率会非常高。而且不好控制。 换一种想法,如果带宽不够,多线程带来的恐怕是麻烦。 |
|
返回顶楼 | |