在整理数据,处理数据上。对于大规模数据分析,相较于hadoop来说,spark是个更为方便的工具。今天为大家带来python基础编程例子之PySpark,希望对大家的工作和学习有帮助。
基本概念介绍
首先介绍一下spark中常见的基本概念:
RDD:弹性分布式数据集的简称,是一个分布式对象集合,「本质上是一个只读的分区记录集合。不能直接修改,只能通过一定的转换操作(map, reduce, join, group by)来创建新的RDD。」
DAG:有向无环图,反应了RDD之间的依赖关系。
Executor:一个进程,负责运行任务。
Application:用户编写的spark应用程序。
Task:运行在Excutor上的工作单元。
Job:一个job包含多个RDD以及对应的RDD上的各种操作。
Stage:作业的基本调度单位。一个作业会被分为多组Task,每组任务称为一个stage。
其中,RDD是一种高度受限的内存模型,一次只能对RDD全集进行修改。听完上述说明,大家可能理解起来很抽象,接下来我将介绍RDD编程模型,并通过程序例子来说明,方便大家理解。
RDD编程例子
1. 从文件系统中加载数据并转化成RDD格式
下面的例程可以将文本文件转化成RDD数据格式读入,便于Spark对RDD数据并行处理。
from pyspark import SparkConf, SparkContext
sc = SparkContext()
# 可以通过sc.textFiles来将text文件转化成RDD格式的数据。
# 如果是本地文件, 要加上 "file:///"
lines = sc.textFiles("file:///usr/local/sparl/example.txt")
# 下面三条语句是完全等价的
lines = sc.textFiles("hdfs://localhost:9000/user/hadoop/example.txt")
lines = sc.textFiles("/user/hadoop/example.txt")
lines = sc.textFiles("example.txt")
lines.foreach(print)
2. 将数组转化成RDD格式
array = [1, 2, 3, 4, 5]
# 通过sc.parallelize将数组转化成RDD格式
rdd = sc.parallelize(array)
rdd.foreach(print)
#1
#2
#3
#4
#5
3. RDD操作:Transformation
1. Filter
lines = sc.parallelize(['Spark is very fast', 'My name is LiLei'])
# 筛选出含有“Spark”的行,操作为并行。
linesWithSpark = lines.filter(lambda line: "Spark" in line)
# 每行并行打印
linesWithSpark.foreach(print)
# Spark is very fast
2. Map
lines = sc.parallelize(['Spark is very fast', 'My name is LiLei'])
# 每一行通过map并行处理。
words = lines.map(lambda line:line.split(" "))
words.foreach(print)
# ['Spark', 'is', 'very', 'fast']
# ['My', 'name', 'is', 'LiLie']
3. groupByKey
words = sc.parallelize([("Hadoop",1),("is",1),("good",1), \
("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
# groupByKey() 应用于 (K,V) 键值对的数据集时, 返回一个新的 (K, Iterable) 形式的数据集
words1 = words.groupByKey()
words1.foreach(print)
#('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
#('better', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e80>)
#('fast', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
#('good', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
#('Spark', <pyspark.resultiterable.ResultIterable object at 0x7fb210552f98>)
#('is', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e10>)
4. reduceByKey
words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), \
("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
# reduceByKey:相同的key通过指定操作进行聚合,下方代码利用求和进行聚合
words1 = words.reduceByKey(lambda a,b:a+b)
words1.foreach(print)
#('good', 1)
#('Hadoop', 1)
#('better', 1)
#('Spark', 2)
#('fast', 1)
#('is', 3)
4. RDD操作:Action
由于Spark的惰性机制,当RDD通过Transformation操作,直到遇到Action操作后,才会执行真正的计算, 从文件中加载数据, 完成一次又一次Transformation操作, 最终, 完成Action操作得到结果。
rdd = sc.parallelize([1,2,3,4,5])
## rdd的数量
rdd.count()
#5
## 第一行rdd
rdd.first()
#1
## 前三行rdd
rdd.take(3)
#[1, 2, 3]
rdd.reduce(lambda a,b:a+b)
#15
## 以数组的形式返回rdd中所有元素
rdd.collect()
#[1, 2, 3, 4, 5]
rdd.foreach(lambda elem:print(elem))
总结
通过将输入(文件,数组)转化成RDD,并将多个简单的Transformation和Action操作进行串联,Spark可以高效的完成很多复杂数据的处理。同时,在完成大规模的数据处理后,我们也可以利用Spark中内置的机器学习算法来对这些大规模的数据进行学习和建模。想要了解更多Python教程欢迎持续关注编程学习网
扫码二维码 获取免费视频学习资料
- 本文固定链接: http://www.phpxs.com/post/8515/
- 转载请注明:转载必须在正文中标注并保留原文链接
- 扫码: 扫上方二维码获取免费视频资料