BigData复习笔记01:HDFS1.0与MapReduce

HDFS1.0与MapReduce


troy-t-9sQgt_cR50c-unsplash

Chapter01. MapReduce & HDFS1.0

海量数据分流处理技术

分而治之

  • 大数据量

    • 早期搜索引擎的网页存储系统,单机存储数千万网页,几十亿的网页需要通过几百台单机服务器存储,url为Key
    • 分布式文件系统,按Block(64M-256M)来划分组织文件
      • 稳定性
      • 容错能力
      • 数据一致性
  • 大流量

    • 覆盖的大流量互联网服务
    • 南方流量分到电信机房,北方流量分到联通机房
    • 搜索引擎将query作为Key来分流
  • 大计算

    • 根据输入数据划分计算任务
    • MapReduce 按输入数据来划分

传统Hash方法

如何将大数据流量均分到N台服务器,做到负载均衡?

思路:

  • 找到合理的Key,Hash(Key)尽量分布均匀

    • Hash(Key) mod N == 0 分到第 0 台

    • Hash(Key) mod N == 1 分到第 1 台

    • ……

    • Hash(Key) mod N == i 分到第 i 台

    • ……

    • Hash(Key) mod N == N - 1 分到第N - 1台

  • 一般以时间戳为Key

随机划分

一致性Hash

支持动态增长, 更高级的划分方法,解决热点(Hot spot)问题

案例:

  • 服务器A承压50%
  • 服务器B承压30%
  • 服务器C承压20%

如图,用户按Hash(Key)顺时针访问不同服务器。

  • 若服务器B挂掉,则

    • 服务器B承压30% * 5/7 -> 交付服务器A

    • 服务器B承压30% * 2/7 -> 交付服务器C

一致性Hash

MapReduce

用于处理海量数据的分布式计算框架

前提:分布式存储架构

角色:

  • Master
  • Slave
  • Client

GFS存储

MapReduce基本思想

分而治之

分解 >> 求解 >> 合并

案例Demo:分面值数钞票

  • 方式1: 单点策略

    • 一个人数出所有的钞票,数出各面值各有多少张
  • 方式2: 分治策略

    • 每个人分得一部分钞票,数出各面值有多少张
    • 汇总,每个人负责统计一种面值

MapReduce计算流程

步骤

  1. 将数据输入到HDFS上
  2. 对输入数据进行处理
  3. 对处理的数据进行切片
  4. 根据就近原则,对切片数据进行对应节点的Map操作,结果暂存在内存缓冲区
  5. 当缓冲区数据大小到达阈值时
    1. 锁住缓冲区
    2. 对切片结果按partition和key进行排序【默认快速排序,第一关键字为分区号,第二关键字为key】,写入磁盘
    3. 将磁盘上的切片结果进行归并排序{partition, key, value}
  6. 将Map结果按partition传输到对应Reduce节点
  7. Reduce节点将不同Map节点传输的数据按partition分区信息合并,进行Reduce操作
  8. 结果处理后输出到HDFS

详解

  • File
    • 文件存储在HDFS中,每个文件切分成多个一定大小(默认64M)的Block,存储在多个DataNode节点上(默认3备份)
    • TextFile(明文标准输出)
      • hadoop fs -cat /xxx查看
    • SequenceFile(二进制输出)
      • hadoop fs -text /xxx查看
  • InputFormat
    • MR框架基础类之一(Java接口)
      • 数据分割(Data Splits)
        • 每个Split包含后一个Block的开头部分的数据(解决记录跨Block问题)
        • 如记录跨跃存储在两个Block中,这条记录属于前一个Block对应的Split
      • 记录读取器(Record Reader)
        • 将读取到Split导入Map
        • 每读取一条记录,将记录作为参数,调用一次Map函数
        • 继续这个过程,读取下一条记录直到Split尾部
  • Map
  • Shuffle
    • Partition, Sort, Spill, Merge, Combiner, Copy, Memory, Disk……
    • 性能优化的重点
      • Partition
        • 决定数据由哪个Reducer处理,从而分区(如Hash法)
      • MemoryBuffer
        • 内存缓冲区,每个Map的结果和Partition处理的Key Value结果都保存在缓存中
        • 缓冲区大小:默认100M
        • 溢写阈值:100M * 0.8 = 80M
        • 缓冲区中的数据:{partition, key, value}三元组
      • Spill
        • 内存缓冲区达到阈值时,溢写Spill线程锁住这80M的缓冲区,开始将数据写到本地磁盘中,然后释放内存
        • 每次溢写都生成一个数据文件
        • 溢出的数据到磁盘前会对数据进行Key排序Sort,以及合并Combiner
        • 发送相同Reduce的Key数量,会拼接到一起,减少Partition的索引数量
          • Sort
            • 缓冲区数据按照Key进行排序
          • Combiner
            • 数据合并,相同Key的数据,Value值合并,减少输出传输量
            • Combiner函数事实上是Reducer函数,满足Combiner处理不影响{sum, max等}最终Reduce等结果时,可以极大提升性能
  • Reduce
    • 多个Reduce任务输入的数据都属于不同的Partition,因此结果数据的Key不重复
    • 合并Reduce输出文件即可得到最终的结果

配置注意事项

  • 文件句柄个数
    • ulimit命令
    • 报错“当前打开文件超出最大个数”时使用
  • 合适的slot
    • 单机map、reduce个数【相互隔离】
    • mapred.tasktracker.map.tasks.maximum配置Map的slot数(默认2)
    • mapreduce.tasktracker.tasks.reduce.maximum配置Reduce的slot数(默认2)
    • 内存限制
    • Slot数 = CPU核数 - 1
    • 多机集群分离
  • 磁盘情况
    • 适合单机多磁盘(Raid阵列)
    • mapred.local.dirMap中间结果存储路径
    • dfs.data.dirHDFS数据存储路径
  • 配置加载
    • 简单配置通过提交作业时-file分发
    • 复杂较大配置
      • 传入HDFS
      • Map中打开文件读取
      • 建立内存结构
  • 确定Map任务数依次优先参考如下几个原则
    • 每个Map任务使用的内存不超过800M, 尽量在500M以下
    • 每个Map任务运行时间控制在大约20分钟,最好1-3分钟
    • 每个Map任务处理的最大数据量为一个HDFS块大小,一个Map任务处理的输入不能跨文件
    • Map任务总数不能超过平台可用的任务槽位
  • Map要点
    • Map个数为Split份数
    • 压缩文件不可切分【通常压缩文件用于控制Map个数】
    • 非压缩文件和Sequence文件可以切分
    • dfs.block.size决定block大小
  • 确定Reduce任务数依次优先参考如下几个原则
    • 每个Reduce任务使用的内存不超过800M, 尽量在500M以下
    • 每个Reduce任务运行时间控制在大约20分钟,最好1-3分钟
    • 整个Reduce阶段的输入数据总量
    • 每个Reduce任务处理的数据量控制在500M以内
    • Map任务数与Reduce任务数乘积
    • 输出数据要求
  • Reduce个数设置
    • mapred.reduce.tasks默认为1
    • Reduce个数太少
      • 单次执行慢
      • 出错再试成本高
    • Reduce个数太大
      • Shuffle开销大
      • 输出大量小文件

MapReduce

MapReduce重要进程(HDFS1.0)

  • JobTracker

    • 主进程,负责接收客户作业提交,调度任务到作业节点上运行,并提供诸如监控工作节点状态及任务进度等管理功能,1个MapReduce集群有1个JobTracker,一般运行在可靠的硬件上
    • TaskTracker是通过周期性的心跳来通知JobTracker其当前的健康状态,每一次心跳包含了可用的map和reduce任务数目,占用的数目及运行中任务的详细信息。JobTracker利用一个线程池同时处理心跳和客户请求。
    • 等待JobClient提交作业
  • TaskTracker

    • 由JobTracker指派任务,实例化用户程序,在本地执行任务并周期性地向JobTracker汇报状态。在每一个工作节点上永远只会有1个TaskTracker。
    • 每3s主动向JobTracker发送心跳询问有没有任务,如果有,让其派发任务给它执行

MapReduce采用多进程并发

  • 优点:
    • 方便任务资源控制和调配
    • 运行稳定
  • 缺点:
    • 消耗更多的启动时间【不适合低延时作业】

MapReduce作业提交流程

  1. 客户端Client提交作业请求
  2. Master的JobTracker接收请求分配Job ID
  3. 客户端在HDFS对应Job ID目录上传资源
  4. Client向JobTracker正式提交任务
  5. JobTracker对任务进行初始化
  6. JobTracker将HDFS对应Job ID目录文件分发到各个TaskTracker节点
  7. TaskTracker向JobTracker发送心跳
  8. TaskTracker向HDFS分发Job资源
  9. TaskTracker执行任务

MapReduce作业调度

  • 默认先进先出(FIFO)队列调度模式
    • 优先级:very_high, high, normal, low, very low

Streaming

MapReduce和HDFS采用Java实现,默认提供Java编程接口

Streaming框架允许任何程序语言实现的程序在Hadoop MapReduce中使用

Streaming方便已有程序向Hadoop平台移植

优点

  • 开发效率高
    • 方便移植Hadoop平台,仅需按照一定的格式从标准输入读取数据,向标准输出写数据
    • 原有单机程序稍加改动即可在Hadoop平台进行分布式处理
    • 容易单机调试
      • cat input | mapper | sort | reducer > output
  • 便于平台进行资源控制
    • Streaming框架中通过limit等方式可以灵活地限制应用程序使用的内存等资源

缺点

  • Streaming默认仅能处理文本数据,如要对二进制数据进行处理,比较好的方法是将二进制的key和value进行base64的编码转换成文本
  • 两次数据拷贝和解析(分割),带来一定开销

命令行要点

  • input
    • 指定作业的输入文件HDFS路径,支持使用*通配符,支持指定多个文件或目录,可多次使用
  • output
    • 指定作业的输出文件HDFS路径,路径必须不存在,并且执行作业用户需要具备创建该目录的权限,只能使用一次
  • mapper
    • 用户自己写的Map程序
  • reducer
    • 用户自己写的Reduce程序
  • file
    • 打包本地文件到提交的Job中
      • map和reduce的执行文件
      • map和reduce要用输入的文件,如配置文件
    • 类似的配置还有
      • cacheFile 提交HDFS文件到提交的Job中
      • cacheArchive 提交HDFS压缩文件到提交的Job中
  • jobconf
    • 提交作业的一些配置属性
    • 常见配置
      • mapred.map.tasksmap task数目
      • mapred.reduce.tasksreduce task数目
      • stream.num.map.output.key.field指定map task输出记录中key所占的域数目
      • num.key.dields.for.partition指定对key分出来的前几部分做partition而不是整个key

HDFS1.0

HDFS1.0基础

  • HDFS1.0系统架构
    • Master
      • NameNode
        • 管理着文件系统命名空间
          • 维护着文件系统树及树中的所有文件和目录
        • 存储元数据
          • NameNode保存元信息的种类
            • 文件名目录名及它们之间的层级关系
            • 文件目录的所有者及其权限
            • 每个文件块的名及文件有哪些块组成
        • 元数据保存在内存中
          • NameNode元信息并不包含每个块的位置信息
        • 保存文件/Block/DataNode之间的映射关系
          • 文件名 -> Block
          • Block -> DataNode
        • 运行NameNode会占用大量内存和I/O资源,一般NameNode不会存储用户数据或执行MapReduce任务
        • 全Hadoop系统仅一个NameNode
          • 单点问题
            • 方案1:将Hadoop元数据写入到本地文件系统时同步到远程挂载的网络文件系统NFS
            • 方案2:运行SecondaryNameNode进程,持久化到磁盘
      • SecondaryNameNode
        • 并不是NameNode,不取代NameNode也不是NameNode的备份
        • 作用是与NameNode交互,定期通过编辑日志文件合并命名空间镜像
        • 当NameNode发生故障,NameNode会通过自己合并的命名空间镜像副本来恢复数据
        • SecondaryNameNode保存的NameNode元信息总是滞后于NameNode的,会导致部分数据丢失
    • Worker
      • DataNode
        • 保存Block -> Path的映射关系
        • 负责存储数据块,负责为系统客户端提供数据块的读写服务
        • 根据NameNode的指示进行创建/删除和复制等操作
        • 心跳机制,定期报告文件块列表信息
        • DataNode间通信,进行块的副本处理
          • 数据块
            • HDFS默认数据块大小为64M
            • 磁盘块一般为512B
            • 块增大可以减少寻址时间,降低寻址时间/文件传输时间
            • 数据块过大导致整体任务量过小,降低作业处理速度
  • Hadoop更倾向存储大文件
    • 一般来说,一条元信息记录会占用200byte内存空间。
    • 假设块大小为64M,备份数量是3,那么一个1G大小的文件将占用16 * 3 = 48 个文件块
    • 如现在有1000个1M大小的文件,则会占用 1000 * 3 = 3000 个文件块(多个文件不能放到一个块中)
    • 如果文件越小,存储同等大小的文件所需要的元信息就越多
  • 元信息持久化
    • 在NameNode中存放元信息的文件是fsimage
    • 在系统运行期间所有对元信息的操作都保存在内存中并被持久化到另一个文件edits中
    • fsimage文件与edits文件会被SecondaryNameNode进程周期性合并

SecondaryNameNode

机架感知策略

默认3副本

  • 第一个副本,放在与客户端相同的节点(如客户端是集群外的一台机器,就随机算节点,但是系统会避免挑选太满或太忙的节点)
  • 第二个副本,放在不同机架(随机选择)的节点
  • 第三个副本,放在与第二个副本同机架但是不同节点上
  • distance
    • distance = 0, 相同DataNode
    • distance = 2, 相同Rack下的不同DataNode
    • distance = 4, 相同IDC下的不同DataNode
    • distance = 6, 不同IDC下的DataNode

数据完整性校验

  • 不希望在存储和处理数据时丢失或损坏任何数据

  • HDFS会对写入的数据计算校验和,并在读取时验证校验和

  • 两种校验方法

    • 校验和

      • 检测损坏数据的常用方法时在第一次写入系统时计算数据的校验和,在通道传输过程中,如果新生成的校验和不完全匹配原始的校验和,那么数据就会被认定为是被损坏的(默认512字节创建1个校验码
    • 数据块检测程序DataBlockScanner

      • 在DataNode节点上开启一个后台进程,来定期验证存储在它上的所有块,这个是防止物理介质出现损减情况而造成的数据损坏(损坏数据从其他DataNode拷贝)

可靠性措施

  • 一个名字节点和多个数据节点
  • 数据复制(冗余机制)
    • 存放位置(机架感知策略)
    • 并不是3副本写完才返回ack,三副本中有1个写成功就返回ack
  • 故障检测
    • 数据节点
      • 心跳包(检测是否宕机)
      • 块报告(安全模式下检测)
      • 数据完整性检测(校验和比较)
    • 名字节点
      • 日志文件
      • 镜像文件
  • 空间回收机制
    • Trash目录(修改core-site.xml)

HDFS&MapReduce本地模式

  • Master(小集群)
    • NameNode
    • JobTracker
  • Slave
    • DataNode
    • TaskTracker

案例代码

常见实践有:

数据统计

WordCount

数据过滤(清洗)

从日志查找某一个条件等数据

除去非法数据,保留合法数据

数据格式整理

同类汇聚

多份日志,相同时间点、用户行为日志Join

类表格文件存储中,相同主键拼接相关属性

历史的主数据与新增,修改数据合并

全局排序

混合日志,按时间排列好顺序

按某个或多个字段有序

容错框架

测试集群状态:在集群上运行一个错误代码Job,进行观察

使用易出错的服务,365 * 24 运行

计算规模经常变化调整的服务

单进程程序,迅速提升执行计算效率

WordCount

Python3

map.py

1
2
3
4
5
6
7
import sys


for line in sys.stdin:
ss = line.strip().split(' ')
for word in ss:
print(word.strip() + '\t' + '1')

reduce.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import sys


current_word = None
cnt_sum = 0

for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
word, cnt = ss

if current_word is None:
current_word = word
if current_word != word:
print(current_word.strip() + '\t' + str(cnt_sum))
current_word = word
cnt_sum = 0

cnt_sum += int(cnt)

print(current_word.strip() + '\t' + str(cnt_sum))

The_Man_of_Property.txt

1
“The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive instincts, and as little proof against the inroads of beauty and passion as Swithin, Soames, or even Young Jolyon. And if heroic figures, in days that never were, seem to startle out from their surroundings in fashion unbecoming to a Forsyte of the Victorian era, we may be sure that tribal instinct was even then the prime force, and that “family” and the sense of home and property counted as they do to this day, for all the recent efforts to “talk them out.”

run.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env bash

# hadoop命令地址
HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"

# hadoop streaming jar包地址
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

# HDFS 输入文件路径
INPUT_FILE_PATH="/week01/01_mr_wordcount/The_Man_of_Property.txt"

# HDFS 输出文件路径
OUTPUT_PATH="/week01/01_mr_wordcount/output/python3"

# 输入文件本地路径
LOCAL_FILE_PATH="/mnt/hgfs/Code/week01/01_mr_wordcount/The_Man_of_Property.txt"
# 输入文件 HDFS上传路径
UPLOAD_PATH="/week01/01_mr_wordcount"

# 删除HDFS存在目录
${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}

# 创建HDFS 上传目录
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
# 将本地输入文件上传到HDFS目录
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH} ${UPLOAD_PATH}

# 命令行
${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-files map.py,red.py \
-input ${INPUT_FILE_PATH} \
-output ${OUTPUT_PATH} \
-mapper "python map.py" \
-reducer "python red.py" \

# -file 过时了,2.8.5用-files代替,作为可选参数需放在-input等参数前面

AllSort_1Reduce

Version 1

Python3

map_sort.py
1
2
3
4
5
6
7
8
9
10
11
import sys


base_count = 10000

for line in sys.stdin:
ss = line.strip().split('\t')
key, val = ss

new_key = base_count + int(key)
print(str(new_key) + '\t' + val)
red_sort.py
1
2
3
4
5
6
7
8
9
10
import sys


base_count = 10000

for line in sys.stdin:
new_key, val = line.strip().split('\t')

key = int(new_key) - base_count
print('%s\t%s' % (key, val))
a.txt
1
2
3
4
5
1	hadoop
3 hadoop
5 hadoop
7 hadoop
9 hadoop
b.txt
1
2
3
4
5
0	java
2 java
4 java
6 java
8 java
run.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH_A="/week01/02_mr_allsort_1reduce/a.txt"
INPUT_FILE_PATH_B="/week01/02_mr_allsort_1reduce/b.txt"

OUTPUT_PATH="/week01/02_mr_allsort_1reduce/version1/output/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/02_mr_allsort_1reduce/a.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/02_mr_allsort_1reduce/b.txt"
UPLOAD_PATH="/week01/02_mr_allsort_1reduce/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_A}
${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_B}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH}

${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.reduces=1 \
-files map_sort.py,red_sort.py \
-input ${INPUT_FILE_PATH_A},${INPUT_FILE_PATH_B} \
-output ${OUTPUT_PATH} \
-mapper "python map_sort.py" \
-reducer "python red_sort.py" \

# 依赖MapReduce框架自身的sort功能:
# -D mapreduce.job.reduces=1
# -jobconf 可用-D 替代,作为可选参数放在-input等前面

Version 2

Python3

map_sort.py
1
2
3
4
5
import sys


for line in sys.stdin:
print(line.strip())
red_sort.py
1
2
3
4
5
import sys


for line in sys.stdin:
print(line.strip())
a.txt
1
2
3
4
5
1	hadoop
3 hadoop
5 hadoop
7 hadoop
9 hadoop
b.txt
1
2
3
4
5
0	java
2 java
4 java
6 java
8 java
run.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH_A="/week01/02_mr_allsort_1reduce/a.txt"
INPUT_FILE_PATH_B="/week01/02_mr_allsort_1reduce/b.txt"

OUTPUT_PATH="/week01/02_mr_allsort_1reduce/version2/output/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/02_mr_allsort_1reduce/a.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/02_mr_allsort_1reduce/b.txt"
UPLOAD_PATH="/week01/02_mr_allsort_1reduce/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_A}
${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_B}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH}

${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D stream.num.map.output.key.fields=1 \
-D mapreduce.partition.keypartitioner.options="-k1,1" \
-D mapreduce.partition.keycomparator.options="-k1,1n" \
-D mapreduce.job.reduces=1 \
-files map_sort.py,red_sort.py \
-input ${INPUT_FILE_PATH_A},${INPUT_FILE_PATH_B} \
-output ${OUTPUT_PATH} \
-mapper "python map_sort.py" \
-reducer "python red_sort.py" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \

# 设置单reduce:
# -D mapreduce.job.reduces=1 \
# 控制分发,完成二次排序:
# -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
# 完成key排序:
# -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
# 设置分隔符位置(默认\t),分隔符之前为key,之后为value
# -D stream.num.map.output.key.fields=1 \
# 选择哪一部分做partition,-k1,1表示partition的key范围是(1,1),即第1列
# -D mapreduce.partition.keypartitioner.options="-k1,1" \
# 设置key中需要比较的字段或字节范围,-k1,1表示sort的key范围是(1,1),即第1列,n表示按数字number类型排序
# -D mapreduce.partition.keycomparator.options="-k1,1n" \

AllSort

Python3

map_sort.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import sys


base_count = 10000

for line in sys.stdin:
ss = line.strip().split('\t')
key, val = ss

new_key = base_count + int(key)

partition_index = 1
if new_key < (10100 + 10000) / 2:
partition_index = 0

print("%s\t%s\t%s" % (partition_index, new_key, val))

red_sort.py

1
2
3
4
5
6
7
8
9
10
import sys


base_count = 10000

for line in sys.stdin:
partition_index, new_key, val = line.strip().split('\t')

key = int(new_key) - base_count
print('\t'.join([str(key), val]))

a.txt

1
2
3
4
5
1	hadoop
3 hadoop
5 hadoop
7 hadoop
9 hadoop

b.txt

1
2
3
4
5
0	java
2 java
4 java
6 java
8 java

run.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH_A="/week01/03_mr_allsort/a.txt"
INPUT_FILE_PATH_B="/week01/03_mr_allsort/b.txt"

OUTPUT_PATH="/week01/03_mr_allsort/output/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/03_mr_allsort/a.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/03_mr_allsort/b.txt"
UPLOAD_PATH="/week01/03_mr_allsort/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_A}
${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_B}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH}

${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.reduces=2 \
-D stream.num.map.output.key.fields=2 \
-D num.key.fields.for.partition=1 \
-files map_sort.py,red_sort.py \
-input ${INPUT_FILE_PATH_A},${INPUT_FILE_PATH_B} \
-output ${OUTPUT_PATH} \
-mapper "python map_sort.py" \
-reducer "python red_sort.py" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

# 设置分隔符位置(默认\t),分隔符之前为key,之后为value,此处以前2列为key
# -D stream.num.map.output.key.fields=2 \
# 设置partition key(仅能从头顺序选取范围)用来做分发,此处以第1列做partition
# -D num.key.fields.for.partition=1 \
# 设置partition key(可选择中间范围)用来做分发
# -D mapreduce.partition.keypartitioner.options="-k1,1" \

File_Broadcast

File

Python3

map.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import sys


def read_local_file_func(f):
word_set = set()
file_in = open(f, 'r')
for line in file_in:
word = line.strip()
word_set.add(word)
return word_set


def mapper_func(white_list_fd):
word_set = read_local_file_func(white_list_fd)

for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print("%s\t%s" % (s, 1))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
red.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import sys


def reducer_func():
current_word = None
count_pool = []
sum = 0

for line in sys.stdin:
word, val = line.strip().split('\t')

if current_word is None:
current_word = word

if current_word != word:
for count in count_pool:
sum += count
print("%s\t%s" % (current_word, sum))
current_word = word
count_pool = []
sum = 0

count_pool.append(int(val))

for count in count_pool:
sum += count
print("%s\t%s" % (current_word, str(sum)))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
The_Man_of_Property.txt
1
“The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive instincts, and as little proof against the inroads of beauty and passion as Swithin, Soames, or even Young Jolyon. And if heroic figures, in days that never were, seem to startle out from their surroundings in fashion unbecoming to a Forsyte of the Victorian era, we may be sure that tribal instinct was even then the prime force, and that “family” and the sense of home and property counted as they do to this day, for all the recent efforts to “talk them out.”
white_list
1
the
run.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH="/week01/04_mr_file_broadcast/The_Man_of_Property.txt"
OUTPUT_PATH="/week01/04_mr_file_broadcast/file/output/python3"

LOCAL_FILE_PATH="/mnt/hgfs/Code/week01/04_mr_file_broadcast/The_Man_of_Property.txt"
UPLOAD_PATH="/week01/04_mr_file_broadcast/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH} ${UPLOAD_PATH}

${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.reduces=2 \
-files map.py,red.py,white_list \
-input ${INPUT_FILE_PATH} \
-output ${OUTPUT_PATH} \
-mapper "python map.py mapper_func white_list" \
-reducer "python red.py reducer_func" \

cacheFile

Python3

map.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import sys


def read_local_file_func(file):
word_set = set()
file_in = open(file, 'r')
for line in file_in:
word = line.strip()
word_set.add(word)
return word_set


def mapper_func(white_list):
word_set = read_local_file_func(white_list)

for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print("%s\t%s" % (s, 1))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
red.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import sys


def reducer_func():
current_word = None
cnt_sum = 0

for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
word, cnt = ss

if current_word is None:
current_word = word
if current_word != word:
print(current_word.strip() + '\t' + str(cnt_sum))
current_word = word
cnt_sum = 0

cnt_sum += int(cnt)

print(current_word.strip() + '\t' + str(cnt_sum))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
The_Man_of_Property.txt
1
“The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive instincts, and as little proof against the inroads of beauty and passion as Swithin, Soames, or even Young Jolyon. And if heroic figures, in days that never were, seem to startle out from their surroundings in fashion unbecoming to a Forsyte of the Victorian era, we may be sure that tribal instinct was even then the prime force, and that “family” and the sense of home and property counted as they do to this day, for all the recent efforts to “talk them out.”
white_list
1
the
run.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH="/week01/04_mr_file_broadcast/The_Man_of_Property.txt"
OUTPUT_PATH="/week01/04_mr_file_broadcast/cachefile/output/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/04_mr_file_broadcast/The_Man_of_Property.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/04_mr_file_broadcast/white_list"
UPLOAD_PATH_A="/week01/04_mr_file_broadcast/"
UPLOAD_PATH_B="/week01/04_mr_file_broadcast/cachefile/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH_B}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${UPLOAD_PATH_A}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH_B}

${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.reduces=2 \
-D mapreduce.job.name=cachefile_demo \
-files map.py,red.py,"hdfs://master:9000/week01/04_mr_file_broadcast/cachefile/white_list#WH" \
-input ${INPUT_FILE_PATH} \
-output ${OUTPUT_PATH} \
-mapper "python map.py mapper_func WH" \
-reducer "python red.py reducer_func" \

# 过时了
# -cacheFile "hdfs://master:9000/week01/04_mr_file_broadcast/cachefile
# /white_list#WWWHHH" \

#-cacheFile "${HDFS_FILE_PATH}#WH" \

cacheArchive

Python3

map.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os
import sys
import gzip


def get_file_handler(f):
file_in = open(f, 'r')
return file_in


def get_cachefile_handlers(f):
f_handlers_list = []
if os.path.isdir(f):
for fd in os.listdir(f):
f_handlers_list.append(get_file_handler(f + '/' + fd))
return f_handlers_list


def read_local_file_func(f):
word_set = set()
for cachefile in get_cachefile_handlers(f):
for line in cachefile:
word = line.strip()
word_set.add(word)
return word_set


def mapper_func(white_list_fd):
word_set = read_local_file_func(white_list_fd)

for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print("%s\t%s" % (s, 1))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
red.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import sys


def reducer_func():
current_word = None
count_pool = []
sum = 0

for line in sys.stdin:
word, val = line.strip().split('\t')

if current_word is None:
current_word = word

if current_word != word:
for count in count_pool:
sum += count
print("%s\t%s" % (current_word, sum))
current_word = word
count_pool = []
sum = 0

count_pool.append(int(val))

for count in count_pool:
sum += count
print("%s\t%s" % (current_word, str(sum)))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)
The_Man_of_Property.txt
1
“The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive instincts, and as little proof against the inroads of beauty and passion as Swithin, Soames, or even Young Jolyon. And if heroic figures, in days that never were, seem to startle out from their surroundings in fashion unbecoming to a Forsyte of the Victorian era, we may be sure that tribal instinct was even then the prime force, and that “family” and the sense of home and property counted as they do to this day, for all the recent efforts to “talk them out.”
w.tar.gz
white_list_1
1
the
white_list_2
1
a
run.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH="/week01/04_mr_file_broadcast/The_Man_of_Property.txt"
OUTPUT_PATH="/week01/04_mr_file_broadcast/cachearchive/output/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/04_mr_file_broadcast/The_Man_of_Property.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/04_mr_file_broadcast/w.tar.gz"
UPLOAD_PATH_A="/week01/04_mr_file_broadcast/"
UPLOAD_PATH_B="/week01/04_mr_file_broadcast/cachearchive/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH_B}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${UPLOAD_PATH_A}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH_B}

${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.reduces=2 \
-D mapreduce.job.name=cachearchive_demo \
-files map.py,red.py \
-archives "hdfs://master:9000/week01/04_mr_file_broadcast/cachearchive/w.tar.gz#WH.gz" \
-input ${INPUT_FILE_PATH} \
-output ${OUTPUT_PATH} \
-mapper "python map.py mapper_func WH.gz" \
-reducer "python red.py reducer_func" \

Compression

Python3

map.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os
import sys
import gzip


def get_file_handler(f):
file_in = open(f, 'r')
return file_in


def get_cachefile_handlers(f):
f_handlers_list = []
if os.path.isdir(f):
for fd in os.listdir(f):
f_handlers_list.append(get_file_handler(f + '/' + fd))
return f_handlers_list


def read_local_file_func(f):
word_set = set()
for cachefile in get_cachefile_handlers(f):
for line in cachefile:
word = line.strip()
word_set.add(word)
return word_set


def mapper_func(white_list_fd):
word_set = read_local_file_func(white_list_fd)

for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print("%s\t%s" % (s, 1))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)

red.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import sys


def reducer_func():
current_word = None
count_pool = []
sum = 0

for line in sys.stdin:
word, val = line.strip().split('\t')

if current_word is None:
current_word = word

if current_word != word:
for count in count_pool:
sum += count
print("%s\t%s" % (current_word, sum))
current_word = word
count_pool = []
sum = 0

count_pool.append(int(val))

for count in count_pool:
sum += count
print("%s\t%s" % (current_word, str(sum)))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)

The_Man_of_Property.txt

1
“The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive instincts, and as little proof against the inroads of beauty and passion as Swithin, Soames, or even Young Jolyon. And if heroic figures, in days that never were, seem to startle out from their surroundings in fashion unbecoming to a Forsyte of the Victorian era, we may be sure that tribal instinct was even then the prime force, and that “family” and the sense of home and property counted as they do to this day, for all the recent efforts to “talk them out.”

white_list_dir.tar.gz

white_list_1
1
the
white_list_2
1
a

run.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH="/week01/05_mr_compression/The_Man_of_Property.txt"
OUTPUT_PATH="/week01/05_mr_compression/run_1/output/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/05_mr_compression/The_Man_of_Property.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/05_mr_compression/white_list_dir.tar.gz"
UPLOAD_PATH="/week01/05_mr_compression/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH}

# Step 1.
${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.reduces=2 \
-D mapreduce.job.name=compression_run_1_demo \
-D mapreduce.map.output.compress=true \
-D mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec \
-D mapreduce.output.fileoutputformat.compress=true \
-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec \
-files map.py,red.py \
-archives "hdfs://master:9000/week01/05_mr_compression/white_list_dir.tar.gz#WH.gz" \
-input ${INPUT_FILE_PATH} \
-output ${OUTPUT_PATH} \
-mapper "python map.py mapper_func WH.gz" \
-reducer "python red.py reducer_func" \

Join

Python3

map_a.py

1
2
3
4
5
6
7
import sys


for line in sys.stdin:
key, val = line.strip().split(' ')

print("%s\t1\t%s" % (key, val))

a.txt

1
2
3
4
5
aaa1	123
aaa2 123
aaa3 123
aaa4 123
aaa5 123

map_b.py

1
2
3
4
5
6
7
import sys


for line in sys.stdin:
key, val = line.strip().split(' ')

print("%s\t2\t%s" % (key, val))

b.txt

1
2
3
4
5
aaa1	hadoop
aaa2 hadoop
aaa3 hadoop
aaa4 hadoop
aaa5 hadoop

red_join.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import sys


val_1 = ""

for line in sys.stdin:
key, flag, val = line.strip().split('\t')

if flag == '1':
val_1 = val
elif flag == '2':
val_2 = val
print("%s\t%s\t%s" % (key, val_1, val_2))
val_1 = ""

run.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH_A="/week01/06_mr_join/a.txt"
INPUT_FILE_PATH_B="/week01/06_mr_join/b.txt"

OUTPUT_PATH_A="/week01/06_mr_join/output/a/python3"
OUTPUT_PATH_B="/week01/06_mr_join/output/b/python3"

OUTPUT_PATH_JOIN="/week01/06_mr_join/output/join/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/06_mr_join/a.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/06_mr_join/b.txt"
UPLOAD_PATH="/week01/06_mr_join/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_A} ${INPUT_FILE_PATH_B} ${OUTPUT_PATH_A} ${OUTPUT_PATH_B} ${OUTPUT_PATH_JOIN}

${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH}

# Step 1.
${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-files map_a.py \
-input ${INPUT_FILE_PATH_A} \
-output ${OUTPUT_PATH_A} \
-mapper "python map_a.py" \

# Step 2.
${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-files map_b.py \
-input ${INPUT_FILE_PATH_B} \
-output ${OUTPUT_PATH_B} \
-mapper "python map_b.py" \

# Step 3.
${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D stream.num.map.output.key.fields=2 \
-D num.key.fields.for.partition=1 \
-files red_join.py \
-input ${OUTPUT_PATH_A},${OUTPUT_PATH_B} \
-output ${OUTPUT_PATH_JOIN} \
-mapper "cat" \
-reducer "python red_join.py" \

附加项目:pyweb

Python3

首先请确保使用pip install web.py==0.40-dev1

main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import web
import sys

urls = (
'/', 'index',
'/test', 'test',
)

app = web.application(urls, globals())

userid_rec_dict = {}
with open('file.test', 'r') as fd:
for line in fd:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
userid = ss[0].strip()
items = ss[1].strip()
userid_rec_dict[userid] = items


class index:
def GET(self):
params = web.input()
userid = params.get('userid', '')
if userid not in userid_rec_dict:
return 'no rec!'
else:
return '\n'.join(userid_rec_dict[userid].strip().split(''))


class test:
def GET(self):
print(web.input())
return '222'


if __name__ == "__main__":
app.run()

file.test

1
2
3
zhangsan	1
lisi 2
wangwu 3

终端输入python main.py 12345启动web服务器

  • 如遇到报错信息

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    Traceback (most recent call last):
    File "D:\Program Files\Python\Python37\lib\site-packages\web\utils.py", line 526, in take
    yield next(seq)
    StopIteration
    The above exception was the direct cause of the following exception:
    Traceback (most recent call last):
    File "D:\Python\hello.py", line 6, in <module>
    app = web.application(urls, globals(),True)
    File "D:\Program Files\Python\Python37\lib\site-packages\web\application.py", line 62, in __init__
    self.init_mapping(mapping)
    File "D:\Program Files\Python\Python37\lib\site-packages\web\application.py", line 130, in init_mapping
    self.mapping = list(utils.group(mapping, 2))
    File "D:\Program Files\Python\Python37\lib\site-packages\web\utils.py", line 531, in group
    x = list(take(seq, size))
    RuntimeError: generator raised StopIteration

    修改Lib\site-packages\web 下的utils.py文件

    1
    2
    3
    4
    + try:
    yield next(seq) # 526行
    + except StopIteration:
    + return

网页打开http://0.0.0.0:12345/即可访问页面

输入网址http://0.0.0.0:12345/?userid=zhangsan页面显示1

未来可拓展推荐系统 远程分词服务等



Alessa0 wechat
(> <)  中国儿童少年基金会  &  Alessa0.cn  谢谢您的帮助!
--------- 本文结束 感谢您的阅读 ---------

本文标题:BigData复习笔记01:HDFS1.0与MapReduce

文章作者:Alessa0

发布时间:2019年07月17日 - 21:07

最后更新:2019年08月05日 - 15:08

原始链接:https://alessa0.cn/posts/34c278f3/

版权声明: CC BY-NC-ND 4.0 转载请保留原文链接及作者。

0%