代码
import org.apache.commons.lang.ObjectUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;/** * Created with IntelliJ IDEA. * User: @别慌 * Date: 2019-06-16 * Time: 0:44 * Description: */public class PartitionerMain extends Configured implements Tool { public int run(String[] args) throws Exception { //job类,组装MR Job job=Job.getInstance(super.getConf(),PartitionerMain.class.getSimpleName()); //打包运行 job.setJarByClass(PartitionerMain.class); //第一步:读取文件,解析kv对 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job,new Path(args[0])); //自定义map逻辑 job.setMapperClass(partitionMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); //第三部 分区 job.setPartitionerClass(partitionerOwn.class); //自定义reduce逻辑 job.setReducerClass(partitionReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(2); //设置输出类 job.setOutputFormatClass(TextOutputFormat.class); //输出路径不写死,通过参数传进来 TextOutputFormat.setOutputPath(job,new Path(args[1])); //提交任务 boolean b =job.waitForCompletion(true); return b?0:1; } public static void main(String args[]) throws Exception { int run= ToolRunner.run(new Configuration(),new PartitionerMain(),args); System.exit(run); }}import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Partitioner;/** * Created with IntelliJ IDEA. * User: @别慌 * Date: 2019-06-16 * Time: 0:56 * Description: */public class partitionerOwn extends Partitioner{ public int getPartition(Text text, NullWritable nullWritable, int i) { String[] spilt=text.toString().split("\t"); String gameResult=spilt[5]; if (null != gameResult && ""!=gameResult){ if (Integer.parseInt(gameResult)>15){ return 0; }else { return 1; } } return 0; }}import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * Created with IntelliJ IDEA. * User: @别慌 * Date: 2019-06-16 * Time: 0:47 * Description: */public class partitionMap extends Mapper { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); }}import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * Created with IntelliJ IDEA. * User: @别慌 * Date: 2019-06-16 * Time: 1:14 * Description: */public class partitionReduce extends Reducer { protected void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException { context.write(key,NullWritable.get()); }}
pom.xml
4.0.0 com.ali hdfs 1.0-SNAPSHOT org.apache.hadoop hadoop-common 2.9.0 org.apache.hadoop hadoop-hdfs 2.9.0 org.apache.hadoop hadoop-mapreduce-client-core 2.9.0
java项目架构
- PartitionerMain
- partitionerOwn
- partitionMap
- partitionReduce
hadoop jar mouse.jar PartitionerMain /partition /outtest
jar包 main函数 hdfs上的目标文件 hdfs的输出文件(不能事先存在)
源文件(partition)
1 0 1 2017-07-31 23:10:12 834232 6 sadasaa2 9 6 2019-06-76 78:97:12 687907 45 dsdskd