Hadoop的基本概念
-
Hadoop是Apache的一个开源的分布式计算平台,核心是以HDFS分布式文件系统和MapReduce分布式计算框架构成,为用户提供了一套底层透明的分布式基础设施
-
Hadoop框架中最核心设计就是:HDFS和MapReduce。HDFS提供了海量数据的存储,MapReduce提供了对数据的计算。
-
HDFS是Hadoop分布式文件系统,具有高容错性、高伸缩性,允许用户基于廉价硬件部署,构建分布式存储系统,为分布式计算存储提供了底层支持
-
MapReduce提供简单的API,允许用户在不了解底层细节的情况下,开发分布式并行程序,利用大规模集群资源,解决传统单机无法解决的大数据处理问题
-
设计思想起源于Google GFS、MapReduce Paper
-
Doug Cutting在Yahoo开发,2008年贡献给Apache基金会
GFS设计思想
HDFS的设计思想起源于GFS论文。所以我们先来了解一下GFS的核心设计思想。
2003年,Google发表了一篇 技术学术论文,公开介绍了自己的谷歌文件系统GFS( Google File System)。这是Google公司为了存储海量搜索数据而设计的专用文件系统。
其基本原理如下:
GFS中有四类角色,分别是
-
GFS chunkserver:
- GFS的设计思想就是将文件分成固定大小(chunk size默认大小是64MB)的chunk来存储的。
- 每个chunk通过全局唯一的64位的chunk handle来标识,chunk handle在chunk创建的时候由GFS master分配。
- GFS chunkserver把文件存储在本地磁盘中,读或写的时候需要指定文件名和字节范围,然后定位到对应的chunk。
- 为了保证数据的可靠性,一个chunk一般会在多台GFS chunkserver上存储,默认为3份,但用户也可以根据自己的需要修改这个值。
-
GFS master:
- 管理所有的元数据信息,包括namespaces,访问控制信息,文件到chunk的映射信息,以及chunk的地址信息(即chunk存放在哪台GFS chunkserver上)。
- 在GFS架构中只有单个master,这种架构的好处是设计和实现简单,但保证master节点的稳定性,就需要减轻master节点的负担。
-
GFS client:
- GFS应用端使用的API接口,client和GFS master交互来获取元数据信息,但是所有和数据相关的信息都是直接和GFS chunkserver来交互的。
-
Application:
- Application为使用GFS的应用,应用通过GFS client于GFS后端(GFS master和GFS chunkserver)打交道。
读取流程
写入流程
HDFS基本概念
- HDFS是hadoop自带的分布式文件系统,即Hadoop Distributed File System。
- HDFS具有高容错性,部署成本低等特性。
- HDFS提供了对应用数据的高吞吐量访问,适用于大量数据集的应用。
- HDFS放宽了对POXIS的要求,采用流式的数据访问方式。
- HDFS 最初是作为 Apache Nutch 网络搜索引擎项目的基础设施而构建的。
- HDFS的设计思想就是基于GFS论文而实现的。
HDFS的优点
- 高容错性:硬件故障是常态而不是例外。 一个 HDFS 实例可能由数百或数千台服务器组成,每台服务器都会存储一部分数据。 事实上,有大量组件并且每个组件都有很大的故障概率,这意味着 HDFS 的某些组件总是无法正常工作。 因此,故障检测和快速、自动恢复是 HDFS 的核心架构目标。由于 HDFS 采用数据的多副本方案,所以部分硬件的损坏不会导致全部数据的丢失。
- 流式数据访问:HDFS 更适合批处理而不是用户交互使用,适合一次写入,多次读取。HDFS 设计的重点是支持高吞吐量的数据访问, 而不是低延迟的数据访问。
- 大数据集:HDFS 适合于大文件的存储,文档的大小应该是是 GB 到 TB 级别的。
- 简单一致性模型:HDFS 应用程序需要对文件进行一次写入多次读取的访问模型。 文件一旦创建、写入和关闭,除了追加和截断外,无需更改。 支持将内容附加到文件的末尾,但不能在任意点更新。 这种假设简化了数据一致性问题并实现了高吞吐量数据访问。 MapReduce 应用程序或网络爬虫应用程序与此模型完美契合。
- 移动计算比移动数据更容易:如果应用程序请求的计算在它所操作的数据附近执行,它会更有效率。 当数据集的大小很大时尤其如此。 这最大限度地减少了网络拥塞并增加了系统的整体吞吐量。 假设通常将计算迁移到数据所在的位置,而不是将数据移动到应用程序运行的位置。 HDFS 为应用程序提供了接口,使它们更接近数据所在的位置。
- 跨异构硬件和软件平台的可移植性:HDFS是使用Java开发的,任何支持java的服务器上都可以部署HDFS,所以HDFS 具有良好的跨平台移植性,这使得其他大数据计算框架都将其作为数据持久化存储的首选方案。
HDFS的缺点
- 不适合低延时数据访问,比如毫秒级的存储数据,是做不到的。
- 无法高效的对大量小文件进行存储。存储大量小文件的话,它会占用NameNode大量的内存来存储文件目录和块信息。这样是不可取的,因为NameNode的内存总是有限的。小文件存储的寻址时间会超过读取时间,它违反了HDFS的设计目标。
- 不支持并发写入、文件随机修改。一个文件只能有一个写,不允许多个线程同时写;仅支持数据append,不支持文件的随机修改
HDFS架构
HDFS的总体架构基本与GFS的设计思路一致,其中HDFS Client对应着GFS 中的Client和Application。NameNode对应着GFS中的Master角色,DataNode对应着GFS中的chunkserver。
NameNode主要负责管理文件系统的namespace以及管理Client对文件系统的访问。NameNode 执行文件系统命名空间操作,如打开、关闭和重命名文件和目录。 它还确定块到 DataNode 的映射。
DataNode主要负责管理连接到它们运行的节点的存储,负责处理来自文件系统客户端的读写请求,根据NameNode的指令执行块的创建、删除和复制。
HDFS1.0架构
角色介绍:
- Client
-
- 与HDFS的NameNode和DataNode交互,进行读写、创建目录、创建文件、复制、删除等操作。
- HDFS提供了很多客户端,例如HDFS Shell,Java Client,HDFS Web Console等。
- NameNode
-
- 以FsImage的形式维护HDFS的命名空间,文件系统树以及整棵树内所有的文件和目录。
- 以EditLog形式,记录对每个文件的新增,修改,删除的操作。
- 记录每个文件中各个Block所在的DataNode信息。
- 处理客户端对文件的请求。
- 配置副本策略。
- SecondaryNameNode
-
- 并非NameNode的热备,当NameNode挂掉的时候,它并不能替换NameNode并提供服务。
- 其主要作用是定期合并Fsimage和Edits,并推送给NameNode。
- 不接受Client的请求,作为NameNode的冷备。
- DataNode
-
- 实际存储数据的单元。
- 以Block为单位。
- 数据以普通文件形式保存在本地文件系统。
- 定期向NameNode发送它们所存储的数据块的列表。
HDFS数据块
HDFS为了支持高效的处理数据,引入了数据块的概念。
- 1.X的版本默认是64MB(后面2.x的版本开始,默认为128MB)。
- HDFS上的文件也被划分为块大小的多个分块(Chunk),作为独立的存储单元。
- HDFS中小于一个块大小的文件,不会占据整个块的空间。
引入数据块带来好处:
为什么块的大小不能设置的过大也不能过小(为什么块的大小为128MB?):
机架感知
通常,大型Hadoop集群是以机架的形式来组织的,同一个机架上不同节点间的网络状况比不同机架之间的更为理想。另外,NameNode设法将数据块副本保存在不同的机架上以提高容错性。
拓扑距离算法
拓扑距离算法:两个节点到达最近的共同祖先的距离总和。
- 同一节点上的进程 = 0
- 同一机架上的不同节点 = 2
- 同一数据中心不同机架上的节点 = 4
- 不同数据中心的节点 = 6
副本放置策略
假设副本数为3。
- 第一个副本放置在Client所处的节点上,如果客户端在集群外(例如Java Client API),则随机选择一个。快速写入。
- 第二个副本写入位于不同rack的节点。主要应对交换机故障的情况,保证数据的可靠性。
- 第三个副本写入和第二份副本同一个rack内的节点。主要为了减少跨rack的网络流量,同时兼顾可靠性和效率。
HDFS读流程
HDFS写流程
如果写入DataNode发生故障:
dfs.replication.min
的副本数(默认为1),写操作就会成功,并且这个块可以在集群中异步复制,直到达到其目标副本数(dfs.replication
默认值为3)。故障恢复和容灾
HDFS的主要目标就是即使在出错的情况下也要保证数据存储的可靠性。常见的三种出错情况是:Namenode出错, Datanode出错和网络割裂(network partitions)。
DataNode容灾
磁盘数据错误,心跳检测和重新复制
- 每个Datanode节点周期性地向Namenode发送心跳信号。
- 网络割裂可能导致一部分Datanode跟Namenode失去联系。
- Namenode通过心跳信号的缺失来检测这一情况,并将这些近期不再发送心跳信号Datanode标记为宕机,不会再将新的请求发给它们,任何存储在宕机Datanode上的数据将不再有效。
- Datanode的宕机可能会引起一些数据块的副本系数低于指定值,Namenode不断地检测这些需要复制的数据块,一旦发现就启动复制操作。
-
- 在下列情况下,可能需要重新复制:
-
-
- 某个Datanode节点失效
- 某个副本遭到损坏
- Datanode上的硬盘错误
- 文件的副本系数增大
-
集群均衡
HDFS的架构支持数据均衡策略。如果某个Datanode节点上的空闲空间低于特定的临界点,按照均衡策略系统就会自动地将数据从这个Datanode移动到其他空闲的Datanode。当对某个文件的请求突然增加,那么也可能启动一个计划创建该文件新的副本,并且同时重新平衡集群中的其他数据。
数据完整性
从某个Datanode获取的数据块有可能是损坏的,损坏可能是由Datanode的存储设备错误、网络错误或者软件bug造成的。HDFS客户端软件实现了对HDFS文件内容的校验和(checksum)检查。当客户端创建一个新的HDFS文件,会计算这个文件每个数据块的校验和,并将校验和作为一个单独的隐藏文件保存在同一个HDFS名字空间下。当客户端获取文件内容后,它会检验从Datanode获取的数据跟相应的校验和文件中的校验和是否匹配,如果不匹配,客户端可以选择从其他Datanode获取该数据块的副本。
NameNode容灾
在HDFS1.0架构中,Namenode是HDFS集群中的单点故障(single point of failure)所在,内存中的元数据都将全部丢失,造成整个HDFS集群不可用。为了提供可用性,HDFS提供了SecondaryNameNode角色,主要用于定期合并Fsimage和Edits,并推送给NameNode。
FsImage和EditLog
在HDFS中,FsImage和EditLog是NameNode两个非常重要的文件。
NameNode的存储目录树的信息,而目录树的信息则存放在FsImage文件中,当NameNode启动的时候会首先读取整个FsImage文件,将信息装载到内存中。
EditLog存储日志信息,在NameNode上所有对目录的操作,增加,删除,修改等都会保存到EditLog中,并不会同步到FsImage中,当NameNode关闭的时候,也不会将FsImage和EditLog进行合并。
当NameNode启动的时候,首先装载FsImage文件,然后按照EditLog中的记录执行一遍所有记录的操作,最后把信息的目录树写入FsImage中,并删掉EditLog,重新启用新的EditLog。
Secondary NameNode
从上述对FsImage和EditLog的学习中可以发现
- FsImage是整个NameNode内存中元数据在某一时刻的快照(Snapshot)。
- FsImage不能频繁的构建,生成FsImage需要花费大量的内存。
- 目前FsImage只在NameNode重启时才构建。
- 而Editlog记录的是从这个快照开始到当前所有的元数据的改动。
- 如果Editlog太多,重放Editlog会消耗大量时间,这会导致启动NameNode花费数小时之久。
那么有什么方法可以定时来构建FsImage,又不影响NameNode的服务呢?答案就是Secondary NameNode。
(1)第一次启动NameNode格式化后,创建Fsimage和Edits文件。如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
(2)客户端对元数据进行增删改的请求。
(3)NameNode记录操作日志。
(4)NameNode在内存中对元数据进行增删改。
(1)Secondary NameNode询问NameNode是否需要CheckPoint。直接带回NameNode是否检查结果。
(2)Secondary NameNode请求执行CheckPoint。
(3)NameNode滚动正在写的Edits日志。
(4)将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode。
(5)Secondary NameNode加载编辑日志和镜像文件到内存,并合并。
(6)生成新的镜像文件fsimage.chkpoint。
(7)拷贝fsimage.chkpoint到NameNode。
(8)NameNode将fsimage.chkpoint重新命名成fsimage。
HDFS2.0架构
实际上,虽说有了Secondary NameNode,但它只负责定期合并FsImage和EditLog,不是NameNode的主备,所以NameNode的单点故障问题还依然存在。
另外FsImage和EditLog操作都是在内存中的,内存受限,整个集群的Size受限于这么一个单节点的NameNode。
所以在HDFS2.0架构优化,就是为了解决这两个问题,解决方案如下
Hadoop1.0架构问题 | Hadoop2.0的改进 |
---|---|
单一NameNode,存在单点故障问题 | HDFS HA,提供NameNode热备 |
单一NameNode单一NameSpace,无法实现资源隔离 | HDFS Federation管理多个命名空间 |
NameNode HA概述
从上图中,我们可以看出NameNode的高可用架构主要包含几个部分:
- Active NameNode(ANN):在HDFS集群中,对外提供读写服务的唯一Master节点。ANN将客户端请求过来的写操作通过EditLog写入共享存储系统(即JournalNode Cluster),为Standby NameNode及时同步数据提供支持;
- Standby NameNode(SBN):与ANN相互形成热备,SBN及时从共享存储系统中读取EditLog数据并更新内存,以保证当前状态尽可能与ANN同步。
- JournalNode Cluster(JNs):ANN与SBN之间共享Editlog的一致性存储系统,是HDFS NameNode高可用的核心组件。借助JournalNode集群ANN可以尽可能及时同步元数据到SBN。其中ANN采用Push模式将EditLog写入JN,SBN通过Pull模式周期性地从JN拉取数据,整个过程中JN不主动进行数据交换;
- ZKFailoverController(ZKFC):ZKFailoverController以独立进程运行,对NameNode主备切换进行控制,正常情况ANN和SBN分别对应各自ZKFC进程。ZKFC主要功能:NameNode健康状况检测;借助Zookeeper实现NameNode自动选主;操作NameNode进行主从切换;
- Zookeeper(ZK):为ZKFC实现自动选主功能提供统一协调服务。
- DataNode:DataNode 会同时向主NameNode和备NameNode上报数据块的位置信息。
接下来我们将从三个方面讨论NameNode HA的机制:
主从一致性
就是要保证主从NameNode关于整个HDFS的元数据要一致。
- Active NameNode启动后提供服务,并把Editlog写到本地和共享存储系统中。
- Standby NameNode周期性的从QJM中拉取Editlog,保持与active的状态同步。
- DataNode同时两个NameNode发送BlockReport。
脑裂
- QJM的fencing,确保只有一个NN的写入成功。
- DataNode的fencing,确保只有一个NN能够命令DN。
- Client的fencing,确保只有一个NN能提供服务。
QJM的fencing
QJM全称是Quorum Journal Manager, 由JournalNode(JN)组成,一般是奇数个结点组成。
- 高可用:
-
- 当存活的节点数为偶数个时,无法提供正常服务。
- 基于Paxos:
-
- NameNode会同时向所有JournalNode并行写文件,只要有N/2+1个结点写成功则认为此次写操作成功,遵循Paxos协议。
- 防止双写
-
- Epoch Number
-
-
- 当NN成为Active结点时,其会被赋予一个Epoch Number
- 每个Epoch Number是唯一的,不会有相同的出现
- Epoch Number有严格顺序保证,每次NN切换后其Epoch Number都会自增1
-
-
- ANN把自己的Epoch Number通过发送给所有JN结点
- ANN同步日志到JN的任何RPC请求都必须包含这个Epoch Number。
- JN会对比每次请求中的Epoch Number和保存在本地的Epoch Number,小于则拒绝该请求,反之则更新本地保存的Epoch Number
DataNode的fencing
- 每个NN改变状态的时候,向DN发送自己的状态和一个序列号(类似Epoch Numbers)。
- DN在运行过程中维护此序列号,当failover时,新的NN在返回DN心跳时会返回自己的active状态和一个更大的序列号。DN接收到这个返回则认为该NN为新的active。
- 如果这时原来的active NN恢复,返回给DN的心跳信息包含active状态和原来的序列号,这时DN就会拒绝这个NN的命令。
Client的fencing
让访问Standby NN的客户端直接失败
- 在RPC层封装了一层,通过FailoverProxyProvider以重试的方式连接NN。
- 通过若干次连接一个NN失败后尝试连接新的NN,对客户端的影响是重试的时候增加一定的延迟。
- 客户端可以设置重试次数和时间。
Failover
主备切换是通过ZKFC实现的。具体功能如下:
HDFS Federation
HDFS 1.0命名空间架构大致可以分为两层:
- Namespace:由目录、文件和数据块组成,支持常见的 文件系统操作,例如创建、删除、修改和列出文件和目 录。
- BlockStorageService:这个部分又由两部分组成
-
- 数据块管理(Block Management),由NameNode提供
-
-
- 通过处理DataNode的注册和定期心跳来提供集群中DataNode的基本关系;
- 维护数据到数据块的映射关系,以及数据块在DataNode的映射关系;
- 支持数据块相关操作,如创建、删除、修改和获取块位置;
- 管理副本的放置、副本的创建,以及删除多余的副本。
-
-
- 存储( Storage) ,由DataNode提供
-
-
- 主要在本地文件系统存储数据块,并提供读写访问。
-
之前的HDFS架构只允许整个集群有一个命名空间。在该配置中,单个Namenode管理命名空间。
可以通过HDFS Federation通过向HDFS添加对多个Namenode/命名空间的支持来解决此限制。
Federation概述和原理
设计特点:
- NameNode共享底层的数据节点存储资源
- DataNode向所有NameNode汇报
- 属于同一个Namespace的块构成一个block pool
- 可以存在多个相互独立的NameNode
- 水平扩展的命名服务
- 独立管理Namespace和block pool
- 联邦(Federation)关系不需要彼此协调
- 向后兼容
原理:
- 一个Namespace和一个BlockPool对应
- 一个BlockPool是属于某个namespace下的一系列block。
- DataNode是共享的,不同BlockPool的block在同一个DataNode上存储。
- 一个Namespace和它的blockpool一起被叫做Namespace Volume。
Federation的配置
dfs.nameservices
ns1,ns2
dfs.namenode.rpc-address.ns1
nn-host1:rpc-port
dfs.namenode.http-address.ns1
nn-host1:http-port
dfs.namenode.secondary.http-address.ns1
snn-host1:http-port
dfs.namenode.rpc-address.ns2
nn-host2:rpc-port
dfs.namenode.http-address.ns2
nn-host2:http-port
dfs.namenode.secondary.http-address.ns2
snn-host2:http-port
.... Other common configuration ...
访问地址如下:
hdfs://nn-host1:rpc-port/foo/bar
hdfs://nn-host2:rpc-port/tmp/data
ViewFs
Federation存在的问题
- 客户端都要更新配置文件,并维护多个Namespace
- 访问目录需要指定完整路径
- 当Namespace增多以后,管理和访问非常不方便。
基于上述问题,社区提出了基于客户端的ViewFs(视图文件系统)。ViewFs简单的可以理解为这是一个虚拟的,逻辑上的文件系统。因为这个文件系统实际并不真实存在,只是我们构建了这个文件系统,它的底层指向了实际意义上的多物理集群。ViewFs实际上是使用挂载表(Mount Table)做到的。
有点类似于Docker挂载宿主机的volume机制。
ViewFs配置
fs.defaultFS
viewfs://Cluster1
fs.viewfs.mounttable.Cluster1.link./data
hdfs://nn-host1:rpc-port/data
fs.viewfs.mounttable.Cluster1.link./project
hdfs://nn-host2:rpc-port/project
fs.viewfs.mounttable.Cluster1.link./user
hdfs://nn-host3:rpc-port/user
fs.viewfs.mounttable.Cluster1.link./tmp
hdfs://nn-host4:rpc-port/tmp
fs.viewfs.mounttable.Cluster1.linkFallback
hdfs://nn-host1:rpc-port/
访问地址:
/foo/bar => nn-host4
/tmp/data => nn-host1
->
viewfs://cluster1/foo/bar
viewfs://cluster1/tmp/data
ViewFS方案也存在一些问题:
- 对于已经发不出去的客户端,升级比较困难;
- 对于新增目录,需要添加挂在表与产品对接,维护起来比较困难。
HDFS3.0架构
HDFS1.0架构和HDFS2.0架构存在的问题:
- HDFS的Master/Slave架构,使得Master节点在元数据存储与提供服务上都 会存在瓶颈。
- 为了解决扩展性、性能、隔离等问题,社区提出了Federation方案(HDFS- 1052)。
- 使用该方案之后,带来的问题就是同一个集群出现了多个命名空间 (namespace)。客户需要知道读写的数据在哪个命名空间下才可以进行操作。为了解决统一命名空间的问题,社区提出了基于客户端(client-side) 的解决方案ViewFS(HADOOP-7257)。
- ViewFS同样也存在一些问题,例如对于已经发布出去客户端升级比较困难, 对于新增目录需要增加挂载配置,维护起来比较困难。
社区在2.9和3.0版本 中发布了一个新的解决统一命名空间问题的方案Router-Based Federation (HDFS-10467),该方案是基于服务端进行实现的。
HDFS RBF概述
基于路由的Federation方案是在服务端添加了一个Federation layer,这个额 外的层允许客户端透明地访问任何子集群。Federation layer将Block访问引导 至适当的子群集,维护namespaces的状态。
Federation layer包含多个组件。Router是一个与NameNode具有相同接口的 组件,根据State Store的元数据信息将客户端请求转发给正确的子集群。State Store组件包含了远程挂载表(和ViewFS方案里面的配置文件类似 ,但在客户 端之间共享)。
Router(无状态):一个系统中可以包含多个Router,每个Router包含两个作用:
State Store(分布式):在State Store里面主要维护以下几方面的信息:
RBF访问流程:
HDFS异构存储
对需要频繁访问的数据我们称之为“热”数据,反之我们称之为“冷”数据,而处于中间的数据我们称之为“温”数据。 Hadoop从2.6.0版本开始支持异构存储。
那么如何定义数据为冷热呢,参考如下:
age | 使用频率 | 温度 |
---|---|---|
age < 7天 | 1天20次 | HOT |
7天 >= age < 1月 | 1周5次 | WARM |
1月 >= age < 3月 | 1月5次 | COLD |
3月 >= age < 3年 | 1年2次 | FROZEN |
HDFS定义了Lazy_Persist、All_SSD、One_SSD、Hot、Warm和Cold六种存储策略:
Policy ID | Policy Name | Block Placement(n replicas) | Fallback storages for creation | Fallback storages for replication |
---|---|---|---|---|
15 | Lazy_Persist | RAM_DISK: 1, DISK: n-1 | DISK | DISK |
12 | All_SSD | SSD: n | DISK | DISK |
10 | One_SSD | SSD: 1, DISK: n-1 | SSD, DISK | SSD, DISK |
7 | Hot (default) | DISK: n | ARCHIVE | |
5 | Warm | DISK: 1, ARCHIVE: n-1 | ARCHIVE, DISK | ARCHIVE, DISK |
2 | Cold | ARCHIVE: n | ||
1 | Provided | PROVIDED: 1, DISK: n-1 | PROVIDED, DISK | PROVIDED, DISK |
HDFS API
Shell
hadoop.apache.org/docs/stable…
hadoop.apache.org/docs/stable…
Java API
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
/**
* HDFS常用API
*/
public class HdfsTest {
private static final String HDFS_PATH = "hdfs://192.168.0.106:8020";
private static final String HDFS_USER = "root";
private static FileSystem fileSystem;
/**
* 获取fileSystem
*/
@Before
public void prepare() {
try {
Configuration configuration = new Configuration();
// 这里我启动的是单节点的Hadoop,副本系数可以设置为1,不设置的话默认值为3
configuration.set("dfs.replication", "1");
fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
}
}
/**
* 创建目录,支持递归创建
*/
@Test
public void mkDir() throws Exception {
fileSystem.mkdirs(new Path("/hdfs-api/test0/"));
}
/**
* 创建具有指定权限的目录
*/
@Test
public void mkDirWithPermission() throws Exception {
fileSystem.mkdirs(new Path("/hdfs-api/test1/"),
new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.READ));
}
/**
* 创建文件,并写入内容
*/
@Test
public void create() throws Exception {
// 如果文件存在,默认会覆盖, 可以通过第二个参数进行控制。第三个参数可以控制使用缓冲区的大小
FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/a.txt"),
true, 4096);
out.write("hello hadoop!".getBytes());
out.write("hello spark!".getBytes());
out.write("hello flink!".getBytes());
// 强制将缓冲区中内容刷出
out.flush();
out.close();
}
/**
* 判断文件是否存在
*/
@Test
public void exist() throws Exception {
boolean exists = fileSystem.exists(new Path("/hdfs-api/test/a.txt"));
System.out.println(exists);
}
/**
* 查看文件内容
*/
@Test
public void readToString() throws Exception {
FSDataInputStream inputStream = fileSystem.open(new Path("/hdfs-api/test/a.txt"));
String context = inputStreamToString(inputStream, "utf-8");
System.out.println(context);
}
/**
* 文件重命名
*/
@Test
public void rename() throws Exception {
Path oldPath = new Path("/hdfs-api/test/a.txt");
Path newPath = new Path("/hdfs-api/test/b.txt");
boolean result = fileSystem.rename(oldPath, newPath);
System.out.println(result);
}
/**
* 删除文件
*/
@Test
public void delete() throws Exception {
/*
* 第二个参数代表是否递归删除
* + 如果path是一个目录且递归删除为true, 则删除该目录及其中所有文件;
* + 如果path是一个目录但递归删除为false,则会则抛出异常。
*/
boolean result = fileSystem.delete(new Path("/hdfs-api/test/b.txt"), true);
System.out.println(result);
}
/**
* 上传文件到HDFS
*/
@Test
public void copyFromLocalFile() throws Exception {
// 如果指定的是目录,则会把目录及其中的文件都复制到指定目录下
Path src = new Path("D:BigData-Notesnotesinstallation");
Path dst = new Path("/hdfs-api/test/");
fileSystem.copyFromLocalFile(src, dst);
}
/**
* 上传文件到HDFS
*/
@Test
public void copyFromLocalBigFile() throws Exception {
File file = new File("D:kafka.tgz");
final float fileSize = file.length();
InputStream in = new BufferedInputStream(new FileInputStream(file));
FSDataOutputStream out = fileSystem.create(new Path("/hdfs-api/test/kafka5.tgz"),
new Progressable() {
long fileCount = 0;
public void progress() {
fileCount++;
// progress方法每上传大约64KB的数据后就会被调用一次
System.out.println("文件上传总进度:" + (fileCount * 64 * 1024 / fileSize) * 100 + " %");
}
});
IOUtils.copyBytes(in, out, 4096);
}
/**
* 从HDFS上下载文件
*/
@Test
public void copyToLocalFile() throws Exception {
Path src = new Path("/hdfs-api/test/kafka.tgz");
Path dst = new Path("D:app");
/*
* 第一个参数控制下载完成后是否删除源文件,默认是true,即删除;
* 最后一个参数表示是否将RawLocalFileSystem用作本地文件系统;
* RawLocalFileSystem默认为false,通常情况下可以不设置,
* 但如果你在执行时候抛出NullPointerException异常,则代表你的文件系统与程序可能存在不兼容的情况(window下常见),
* 此时可以将RawLocalFileSystem设置为true
*/
fileSystem.copyToLocalFile(false, src, dst, true);
}
/**
* 查看指定目录下所有文件的信息
*/
@Test
public void listFiles() throws Exception {
FileStatus[] statuses = fileSystem.listStatus(new Path("/hdfs-api"));
for (FileStatus fileStatus : statuses) {
//fileStatus的toString方法被重写过,直接打印可以看到所有信息
System.out.println(fileStatus.toString());
}
}
/**
* 递归查看指定目录下所有文件的信息
*/
@Test
public void listFilesRecursive() throws Exception {
RemoteIterator files = fileSystem.listFiles(new Path("/hbase"), true);
while (files.hasNext()) {
System.out.println(files.next());
}
}
/**
* 查看文件块信息
*/
@Test
public void getFileBlockLocations() throws Exception {
FileStatus fileStatus = fileSystem.getFileStatus(new Path("/hdfs-api/test/kafka.tgz"));
BlockLocation[] blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
for (BlockLocation block : blocks) {
System.out.println(block);
}
}
/**
* 测试结束后,释放fileSystem
*/
@After
public void destroy() {
fileSystem = null;
}
/**
* 把输入流转换为指定编码的字符
*
* @param inputStream 输入流
* @param encode 指定编码类型
*/
private static String inputStreamToString(InputStream inputStream, String encode) {
try {
if (encode == null || ("".equals(encode))) {
encode = "utf-8";
}
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, encode));
StringBuilder builder = new StringBuilder();
String str = "";
while ((str = reader.readLine()) != null) {
builder.append(str).append("n");
}
return builder.toString();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
参考文献
Hadoop权威指南-第三章
HDFS官网
BigData-Note
美团对HDFS的应用
尚硅谷大数据