HDFS常用API操作 和 HDFS的I/O流操作
前置操作
创建maven工程,修改pom.xml文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.mcq</groupId> <artifactId>HDFS-001</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.2</version></dependency><dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.8</version><scope>system</scope><systemPath>${JAVA_HOME}/lib/tools.jar</systemPath></dependency></dependencies></project>
在resources添加一个file:log4j.properties:
log4j.rootLogger=INFO, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%nlog4j.appender.logfile=org.apache.log4j.FileAppenderlog4j.appender.logfile.File=target/spring.loglog4j.appender.logfile.layout=org.apache.log4j.PatternLayoutlog4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
API操作
HDFS的命令和linux极其相似,可以类比记忆,在这里列出一些java api操作:
package com.mcq;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.BlockLocation;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.LocatedFileStatus;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.RemoteIterator;import org.junit.Test;public class HDFSClient {public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {Configuration conf = new Configuration();// c.set("fs.defaultFS", "hdfs://hadoop103:9000");// FileSystem fs = FileSystem.get(c);FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");fs.mkdirs(new Path("/ppqq"));fs.close();System.out.println("over");}@Test // 文件上传public void testCopyFromLocalFile()throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");fs.copyFromLocalFile(new Path("d:/banzhang.txt"), new Path("/banzhang.txt"));fs.close();System.out.println("over");}@Test // 文件下载public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("d:/hadoop test/banhua.txt"), true);// 第一个false表示不剪切,最后一个true表示本地,不产生crc文件fs.close();System.out.println("over");}@Test // 文件删除public void testDelete() throws IOException, InterruptedException, URISyntaxException {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");fs.delete(new Path("/0811"), true); // 是否递归删除fs.close();System.out.println("over");}@Test // 文件更名public void testRename() throws IOException, InterruptedException, URISyntaxException {Configuration conf = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");fs.rename(new Path("/banzhang.txt"), new Path("/lala.txt"));fs.close();System.out.println("over");}@Testpublic void testListFiles() throws IOException, InterruptedException, URISyntaxException {// 1获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");// 2 获取文件详情RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);while (listFiles.hasNext()) {LocatedFileStatus status = listFiles.next();// 输出详情// 文件名称System.out.println(status.getPath().getName());// 长度System.out.println(status.getLen());// 权限System.out.println(status.getPermission());// 分组System.out.println(status.getGroup());// 获取存储的块信息BlockLocation[] blockLocations = status.getBlockLocations();for (BlockLocation blockLocation : blockLocations) {// 获取块存储的主机节点String[] hosts = blockLocation.getHosts();for (String host : hosts) {System.out.println(host);}}System.out.println("-----------分割线----------");}// 3 关闭资源fs.close();}@Testpublic void testListStatus() throws IOException, InterruptedException, URISyntaxException{// 1 获取文件配置信息Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");// 2 判断是文件还是文件夹FileStatus[] listStatus = fs.listStatus(new Path("/"));for (FileStatus fileStatus : listStatus) {// 如果是文件if (fileStatus.isFile()) {System.out.println("f:"+fileStatus.getPath().getName());}else {System.out.println("d:"+fileStatus.getPath().getName());}}// 3 关闭资源fs.close();}}
I/O流操作
上面的API操作 HDFS系统都是框架封装好的,如果我们想自己实现上述API操作可以采用IO流的方式实现数据的上传和下载。
package com.mcq;import java.io.File;import java.io.FileInputStream;import java.io.FileOutputStream;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.yarn.api.records.URL;import org.junit.Test;public class HDFSIO {//文件上传@Testpublic void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {// 1 获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");// 2 创建输入流FileInputStream fis = new FileInputStream(new File("d:/banzhang.txt"));// 3 获取输出流FSDataOutputStream fos = fs.create(new Path("/xiaocao.txt"));// 4 流对拷IOUtils.copyBytes(fis, fos, configuration);// 5 关闭资源IOUtils.closeStream(fos);IOUtils.closeStream(fis);fs.close();}// 文件下载@Testpublic void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{// 1 获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");// 2 获取输入流FSDataInputStream fis = fs.open(new Path("/banhua.txt"));// 3 获取输出流FileOutputStream fos = new FileOutputStream(new File("d:/banhua.txt"));// 4 流的对拷IOUtils.copyBytes(fis, fos, configuration);// 5 关闭资源IOUtils.closeStream(fos);IOUtils.closeStream(fis);fs.close();}//定位文件读取//(1)下载第一块@Testpublic void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{// 1 获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");// 2 获取输入流FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));// 3 创建输出流FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part1"));// 4 流的拷贝byte[] buf = new byte[1024];for(int i =0 ; i < 1024 * 128; i++){fis.read(buf);fos.write(buf);}// 5关闭资源IOUtils.closeStream(fis);IOUtils.closeStream(fos);fs.close();}//(2)下载第二块@Testpublic void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{// 1 获取文件系统Configuration configuration = new Configuration();FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");// 2 打开输入流FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));// 3 定位输入数据位置fis.seek(1024*1024*128);// 4 创建输出流FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part2"));// 5 流的对拷IOUtils.copyBytes(fis, fos, configuration);// 6 关闭资源IOUtils.closeStream(fis);IOUtils.closeStream(fos);}}
赞 (0)