本文共 6475 字,大约阅读时间需要 21 分钟。
原始数据:
话不多说,直接上代码!
老样子先pom.xml
4.0.0 com.henu ETL 1.0-SNAPSHOT org.apache.hadoop hadoop-common 2.6.0 org.apache.hadoop hadoop-client 2.6.0 org.apache.hadoop hadoop-hdfs 2.6.0 org.apache.hadoop hadoop-yarn-common 2.6.0 org.apache.hadoop hadoop-yarn-client 2.6.0 org.apache.hadoop hadoop-yarn-server-common 2.6.0 org.apache.hadoop hadoop-yarn-server-resourcemanager 2.6.0 org.apache.hadoop hadoop-yarn-server-nodemanager 2.6.0 org.apache.hadoop hadoop-yarn-server-applicationhistoryservice 2.6.0 org.apache.hadoop hadoop-mapreduce-client-core 2.6.0 org.apache.hadoop hadoop-mapreduce-client-shuffle 2.6.0
ETLUtil
package com.wc; /** * @author George * @description etl工具类 **/public class ETLUtil { public static String oriString2ETLString(String ori){ StringBuilder etlString = new StringBuilder(); if (ori.startsWith("0")) { String[] splits = ori.split("\t"); for (String split : splits) { if (!"25".equals(split) && !"".equals(split)) { etlString.append(split + "#"); } } } return etlString.toString(); }/* public static void main(String[] args) throws IOException { BufferedReader br = new BufferedReader(new FileReader("./data/test")); String str = ""; while ((str = br.readLine())!=null){ String string = oriString2ETLString(str); System.out.println(string); } br.close(); }*/}
BSETLMapper
package com.wc; import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author George * @description map阶段 **/public class BSETLMapper extends Mapper
BSETLRunner
package com.wc;import com.AccountRegisterETL.AccountRegisterETLMapper;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @author George * @description **/public class BSETLRunner implements Tool { private Configuration conf = null; public void setConf(Configuration conf) { this.conf = conf; } public Configuration getConf() { return this.conf; } public int run(String[] args) throws Exception { conf = this.getConf(); conf.set("inpath", args[0]); conf.set("outpath", args[1]); Job job = Job.getInstance(conf); job.setJarByClass(BSETLRunner.class); job.setMapperClass(BSETLMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(0); this.initJobInputPath(job); this.initJobOutputPath(job); return job.waitForCompletion(true) ? 0 : 1; } private void initJobOutputPath(Job job) throws IOException { Configuration conf = job.getConfiguration(); String outPathString = conf.get("outpath"); FileSystem fs = FileSystem.get(conf); Path outPath = new Path(outPathString); if(fs.exists(outPath)){ fs.delete(outPath, true); } FileOutputFormat.setOutputPath(job, outPath); } private void initJobInputPath(Job job) throws IOException { Configuration conf = job.getConfiguration(); String inPathString = conf.get("inpath"); FileSystem fs = FileSystem.get(conf); Path inPath = new Path(inPathString); if(fs.exists(inPath)){ FileInputFormat.addInputPath(job, inPath); }else{ throw new RuntimeException("HDFS中该文件目录不存在:" + inPathString); } } public static void main(String[] args) { try { int resultCode = ToolRunner.run(new BSETLRunner(), args); if(resultCode == 0){ System.out.println("Success!"); }else{ System.out.println("Fail!"); } System.exit(resultCode); } catch (Exception e) { e.printStackTrace(); System.exit(1); } }}
启动集群!!!
上传jar包,
上传数据:
[root@henu2 ~]# hdfs dfs -put data.txt /
运行jar包:
[root@henu2 ~]# hdfs dfs -mkdir /out[root@henu2 ~]# yarn jar ETL-1.0-SNAPSHOT.jar com.wc.BSETLRunner /data.txt /out/
得到结果数据:
结果展示:
转载自 乔治大哥(我的臭弟弟)