Hadoop处理HDF文件

Linux大全评论522 views阅读模式

1、前言

HDF文件是遥感应用中一种常见的数据格式,由于其高度结构化的特点,笔者曾被如何使用Hadoop处理HDF文件这个问题困扰过相当长的一段时间。于是Google各种解决方案,但都没有找到一种理想的处理办法。也曾参考过HDFGroup官方发的一篇帖子(网址在这里),里面提供了使用Hadoop针对大、中、小HDF文件的处理思路。虽然根据他提供的解决办法,按图索骥,肯定能解决如何使用Hadoop处理HDF文件这个问题,但个人感觉方法偏复杂且需要对HDF的数据格式有较深的理解,实现起来不太容易。于是乎,笔者又继续寻找解决方案,终于发现了一种办法,下面将对该方法进行具体说明。

2、MapReduce主程序

这里主要使用到了netcdf的库进行hdf数据流的反序列化工作(netcdf库的下载地址)。与HDF官方提供的Java库不同,netcdf仅利用Java进行HDF文件的读写操作,且这个库支持多种科学数据,包括HDF4、HDF5等多种格式。而HDF的官方Java库中,底层实际仍是用C进行HDF文件的操作。

CentOS安装和配置Hadoop2.2.0  http://www.linuxidc.com/Linux/2014-01/94685.htm

Ubuntu 13.04上搭建Hadoop环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu上搭建Hadoop环境(单机模式+伪分布模式) http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu下Hadoop环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建Hadoop环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建Hadoop环境(在Winodws环境下用虚拟机虚拟两个Ubuntu系统进行搭建) http://www.linuxidc.com/Linux/2011-12/48894.htm

下面贴出MapReduce的Mapper函数代码:

package example;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import ucar.ma2.ArrayShort;
import ucar.nc2.Dimension;
import ucar.nc2.Group;
import ucar.nc2.NetcdfFile;
import ucar.nc2.Variable;

public class ReadMapper extends
  Mapper<Text, BytesWritable, Text, BytesWritable> {

 public void map(Text key, BytesWritable value, Context context)
   throws IOException, InterruptedException { 
  String fileName = key.toString();
  NetcdfFile file = NetcdfFile.openInMemory("hdf4", value.get());
  Group dataGroup = (file.findGroup("MOD_Grid_monthly_1km_VI")).findGroup("Data_Fields");
  //读取到1_km_monthly_red_reflectance的变量
  Variable redVar = dataGroup.findVariable("1_km_monthly_red_reflectance");
  short[][] data = new short[1200][1200];
  if(dataGroup != null){   
   ArrayShort.D2 dataArray;
   //读取redVar中的影像数据
   dataArray = (ArrayShort.D2) redVar.read();
   List<Dimension> dimList = file.getDimensions();
   //获取影像的y方向像元个数
   Dimension ydim = dimList.get(0);
   //获取影像的x方向像元个数
   Dimension xdim = dimList.get(1);
   //遍历整个影像,读取出像元的值
   for(int i=0;i<xdim.getLength();i++){
    for(int j=0;j<ydim.getLength();j++){
     data[i][j] = dataArray.get(i, j);     
    }   
   }         
  } 
  System.out.print(file.getDetailInfo());
 }
}

注意程序中的NetcdfFile.openInMemory方法,该静态方法支持从byte[]中构造HDF文件,从而实现了HDF文件的反序列化操作。下面贴出主程序的示例代码:

package example;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import example.WholeFileInputFormat;

public class ReadMain {
 public boolean runJob(String[] args) throws IOException,
   ClassNotFoundException, InterruptedException {
  Configuration conf = new Configuration();
  // conf.set("mapred.job.tracker", Utils.JOBTRACKER);
  String rootPath= "/opt/hadoop-2.3.0/etc/hadoop";
  //String rootPath="/opt/hadoop-2.3.0/etc/hadoop/";
  conf.addResource(new Path(rootPath+"yarn-site.xml"));
  conf.addResource(new Path(rootPath+"core-site.xml"));
  conf.addResource(new Path(rootPath+"hdfs-site.xml"));
  conf.addResource(new Path(rootPath+"mapred-site.xml"));
  Job job = new Job(conf);

  job.setJobName("Job name:" + args[0]);
  job.setJarByClass(ReadMain.class);

  job.setMapperClass(ReadMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(BytesWritable.class);
 
  job.setInputFormatClass(WholeFileInputFormat.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  FileInputFormat.addInputPath(job, new Path(args[1]));
  FileOutputFormat.setOutputPath(job, new Path(args[2]));
  boolean flag = job.waitForCompletion(true);
  return flag;
 }

 public static void main(String[] args) throws ClassNotFoundException,
   IOException, InterruptedException {
  String[] inputPaths = new String[] { "normalizeJob",
    "hdfs://192.168.168.101:9000/user/hduser/hdf/MOD13A3.A2005274.h00v10.005.2008079143041.hdf",
    "hdfs://192.168.168.101:9000/user/hduser/test/" };
  ReadMain test = new ReadMain();
  test.runJob(inputPaths);
 }

}

关于MapReduce主程序有几点值得说明一下:

1、MapReduce数据的输入格式为WholeFileInputFormat.class,即不对数据进行切分。关于该格式,可以参考另外一篇文章:如何通过Java程序提交Yarn的计算任务(http://www.linuxidc.com/Linux/2014-11/109360.htm),这里不再赘述。

2、本人用的是Yarn2.3.0来执行计算任务,如果用老版本的hadoop,如1.2.0,则把以上主程序中的conf.addResource部分的代码删掉即可。

3、以上MapReduce程序中,只用到了Map函数,未设置Reduce函数。

企鹅博客
  • 本文由 发表于 2020年10月2日 22:25:59
  • 转载请务必保留本文链接:https://www.qieseo.com/163071.html

发表评论