`
cloudtech
  • 浏览: 4642929 次
  • 性别: Icon_minigender_1
  • 来自: 武汉
文章分类
社区版块
存档分类
最新评论

MPI 实现并行直接插入排序

 
阅读更多

话说 直插排序串行只需要十几行 并行就大不一样。。。

基本思路是

1/主线程控制数据读取 分发和汇总

2/从线程中数据按线程号非降有序 每次有新数据 主线程广播 从线程按照数据大小判断是否在自己处理范围内 如果在的话 在自己保存的数组中进行直接插入

3/最后 主线程按序收集显示所有数据

#include "mpi.h"
#include <unistd.h>
#include <fcntl.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>

#define END 0
#define INIT 1
#define DATA 2
#define ROOT 0

int readValue(char* file,int* values)
{
	int fd,size,count=0;
	char buffer[80],num[11];
	fd=open(file,O_RDONLY);
	do
	{
		size=read(fd,buffer,sizeof(buffer));
		int j=0;
		for(int i=0;i<size;++i)
		{
			if(buffer[i]<'9'&&buffer[i]>'0')
			{
				num[j++]=buffer[i];
			}else
			{
				num[j]='\0';
				values[count++]=atoi(num);
				j=0;	
			}
		}
	}while(size!=0);
	close(fd);
	return count;
}
void insert(int* arr,int pos,int temp)
{
	while(pos>=0&&temp<arr[pos])
	{
		arr[pos+1]=arr[pos];
		--pos;
	}
	arr[pos+1]=temp;
}
void serial(int* arr,int size)
{
	for(int i=1;i<size;++i)
	{
		insert(arr,i-1,arr[i]);
	}
}

int main(int argc,char *argv[])
{
	int* values;
	int self,size,length,temp;
	MPI_Init(&argc,&argv);
	MPI_Comm_size(MPI_COMM_WORLD,&size);
	MPI_Comm_rank(MPI_COMM_WORLD,&self);
	MPI_Status status;
	if(self==0)
	{
		//read value from file
		values=(int*)malloc(100*sizeof(int));
		length=readValue("/home/gt/parellel/sort/data.in",values);
		//initial root value of each process
		serial(values,size);
		//broadcast the max length of each process
		MPI_Bcast(&length,1,MPI_INT,ROOT,MPI_COMM_WORLD);
		//the boundary
		for(int i=1;i<size;++i)
		{
			MPI_Ssend(&values[i-1],1,MPI_INT,i,INIT,MPI_COMM_WORLD);
		}
		MPI_Recv(&temp,1,MPI_INT,self+1,DATA,MPI_COMM_WORLD,&status);
		//broadcast each value and do insert in each process
		for(int i=size-1;i<length;++i)
		{
			temp=values[i];
			MPI_Bcast(&temp,1,MPI_INT,ROOT,MPI_COMM_WORLD);
			printf("broadcast %d \n",values[i]);
			//MPI_Barrier(MPI_COMM_WORLD);
		}
		
		//ends
		printf("ends \n");
		temp=END;
		MPI_Bcast(&temp,1,MPI_INT,ROOT,MPI_COMM_WORLD);
		//recieve from each process in order
		int pos=0;
		int len=0;
		for(int i=1;i<size;++i)
		{
			MPI_Recv(&len,1,MPI_INT,i,DATA,MPI_COMM_WORLD,&status);
			for(int j=0;j<len;++j)
			{
				MPI_Recv(&temp,1,MPI_INT,i,DATA,MPI_COMM_WORLD,&status);
				values[pos++]=temp;
			}
		}
		for(int i=0;i<length;++i)
		{
			printf("%d \n",values[i]);
		}
		free(values);
	}
	else
	{	
		MPI_Request request;
		//recieve max length
		MPI_Bcast(&length,1,MPI_INT,ROOT,MPI_COMM_WORLD);
		//position
		int pos=0;
		//post irecv
		MPI_Irecv(&temp,1,MPI_INT,ROOT,INIT,MPI_COMM_WORLD,&request);
		//data on this process
		int* local=(int*) malloc(length*sizeof(int)+1);
		//wait for the recieve
		MPI_Wait(&request,&status);
		//exchange upper limit
		int upper=999999;//max of int
		if(self!=size-1)MPI_Irecv(&upper,1,MPI_INT,self+1,DATA,MPI_COMM_WORLD,&request);
		MPI_Ssend(&temp,1,MPI_INT,self-1,DATA,MPI_COMM_WORLD);
		if(self!=size-1)MPI_Wait(&request,&status);
		local[pos++]=temp;
		MPI_Bcast(&temp,1,MPI_INT,ROOT,MPI_COMM_WORLD);
		//MPI_Barrier(MPI_COMM_WORLD);
		while(temp!=END)
		{
			printf("%d recieve %d \n",self,temp);
			if(temp<=upper&&(self==1||local[0]<temp))
			{
				printf("%d insert %d \n",self,temp);
				local[pos++]=temp;
				insert(local,pos-2,temp);
			}
			MPI_Bcast(&temp,1,MPI_INT,ROOT,MPI_COMM_WORLD);
			//MPI_Barrier(MPI_COMM_WORLD);
		}
		//ends
		//sends the data in order to root process
		MPI_Ssend(&pos,1,MPI_INT,ROOT,DATA,MPI_COMM_WORLD);
		for(int i=0;i<pos;++i)	
		{
			MPI_Ssend(&local[i],1,MPI_INT,ROOT,DATA,MPI_COMM_WORLD);
			printf("% d send back %d \n",self,local[i]);
		}
		free(local);
	}
	MPI_Finalize();
	return 0;
}


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics