本文共 18270 字,大约阅读时间需要 60 分钟。
介绍:一个稍复杂点的处理逻辑往往需要多个 MapReduce 程序串联处理,多 job 的串联可以借助MapReduce 框架的 JobControl 实现。
需求:以下有两个 MapReduce 任务,分别是 Flow 的 SumMR 和 SortMR,其中有依赖关系:SumMR的输出是 SortMR 的输入,所以 SortMR 的启动得在 SumMR 完成之后这两个程序在:如何实现两个代码的依赖关系呢?代码实现(这里只给出多 Job 串联的代码)public class JobDecy { public static void main(String[] args) { Configuration conf = new Configuration(true); conf.set("fs.defaultFS", "hdfs://zzy:9000"); conf.addResource("core-site.xml"); conf.addResource("hdfs-site.xml"); System.setProperty("HADOOP_USER_NAME", "hadoop"); try { //job1 FlowSum Job job1 = Job.getInstance(conf); job1.setJobName("FlowSum"); //设置任务类 job1.setJarByClass(FlowSum.class); //设置Mapper Reducer Combine job1.setMapperClass(FlowSum.MyMapper.class); job1.setReducerClass(FlowSum.MyReducer.class); job1.setCombinerClass(FlowSum.FlowSumCombine.class); //设置map 和reduce 的输入输出类型 job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(Text.class); job1.setOutputKeyClass(Text.class); job1.setMapOutputValueClass(Text.class); // 指定该 mapreduce 程序数据的输入和输出路径 Path input1 = new Path("/data/input"); Path output1 = new Path("/data/output"); //一定要保证output不存在 if (output1.getFileSystem(conf).exists(output1)) { output1.getFileSystem(conf).delete(output1, true); //递归删除 } FileInputFormat.addInputPath(job1, input1); FileOutputFormat.setOutputPath(job1, output1); //Job2 FlowSumSort Job job2= Job.getInstance(conf); job2.setJarByClass(FlowSumSort.class); job2.setJobName("FlowSumSort"); job2.setMapperClass(Mapper.class); job2.setReducerClass(Reducer.class); job2.setOutputKeyClass(FlowBean.class); job2.setOutputValueClass(NullWritable.class); // 指定该 mapreduce 程序数据的输入和输出路径 Path input2=new Path("//data/output"); Path output2 =new Path("/data/output1"); //一定要保证output不存在 if(output2.getFileSystem(conf).exists(output2)){ output2.getFileSystem(conf).delete(output2,true); //递归删除 } FileInputFormat.addInputPath(job2,input2); FileOutputFormat.setOutputPath(job2,output2); //为每个任务创建ControlledJob ControlledJob job1_cj=new ControlledJob(job1.getConfiguration()); ControlledJob job2_cj=new ControlledJob(job2.getConfiguration()); //绑定 job1_cj.setJob(job1); job2_cj.setJob(job2); // 设置作业依赖关系 job2_cj.addDependingJob(job2_cj); //job2 依赖于job1 //创建jobControl JobControl jc=new JobControl("sum and sort"); jc.addJob(job1_cj); jc.addJob(job2_cj); //使用线程开启Job Thread jobThread=new Thread(jc); //开启任务 jobThread.start(); //为了保证主程序不终止,没0.5秒检查一次是否完成作业 while(!jc.allFinished()){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } //罪作业完成之后,终止线程,释放资源 jc.stop(); } catch (IOException e) { e.printStackTrace(); } }}
需求:求每个班级的总分最高的前三名
字段:班级 姓名 数学 语文 英语 (字段之间是制表符分割)分析: - 利用“班级和总分”作为 key,可以将 map 阶段读取到的所有学生成绩数据按照班级和成绩排倒序,发送到 reduce - 在 reduce 端利用 GroupingComparator 将班级相同的 kv 聚合成组,然后取前三个即是前三名代码实现:自定义学生类:public class Student implements WritableComparable{ private String t_class; private String t_name; private int t_sumSource; public Student(){ } public void set(String t_class,String t_name,int chinese,int math,int english){ this.t_class=t_class; this.t_name=t_name; this.t_sumSource=chinese+math+english; } public String getT_class() { return t_class; } public void setT_class(String t_class) { this.t_class = t_class; } public String getT_name() { return t_name; } public void setT_name(String t_name) { this.t_name = t_name; } public int getT_sumSource() { return t_sumSource; } public void setT_sumSource(int t_sumSource) { this.t_sumSource = t_sumSource; } //比较规则 @Override public int compareTo(Student stu) { //首先根据班级比较 int result1=this.t_class.compareTo(stu.t_class); //班级相同的在根据总分比较 if(result1==0){ return stu.t_sumSource-this.t_sumSource; } return result1; } //序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.t_class); out.writeUTF(this.t_name); out.writeInt(this.t_sumSource); } //反序列化 @Override public void readFields(DataInput in) throws IOException { this.t_class=in.readUTF(); this.t_name=in.readUTF(); this.t_sumSource=in.readInt(); }}
自定义分组:
//自定义分组规则 private static class MyGroupComparator extends WritableComparator{ //这句代码必须要加,并且要调用父类的构造 public MyGroupComparator(){ super(Student.class, true); } / 决定输入到 reduce 的数据的分组规则 根据班级进行分组 / @Override public int compare(WritableComparable a, WritableComparable b) { Student stu1=(Student)a; Student stu2=(Student)a; return stu1.getTclass().compareTo(stu2.getTclass()); } }*
MR程序:
//Mapperprivate static class MyMapper extends Mapper{ Student bean = new Student(); NullWritable mv = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\\s+"); //班级 姓名 数学 语文 英语 String t_clas=fields[0]; String t_name=fields[1]; int chinese=Integer.parseInt(fields[2]); int math=Integer.parseInt(fields[3]); int english=Integer.parseInt(fields[4]); bean.set(t_clas,t_name,chinese,math,english); context.write(bean,mv); } } //Reducer private static class MyReducer extends Reducer { @Override protected void reduce(Student key, Iterable values, Context context) throws IOException, InterruptedException { int count =0; for(NullWritable value:values){ if(count>2){ break; } context.write(key,value); count++; } } }
job:
public class ClazzScoreGroupComparator { public static void main(String[] args) { Configuration conf=new Configuration(true); conf.set("fs.defaultFS","hdfs://zzy:9000"); conf.set("fs.defaultFS", "hdfs://zzy:9000"); conf.addResource("core-site.xml"); conf.addResource("hdfs-site.xml"); System.setProperty("HADOOP_USER_NAME", "hadoop"); try { Job job= Job.getInstance(conf); job.setJarByClass(ClazzScoreGroupComparator.class); job.setJobName("ClazzScoreGroupComparator"); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //指定自定义分组 job.setGroupingComparatorClass(MyGroupComparator.class); job.setOutputKeyClass(Student.class); job.setOutputValueClass(NullWritable.class); // 指定该 mapreduce 程序数据的输入和输出路径 Path input=new Path("//data/student.txt"); Path output =new Path("/data/output2"); //一定要保证output不存在 if(output.getFileSystem(conf).exists(output)){ output.getFileSystem(conf).delete(output,true); //递归删除 } FileInputFormat.addInputPath(job,input); FileOutputFormat.setOutputPath(job,output); boolean success=job.waitForCompletion(true); System.exit(success?0:1); } catch (Exception e) { e.printStackTrace(); } }
介绍:计数器是用来记录 job 的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况,MapReduce 自带了许多默认 Counter,现在我们来分析这些默认 Counter 的含义,方便大家观察 Job 结果,如输入的字节数、输出的字节数、Map 端输入/输出的字节数和条数、Reduce 端的输入/输出的字节数和条数等。
需求:利用全局计数器来统计一个目录下所有文件出现的单词总数和总行数
代码实现:public class CounterWordCount { public static void main(String[] args) { Configuration conf=new Configuration(true); conf.set("fs.defaultFS","hdfs://zzy:9000"); conf.set("fs.defaultFS", "hdfs://zzy:9000"); conf.addResource("core-site.xml"); conf.addResource("hdfs-site.xml"); System.setProperty("HADOOP_USER_NAME", "hadoop"); try { Job job= Job.getInstance(conf); job.setJarByClass(CounterWordCount.class); job.setJobName("CounterWordCount"); job.setMapperClass(MyMapper.class); //设置reduceTask为0 job.setNumReduceTasks(0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 指定该 mapreduce 程序数据的输入和输出路径 Path input=new Path("//data/"); Path output =new Path("/data/output3"); //一定要保证output不存在 if(output.getFileSystem(conf).exists(output)){ output.getFileSystem(conf).delete(output,true); //递归删除 } FileInputFormat.addInputPath(job,input); FileOutputFormat.setOutputPath(job,output); boolean success=job.waitForCompletion(true); System.exit(success?0:1); } catch (Exception e) { e.printStackTrace(); } } //定义枚举 用于存放计数器 enum CouterWordsCounts{COUNT_WORDS, COUNT_LINES} //Mapper private static class MyMapper extends Mapper{ Text mk=new Text(); LongWritable mv=new LongWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 统计行数,因为默认读取文本是逐行读取,所以 map 执行一次,行数+1 context.getCounter(CouterWordsCounts.COUNT_LINES).increment(1); String words[]=value.toString().split("\\s+"); for(String word:words){ context.getCounter(CouterWordsCounts.COUNT_WORDS).increment(1); } } //这个方法,在这个类的最后执行 @Override protected void cleanup(Context context) throws IOException, InterruptedException { mk.set("行数:"); mv.set(context.getCounter(CouterWordsCounts.COUNT_LINES).getValue()); context.write(mk,mv); mk.set("单词数:"); mv.set(context.getCounter(CouterWordsCounts.COUNT_WORDS).getValue()); context.write(mk,mv); } }}
介绍:在各种实际业务场景中,按照某个关键字对两份数据进行连接是非常常见的。如果两份数据都比较小,那么可以直接在内存中完成连接。如果是大数据量的呢?显然,在内存中进行连接会发生 OOM。MapReduce 可以用来解决大数据量的连接。在MapReduce join分两种,map join和reduce join
介绍:MapJoin 适用于有一份数据较小的连接情况。做法是直接把该小份数据直接全部加载到内存当中,按链接关键字建立索引。然后大份数据就作为 MapTask 的输入,对 map()方法的每次输入都去内存当中直接去匹配连接。然后把连接结果按 key 输出.。
数据介绍:
movies.dat:1::Toy Story (1995)::Animation|Children's|Comedy字段含义:movieid, moviename, movietypeRatings.dat:1::1193::5::978300760字段含义:userid, movieid, rate, timestamp代码实现:
public class MovieRatingMapJoinMR { public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hadoop02:9000"); System.setProperty("HADOOP_USER_NAME", "hadoop"); try { Job job = Job.getInstance(conf); job.setJarByClass(MovieRatingMapJoinMR.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(0); String minInput = args[0]; String maxInput = args[1]; String output = args[2]; FileInputFormat.setInputPaths(job, new Path(maxInput)); Path outputPath = new Path(output); FileSystem fs = FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileOutputFormat.setOutputPath(job, outputPath); //将小表加载到内存 URI uri=new Path(minInput).toUri(); job.addCacheFile(uri); boolean status = job.waitForCompletion(true); System.exit(status?0:1); } catch (Exception e) { e.printStackTrace(); } } //Mapper private static class MyMapper extends Mapper{ Text mk = new Text(); Text mv = new Text(); // 用来存储小份数据的所有解析出来的 key-value private static Map movieMap = new HashMap (); @Override protected void setup(Context context) throws IOException, InterruptedException { //读取加载到内存的表数据,并将数据的封装到movieMap容器中 URI[] cacheFiles = context.getCacheFiles(); //获取文件名 String myfilePath = cacheFiles[0].toString(); BufferedReader br = new BufferedReader(new FileReader(myfilePath)); // 此处的 line 就是从文件当中逐行读到的 movie String line = ""; while ((line = br.readLine()) != null) { //movieid::moviename::movietype String fields[] = line.split("::"); movieMap.put(fields[0], fields[1] + "\\t" + fields[2]); } IOUtils.closeStream(br); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("::"); //userid::movieid::rate::timestamp String userid = fields[0]; String movieid = fields[1]; String rate = fields[2]; String timestamp = fields[3]; if (movieMap.containsKey(userid)) { String movieFileds = movieMap.get(userid); mk.set(userid); mv.set(movieFileds + "\\t" + movieid + "\\t" + rate + "\\t" + timestamp); context.write(mk, mv); } } }}
介绍:
- map 阶段,两份数据 data1 和 data2 会被 map 分别读入,解析成以链接字段为 key 以查询字段为 value 的 key-value 对,并标明数据来源是 data1 还是 data2。 - reduce 阶段,reducetask 会接收来自 data1 和 data2 的相同 key 的数据,在 reduce 端进行乘积链接,最直接的影响是很消耗内存,导致 OOM数据介绍:
movies.dat:1::Toy Story (1995)::Animation|Children's|Comedy字段含义:movieid, moviename, movietypeRatings.dat:1::1193::5::978300760字段含义:userid, movieid, rate, timestamp代码实现:
public class MovieRatingReduceJoinMR { public static void main(String[] args) { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://zzy:9000"); System.setProperty("HADOOP_USER_NAME", "hadoop"); try { Job job = Job.getInstance(conf); job.setJarByClass(MovieRatingReduceJoinMR.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); String Input = args[0]; String output = args[1]; FileInputFormat.setInputPaths(job, new Path(Input)); Path outputPath = new Path(output); FileSystem fs = FileSystem.get(conf); if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileOutputFormat.setOutputPath(job, outputPath); boolean status = job.waitForCompletion(true); System.exit(status?0:1); } catch (Exception e) { e.printStackTrace(); } } //Mapper private static class MyMapper extends Mapper{ private String name; Text mk = new Text(); Text mv = new Text(); //获取文件名 @Override protected void setup(Context context) throws IOException, InterruptedException { //InputSplit是一个抽象类,使用它的实现类FileSplit FileSplit is=(FileSplit)context.getInputSplit(); name=is.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //movies.dat movieid::moviename::movietype //ratings.dat userid::movieid::rate::timestamp String OutputKey=null; String OutputValue=null; String fields[]=value.toString().split("::"); if(name.endsWith("movies.dat")){ OutputKey=fields[0]; OutputValue=fields[1]+"\t"+fields[2]+"_"+"movies"; }else if(name.endsWith("ratings.dat")){ OutputKey=fields[1]; OutputValue=fields[0]+"\t"+fields[2]+"\t"+fields[3]+"_"+"ratings"; } mk.set(OutputKey); mv.set(OutputValue); context.write(mk,mv); } } //Reducer private static class MyReducer extends Reducer< Text, Text, Text, Text>{ Text rv=new Text(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { List movies=new ArrayList<>(); List ratings=new ArrayList<>(); //将数据分别添加到存放两张表字段的容器中 for(Text value:values){ String fields[]= value.toString().split("_"); if(fields[1].equals("movies")){ movies.add(fields[0]); }else if(fields[1].equals("ratings")){ ratings.add(fields[0]); } } //连接两个表的数据 if(ratings.size()>0&&movies.size()>0){ for(String movie:movies){ for(String rate:ratings){ rv.set(movie+"\t"+rate); context.write(key,rv); } } } } }}
转载于:https://blog.51cto.com/14048416/2342050