`
gordonAtJava
  • 浏览: 7773 次
文章分类
社区版块
存档分类
最新评论

java中的自动并行运算小试

阅读更多
前天看了 T1 大大的“关于两个世界体系的对话”http://www.iteye.com/topic/231515,获益良多,随笔写下2个类把文章中的 (1+2)*(3+4) 并行问题用java解决掉。
基于这个解决方案的java代码是可以(理论上)自动被并行运算的(在方法调用这个层面上),并行部分就是对所有无相关参数并行计算

闲话少说,上代码

客户端
为了让效果明显,加法和乘法里面都sleep了1000ms
Test类有3个method, add(a,b) , mul(a,b,c), trace(starttime,result),在传统的顺序执行中是这样调用的
Test t=new Test();
int a=t.add(1,2);
int b=t.add(3,4);
int c=t.add(2,5);
int result=t.mul(a,b,c);
运行结果
main: + 1 2 => 3
main: + 3 4 => 7
main: + 2 5 => 7
main: * 3 7 7 => 147
147 | time cost : 4003 | freeM/totalM : 1816576/2031616

用了并行运算就要这样调用
Test t=new Test();
Func add=Func.define(t,"add");
Func mul=Func.define(t,"mul");
Var a=add.c(1,2);
Var b=add.c(3,4);
Var c=add.c(2,5);
Var result=mul.c(a,b,c);
result.run();

运行结果
Thread-1: + 1 2 => 3
Thread-2: + 3 4 => 7
Thread-3: + 2 5 => 7
Thread-6: * 3 7 7 => 147
147 | time cost : 2010 | freeM/totalM : 1716992/2031616


用了并发的写法以后,3个加法是同时由3个不同线程执行的,直到3个加法都执行完毕,乘法才被调用。在一个完全按照sequence方式写的并行程序的运行时间大大的减少了,4003ms => 2010ms

Test.java
/*
 * Created on 2008-9-9
 * Title:
 * Description: 
 * @author Gordon Hu
 * @version 1.0
 */
package gordon.concurrence;

public class Test {
    
    public static void main(String[] args) throws Exception {
        Test t=new Test();
        boolean parallel=true;
        if(parallel){
            Func add=Func.define(t,"add");
            Func mul=Func.define(t,"mul");
            Func println=Func.define(t,"trace");
            Var a=add.c(1,2);
            Var b=add.c(3,4);
            Var c=add.c(2,5);
            Var i3=mul.c(a,b,c);
            long start=System.currentTimeMillis();
            Var timeMe=println.c(start,i3);
            timeMe.run();
        }else{
            long START=System.currentTimeMillis();
            int A=t.add(1,2);
            int B=t.add(3,4);
            int C=t.add(2,5);
            int I3=t.mul(A,B,C);
            t.trace(START,I3);
        }
    }
    
    public int add(Integer a,Integer b){
    	int rst=a+b;
    	try {
            Thread.currentThread().sleep(1000);
        } catch (InterruptedException e) {
            // TODO
        }
        System.out.println(Thread.currentThread().getName()+ ": + "+a+" "+b+" => "+rst);
        return rst;
    }

    public int mul(Integer a,Integer b,Integer c){
        int rst=a*b*c;
        try {
            Thread.currentThread().sleep(1000);
        } catch (InterruptedException e) {
            // TODO
        }
        System.out.println(Thread.currentThread().getName()+ ": * "+a+" "+b+" "+c+" => "+rst);
        return rst;
    }

    public void trace(Long start,Object o){
        System.out.println(o+" | time cost : "+(System.currentTimeMillis()-start)+" | freeM/totalM : "+Runtime.getRuntime().freeMemory()+"/"+Runtime.getRuntime().totalMemory());
    }
}



帮助类
Var.java
/*
 * Created on 2008-9-9
 * Title:
 * Description: 
 * @author Gordon Hu
 * @version 1.0
 */
package gordon.concurrence;

import java.util.HashSet;
import java.util.Set;

public abstract class Var implements Runnable {
    public abstract Func getFunc();
    public abstract void run();
    public abstract void update(Var n,Object rst);
    private Set<Var> updater=new HashSet<Var>();

    public void setUpdater(Set<Var> updater) {
        this.updater.addAll(updater);
    }
    
    public void addUpdater(Var updater){
        this.updater.add(updater);
    }

    public Set<Var> getUpdater() {
        return updater;
    }
}

每个Var其实就是一个方法运行时候所需要的context,包括input parameters和return结果要调用的callback

Threading.java
/*
 * Created on 2008-9-9
 * Title:
 * Description: 
 * @author Gordon Hu
 * @version 1.0
 */
package gordon.concurrence;

public class Threading {
    public static void runThread(Runnable r){
        new Thread(r).start();
    }
}

这个类就是所有异步调用的底层设施了,我这里就直接new thread了,可以用threadpool之类的,也可以用grid computation,比如coherence之类的


接下来这个Func其实就是些反射动态调用并且暴露一个统一的类似function的接口。
动态调用里面我用了方法名字匹配(不包括参数类型,只要名字符合的第一个方法就被调用,这样主要是因为本人倾向可变长参数,不喜欢overloading,所以像System.out对象里面的println方法就不能直接用)。另外还负责了生成一个Var。
Func.java
/*
 * Created on 2008-9-9
 * Title:
 * Description: 
 * @author Gordon Hu
 * @version 1.0
 */
package gordon.concurrence;

import java.util.ArrayList;
import java.util.List;

public abstract class Func {
    public abstract Object apply(Object... args);
    public Var c(final Object... args){
        final Func self=this;
        Var n=new Var(){
                //....create a Var and bind args to it
                //....具体实现省略,详情看附件中的src
            };
        //set callbacks
        for(Object o:args){
            if(o instanceof Var)((Var)o).addUpdater(n);
        }
        return n;
    }
    
    
    public static Func define(final Object worker,final String method){
        //define a Func from a given object and its method name
        return new Func(){
                public Object apply(Object... args) {
                    return run(worker,method,args);
                }
            };
    }
    
    public static Runnable getUpdateThread(final Var updater,final Var node,final Object rst){
        return new Runnable(){
            public void run(){
                updater.update(node,rst);
            }
        };
    }
    public static Object run(Object worker, String action, 
                                Object... arguments) {
        //dynamically call run a method
        //execute worker.action(arguments) and return result
        //....具体实现省略,详情看附件中的src
    }
}
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics