今日内容:
1- Spark的入门案例
2- Spark on Yarn环境配置
3- Spark程序 与 pyspark交互流程
4- Spark-Submit相关参数配置
1. 基于Pycharm完成PySpark入门案例
1.1 从HDFS上读取文件并实现排序
1- 上传一个words.txt文件到HDFS中
vim words.txt
输入i 进入插入模式
添加以下相关内容:
hadoop hadoop hadoop hive hive
hive hadoop hive sqoop sqoop
hadoop hue hue sqoop hive
hive hive sqoop hue zookeeper
zookeeper hive sqoop oozie
oozie oozie hive hive
上传words.txt
hdfs dfs -put words.txt /
注意: 提前开启Hadoop集群, 开启后, 必须记得要进行校验工作(jps + 页面查看)
2- 从HDFS上读取文件, 完成WordCount案例实现
from pyspark import SparkContext, SparkConf
import os
# 锁定远端版本地址, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("spark的入门案例: 从HDFS上读取文件, 实现WordCount案例")
# 1. 创建SparkContext对象
conf = SparkConf().setAppName("wd_02").setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2. 执行相关的操作
# 2.1 读取外部文件的数据
rdd_init = sc.textFile(name='hdfs://node1:8020/words.txt')
"""
得到数据:
[
'hadoop hadoop hadoop hive hive',
'hive hadoop hive sqoop sqoop',
'hadoop hue hue sqoop hive',
'hive hive sqoop hue zookeeper',
'zookeeper hive sqoop oozie',
'oozie oozie hive hive'
]
执行Map函数后的返回的内容:
[
['hadoop', 'hadoop', 'hadoop', 'hive', 'hive'],
['hive', 'hadoop', 'hive', 'sqoop', 'sqoop'],
['hadoop', 'hue', 'hue', 'sqoop', 'hive'],
['hive', 'hive', 'sqoop', 'hue', 'zookeeper'],
['zookeeper', 'hive', 'sqoop', 'oozie'],
['oozie', 'oozie', 'hive', 'hive']
]
执行flatMap函数后的返回的内容: 比map 多了一个flatten(压扁)操作
[
'hadoop', 'hadoop', 'hadoop', 'hive', 'hive',
'hive', 'hadoop', 'hive', 'sqoop', 'sqoop',
'hadoop', 'hue', 'hue', 'sqoop', 'hive',
'hive', 'hive', 'sqoop', 'hue', 'zookeeper',
'zookeeper', 'hive', 'sqoop', 'oozie',
'oozie', 'oozie', 'hive', 'hive'
]
"""
# 2.2 对数据执行切割操作
# rdd_map = rdd_init.map(lambda line: line.split(' '))
rdd_flatMap = rdd_init.flatMap(lambda line: line.split(' '))
# 2.3 将每一个单词 转换为 (单词,1)
rdd_map = rdd_flatMap.map(lambda word: (word, 1))
# 2.4 根据key进行分组聚合统计
rdd_res = rdd_map.reduceByKey(lambda agg, curr: agg + curr)
# 3. 打印结果
print(rdd_res.collect())
# 4. 释放资源
sc.stop()
精简写法:
from pyspark import SparkContext, SparkConf
import os
# 锁定远端版本地址, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("spark的入门案例: 从HDFS上读取文件, 实现WordCount案例")
# 1. 创建SparkContext对象
conf = SparkConf().setAppName("wd_02").setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2. 执行相关的操作: 链式编程
rdd_res = sc.textFile(name='hdfs://node1:8020/words.txt')\
.flatMap(lambda line:line.split(' '))\
.map(lambda word:(word,1))\
.reduceByKey(lambda agg,curr:agg+curr)
# 3- 打印结果
print(rdd_res.collect())
# 4- 释放资源
sc.stop()
从HDFS上读取数据, 并且对结果数据进行排序操作:
from pyspark import SparkContext, SparkConf
import os
# 锁定远端版本地址, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("spark的入门案例: 从HDFS上读取文件, 实现WordCount案例")
# 1. 创建SparkContext对象
conf = SparkConf().setAppName("wd_02").setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2. 执行相关的操作
# 2.1 读取外部文件的数据
rdd_init = sc.textFile(name='hdfs://node1:8020/words.txt')
# 2.2 对数据执行切割操作
# rdd_map = rdd_init.map(lambda line: line.split(' '))
rdd_flatMap = rdd_init.flatMap(lambda line: line.split(' '))
# 2.3 将每一个单词 转换为 (单词,1)
rdd_map = rdd_flatMap.map(lambda word: (word, 1))
# 2.4 根据key进行分组聚合统计
rdd_res = rdd_map.reduceByKey(lambda agg, curr: agg + curr)
# 2.5 对数据进行排序操作
# rdd_sort = rdd_res.sortBy(lambda res: res[1],ascending=False)
# rdd_res = rdd_res.map(lambda res: (res[1],res[0]))
# rdd_sort = rdd_res.sortByKey(ascending=False)
# rdd_sort = rdd_sort.map(lambda res:(res[1],res[0]))
# top: 用于获取前N个元素, 如果数据是kv类型, 默认按照key进行倒序排列 如果只有一个值, 根据这个值倒序排序
# top也支持自定义排序规则, 可以自定义排序的字段 只能倒序排序
res = rdd_res.top(5, lambda result: result[1])
# 3. 打印结果
# print(rdd_sort.collect())
print(res)
# 4. 释放资源
sc.stop()
如何将结果输出到目的地呢?
from pyspark import SparkContext, SparkConf
import os
# 锁定远端版本地址, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("spark的入门案例: 从HDFS上读取文件, 实现WordCount案例")
# 1. 创建SparkContext对象
conf = SparkConf().setAppName("wd_02").setMaster('local[*]')
sc = SparkContext(conf=conf)
# 2. 执行相关的操作
# 2.1 读取外部文件的数据
rdd_init = sc.textFile(name='hdfs://node1:8020/words.txt')
# 2.2 对数据执行切割操作
# rdd_map = rdd_init.map(lambda line: line.split(' '))
rdd_flatMap = rdd_init.flatMap(lambda line: line.split(' '))
# 2.3 将每一个单词 转换为 (单词,1)
rdd_map = rdd_flatMap.map(lambda word: (word, 1))
# 2.4 根据key进行分组聚合统计
rdd_res = rdd_map.reduceByKey(lambda agg, curr: agg + curr)
# 3. 将结果输出
print(rdd_res.collect())
rdd_res.saveAsTextFile(path='hdfs://node1:8020/wd/output/')
# 4. 释放资源
sc.stop()
总结:
排序相关的API:
sortBy(参数1,参数2):
参数1: 函数 通过函数指定按照谁来进行排序操作
参数2: 可选的 boolean类型 表示是否为升序 默认为True 表示升序
sortByKey(参数1):
参数1: 可选的 boolean类型 表示是否为升序 默认为True 表示升序
默认是根据key进行排序操作, 需要将排序的字段放置到key上
top(N,函数):
参数1: 获取前N个元素
参数2: 可选的 如果kv类型, 默认是根据key进行排序操作, 如果想根据其他排序, 可以定义函数指定
注意: 只能进行倒序排序, 而且直接将结果返回
将结果数据输出文件上: saveAsTextFile(path) 支持输出到HDFS 也支持输出到本地
文件路径协议:
本地: file:///
hdfs: hdfs://node1:8020/
相关其他API:
textFile(name=路径): 获取外部数据源的数据 支持本地和HDFS
flatMap(函数); 根据指定的函数对元素进行转换操作, 支持将一个元素转换为多个元素
map(函数); 根据指定的函数对元素进行转换操作, 支持一对一的转换操作, 传入一个 返回一个
reduceByKey(函数); 根据key进行分组操作, 将同一分组内的value数据合并为一个列表, 然后执行传入的函数
函数传入的参数有二个, 参数1表示的是agg 局部聚合结果 参数2表示遍历的每一个列表中value值
collect(): 收集, 将程序中全部的结果数据收集回来 形成一个列表 返回
1.2 基于Spark-Submit进行任务提交
对于Spark框架来说, 后续需要将自己编写的spark程序提交到相关的资源平台上, 比如说: local yarn spark集群....
spark为了方便任务的提交操作, 专门提供了一个用于进行任务提交的脚本文件: spark-submit
cd /export/server/spark/bin/
格式:
./spark-submit \
--master local | yarn | spark集群 ....
--其他的资源参数信息 (可选的 基本都是有默认值)
执行脚本 [参数]
示例:
./spark-submit \
--master local \
/export/data/workspace/gz11_pyspark_parent/_01_spark_base/src/_05_pyspark_wd_out.py
对于local模式来说, 一直通过spark-submit操作, 用户不大, 如果是连接远程, 直接右键运行, 相当于local方案
什么时候, 需要使用spark-submit local模式:
比如说: 在本地环境开发代码(不基于远端模式), 编写代码后, 需要在测试环境中运行, 此时将py脚本上传, 然后使用spark-submit运行测试
运行一个Spark应用程序,其实底层本质上有二种类型进程的, 一个是Driver进程 一个是Executor进程
Driver进程: 类似于 MR中 ApplicationMaster
主要负责: 任务的资源申请, 任务分配 任务进度的管理, 等 基本与任务相关的工作 都是交给Driver来处理
Executor程序: 执行器(理解为是一个线程池) Spark最终执行的线程都是运行在executor上, executor是可以有多个的, 每个里面都是可以有多个线程的
扫码二维码 获取免费视频学习资料
- 本文固定链接: http://phpxs.com/post/11489/
- 转载请注明:转载必须在正文中标注并保留原文链接
- 扫码: 扫上方二维码获取免费视频资料
查 看2022高级编程视频教程免费获取