注:
Admin_Log
本文需要用到pyspark模块,请自行在PyCharm中添加。
数据输入
# 导包
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
# 如果要查看RDD里面有什么内容,需要用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
- 结果
[1, 2, 3, 4, 5] [1, 2, 3, 4, 5] ['a', 'b', 'c', 'd', 'e', 'f', 'g'] [1, 2, 3, 4, 5] ['key1', 'key2']
数据输入 – 文件输入
# 导包
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# # 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
# rdd1 = sc.parallelize([1, 2, 3, 4, 5])
# rdd2 = sc.parallelize((1, 2, 3, 4, 5))
# rdd3 = sc.parallelize("abcdefg")
# rdd4 = sc.parallelize({1, 2, 3, 4, 5})
# rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
#
# # 如果要查看RDD里面有什么内容,需要用collect()方法
# print(rdd1.collect())
# print(rdd2.collect())
# print(rdd3.collect())
# print(rdd4.collect())
# print(rdd5.collect())
# 通过textFile方法,读取文件数据加载到Spark内,成为RDD对象
rdd = sc.textFile("E:/PycharmProjects/test.txt")
print(rdd.collect())
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
- 结果
['hello world admin', 'hello world admin hello world admin', 'hello world admin', 'hello world admin', 'adminlog www adminlog', 'adminlogcn adminlogcn adminlogcn']
map方法
# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10
# def func(data):
# return data * 10
# 组成新的RDD
# rdd2 = rdd.map(func)
# 链式调用,调用类型事一个时,可以一直点下去
rdd2 = rdd.map(lambda x: x *10).map(lambda x: x + 5)
# 链式调用
# rdd3 = rdd2.map(lambda x: x + 5)
print(rdd2.collect())
- 结果
[15, 25, 35, 45, 55]
flatmap方法
# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 准备一个rdd
rdd = sc.parallelize(["hello world 666", "admin admin log adminlog", "www admin log cn"])
# 需求:将RDD数据里面的一个个单词提取出来
# rdd2 = rdd.map(lambda x: x.split(" "))
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())
sc.stop()
- 结果
['hello', 'world', '666', 'admin', 'admin', 'log', 'adminlog', 'www', 'admin', 'log', 'cn']
reduceByKey方法
# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 66), ('女', 77), ('女', 88)])
# 求男生和女生两个组的成绩之和
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())
sc.stop()
- 结果
[('男', 187), ('女', 231)]
单词计数综合案例
# 1. 构建执行环境入口对象
# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 2. 读取数据文件
rdd = sc.textFile("E:/PycharmProjects/test.txt")
# 3. 取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# print(word_rdd.collect())
# 4. 将所有单词都转换成二元元组,单词为Key,Value设置为1
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
# print(word_with_one_rdd.collect())
# 5. 分组并求和
result = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
# 6. 打印输出结果
print(result.collect())
sc.stop()
- 结果
[('world', 5), ('admin', 5), ('www', 1), ('hello', 5), ('adminlog', 2), ('adminlogcn', 3)]
filter方法
# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 对RDD的胡数据进行过滤 [ 保留偶数 ]
rdd2 = rdd.filter(lambda num: num % 2 == 0)
print(rdd2.collect())
sc.stop()
- 结果
[2, 4]
distinct方法
# distinct算子功能: 对RDD数据进行去重,返回新的RDD
# 语法 rdd.distinct() 无需传参
# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 准备一个RDD
rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 5, 5, 7, 7, 8, 8, 9, 10])
# 对RDD的数据进行去重
rdd2 = rdd.distinct()
print(rdd2.collect())
sc.stop()
- 结果
[1, 2, 3, 5, 7, 8, 9, 10]
sortBy方法
# rdd.sortBy(func,ascending=False,numPartitions=1)
# func: (T) -> U:告知按照rdd中的哪个数据进行排序,比如lambda x: x[1]表示按照rdd中的第二列元素进行排序# ascending True升序False降序
# numPartitions:用多少分区排序
# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 服用单词计数模块
# 2. 读取数据文件
rdd = sc.textFile("E:/PycharmProjects/test.txt")
# 3. 取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# print(word_rdd.collect())
# 4. 将所有单词都转换成二元元组,单词为Key,Value设置为1
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
# print(word_with_one_rdd.collect())
# 5. 分组并求和
result = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
# 6. 打印输出结果
# print(result.collect())
# 对结果进行排序
# 将二元元组的1号元素返回出去,sortBy根据返回数据进行排序
# 升降序,False为降序,默认升序?
final_rdd = result.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print(final_rdd.collect())
sc.stop()
- 结果
[('world', 5), ('admin', 5), ('hello', 5), ('adminlogcn', 3), ('adminlog', 2), ('www', 1)]



