注:
本文需要用到pyspark模块,请自行在PyCharm中添加。

Admin_Log






# 导包
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})

# 如果要查看RDD里面有什么内容,需要用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())
    结果

    [1, 2, 3, 4, 5]
    [1, 2, 3, 4, 5]
    ['a', 'b', 'c', 'd', 'e', 'f', 'g']
    [1, 2, 3, 4, 5]
    ['key1', 'key2']






# 导包
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# # 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
# rdd1 = sc.parallelize([1, 2, 3, 4, 5])
# rdd2 = sc.parallelize((1, 2, 3, 4, 5))
# rdd3 = sc.parallelize("abcdefg")
# rdd4 = sc.parallelize({1, 2, 3, 4, 5})
# rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})
# 
# # 如果要查看RDD里面有什么内容,需要用collect()方法
# print(rdd1.collect())
# print(rdd2.collect())
# print(rdd3.collect())
# print(rdd4.collect())
# print(rdd5.collect())

# 通过textFile方法,读取文件数据加载到Spark内,成为RDD对象
rdd = sc.textFile("E:/PycharmProjects/test.txt")
print(rdd.collect())
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
    结果

    ['hello world admin', 'hello world admin hello world admin', 'hello world admin', 'hello world admin', 'adminlog www adminlog', 'adminlogcn adminlogcn adminlogcn']








# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"


# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)


# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# 通过map方法将全部数据都乘以10
# def func(data):
#     return data * 10

# 组成新的RDD
# rdd2 = rdd.map(func)
# 链式调用,调用类型事一个时,可以一直点下去
rdd2 = rdd.map(lambda x: x *10).map(lambda x: x + 5)

# 链式调用
# rdd3 = rdd2.map(lambda x: x + 5)


print(rdd2.collect())
    结果

    [15, 25, 35, 45, 55]






# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"


# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 准备一个rdd
rdd = sc.parallelize(["hello world 666", "admin admin log adminlog", "www admin log cn"])
# 需求:将RDD数据里面的一个个单词提取出来
# rdd2 = rdd.map(lambda x: x.split(" "))
rdd2 = rdd.flatMap(lambda x: x.split(" "))
print(rdd2.collect())

sc.stop()
    结果

    ['hello', 'world', '666', 'admin', 'admin', 'log', 'adminlog', 'www', 'admin', 'log', 'cn']






# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"


# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 66), ('女', 77), ('女', 88)])
# 求男生和女生两个组的成绩之和
rdd2 = rdd.reduceByKey(lambda a, b: a + b)

print(rdd2.collect())
sc.stop()
    结果

    [('男', 187), ('女', 231)]






# 1. 构建执行环境入口对象
# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"


# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)

# 2. 读取数据文件
rdd = sc.textFile("E:/PycharmProjects/test.txt")

# 3. 取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# print(word_rdd.collect())

# 4. 将所有单词都转换成二元元组,单词为Key,Value设置为1
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
# print(word_with_one_rdd.collect())

# 5. 分组并求和
result = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
# 6. 打印输出结果
print(result.collect())

sc.stop()
    结果

    [('world', 5), ('admin', 5), ('www', 1), ('hello', 5), ('adminlog', 2), ('adminlogcn', 3)]






# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"


# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])

# 对RDD的胡数据进行过滤 [ 保留偶数 ]
rdd2 = rdd.filter(lambda num: num % 2 == 0)

print(rdd2.collect())
sc.stop()
    结果

    [2, 4]







# distinct算子功能: 对RDD数据进行去重,返回新的RDD
# 语法 rdd.distinct()   无需传参

# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"


# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 1, 2, 2, 3, 3, 5, 5, 7, 7, 8, 8, 9, 10])
# 对RDD的数据进行去重
rdd2 = rdd.distinct()
print(rdd2.collect())

sc.stop()
    结果

    [1, 2, 3, 5, 7, 8, 9, 10]






# rdd.sortBy(func,ascending=False,numPartitions=1)
# func: (T) -> U:告知按照rdd中的哪个数据进行排序,比如lambda x: x[1]表示按照rdd中的第二列元素进行排序# ascending True升序False降序
# numPartitions:用多少分区排序

# 导包
from pyspark import SparkConf, SparkContext
import os
# os.environ["PYSPARK_PYTHON"] = "D:/program files/Python310/python.exe"


# 创建SparkConf类对象
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)


# 服用单词计数模块
# 2. 读取数据文件
rdd = sc.textFile("E:/PycharmProjects/test.txt")

# 3. 取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# print(word_rdd.collect())

# 4. 将所有单词都转换成二元元组,单词为Key,Value设置为1
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
# print(word_with_one_rdd.collect())

# 5. 分组并求和
result = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
# 6. 打印输出结果
# print(result.collect())

# 对结果进行排序
# 将二元元组的1号元素返回出去,sortBy根据返回数据进行排序
# 升降序,False为降序,默认升序?
final_rdd = result.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print(final_rdd.collect())
sc.stop()
    结果

    [('world', 5), ('admin', 5), ('hello', 5), ('adminlogcn', 3), ('adminlog', 2), ('www', 1)]