本站分享:AI、大数据、数据分析师培训认证考试,包括:Python培训Excel培训Matlab培训SPSS培训SAS培训R语言培训Hadoop培训Amos培训Stata培训Eviews培训

基于Hadoop的Map reduce编程(一)_Hadoop编程

hadoop培训 cdadata 3330℃

基于Hadoop的Map reduce编程(一)

关键词:hadoop map reduce,hadoop 多个mapreduce,hadoop 两个mapreduce
Hadoop是apache的一个开源的map-reduce框架,MapReduce是一个并行计算模型,用来处理海量数据。模型思想来源于google的Jeffrey Dean 和 Sanjay Ghemawat,包括map() reduce()两个主要的功能。
这是一个很简单的类似于Hadoop的MapReduce应用例子,应用了mapreduce的基本思想,可以帮助理解hadoop的处理思想和技术,但注意,它没有使用hadoop框架。
例子的功能是创建一些字符串,然后统计这些字符串里面每个字符出现的次数,最后汇总得到总的字符出现次数。
Listing 1. 主程序
public class Main
{
    public static void main(String[] args)
{
        MyMapReduce my = new MyMapReduce();
my.init();
    }
}
Listing 2. MyMapReduce.java
import java.util.*;
public class MyMapReduce
{
List buckets = new ArrayList();
List intermediateresults = new ArrayList();
List values = new ArrayList();
public void init()
{
for(int i = 1; i<=30; i++)
{
values.add(“http://pconline900.javaeye.com” + new Integer(i).toString());
}
System.out.println(“**STEP 1 START**-> Running Conversion into Buckets**”);
System.out.println();
List b = step1ConvertIntoBuckets(values,5);
System.out.println(“************STEP 1 COMPLETE*************”);
System.out.println();
System.out.println();
   System.out.println(“**STEP 2 START**->Running **Map Function** concurrently for all        Buckets”);
System.out.println();
List res = step2RunMapFunctionForAllBuckets(b);
System.out.println(“************STEP 2 COMPLETE*************”);
        System.out.println();
System.out.println();
System.out.println(“**STEP 3 START**->Running **Reduce Function** for collating Intermediate Results and Printing Results”);
System.out.println();
step3RunReduceFunctionForAllBuckets(res);
System.out.println(“************STEP 3 COMPLETE*************”);
System.out.println(“************pconline900 翻译*************”);
System.out.println(“***********博客:http://pconline900.javaeye.com*************”);
}
public List step1ConvertIntoBuckets(List list,int numberofbuckets)
{
int n = list.size();
int m = n / numberofbuckets;
int rem = n% numberofbuckets;
int count = 0;
System.out.println(“BUCKETS”);
for(int j =1; j<= numberofbuckets; j++)
{
List temp = new ArrayList();
for(int i=1; i<= m; i++)
{
temp.add((String)values.get(count));
count++;
}
buckets.add(temp);
temp = new ArrayList();
}
if(rem != 0)
{
List temp = new ArrayList();
for(int i =1; i<=rem;i++)
{
temp.add((String)values.get(count));
count++;
}
buckets.add(temp);
}
System.out.println();
System.out.println(buckets);
System.out.println();
return buckets;
}
public List step2RunMapFunctionForAllBuckets(List list)
{
for(int i=0; i< list.size(); i++)
{
List elementList = (ArrayList)list.get(i);
new StartThread(elementList).start();
}
        try
{
Thread.currentThread().sleep(1000);
}catch(Exception e)
{
}
return intermediateresults;
}
public void step3RunReduceFunctionForAllBuckets(List list)
{
int sum =0;
for(int i=0; i< list.size(); i++)
{
//you can do some processing here, like finding max of all results etc
int t = Integer.parseInt((String)list.get(i));
sum += t;
}
System.out.println();
System.out.println(“Total Count is “+ sum);
System.out.println();
}
class StartThread extends Thread
{
private List tempList = new ArrayList();
public StartThread(List list)
{
tempList = list;
}
public void run()
{
for(int i=0; i< tempList.size();i++)
{
String str = (String)tempList.get(i);
synchronized(this)
{
intermediateresults.add(new Integer(str.length()).toString());
}
}
}
}
}
? init()方法创建了一些测试数据,作为测试数据。实际应用中会是海量数据处理。
? step1ConvertIntoBuckets()方法将测试数据拆分到5个 bucket中,每个bucket是一个ArrayList(包含6个String数据)。bucket可以保存在内存,磁盘,或者集群中的其他节点;
? step2RunMapFunctionForAllBuckets()方法创建了5个线程(每个bucket一个),每个线程StartThread处理每个bucket并把处理结果放在intermediateresults这个arraylist中。
? 如果bucket分配给不同的节点处理,必须有一个master主控节点监控各个节点的计算,汇总各个节点的处理结果,若有节点失败,master必须能够分配计算任务给其他节点计算。
? step3RunReduceFunctionForAllBuckets()方法加载intermediateresults中间处理结果,并进行汇总处理,最后得到最终的计算结果。

转载请注明:数据分析 » 基于Hadoop的Map reduce编程(一)_Hadoop编程

喜欢 (0)or分享 (0)