博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce入门(二)合并小文件
阅读量:5094 次
发布时间:2019-06-13

本文共 8575 字,大约阅读时间需要 28 分钟。

hadoop为什么要合并小文件?

        小文件是指文件size小于HDFS上block大小的文件。这样的文件会给hadoop的扩展性和性能带来严重问题。首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1000 0000个小文件,每个文件占用一个block,则namenode大约需要2G空间。如果存储1亿个文件,则namenode需要20G空间(见参考资料[1][4][5])。这样namenode内存容量严重制约了集群的扩展。 其次,访问大量小文件速度远远小于访问几个大文件。HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个datanode跳到另一个datanode,严重影响性能。最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。

一、创建MergeSmallFileJob 类:用于实现合并小文件的任务(2M一下属于小文件) 

package cn.itxiaobai;import com.google.common.io.Resources;import com.utils.CDUPUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.LocatedFileStatus;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.RemoteIterator;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.util.Iterator;/** * 合并小文件的任务(2M一下属于小文件) */public class MergeSmallFileJob {    public static class MergeSmallFileMapper extends Mapper
{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将文件名作为key,内容作为value输出 //1.获取文件名 FileSplit inputSplit = (FileSplit) context.getInputSplit(); String fileName = inputSplit.getPath().getName(); //打印文件名以及与之对应的内容 context.write(new Text(fileName),value); } } public static class MergeSmallFileReduce extends Reducer
{ /** * * @param key:文件名 * @param values:一个文件的所有内容 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { //将迭代器中的内容拼接 Iterator
iterator = values.iterator(); //使用StringBuffer StringBuffer stringBuffer = new StringBuffer(); while (iterator.hasNext()){ stringBuffer.append(iterator.next()).append(","); } //打印 context.write(key,new Text(stringBuffer.toString())); } } public static class MyJob{ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration coreSiteConf = new Configuration(); coreSiteConf.addResource(Resources.getResource("core-site-local.xml")); //设置一个任务 Job job = Job.getInstance(coreSiteConf, "my small merge big file"); //设置job的运行类 job.setJarByClass(MyJob.class); //设置Map和Reduce处理类 job.setMapperClass(MergeSmallFileMapper.class); job.setReducerClass(MergeSmallFileReduce.class); //map输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置job/reduce输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileSystem fileSystem = FileSystem.get(coreSiteConf); //listFiles:可以迭代便利文件 RemoteIterator
listFiles = fileSystem.listFiles(new Path("/"), true); while (listFiles.hasNext()) { LocatedFileStatus fileStatus = listFiles.next(); Path filesPath = fileStatus.getPath(); if (!fileStatus.isDirectory()) { //判断大小 及格式 if (fileStatus.getLen() < 2 * 1014 * 1024 && filesPath.getName().contains(".txt")) { //文件输入路径 FileInputFormat.addInputPath(job,filesPath); } } } //删除存在目录 CDUPUtils.deleteFileName("/mymergeout"); FileOutputFormat.setOutputPath(job, new Path("/mymergeout")); //运行任务 boolean flag = job.waitForCompletion(true); if (flag){ System.out.println("文件读取内容如下:"); CDUPUtils.readContent("/mymergeout/part-r-00000"); }else { System.out.println("文件加载失败...."); } } }}二、里面用到自己写的工具类CDUPUtils :用于删除已存在目录以及阅读文件内容
package com.utils;import com.google.common.io.Resources;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import java.io.IOException;import java.io.InputStreamReader;import java.io.LineNumberReader;import java.util.ArrayList;public class CDUPUtils {    //删除已经存在在hdfs上面的文件文件    public static void deleteFileName(String path) throws IOException {        //将要删除的文件        Path fileName = new Path(path);        Configuration entries = new Configuration();        //解析core-site-master2.xml文件        entries.addResource(Resources.getResource("core-site-local.xml"));        //coreSiteConf.set(,);--在这里可以添加配置文件        FileSystem fileSystem = FileSystem.get(entries);        if (fileSystem.exists(fileName)){            System.out.println(fileName+"已经存在,正在删除它...");            boolean flag = fileSystem.delete(fileName, true);            if (flag){                System.out.println(fileName+"删除成功");            }else {                System.out.println(fileName+"删除失败!");                return;            }        }        //关闭资源        fileSystem.close();    }    //读取文件内容    public static void readContent(String path) throws IOException {        //将要读取的文件路径        Path fileName = new Path(path);        ArrayList
returnValue = new ArrayList
(); Configuration configuration = new Configuration(); configuration.addResource(Resources.getResource("core-site-local.xml")); //获取客户端系统文件 FileSystem fileSystem = FileSystem.get(configuration); //open打开文件--获取文件的输入流用于读取数据 FSDataInputStream inputStream = fileSystem.open(fileName); InputStreamReader inputStreamReader = new InputStreamReader(inputStream); //一行一行的读取数据 LineNumberReader lineNumberReader = new LineNumberReader(inputStreamReader); //定义一个字符串变量用于接收每一行的数据 String str = null; //判断何时没有数据 while ((str=lineNumberReader.readLine())!=null){ returnValue.add(str); } //打印数据到控制台 System.out.println("MapReduce算法操作的文件内容如下:"); for (String read : returnValue) { System.out.println(read); } //关闭资源 lineNumberReader.close(); inputStream.close(); inputStreamReader.close(); }}

 配置文件:cort-site-local.xml--------注意里面的主机IP需要填写自己的

fs.defaultFS
hdfs://master2:9000
fs.hdfs.impl
org.apache.hadoop.hdfs.DistributedFileSystem

pom中添加的依赖 

4.0.0
com.zhiyou100
mrdemo
1.0-SNAPSHOT
2.7.5
org.apache.hadoop
hadoop-mapreduce-client-core
${org.apache.hadoop.version}
org.apache.hadoop
hadoop-mapreduce-client-common
${org.apache.hadoop.version}
org.apache.hadoop
hadoop-mapreduce-client-jobclient
${org.apache.hadoop.version}
org.apache.hadoop
hadoop-common
${org.apache.hadoop.version}
org.apache.hadoop
hadoop-hdfs
${org.apache.hadoop.version}

在本地直接运行(右击Run)测试

转载于:https://www.cnblogs.com/pigdata/p/10305600.html

你可能感兴趣的文章
像进度条的网页加载Loading JS代码
查看>>
C#有哪几种定时器
查看>>
javascript高级程序设计笔记-第八章(BOM)
查看>>
结构体和共用体字节对齐
查看>>
在一个环境中使用不同版本的rails
查看>>
openfire的搭建与运行(转)
查看>>
mybatis使用oracle自动生成主键
查看>>
实践中 XunSearch(讯搜)更新索引方案对比
查看>>
C#程序关闭时怎么关闭子线程
查看>>
Plants vs. Zombies(二分好题+思维)
查看>>
依赖包拼合方法
查看>>
professional email address collections
查看>>
jquery兼容IE和火狐下focus()事件
查看>>
玩转spring boot——开篇
查看>>
aix-裸设备文件大小查看
查看>>
ubantu的二三事
查看>>
动图展示16个Sublime Text快捷键用法 ---------------物化的sublime
查看>>
一个简单的 SQLite 的示例
查看>>
3732 Ahui Writes Word
查看>>
POJ 1363 Rails
查看>>