BigData 复习笔记 01:HDFS1.0 与 MapReduce

HDFS1.0 与 MapReduce

troy-t-9sQgt_cR50c-unsplash

Chapter01. MapReduce & HDFS1.0

海量数据分流处理技术

分而治之

  • 大数据量
    • 早期搜索引擎的网页存储系统,单机存储数千万网页,几十亿的网页需要通过几百台单机服务器存储,url 为 Key
    • 分布式文件系统,按 Block (64M-256M) 来划分组织文件
      • 稳定性
      • 容错能力
      • 数据一致性
  • 大流量
    • 覆盖的大流量互联网服务
    • 南方流量分到电信机房,北方流量分到联通机房
    • 搜索引擎将 query 作为 Key 来分流
  • 大计算
    • 根据输入数据划分计算任务
    • MapReduce 按输入数据来划分

传统 Hash 方法

如何将大数据流量均分到 N 台服务器,做到负载均衡?

思路:

  • 找到合理的 Key,Hash(Key) 尽量分布均匀

    • Hash (Key) mod N == 0 分到第 0 台

    • Hash (Key) mod N == 1 分到第 1 台

    • ……

    • Hash (Key) mod N == i 分到第 i 台

    • ……

    • Hash (Key) mod N == N - 1 分到第 N - 1 台

  • 一般以时间戳为 Key

随机划分

一致性 Hash

支持动态增长, 更高级的划分方法,解决热点 (Hot spot) 问题

案例:

  • 服务器 A 承压 50%
  • 服务器 B 承压 30%
  • 服务器 C 承压 20%

如图,用户按 Hash (Key) 顺时针访问不同服务器。

  • 若服务器 B 挂掉,则

    • 服务器 B 承压 30% * 5/7 -> 交付服务器 A

    • 服务器 B 承压 30% * 2/7 -> 交付服务器 C

一致性Hash

MapReduce

用于处理海量数据的分布式计算框架

前提:分布式存储架构

角色:

  • Master
  • Slave
  • Client
GFS存储

MapReduce 基本思想

分而治之

分解 >> 求解 >> 合并

案例 Demo:分面值数钞票

  • 方式 1: 单点策略
    • 一个人数出所有的钞票,数出各面值各有多少张
  • 方式 2: 分治策略
    • 每个人分得一部分钞票,数出各面值有多少张
    • 汇总,每个人负责统计一种面值

MapReduce 计算流程

步骤

  1. 将数据输入到 HDFS 上
  2. 对输入数据进行处理
  3. 对处理的数据进行切片
  4. 根据就近原则,对切片数据进行对应节点的 Map 操作,结果暂存在内存缓冲区
  5. 当缓冲区数据大小到达阈值时
    1. 锁住缓冲区
    2. 对切片结果按 partition 和 key 进行排序【默认快速排序,第一关键字为分区号,第二关键字为 key】,写入磁盘
    3. 将磁盘上的切片结果进行归并排序 {partition, key, value}
  6. 将 Map 结果按 partition 传输到对应 Reduce 节点
  7. Reduce 节点将不同 Map 节点传输的数据按 partition 分区信息合并,进行 Reduce 操作
  8. 结果处理后输出到 HDFS

详解

  • File
    • 文件存储在 HDFS 中,每个文件切分成多个一定大小(默认 64M)的 Block,存储在多个 DataNode 节点上(默认 3 备份)
    • TextFile(明文标准输出)
      • hadoop fs -cat /xxx 查看
    • SequenceFile(二进制输出)
      • hadoop fs -text /xxx 查看
  • InputFormat
    • MR 框架基础类之一(Java 接口)
      • 数据分割(Data Splits)
        • 每个 Split 包含后一个 Block 的开头部分的数据(解决记录跨 Block 问题)
        • 如记录跨跃存储在两个 Block 中,这条记录属于前一个 Block 对应的 Split
      • 记录读取器(Record Reader)
        • 将读取到 Split 导入 Map
        • 每读取一条记录,将记录作为参数,调用一次 Map 函数
        • 继续这个过程,读取下一条记录直到 Split 尾部
  • Map
  • Shuffle
    • Partition, Sort, Spill, Merge, Combiner, Copy, Memory, Disk……
    • 性能优化的重点
      • Partition
        • 决定数据由哪个 Reducer 处理,从而分区(如 Hash 法)
      • MemoryBuffer
        • 内存缓冲区,每个 Map 的结果和 Partition 处理的 Key Value 结果都保存在缓存中
        • 缓冲区大小:默认 100M
        • 溢写阈值:100M * 0.8 = 80M
        • 缓冲区中的数据:{partition, key, value} 三元组
      • Spill
        • 内存缓冲区达到阈值时,溢写 Spill 线程锁住这 80M 的缓冲区,开始将数据写到本地磁盘中,然后释放内存
        • 每次溢写都生成一个数据文件
        • 溢出的数据到磁盘前会对数据进行 Key 排序 Sort,以及合并 Combiner
        • 发送相同 Reduce 的 Key 数量,会拼接到一起,减少 Partition 的索引数量
          • Sort
            • 缓冲区数据按照 Key 进行排序
          • Combiner
            • 数据合并,相同 Key 的数据,Value 值合并,减少输出传输量
            • Combiner 函数事实上是 Reducer 函数,满足 Combiner 处理不影响 {sum, max 等} 最终 Reduce 等结果时,可以极大提升性能
  • Reduce
    • 多个 Reduce 任务输入的数据都属于不同的 Partition,因此结果数据的 Key 不重复
    • 合并 Reduce 输出文件即可得到最终的结果

配置注意事项

  • 文件句柄个数
    • ulimit 命令
    • 报错 “当前打开文件超出最大个数” 时使用
  • 合适的 slot
    • 单机 map、reduce 个数【相互隔离】
    • mapred.tasktracker.map.tasks.maximum 配置 Map 的 slot 数(默认 2)
    • mapreduce.tasktracker.tasks.reduce.maximum 配置 Reduce 的 slot 数(默认 2)
    • 内存限制
    • Slot 数 = CPU 核数 - 1
    • 多机集群分离
  • 磁盘情况
    • 适合单机多磁盘(Raid 阵列)
    • mapred.local.dirMap 中间结果存储路径
    • dfs.data.dirHDFS 数据存储路径
  • 配置加载
    • 简单配置通过提交作业时 - file 分发
    • 复杂较大配置
      • 传入 HDFS
      • Map 中打开文件读取
      • 建立内存结构
  • 确定 Map 任务数依次优先参考如下几个原则
    • 每个 Map 任务使用的内存不超过 800M, 尽量在 500M 以下
    • 每个 Map 任务运行时间控制在大约 20 分钟,最好 1-3 分钟
    • 每个 Map 任务处理的最大数据量为一个 HDFS 块大小,一个 Map 任务处理的输入不能跨文件
    • Map 任务总数不能超过平台可用的任务槽位
  • Map 要点
    • Map 个数为 Split 份数
    • 压缩文件不可切分【通常压缩文件用于控制 Map 个数】
    • 非压缩文件和 Sequence 文件可以切分
    • dfs.block.size 决定 block 大小
  • 确定 Reduce 任务数依次优先参考如下几个原则
    • 每个 Reduce 任务使用的内存不超过 800M, 尽量在 500M 以下
    • 每个 Reduce 任务运行时间控制在大约 20 分钟,最好 1-3 分钟
    • 整个 Reduce 阶段的输入数据总量
    • 每个 Reduce 任务处理的数据量控制在 500M 以内
    • Map 任务数与 Reduce 任务数乘积
    • 输出数据要求
  • Reduce 个数设置
    • mapred.reduce.tasks 默认为 1
    • Reduce 个数太少
      • 单次执行慢
      • 出错再试成本高
    • Reduce 个数太大
      • Shuffle 开销大
      • 输出大量小文件
MapReduce

MapReduce 重要进程(HDFS1.0)

  • JobTracker
    • 主进程,负责接收客户作业提交,调度任务到作业节点上运行,并提供诸如监控工作节点状态及任务进度等管理功能,1 个 MapReduce 集群有 1 个 JobTracker,一般运行在可靠的硬件上
    • TaskTracker 是通过周期性的心跳来通知 JobTracker 其当前的健康状态,每一次心跳包含了可用的 map 和 reduce 任务数目,占用的数目及运行中任务的详细信息。JobTracker 利用一个线程池同时处理心跳和客户请求。
    • 等待 JobClient 提交作业
  • TaskTracker
    • 由 JobTracker 指派任务,实例化用户程序,在本地执行任务并周期性地向 JobTracker 汇报状态。在每一个工作节点上永远只会有 1 个 TaskTracker。
    • 每 3s 主动向 JobTracker 发送心跳询问有没有任务,如果有,让其派发任务给它执行

MapReduce 采用多进程并发

  • 优点:
    • 方便任务资源控制和调配
    • 运行稳定
  • 缺点:
    • 消耗更多的启动时间【不适合低延时作业】

MapReduce 作业提交流程

  1. 客户端 Client 提交作业请求
  2. Master 的 JobTracker 接收请求分配 Job ID
  3. 客户端在 HDFS 对应 Job ID 目录上传资源
  4. Client 向 JobTracker 正式提交任务
  5. JobTracker 对任务进行初始化
  6. JobTracker 将 HDFS 对应 Job ID 目录文件分发到各个 TaskTracker 节点
  7. TaskTracker 向 JobTracker 发送心跳
  8. TaskTracker 向 HDFS 分发 Job 资源
  9. TaskTracker 执行任务

MapReduce 作业调度

  • 默认先进先出(FIFO)队列调度模式
    • 优先级:very_high, high, normal, low, very low

Streaming

MapReduce 和 HDFS 采用 Java 实现,默认提供 Java 编程接口

Streaming 框架允许任何程序语言实现的程序在 Hadoop MapReduce 中使用

Streaming 方便已有程序向 Hadoop 平台移植

优点

  • 开发效率高
    • 方便移植 Hadoop 平台,仅需按照一定的格式从标准输入读取数据,向标准输出写数据
    • 原有单机程序稍加改动即可在 Hadoop 平台进行分布式处理
    • 容易单机调试
      • cat input | mapper | sort | reducer > output
  • 便于平台进行资源控制
    • Streaming 框架中通过 limit 等方式可以灵活地限制应用程序使用的内存等资源

缺点

  • Streaming 默认仅能处理文本数据,如要对二进制数据进行处理,比较好的方法是将二进制的 key 和 value 进行 base64 的编码转换成文本
  • 两次数据拷贝和解析(分割),带来一定开销

命令行要点

  • input
    • 指定作业的输入文件 HDFS 路径,支持使用 * 通配符,支持指定多个文件或目录,可多次使用
  • output
    • 指定作业的输出文件 HDFS 路径,路径必须不存在,并且执行作业用户需要具备创建该目录的权限,只能使用一次
  • mapper
    • 用户自己写的 Map 程序
  • reducer
    • 用户自己写的 Reduce 程序
  • file
    • 打包本地文件到提交的 Job 中
      • map 和 reduce 的执行文件
      • map 和 reduce 要用输入的文件,如配置文件
    • 类似的配置还有
      • cacheFile 提交 HDFS 文件到提交的 Job 中
      • cacheArchive 提交 HDFS 压缩文件到提交的 Job 中
  • jobconf
    • 提交作业的一些配置属性
    • 常见配置
      • mapred.map.tasksmap task 数目
      • mapred.reduce.tasksreduce task 数目
      • stream.num.map.output.key.field 指定 map task 输出记录中 key 所占的域数目
      • num.key.dields.for.partition 指定对 key 分出来的前几部分做 partition 而不是整个 key

HDFS1.0

HDFS1.0 基础

  • HDFS1.0 系统架构
    • Master
      • NameNode
        • 管理着文件系统命名空间
          • 维护着文件系统树及树中的所有文件和目录
        • 存储元数据
          • NameNode 保存元信息的种类
            • 文件名目录名及它们之间的层级关系
            • 文件目录的所有者及其权限
            • 每个文件块的名及文件有哪些块组成
        • 元数据保存在内存中
          • NameNode 元信息并不包含每个块的位置信息
        • 保存文件 / Block/DataNode 之间的映射关系
          • 文件名 -> Block
          • Block -> DataNode
        • 运行 NameNode 会占用大量内存和 I/O 资源,一般 NameNode 不会存储用户数据或执行 MapReduce 任务
        • 全 Hadoop 系统仅一个 NameNode
          • 单点问题
            • 方案 1:将 Hadoop 元数据写入到本地文件系统时同步到远程挂载的网络文件系统 NFS
            • 方案 2:运行 SecondaryNameNode 进程,持久化到磁盘
      • SecondaryNameNode
        • 并不是 NameNode,不取代 NameNode 也不是 NameNode 的备份
        • 作用是与 NameNode 交互,定期通过编辑日志文件合并命名空间镜像
        • 当 NameNode 发生故障,NameNode 会通过自己合并的命名空间镜像副本来恢复数据
        • SecondaryNameNode 保存的 NameNode 元信息总是滞后于 NameNode 的,会导致部分数据丢失
    • Worker
      • DataNode
        • 保存 Block -> Path 的映射关系
        • 负责存储数据块,负责为系统客户端提供数据块的读写服务
        • 根据 NameNode 的指示进行创建 / 删除和复制等操作
        • 心跳机制,定期报告文件块列表信息
        • DataNode 间通信,进行块的副本处理
          • 数据块
            • HDFS 默认数据块大小为 64M
            • 磁盘块一般为 512B
            • 块增大可以减少寻址时间,降低寻址时间 / 文件传输时间
            • 数据块过大导致整体任务量过小,降低作业处理速度
  • Hadoop 更倾向存储大文件
    • 一般来说,一条元信息记录会占用 200byte 内存空间。
    • 假设块大小为 64M,备份数量是 3,那么一个 1G 大小的文件将占用 16 * 3 = 48 个文件块
    • 如现在有 1000 个 1M 大小的文件,则会占用 1000 * 3 = 3000 个文件块(多个文件不能放到一个块中)
    • 如果文件越小,存储同等大小的文件所需要的元信息就越多
  • 元信息持久化
    • 在 NameNode 中存放元信息的文件是 fsimage
    • 在系统运行期间所有对元信息的操作都保存在内存中并被持久化到另一个文件 edits 中
    • fsimage 文件与 edits 文件会被 SecondaryNameNode 进程周期性合并
SecondaryNameNode

机架感知策略

默认 3 副本

  • 第一个副本,放在与客户端相同的节点(如客户端是集群外的一台机器,就随机算节点,但是系统会避免挑选太满或太忙的节点)
  • 第二个副本,放在不同机架(随机选择)的节点
  • 第三个副本,放在与第二个副本同机架但是不同节点上
  • distance
    • distance = 0, 相同 DataNode
    • distance = 2, 相同 Rack 下的不同 DataNode
    • distance = 4, 相同 IDC 下的不同 DataNode
    • distance = 6, 不同 IDC 下的 DataNode

数据完整性校验

  • 不希望在存储和处理数据时丢失或损坏任何数据

  • HDFS 会对写入的数据计算校验和,并在读取时验证校验和

  • 两种校验方法

    • 校验和
      • 检测损坏数据的常用方法时在第一次写入系统时计算数据的校验和,在通道传输过程中,如果新生成的校验和不完全匹配原始的校验和,那么数据就会被认定为是被损坏的(默认 512 字节创建 1 个校验码
    • 数据块检测程序 DataBlockScanner
      • 在 DataNode 节点上开启一个后台进程,来定期验证存储在它上的所有块,这个是防止物理介质出现损减情况而造成的数据损坏(损坏数据从其他 DataNode 拷贝)

可靠性措施

  • 一个名字节点和多个数据节点
  • 数据复制(冗余机制)
    • 存放位置(机架感知策略)
    • 并不是 3 副本写完才返回 ack,三副本中有 1 个写成功就返回 ack
  • 故障检测
    • 数据节点
      • 心跳包(检测是否宕机)
      • 块报告(安全模式下检测)
      • 数据完整性检测(校验和比较)
    • 名字节点
      • 日志文件
      • 镜像文件
  • 空间回收机制
    • Trash 目录(修改 core-site.xml)

HDFS&MapReduce 本地模式

  • Master(小集群)
    • NameNode
    • JobTracker
  • Slave
    • DataNode
    • TaskTracker

案例代码

常见实践有:

数据统计

WordCount

数据过滤(清洗)

从日志查找某一个条件等数据

除去非法数据,保留合法数据

数据格式整理

同类汇聚

多份日志,相同时间点、用户行为日志 Join

类表格文件存储中,相同主键拼接相关属性

历史的主数据与新增,修改数据合并

全局排序

混合日志,按时间排列好顺序

按某个或多个字段有序

容错框架

测试集群状态:在集群上运行一个错误代码 Job,进行观察

使用易出错的服务,365 * 24 运行

计算规模经常变化调整的服务

单进程程序,迅速提升执行计算效率

WordCount

Python3

map.py

import sys


for line in sys.stdin:
ss = line.strip().split(' ')
for word in ss:
print(word.strip() + '\t' + '1')

reduce.py

import sys


current_word = None
cnt_sum = 0

for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
word, cnt = ss

if current_word is None:
current_word = word
if current_word != word:
print(current_word.strip() + '\t' + str(cnt_sum))
current_word = word
cnt_sum = 0

cnt_sum += int(cnt)

print(current_word.strip() + '\t' + str(cnt_sum))

The_Man_of_Property.txt

“The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive instincts, and as little proof against the inroads of beauty and passion as Swithin, Soames, or even Young Jolyon. And if heroic figures, in days that never were, seem to startle out from their surroundings in fashion unbecoming to a Forsyte of the Victorian era, we may be sure that tribal instinct was even then the prime force, and that “familyand the sense of home and property counted as they do to this day, for all the recent efforts to “talk them out.”

run.sh

#!/usr/bin/env bash

# hadoop命令地址
HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"

# hadoop streaming jar包地址
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

# HDFS 输入文件路径
INPUT_FILE_PATH="/week01/01_mr_wordcount/The_Man_of_Property.txt"

# HDFS 输出文件路径
OUTPUT_PATH="/week01/01_mr_wordcount/output/python3"

# 输入文件本地路径
LOCAL_FILE_PATH="/mnt/hgfs/Code/week01/01_mr_wordcount/The_Man_of_Property.txt"
# 输入文件 HDFS上传路径
UPLOAD_PATH="/week01/01_mr_wordcount"

# 删除HDFS存在目录
${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}

# 创建HDFS 上传目录
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
# 将本地输入文件上传到HDFS目录
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH} ${UPLOAD_PATH}

# 命令行
${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-files map.py,red.py \
-input ${INPUT_FILE_PATH} \
-output ${OUTPUT_PATH} \
-mapper "python map.py" \
-reducer "python red.py" \

# -file 过时了,2.8.5用-files代替,作为可选参数需放在-input等参数前面

AllSort_1Reduce

Version 1

Python3

map_sort.py
import sys


base_count = 10000

for line in sys.stdin:
ss = line.strip().split('\t')
key, val = ss

new_key = base_count + int(key)
print(str(new_key) + '\t' + val)

red_sort.py
import sys


base_count = 10000

for line in sys.stdin:
new_key, val = line.strip().split('\t')

key = int(new_key) - base_count
print('%s\t%s' % (key, val))

a.txt
1	hadoop
3 hadoop
5 hadoop
7 hadoop
9 hadoop
b.txt
0	java
2 java
4 java
6 java
8 java
run.sh
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH_A="/week01/02_mr_allsort_1reduce/a.txt"
INPUT_FILE_PATH_B="/week01/02_mr_allsort_1reduce/b.txt"

OUTPUT_PATH="/week01/02_mr_allsort_1reduce/version1/output/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/02_mr_allsort_1reduce/a.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/02_mr_allsort_1reduce/b.txt"
UPLOAD_PATH="/week01/02_mr_allsort_1reduce/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_A}
${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_B}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH}

${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.reduces=1 \
-files map_sort.py,red_sort.py \
-input ${INPUT_FILE_PATH_A},${INPUT_FILE_PATH_B} \
-output ${OUTPUT_PATH} \
-mapper "python map_sort.py" \
-reducer "python red_sort.py" \

# 依赖MapReduce框架自身的sort功能:
# -D mapreduce.job.reduces=1
# -jobconf 可用-D 替代,作为可选参数放在-input等前面

Version 2

Python3

map_sort.py
import sys


for line in sys.stdin:
print(line.strip())

red_sort.py
import sys


for line in sys.stdin:
print(line.strip())

a.txt
1	hadoop
3 hadoop
5 hadoop
7 hadoop
9 hadoop
b.txt
0	java
2 java
4 java
6 java
8 java
run.sh
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH_A="/week01/02_mr_allsort_1reduce/a.txt"
INPUT_FILE_PATH_B="/week01/02_mr_allsort_1reduce/b.txt"

OUTPUT_PATH="/week01/02_mr_allsort_1reduce/version2/output/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/02_mr_allsort_1reduce/a.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/02_mr_allsort_1reduce/b.txt"
UPLOAD_PATH="/week01/02_mr_allsort_1reduce/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_A}
${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_B}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH}

${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D stream.num.map.output.key.fields=1 \
-D mapreduce.partition.keypartitioner.options="-k1,1" \
-D mapreduce.partition.keycomparator.options="-k1,1n" \
-D mapreduce.job.reduces=1 \
-files map_sort.py,red_sort.py \
-input ${INPUT_FILE_PATH_A},${INPUT_FILE_PATH_B} \
-output ${OUTPUT_PATH} \
-mapper "python map_sort.py" \
-reducer "python red_sort.py" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \

# 设置单reduce:
# -D mapreduce.job.reduces=1 \
# 控制分发,完成二次排序:
# -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
# 完成key排序:
# -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
# 设置分隔符位置(默认\t),分隔符之前为key,之后为value
# -D stream.num.map.output.key.fields=1 \
# 选择哪一部分做partition,-k1,1表示partition的key范围是(1,1),即第1列
# -D mapreduce.partition.keypartitioner.options="-k1,1" \
# 设置key中需要比较的字段或字节范围,-k1,1表示sort的key范围是(1,1),即第1列,n表示按数字number类型排序
# -D mapreduce.partition.keycomparator.options="-k1,1n" \

AllSort

Python3

map_sort.py

import sys


base_count = 10000

for line in sys.stdin:
ss = line.strip().split('\t')
key, val = ss

new_key = base_count + int(key)

partition_index = 1
if new_key < (10100 + 10000) / 2:
partition_index = 0

print("%s\t%s\t%s" % (partition_index, new_key, val))

red_sort.py

import sys


base_count = 10000

for line in sys.stdin:
partition_index, new_key, val = line.strip().split('\t')

key = int(new_key) - base_count
print('\t'.join([str(key), val]))

a.txt

1	hadoop
3 hadoop
5 hadoop
7 hadoop
9 hadoop

b.txt

0	java
2 java
4 java
6 java
8 java

run.sh

#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH_A="/week01/03_mr_allsort/a.txt"
INPUT_FILE_PATH_B="/week01/03_mr_allsort/b.txt"

OUTPUT_PATH="/week01/03_mr_allsort/output/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/03_mr_allsort/a.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/03_mr_allsort/b.txt"
UPLOAD_PATH="/week01/03_mr_allsort/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_A}
${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_B}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH}

${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.reduces=2 \
-D stream.num.map.output.key.fields=2 \
-D num.key.fields.for.partition=1 \
-files map_sort.py,red_sort.py \
-input ${INPUT_FILE_PATH_A},${INPUT_FILE_PATH_B} \
-output ${OUTPUT_PATH} \
-mapper "python map_sort.py" \
-reducer "python red_sort.py" \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

# 设置分隔符位置(默认\t),分隔符之前为key,之后为value,此处以前2列为key
# -D stream.num.map.output.key.fields=2 \
# 设置partition key(仅能从头顺序选取范围)用来做分发,此处以第1列做partition
# -D num.key.fields.for.partition=1 \
# 设置partition key(可选择中间范围)用来做分发
# -D mapreduce.partition.keypartitioner.options="-k1,1" \

File_Broadcast

File

Python3

map.py
import sys


def read_local_file_func(f):
word_set = set()
file_in = open(f, 'r')
for line in file_in:
word = line.strip()
word_set.add(word)
return word_set


def mapper_func(white_list_fd):
word_set = read_local_file_func(white_list_fd)

for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print("%s\t%s" % (s, 1))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)

red.py
import sys


def reducer_func():
current_word = None
count_pool = []
sum = 0

for line in sys.stdin:
word, val = line.strip().split('\t')

if current_word is None:
current_word = word

if current_word != word:
for count in count_pool:
sum += count
print("%s\t%s" % (current_word, sum))
current_word = word
count_pool = []
sum = 0

count_pool.append(int(val))

for count in count_pool:
sum += count
print("%s\t%s" % (current_word, str(sum)))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)

The_Man_of_Property.txt
“The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive instincts, and as little proof against the inroads of beauty and passion as Swithin, Soames, or even Young Jolyon. And if heroic figures, in days that never were, seem to startle out from their surroundings in fashion unbecoming to a Forsyte of the Victorian era, we may be sure that tribal instinct was even then the prime force, and that “familyand the sense of home and property counted as they do to this day, for all the recent efforts to “talk them out.”
white_list
the
run.sh
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH="/week01/04_mr_file_broadcast/The_Man_of_Property.txt"
OUTPUT_PATH="/week01/04_mr_file_broadcast/file/output/python3"

LOCAL_FILE_PATH="/mnt/hgfs/Code/week01/04_mr_file_broadcast/The_Man_of_Property.txt"
UPLOAD_PATH="/week01/04_mr_file_broadcast/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH} ${UPLOAD_PATH}

${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.reduces=2 \
-files map.py,red.py,white_list \
-input ${INPUT_FILE_PATH} \
-output ${OUTPUT_PATH} \
-mapper "python map.py mapper_func white_list" \
-reducer "python red.py reducer_func" \

cacheFile

Python3

map.py
import sys


def read_local_file_func(file):
word_set = set()
file_in = open(file, 'r')
for line in file_in:
word = line.strip()
word_set.add(word)
return word_set


def mapper_func(white_list):
word_set = read_local_file_func(white_list)

for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print("%s\t%s" % (s, 1))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)

red.py
import sys


def reducer_func():
current_word = None
cnt_sum = 0

for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
word, cnt = ss

if current_word is None:
current_word = word
if current_word != word:
print(current_word.strip() + '\t' + str(cnt_sum))
current_word = word
cnt_sum = 0

cnt_sum += int(cnt)

print(current_word.strip() + '\t' + str(cnt_sum))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)

The_Man_of_Property.txt
“The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive instincts, and as little proof against the inroads of beauty and passion as Swithin, Soames, or even Young Jolyon. And if heroic figures, in days that never were, seem to startle out from their surroundings in fashion unbecoming to a Forsyte of the Victorian era, we may be sure that tribal instinct was even then the prime force, and that “familyand the sense of home and property counted as they do to this day, for all the recent efforts to “talk them out.”
white_list
the
run.sh
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH="/week01/04_mr_file_broadcast/The_Man_of_Property.txt"
OUTPUT_PATH="/week01/04_mr_file_broadcast/cachefile/output/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/04_mr_file_broadcast/The_Man_of_Property.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/04_mr_file_broadcast/white_list"
UPLOAD_PATH_A="/week01/04_mr_file_broadcast/"
UPLOAD_PATH_B="/week01/04_mr_file_broadcast/cachefile/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH_B}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${UPLOAD_PATH_A}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH_B}

${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.reduces=2 \
-D mapreduce.job.name=cachefile_demo \
-files map.py,red.py,"hdfs://master:9000/week01/04_mr_file_broadcast/cachefile/white_list#WH" \
-input ${INPUT_FILE_PATH} \
-output ${OUTPUT_PATH} \
-mapper "python map.py mapper_func WH" \
-reducer "python red.py reducer_func" \

# 过时了
# -cacheFile "hdfs://master:9000/week01/04_mr_file_broadcast/cachefile
# /white_list#WWWHHH" \

#-cacheFile "${HDFS_FILE_PATH}#WH" \

cacheArchive

Python3

map.py
import os
import sys
import gzip


def get_file_handler(f):
file_in = open(f, 'r')
return file_in


def get_cachefile_handlers(f):
f_handlers_list = []
if os.path.isdir(f):
for fd in os.listdir(f):
f_handlers_list.append(get_file_handler(f + '/' + fd))
return f_handlers_list


def read_local_file_func(f):
word_set = set()
for cachefile in get_cachefile_handlers(f):
for line in cachefile:
word = line.strip()
word_set.add(word)
return word_set


def mapper_func(white_list_fd):
word_set = read_local_file_func(white_list_fd)

for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print("%s\t%s" % (s, 1))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)

red.py
import sys


def reducer_func():
current_word = None
count_pool = []
sum = 0

for line in sys.stdin:
word, val = line.strip().split('\t')

if current_word is None:
current_word = word

if current_word != word:
for count in count_pool:
sum += count
print("%s\t%s" % (current_word, sum))
current_word = word
count_pool = []
sum = 0

count_pool.append(int(val))

for count in count_pool:
sum += count
print("%s\t%s" % (current_word, str(sum)))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)

The_Man_of_Property.txt
“The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive instincts, and as little proof against the inroads of beauty and passion as Swithin, Soames, or even Young Jolyon. And if heroic figures, in days that never were, seem to startle out from their surroundings in fashion unbecoming to a Forsyte of the Victorian era, we may be sure that tribal instinct was even then the prime force, and that “familyand the sense of home and property counted as they do to this day, for all the recent efforts to “talk them out.”
w.tar.gz
white_list_1
the
white_list_2
a
run.sh
#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH="/week01/04_mr_file_broadcast/The_Man_of_Property.txt"
OUTPUT_PATH="/week01/04_mr_file_broadcast/cachearchive/output/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/04_mr_file_broadcast/The_Man_of_Property.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/04_mr_file_broadcast/w.tar.gz"
UPLOAD_PATH_A="/week01/04_mr_file_broadcast/"
UPLOAD_PATH_B="/week01/04_mr_file_broadcast/cachearchive/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH_B}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${UPLOAD_PATH_A}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH_B}

${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.reduces=2 \
-D mapreduce.job.name=cachearchive_demo \
-files map.py,red.py \
-archives "hdfs://master:9000/week01/04_mr_file_broadcast/cachearchive/w.tar.gz#WH.gz" \
-input ${INPUT_FILE_PATH} \
-output ${OUTPUT_PATH} \
-mapper "python map.py mapper_func WH.gz" \
-reducer "python red.py reducer_func" \

Compression

Python3

map.py

import os
import sys
import gzip


def get_file_handler(f):
file_in = open(f, 'r')
return file_in


def get_cachefile_handlers(f):
f_handlers_list = []
if os.path.isdir(f):
for fd in os.listdir(f):
f_handlers_list.append(get_file_handler(f + '/' + fd))
return f_handlers_list


def read_local_file_func(f):
word_set = set()
for cachefile in get_cachefile_handlers(f):
for line in cachefile:
word = line.strip()
word_set.add(word)
return word_set


def mapper_func(white_list_fd):
word_set = read_local_file_func(white_list_fd)

for line in sys.stdin:
ss = line.strip().split(' ')
for s in ss:
word = s.strip()
if word != "" and (word in word_set):
print("%s\t%s" % (s, 1))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)

red.py

import sys


def reducer_func():
current_word = None
count_pool = []
sum = 0

for line in sys.stdin:
word, val = line.strip().split('\t')

if current_word is None:
current_word = word

if current_word != word:
for count in count_pool:
sum += count
print("%s\t%s" % (current_word, sum))
current_word = word
count_pool = []
sum = 0

count_pool.append(int(val))

for count in count_pool:
sum += count
print("%s\t%s" % (current_word, str(sum)))


if __name__ == "__main__":
module = sys.modules[__name__]
func = getattr(module, sys.argv[1])
args = None
if len(sys.argv) > 1:
args = sys.argv[2:]
func(*args)

The_Man_of_Property.txt

“The Forsyte Saga” was the title originally destined for that part of it which is called “The Man of Property”; and to adopt it for the collected chronicles of the Forsyte family has indulged the Forsytean tenacity that is in all of us. The word Saga might be objected to on the ground that it connotes the heroic and that there is little heroism in these pages. But it is used with a suitable irony; and, after all, this long tale, though it may deal with folk in frock coats, furbelows, and a gilt-edged period, is not devoid of the essential heat of conflict. Discounting for the gigantic stature and blood-thirstiness of old days, as they have come down to us in fairy-tale and legend, the folk of the old Sagas were Forsytes, assuredly, in their possessive instincts, and as little proof against the inroads of beauty and passion as Swithin, Soames, or even Young Jolyon. And if heroic figures, in days that never were, seem to startle out from their surroundings in fashion unbecoming to a Forsyte of the Victorian era, we may be sure that tribal instinct was even then the prime force, and that “familyand the sense of home and property counted as they do to this day, for all the recent efforts to “talk them out.”

white_list_dir.tar.gz

white_list_1
the
white_list_2
a

run.sh

#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH="/week01/05_mr_compression/The_Man_of_Property.txt"
OUTPUT_PATH="/week01/05_mr_compression/run_1/output/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/05_mr_compression/The_Man_of_Property.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/05_mr_compression/white_list_dir.tar.gz"
UPLOAD_PATH="/week01/05_mr_compression/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH}
${HADOOP_CMD} fs -rm -r -skipTrash ${OUTPUT_PATH}
${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH}

# Step 1.
${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D mapreduce.job.reduces=2 \
-D mapreduce.job.name=compression_run_1_demo \
-D mapreduce.map.output.compress=true \
-D mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec \
-D mapreduce.output.fileoutputformat.compress=true \
-D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec \
-files map.py,red.py \
-archives "hdfs://master:9000/week01/05_mr_compression/white_list_dir.tar.gz#WH.gz" \
-input ${INPUT_FILE_PATH} \
-output ${OUTPUT_PATH} \
-mapper "python map.py mapper_func WH.gz" \
-reducer "python red.py reducer_func" \

Join

Python3

map_a.py

import sys


for line in sys.stdin:
key, val = line.strip().split(' ')

print("%s\t1\t%s" % (key, val))

a.txt

aaa1	123
aaa2 123
aaa3 123
aaa4 123
aaa5 123

map_b.py

import sys


for line in sys.stdin:
key, val = line.strip().split(' ')

print("%s\t2\t%s" % (key, val))

b.txt

aaa1	hadoop
aaa2 hadoop
aaa3 hadoop
aaa4 hadoop
aaa5 hadoop

red_join.py

import sys


val_1 = ""

for line in sys.stdin:
key, flag, val = line.strip().split('\t')

if flag == '1':
val_1 = val
elif flag == '2':
val_2 = val
print("%s\t%s\t%s" % (key, val_1, val_2))
val_1 = ""

run.sh

#!/usr/bin/env bash

HADOOP_CMD="/usr/local/src/hadoop-2.8.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.8.5/share/hadoop/tools/lib/hadoop-streaming-2.8.5.jar"

INPUT_FILE_PATH_A="/week01/06_mr_join/a.txt"
INPUT_FILE_PATH_B="/week01/06_mr_join/b.txt"

OUTPUT_PATH_A="/week01/06_mr_join/output/a/python3"
OUTPUT_PATH_B="/week01/06_mr_join/output/b/python3"

OUTPUT_PATH_JOIN="/week01/06_mr_join/output/join/python3"

LOCAL_FILE_PATH_A="/mnt/hgfs/Code/week01/06_mr_join/a.txt"
LOCAL_FILE_PATH_B="/mnt/hgfs/Code/week01/06_mr_join/b.txt"
UPLOAD_PATH="/week01/06_mr_join/"

${HADOOP_CMD} fs -rm -r -skipTrash ${INPUT_FILE_PATH_A} ${INPUT_FILE_PATH_B} ${OUTPUT_PATH_A} ${OUTPUT_PATH_B} ${OUTPUT_PATH_JOIN}

${HADOOP_CMD} fs -mkdir -p ${UPLOAD_PATH}
${HADOOP_CMD} fs -put ${LOCAL_FILE_PATH_A} ${LOCAL_FILE_PATH_B} ${UPLOAD_PATH}

# Step 1.
${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-files map_a.py \
-input ${INPUT_FILE_PATH_A} \
-output ${OUTPUT_PATH_A} \
-mapper "python map_a.py" \

# Step 2.
${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-files map_b.py \
-input ${INPUT_FILE_PATH_B} \
-output ${OUTPUT_PATH_B} \
-mapper "python map_b.py" \

# Step 3.
${HADOOP_CMD} jar ${STREAM_JAR_PATH} \
-D stream.num.map.output.key.fields=2 \
-D num.key.fields.for.partition=1 \
-files red_join.py \
-input ${OUTPUT_PATH_A},${OUTPUT_PATH_B} \
-output ${OUTPUT_PATH_JOIN} \
-mapper "cat" \
-reducer "python red_join.py" \

附加项目:pyweb

Python3

首先请确保使用 pip install web.py==0.40-dev1

main.py

import web
import sys

urls = (
'/', 'index',
'/test', 'test',
)

app = web.application(urls, globals())

userid_rec_dict = {}
with open('file.test', 'r') as fd:
for line in fd:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
userid = ss[0].strip()
items = ss[1].strip()
userid_rec_dict[userid] = items


class index:
def GET(self):
params = web.input()
userid = params.get('userid', '')
if userid not in userid_rec_dict:
return 'no rec!'
else:
return '\n'.join(userid_rec_dict[userid].strip().split(''))


class test:
def GET(self):
print(web.input())
return '222'


if __name__ == "__main__":
app.run()

file.test

zhangsan	1
lisi 2
wangwu 3

终端输入 python main.py 12345 启动 web 服务器

  • 如遇到报错信息

  • ``` Traceback (most recent call last): File "D:Files-packages.py", line 526, in take yield next(seq) StopIteration The above exception was the direct cause of the following exception: Traceback (most recent call last): File "D:.py", line 6, in app = web.application(urls, globals(),True) File "D:Files-packages.py", line 62, in init self.init_mapping(mapping) File "D:Files-packages.py", line 130, in init_mapping self.mapping = list(utils.group(mapping, 2)) File "D:Files-packages.py", line 531, in group x = list(take(seq, size)) RuntimeError: generator raised StopIteration


    修改Lib\site-packages\web 下的utils.py文件

    ```diff
    + try:
    yield next(seq) # 526行
    + except StopIteration:
    + return

网页打开 http://0.0.0.0:12345/ 即可访问页面

输入网址http://0.0.0.0:12345/?userid=zhangsan 页面显示 1

未来可拓展推荐系统 远程分词服务等