博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
清洗弹幕数据
阅读量:3958 次
发布时间:2019-05-24

本文共 6475 字,大约阅读时间需要 21 分钟。

原始数据:

话不多说,直接上代码!

老样子先pom.xml

   
4.0.0
    
com.henu
   
ETL
   
1.0-SNAPSHOT
    
       
           
org.apache.hadoop
           
hadoop-common
           
2.6.0
       
       
           
org.apache.hadoop
           
hadoop-client
           
2.6.0
       
       
           
org.apache.hadoop
           
hadoop-hdfs
           
2.6.0
       
       
           
org.apache.hadoop
           
hadoop-yarn-common
           
2.6.0
       
       
           
org.apache.hadoop
           
hadoop-yarn-client
           
2.6.0
       
       
           
org.apache.hadoop
           
hadoop-yarn-server-common
           
2.6.0
       
       
           
org.apache.hadoop
           
hadoop-yarn-server-resourcemanager
           
2.6.0
       
       
           
org.apache.hadoop
           
hadoop-yarn-server-nodemanager
           
2.6.0
       
       
           
org.apache.hadoop
           
hadoop-yarn-server-applicationhistoryservice
           
2.6.0
       
       
           
org.apache.hadoop
           
hadoop-mapreduce-client-core
           
2.6.0
       
       
           
org.apache.hadoop
           
hadoop-mapreduce-client-shuffle
           
2.6.0
       
    
 

ETLUtil

package com.wc; /** * @author George * @description   etl工具类 **/public class ETLUtil {    public static String oriString2ETLString(String ori){        StringBuilder etlString = new StringBuilder();        if (ori.startsWith("0")) {            String[] splits = ori.split("\t");            for (String split : splits) {                if (!"25".equals(split) && !"".equals(split)) {                    etlString.append(split + "#");                }            }        }        return etlString.toString();    }/*    public static void main(String[] args) throws IOException {        BufferedReader br = new BufferedReader(new FileReader("./data/test"));        String str = "";        while ((str = br.readLine())!=null){            String string = oriString2ETLString(str);            System.out.println(string);        }        br.close();    }*/}

BSETLMapper

package com.wc; import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * @author George * @description   map阶段 **/public class BSETLMapper extends Mapper
{    Text text = new Text();     @Override    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {        String etlString = ETLUtil.oriString2ETLString(value.toString());        //检查字符串是否为空白、空("")或null        if (StringUtils.isBlank(etlString))return;        text.set(etlString);        context.write(NullWritable.get(),text);    }}

BSETLRunner

package com.wc;import com.AccountRegisterETL.AccountRegisterETLMapper;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * @author George * @description **/public class BSETLRunner implements Tool {    private Configuration conf = null;     public void setConf(Configuration conf) {        this.conf = conf;    }     public Configuration getConf() {        return this.conf;    }     public int run(String[] args) throws Exception {        conf = this.getConf();        conf.set("inpath", args[0]);        conf.set("outpath", args[1]);         Job job = Job.getInstance(conf);         job.setJarByClass(BSETLRunner.class);         job.setMapperClass(BSETLMapper.class);        job.setMapOutputKeyClass(NullWritable.class);        job.setMapOutputValueClass(Text.class);        job.setNumReduceTasks(0);         this.initJobInputPath(job);        this.initJobOutputPath(job);         return job.waitForCompletion(true) ? 0 : 1;    }     private void initJobOutputPath(Job job) throws IOException {        Configuration conf = job.getConfiguration();        String outPathString = conf.get("outpath");         FileSystem fs = FileSystem.get(conf);         Path outPath = new Path(outPathString);        if(fs.exists(outPath)){            fs.delete(outPath, true);        }         FileOutputFormat.setOutputPath(job, outPath);     }     private void initJobInputPath(Job job) throws IOException {        Configuration conf = job.getConfiguration();        String inPathString = conf.get("inpath");         FileSystem fs = FileSystem.get(conf);         Path inPath = new Path(inPathString);        if(fs.exists(inPath)){            FileInputFormat.addInputPath(job, inPath);        }else{            throw new RuntimeException("HDFS中该文件目录不存在:" + inPathString);        }    }     public static void main(String[] args) {        try {            int resultCode = ToolRunner.run(new BSETLRunner(), args);            if(resultCode == 0){                System.out.println("Success!");            }else{                System.out.println("Fail!");            }            System.exit(resultCode);        } catch (Exception e) {            e.printStackTrace();            System.exit(1);        }    }}

启动集群!!! 

上传jar包,

上传数据:

[root@henu2 ~]# hdfs dfs -put data.txt /

运行jar包: 

[root@henu2 ~]# hdfs dfs -mkdir /out[root@henu2 ~]# yarn jar ETL-1.0-SNAPSHOT.jar com.wc.BSETLRunner /data.txt /out/

得到结果数据:

结果展示:

转载自 乔治大哥(我的臭弟弟) 

你可能感兴趣的文章
js类型转换
查看>>
spring实例化Bean理解
查看>>
Mac下配置JAVA_HOME
查看>>
fedora 安装mp3播放器插件
查看>>
赏心悦目的宏代码
查看>>
理解套接字recv(),send()
查看>>
发一个C++写的跨平台的BlockingQueue
查看>>
Linux TCP/IP协议栈剖析【体系结构篇】
查看>>
游戏开发中预防内存泄露的一些措施
查看>>
以前的文章全部移除了。
查看>>
几首歌
查看>>
蝴蝶泉边
查看>>
编码转换
查看>>
freerice
查看>>
Does your mother know
查看>>
《写出质量好软件的75条体会》暨答案ZT [转自monkyy的blog]
查看>>
关于详细设计
查看>>
POJ2838,Sliding Window(单调队列)
查看>>
牛客练习赛50,B tokitsukaze and Hash Table(STL+输入输出挂)
查看>>
POJ3728,The merchant(倍增LCA+分治)
查看>>