Browsed by
Tag: 大数据

使用CDH示例程序进行字数统计

使用CDH示例程序进行字数统计

Wordcount程序是Hadoop上的经典“HelloWorld”程序。CDH系统自带了wordcount程序来检测部署的成功与否。

# 解压提前准备好的莎士比亚全集
[sujx@elephant ~]gzip -d shakespeare.txt.gz

# 上传至hadoop文件系统
[sujx@elephant ~] hdfs dfs -mkdir /user/sujx/input
[sujx@elephant ~]hdfs dfs -put shakespeare.txt /user/sujx/input

# 查看有哪些测试程序可用
[sujx@elephant ~] hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
An example program must be given as the first argument.
Valid program names are:
  aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files.
  aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files.
  bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi.  dbcount: An example job that count the pageview counts from a database.
  distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.    grep: A map/reduce program that counts the matches of a regex in the input.
  join: A job that effects a join over sorted, equally partitioned datasets
  multifilewc: A job that counts words from several files.
  pentomino: A map/reduce tile laying program to find solutions to pentomino problems.       pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method.
  randomtextwriter: A map/reduce program that writes 10GB of random textual data per node.   randomwriter: A map/reduce program that writes 10GB of random data per node.
  secondarysort: An example defining a secondary sort to the reduce.
  sort: A map/reduce program that sorts the data written by the random writer.
  sudoku: A sudoku solver.
  teragen: Generate data for the terasort
  terasort: Run the terasort
  teravalidate: Checking results of terasort
  wordcount: A map/reduce program that counts the words in the input files.
  wordmean: A map/reduce program that counts the average length of the words in the input files.
  wordmedian: A map/reduce program that counts the median length of the words in the input files.
  wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.

# 执行mapreduce运算,output文件夹会自动建立
[sujx@elephant ~]hadoop jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar wordcount /user/sujx/input/shakespeare.txt /user/sujx/output/

# 查看输出结果
[sujx@elephant ~] hdfs dfs -ls /user/sujx/output
Found 4 items
-rw-r--r--   3 sujx supergroup          0 2020-03-09 02:47 /user/sujx/output/_SUCCESS      -rw-r--r--   3 sujx supergroup     238211 2020-03-09 02:47 /user/sujx/output/part-r-00000  -rw-r--r--   3 sujx supergroup     236617 2020-03-09 02:47 /user/sujx/output/part-r-00001  -rw-r--r--   3 sujx supergroup     238668 2020-03-09 02:47 /user/sujx/output/part-r-00002 

# 查看输出内容
[sujx@elephant ~]$ hdfs dfs -tail /user/sujx/output/part-r-00000
.       3
writhled        1
writing,        4
writings.       1
writs   1
written,        3
wrong   112
wrong'd-        1
wrong-should    1
wrong.  39
wrong:  1
wronged 11
wronged.        3
wronger,        1
wronger;        1
wrongfully?     1
wrongs  40
wrongs, 9
wrongs; 9
wrote?  1
wrought,        4
…………
wHAT is HDFS?

wHAT is HDFS?

Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求,可以以流的形式访问(streaming access)文件系统中的数据。

概念

  1. HDFS集群分为两大角色:NameNode、DataNode;
  2. NameNode负责管理整个文件系统的元数据,second namenode是namenode的冷备;
  3. DataNode 负责管理用户的文件数据块;
  4. 文件会按照固定的大小(blocksize)(128M)切成若干块后分布式存储在若干台datanode上;
  5. 每一个文件块可以有多个副本,并存放在不同的datanode上;
  6. Datanode会定期向Namenode汇报自身所保存的文件block信息,而namenode则会负责保持文件的副本数量;
  7. HDFS的内部工作机制对客户端保持透明,客户端请求访问HDFS都是通过向namenode申请来进行;

写过程

HDFS_write

  1. Client调用DistributedFileSystem对象的create方法,创建一个文件输出流(FSDataOutputStream)对象
  2. 通过DistributedFileSystem对象与Hadoop集群的NameNode进行一次RPC远程调用,在HDFS的Namespace中创建一个文件条目(Entry),该条目没有任何的Block
  3. 通过FSDataOutputStream对象,向DataNode写入数据,数据首先被写入FSDataOutputStream对象内部的Buffer中,然后数据被分割成一个个Packet数据包
  4. 以Packet最小单位,基于Socket连接发送到按特定算法选择的HDFS集群中一组DataNode(正常是3个,可能大于等于1)中的一个节点上,在这组DataNode组成的Pipeline上依次传输Packet
  5. 这组DataNode组成的Pipeline反方向上,发送ack,最终由Pipeline中第一个DataNode节点将Pipeline ack发送给Client
  6. 完成向文件写入数据,Client在文件输出流(FSDataOutputStream)对象上调用close方法,关闭流
  7. 调用DistributedFileSystem对象的complete方法,通知NameNode文件写入成功

读过程

HDFS_read

  1. Client向NameNode发起RPC请求,来确定请求文件block所在的位置
  2. NameNode会视情况返回文件的部分或者全部block列表,对于每个block,NameNode都会返回含有该block副本的DataNode地址
  3. 这些返回的DN地址,会按照集群拓扑结构得出DataNode与客户端的距离,然后进行排序,排序两个规则:网络拓扑结构中距离Client的排在前;心跳机制中超时汇报的DN状态为STALE,这样的排在后
  4. Clietn选取排序靠前的DataNode来读取block,如果客户端本身就是DataNode,那么将从本地直接获取数据
  5. 底层本质是建立Socket Stream(FSDataInputStream) ,重复调用父类DataInputStream的read方法,知道这个块上的数据读取完毕
  6. 当读完列表的block后,若文件读取还没有结束,客户端会继续想NameNode获取下一批的block列表
  7. 读取完一个Block都会进行checksum验证,如果读取DataNode时出现错误,客户端会通知NameNode,然后再从下一个拥有该block副本的DataNode继续读取。注: 如果在读取过程中DFSInputStream检测到block错误,DFSInputStream也会检查从datanode读取来的数据的校验和,如果发现有数据损坏,它会把坏掉的block报告给namenode同时重新读取其他datanode上的其他block备份
  8. read方法是并行的读取block信息,不是一块一块的读取,NameNode只是返回Client请求包含块的DataNode地址,并不是返回请求块的数据
  9. 最终读取哎所有的block会合并成一个完整的最终文件

副本放置策略

  1. 第一个副本:放置在上传文件的DN;如果是集群外提交,则随机挑选一台磁盘不太满,CPU不太忙的节点
  2. 第二个副本:放置在于第一个副本不同的机架的节点上
  3. 第三个副本:与第二个副本相同机架的节点
  4. 更多副本:随机节点

NameNode 内存使用

NameNode内存数据主要对整个文件系统元数据的管理。Namenode目前元数据管理可以分成两个层次,一个是Namespace的管理层,这一层负责管理HDFS分布式文件系统中的树状目录和文件结构;另一层则为Block管理层,这一层负责管理HDFS分布式文件系统中存储文件到物理块之间的映射关系BlocksMap元数据。其中对Namespace的管理数据除在内存常驻外,会定期Flush到持久化设备中;对BlocksMap元数据的管理只存在内存;当NameNode发生重启,需要从持久化设备中读取Namespace管理数据,并重新构造BlocksMap。

除了对文件系统本身元数据的管理外,NameNode还需要维护DataNode本身的元数据,这部分空间相对固定,且占用空间较小。

从实际Hadoop集群环境历史数据看,当Namespace中包含INode(目录和文件总量)~140M,数据块数量~160M,常驻内存使用量达在~50G。随着数据规模的持续增长,内存占用接近同步线性增长。在整个HDFS服务中,NameNode的核心作用及内存数据结构的重要地位,所以分析内存使用情况对维护HDFS服务稳定性至关重要。

漫画HDFS

HDFS_001
HDFS_002
HDFS_003