博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce的典型编程场景2
阅读量:6008 次
发布时间:2019-06-20

本文共 18270 字,大约阅读时间需要 60 分钟。

1.MapReduce 多 Job 串联

   介绍:一个稍复杂点的处理逻辑往往需要多个 MapReduce 程序串联处理,多 job 的串联可以借助MapReduce 框架的 JobControl 实现。

需求
以下有两个 MapReduce 任务,分别是 Flow 的 SumMR 和 SortMR,其中有依赖关系:SumMR的输出是 SortMR 的输入,所以 SortMR 的启动得在 SumMR 完成之后
这两个程序在:
如何实现两个代码的依赖关系呢?
代码实现(这里只给出多 Job 串联的代码)

public class JobDecy {    public static void main(String[] args) {        Configuration conf = new Configuration(true);        conf.set("fs.defaultFS", "hdfs://zzy:9000");        conf.addResource("core-site.xml");        conf.addResource("hdfs-site.xml");        System.setProperty("HADOOP_USER_NAME", "hadoop");        try {            //job1  FlowSum            Job job1 = Job.getInstance(conf);            job1.setJobName("FlowSum");            //设置任务类            job1.setJarByClass(FlowSum.class);            //设置Mapper  Reducer  Combine            job1.setMapperClass(FlowSum.MyMapper.class);            job1.setReducerClass(FlowSum.MyReducer.class);            job1.setCombinerClass(FlowSum.FlowSumCombine.class);            //设置map 和reduce 的输入输出类型            job1.setMapOutputKeyClass(Text.class);            job1.setMapOutputValueClass(Text.class);            job1.setOutputKeyClass(Text.class);            job1.setMapOutputValueClass(Text.class);            // 指定该 mapreduce 程序数据的输入和输出路径            Path input1 = new Path("/data/input");            Path output1 = new Path("/data/output");            //一定要保证output不存在            if (output1.getFileSystem(conf).exists(output1)) {                output1.getFileSystem(conf).delete(output1, true);  //递归删除            }            FileInputFormat.addInputPath(job1, input1);            FileOutputFormat.setOutputPath(job1, output1);            //Job2 FlowSumSort            Job job2= Job.getInstance(conf);            job2.setJarByClass(FlowSumSort.class);            job2.setJobName("FlowSumSort");            job2.setMapperClass(Mapper.class);            job2.setReducerClass(Reducer.class);            job2.setOutputKeyClass(FlowBean.class);            job2.setOutputValueClass(NullWritable.class);            // 指定该 mapreduce 程序数据的输入和输出路径            Path input2=new Path("//data/output");            Path output2 =new Path("/data/output1");            //一定要保证output不存在            if(output2.getFileSystem(conf).exists(output2)){                output2.getFileSystem(conf).delete(output2,true);  //递归删除            }            FileInputFormat.addInputPath(job2,input2);            FileOutputFormat.setOutputPath(job2,output2);            //为每个任务创建ControlledJob            ControlledJob  job1_cj=new ControlledJob(job1.getConfiguration());            ControlledJob  job2_cj=new ControlledJob(job2.getConfiguration());            //绑定            job1_cj.setJob(job1);            job2_cj.setJob(job2);            // 设置作业依赖关系            job2_cj.addDependingJob(job2_cj);  //job2 依赖于job1            //创建jobControl            JobControl jc=new JobControl("sum and sort");            jc.addJob(job1_cj);            jc.addJob(job2_cj);            //使用线程开启Job            Thread  jobThread=new Thread(jc);            //开启任务            jobThread.start();            //为了保证主程序不终止,没0.5秒检查一次是否完成作业            while(!jc.allFinished()){                try {                    Thread.sleep(500);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }            //罪作业完成之后,终止线程,释放资源            jc.stop();        } catch (IOException e) {            e.printStackTrace();        }    }}

2.TopN 算法实现(二次排序)

需求:求每个班级的总分最高的前三名

字段:班级 姓名 数学 语文 英语 (字段之间是制表符分割)
分析
  - 利用“班级和总分”作为 key,可以将 map 阶段读取到的所有学生成绩数据按照班级和成绩排倒序,发送到 reduce
  - 在 reduce 端利用 GroupingComparator 将班级相同的 kv 聚合成组,然后取前三个即是前三名
代码实现
自定义学生类

public class Student implements WritableComparable
{ private String t_class; private String t_name; private int t_sumSource; public Student(){ } public void set(String t_class,String t_name,int chinese,int math,int english){ this.t_class=t_class; this.t_name=t_name; this.t_sumSource=chinese+math+english; } public String getT_class() { return t_class; } public void setT_class(String t_class) { this.t_class = t_class; } public String getT_name() { return t_name; } public void setT_name(String t_name) { this.t_name = t_name; } public int getT_sumSource() { return t_sumSource; } public void setT_sumSource(int t_sumSource) { this.t_sumSource = t_sumSource; } //比较规则 @Override public int compareTo(Student stu) { //首先根据班级比较 int result1=this.t_class.compareTo(stu.t_class); //班级相同的在根据总分比较 if(result1==0){ return stu.t_sumSource-this.t_sumSource; } return result1; } //序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.t_class); out.writeUTF(this.t_name); out.writeInt(this.t_sumSource); } //反序列化 @Override public void readFields(DataInput in) throws IOException { this.t_class=in.readUTF(); this.t_name=in.readUTF(); this.t_sumSource=in.readInt(); }}

自定义分组

//自定义分组规则    private static class MyGroupComparator extends  WritableComparator{        //这句代码必须要加,并且要调用父类的构造        public MyGroupComparator(){            super(Student.class, true);        }        /          决定输入到 reduce 的数据的分组规则          根据班级进行分组         /        @Override        public int compare(WritableComparable a, WritableComparable b) {            Student stu1=(Student)a;            Student stu2=(Student)a;            return stu1.getTclass().compareTo(stu2.getTclass());        }    }*

MR程序

//Mapper

private static class MyMapper extends Mapper
{ Student bean = new Student(); NullWritable mv = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\\s+"); //班级 姓名 数学 语文 英语 String t_clas=fields[0]; String t_name=fields[1]; int chinese=Integer.parseInt(fields[2]); int math=Integer.parseInt(fields[3]); int english=Integer.parseInt(fields[4]); bean.set(t_clas,t_name,chinese,math,english); context.write(bean,mv); } } //Reducer private static class MyReducer extends Reducer
{ @Override protected void reduce(Student key, Iterable
values, Context context) throws IOException, InterruptedException { int count =0; for(NullWritable value:values){ if(count>2){ break; } context.write(key,value); count++; } } }

job

public class ClazzScoreGroupComparator {    public static void main(String[] args) {        Configuration conf=new Configuration(true);        conf.set("fs.defaultFS","hdfs://zzy:9000");        conf.set("fs.defaultFS", "hdfs://zzy:9000");        conf.addResource("core-site.xml");        conf.addResource("hdfs-site.xml");        System.setProperty("HADOOP_USER_NAME", "hadoop");        try {            Job job= Job.getInstance(conf);            job.setJarByClass(ClazzScoreGroupComparator.class);            job.setJobName("ClazzScoreGroupComparator");            job.setMapperClass(MyMapper.class);            job.setReducerClass(MyReducer.class);            //指定自定义分组            job.setGroupingComparatorClass(MyGroupComparator.class);            job.setOutputKeyClass(Student.class);            job.setOutputValueClass(NullWritable.class);            // 指定该 mapreduce 程序数据的输入和输出路径            Path input=new Path("//data/student.txt");            Path output =new Path("/data/output2");            //一定要保证output不存在            if(output.getFileSystem(conf).exists(output)){                output.getFileSystem(conf).delete(output,true);  //递归删除            }            FileInputFormat.addInputPath(job,input);            FileOutputFormat.setOutputPath(job,output);            boolean success=job.waitForCompletion(true);            System.exit(success?0:1);        } catch (Exception e) {            e.printStackTrace();        }    }

3. MapReduce 全局计数器

    介绍:计数器是用来记录 job 的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况,MapReduce 自带了许多默认 Counter,现在我们来分析这些默认 Counter 的含义,方便大家观察 Job 结果,如输入的字节数、输出的字节数、Map 端输入/输出的字节数和条数、Reduce 端的输入/输出的字节数和条数等。

需求:利用全局计数器来统计一个目录下所有文件出现的单词总数和总行数

代码实现

public class CounterWordCount {    public static void main(String[] args) {        Configuration conf=new Configuration(true);        conf.set("fs.defaultFS","hdfs://zzy:9000");        conf.set("fs.defaultFS", "hdfs://zzy:9000");        conf.addResource("core-site.xml");        conf.addResource("hdfs-site.xml");        System.setProperty("HADOOP_USER_NAME", "hadoop");        try {            Job job= Job.getInstance(conf);            job.setJarByClass(CounterWordCount.class);            job.setJobName("CounterWordCount");            job.setMapperClass(MyMapper.class);            //设置reduceTask为0            job.setNumReduceTasks(0);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(LongWritable.class);            // 指定该 mapreduce 程序数据的输入和输出路径            Path input=new Path("//data/");            Path output =new Path("/data/output3");            //一定要保证output不存在            if(output.getFileSystem(conf).exists(output)){                output.getFileSystem(conf).delete(output,true);  //递归删除            }            FileInputFormat.addInputPath(job,input);            FileOutputFormat.setOutputPath(job,output);            boolean success=job.waitForCompletion(true);            System.exit(success?0:1);        } catch (Exception e) {            e.printStackTrace();        }    }    //定义枚举 用于存放计数器    enum CouterWordsCounts{COUNT_WORDS, COUNT_LINES}    //Mapper    private static class MyMapper extends Mapper
{ Text mk=new Text(); LongWritable mv=new LongWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 统计行数,因为默认读取文本是逐行读取,所以 map 执行一次,行数+1 context.getCounter(CouterWordsCounts.COUNT_LINES).increment(1); String words[]=value.toString().split("\\s+"); for(String word:words){ context.getCounter(CouterWordsCounts.COUNT_WORDS).increment(1); } } //这个方法,在这个类的最后执行 @Override protected void cleanup(Context context) throws IOException, InterruptedException { mk.set("行数:"); mv.set(context.getCounter(CouterWordsCounts.COUNT_LINES).getValue()); context.write(mk,mv); mk.set("单词数:"); mv.set(context.getCounter(CouterWordsCounts.COUNT_WORDS).getValue()); context.write(mk,mv); } }}

4.MapReduce Join

    介绍:在各种实际业务场景中,按照某个关键字对两份数据进行连接是非常常见的。如果两份数据都比较小,那么可以直接在内存中完成连接。如果是大数据量的呢?显然,在内存中进行连接会发生 OOM。MapReduce 可以用来解决大数据量的连接。在MapReduce join分两种,map joinreduce join

map join

    介绍:MapJoin 适用于有一份数据较小的连接情况。做法是直接把该小份数据直接全部加载到内存当中,按链接关键字建立索引。然后大份数据就作为 MapTask 的输入,对 map()方法的每次输入都去内存当中直接去匹配连接。然后把连接结果按 key 输出.。


数据介绍

movies.dat:1::Toy Story (1995)::Animation|Children's|Comedy
字段含义:movieid, moviename, movietype
Ratings.dat:1::1193::5::978300760
字段含义:userid, movieid, rate, timestamp


代码实现

public class MovieRatingMapJoinMR {    public static void main(String[] args) {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://hadoop02:9000");        System.setProperty("HADOOP_USER_NAME", "hadoop");        try {            Job job = Job.getInstance(conf);            job.setJarByClass(MovieRatingMapJoinMR.class);            job.setMapperClass(MyMapper.class);            job.setMapOutputKeyClass(Text.class);            job.setMapOutputValueClass(Text.class);            job.setNumReduceTasks(0);            String minInput = args[0];            String maxInput = args[1];            String output = args[2];            FileInputFormat.setInputPaths(job, new Path(maxInput));            Path outputPath = new Path(output);            FileSystem fs = FileSystem.get(conf);            if (fs.exists(outputPath)) {                fs.delete(outputPath, true);            }            FileOutputFormat.setOutputPath(job, outputPath);            //将小表加载到内存            URI uri=new Path(minInput).toUri();            job.addCacheFile(uri);            boolean status = job.waitForCompletion(true);            System.exit(status?0:1);        } catch (Exception e) {            e.printStackTrace();        }    }    //Mapper    private static class MyMapper extends Mapper
{ Text mk = new Text(); Text mv = new Text(); // 用来存储小份数据的所有解析出来的 key-value private static Map
movieMap = new HashMap
(); @Override protected void setup(Context context) throws IOException, InterruptedException { //读取加载到内存的表数据,并将数据的封装到movieMap容器中 URI[] cacheFiles = context.getCacheFiles(); //获取文件名 String myfilePath = cacheFiles[0].toString(); BufferedReader br = new BufferedReader(new FileReader(myfilePath)); // 此处的 line 就是从文件当中逐行读到的 movie String line = ""; while ((line = br.readLine()) != null) { //movieid::moviename::movietype String fields[] = line.split("::"); movieMap.put(fields[0], fields[1] + "\\t" + fields[2]); } IOUtils.closeStream(br); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("::"); //userid::movieid::rate::timestamp String userid = fields[0]; String movieid = fields[1]; String rate = fields[2]; String timestamp = fields[3]; if (movieMap.containsKey(userid)) { String movieFileds = movieMap.get(userid); mk.set(userid); mv.set(movieFileds + "\\t" + movieid + "\\t" + rate + "\\t" + timestamp); context.write(mk, mv); } } }}

reduce join

    介绍

    - map 阶段,两份数据 data1 和 data2 会被 map 分别读入,解析成以链接字段为 key 以查询字段为 value 的 key-value 对,并标明数据来源是 data1 还是 data2。
    - reduce 阶段,reducetask 会接收来自 data1 和 data2 的相同 key 的数据,在 reduce 端进行乘积链接,最直接的影响是很消耗内存,导致 OOM


数据介绍

movies.dat:1::Toy Story (1995)::Animation|Children's|Comedy
字段含义:movieid, moviename, movietype
Ratings.dat:1::1193::5::978300760
字段含义:userid, movieid, rate, timestamp


代码实现

public class MovieRatingReduceJoinMR {    public static void main(String[] args) {        Configuration conf = new Configuration();        conf.set("fs.defaultFS", "hdfs://zzy:9000");        System.setProperty("HADOOP_USER_NAME", "hadoop");        try {            Job job = Job.getInstance(conf);            job.setJarByClass(MovieRatingReduceJoinMR.class);            job.setMapperClass(MyMapper.class);            job.setReducerClass(MyReducer.class);            job.setMapOutputKeyClass(Text.class);            job.setMapOutputValueClass(Text.class);            String Input = args[0];            String output = args[1];            FileInputFormat.setInputPaths(job, new Path(Input));            Path outputPath = new Path(output);            FileSystem fs = FileSystem.get(conf);            if (fs.exists(outputPath)) {                fs.delete(outputPath, true);            }            FileOutputFormat.setOutputPath(job, outputPath);            boolean status = job.waitForCompletion(true);            System.exit(status?0:1);        } catch (Exception e) {            e.printStackTrace();        }    }    //Mapper    private static class MyMapper extends Mapper
{ private String name; Text mk = new Text(); Text mv = new Text(); //获取文件名 @Override protected void setup(Context context) throws IOException, InterruptedException { //InputSplit是一个抽象类,使用它的实现类FileSplit FileSplit is=(FileSplit)context.getInputSplit(); name=is.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //movies.dat movieid::moviename::movietype //ratings.dat userid::movieid::rate::timestamp String OutputKey=null; String OutputValue=null; String fields[]=value.toString().split("::"); if(name.endsWith("movies.dat")){ OutputKey=fields[0]; OutputValue=fields[1]+"\t"+fields[2]+"_"+"movies"; }else if(name.endsWith("ratings.dat")){ OutputKey=fields[1]; OutputValue=fields[0]+"\t"+fields[2]+"\t"+fields[3]+"_"+"ratings"; } mk.set(OutputKey); mv.set(OutputValue); context.write(mk,mv); } } //Reducer private static class MyReducer extends Reducer< Text, Text, Text, Text>{ Text rv=new Text(); @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { List
movies=new ArrayList<>(); List
ratings=new ArrayList<>(); //将数据分别添加到存放两张表字段的容器中 for(Text value:values){ String fields[]= value.toString().split("_"); if(fields[1].equals("movies")){ movies.add(fields[0]); }else if(fields[1].equals("ratings")){ ratings.add(fields[0]); } } //连接两个表的数据 if(ratings.size()>0&&movies.size()>0){ for(String movie:movies){ for(String rate:ratings){ rv.set(movie+"\t"+rate); context.write(key,rv); } } } } }}

转载于:https://blog.51cto.com/14048416/2342050

你可能感兴趣的文章
404 Sum of Left Leaves
查看>>
提供SaaS Launchkit,快速定制,一云多端等能力,一云多端将通过小程序云实现...
查看>>
java b2b2c SpringCloud电子商务平台
查看>>
(十三)企业分布式微服务云SpringCloud SpringBoot mybatis-断路器聚合监控(Hystrix Turbine)...
查看>>
热更新
查看>>
亿万富翁Calvin Ayre梭哈BCH!
查看>>
map/reduce之间的shuffle,partition,combiner过程的详解
查看>>
ubuntu 手动配置interface上网
查看>>
Notes 迁移到SharePoint
查看>>
CentOS6.3修改yum源用本地系统光盘来安装gcc
查看>>
awk
查看>>
从零开始学Python-day5
查看>>
009Linux管理日常使用的基本命令
查看>>
MAC 查看java home目录
查看>>
两数之和(输入为二叉树) Two Sum IV - Input is a BST
查看>>
我的友情链接
查看>>
Java 常见异常类
查看>>
红帆移动OA for iPhone更新v1.0.2
查看>>
CentOS 6.5下编译安装httpd+mysql+php+phpMyAdmin
查看>>
Eclipse修改字体大小
查看>>