注:本文所用数据均来自百度开源数据
数据会在下方展示下载按钮,请自行提取。
本文中使用Python – 面向对象 – 数据分析 – 每日销售额项目中的data_define.pyfile_define.py进行复写。
本文需要用到pymysql模块,请自行在PyCharm中添加。

Admin_Log







项目介绍:

将文件内的数据写入到MySQL中,后续将写入MySQL中的数据读取并保存到本地。
实现步骤:
1. 设计一个类,可以完成数据的封装
2. 设计一个抽象类,定义文件读取的相关功能,并使用子类实现具体操作
3. 读取文件,生产数据对象
4. 将数据写入到MySQL中
5. 将MySQL中的数据读取并保存到本地
代码模块分析:
data_define.py:数据定义封装类
file_define.py:文件定义封装类
main.py:主逻辑代码[ 数据写入MySQL ]
read_write.py:读取MySQL数据将数据写出到本地

Admin_log

 

 


 

 

python操作MySQL写入数据

python操作MySQL写出数据






"""
数据定义的类
"""


class Record:

    def __init__(self, data, order_id, money, province):
        self.date = data  # 订单日期
        self.order_id = order_id  # 订单ID
        self.money = money  # 订单金额
        self.province = province  # 销售省份

    def __str__(self):
        return f"{self.date}, {self.order_id}, {self.money}, {self.province}"






"""
和文件相关的类定义
"""
import json

# 先定义一个抽象类,用来做顶层设计,确定有哪些功能需要实现
from data_define import Record


class FileReader:

    def read_data(self) -> list[Record]:
        """读取文件数据,读到的每一条数据都转换为Record对象,将它们都封装到list内返回即可"""
        pass


class TextFileReader(FileReader):

    def __init__(self, path):
        self.path = path  # 定义成员变量,记录文件的路径

    # 复写(实现抽象方法)父类的方法
    def read_data(self) -> list[Record]:
        f = open(self.path, "r", encoding="UTF-8")

        record_list: list[Record] = []
        for line in f.readlines():
            # print(line)
            line = line.strip()  # 消除读取到的每一行数据中的"\n"
            # print(line)
            data_list = line.split(",")
            # print(data_list)
            record = Record(data_list[0], data_list[1], int(data_list[2]), data_list[3])
            record_list.append(record)

        f.close()
        return record_list


class JsonFileReader(FileReader):

    def __init__(self, path):
        self.path = path  # 定义成员变量,记录文件的路径

    def read_data(self) -> list[Record]:
        f = open(self.path, "r", encoding="UTF-8")

        record_list: list[Record] = []
        for line in f.readlines():
            data_dict = json.loads(line)
            record = Record(data_dict["date"], data_dict["order_id"], int(data_dict["money"]), data_dict["province"])
            record_list.append(record)
            # print(record_list)
        f.close()
        return record_list

if __name__ == '__main__':
    text_file_reader = TextFileReader("E:/PycharmProjects/面向对象数据/2011年1月销售数据.txt")
    json_file_reader = JsonFileReader("E:/PycharmProjects/面向对象数据/2011年2月销售数据JSON.txt")
    list1 = text_file_reader.read_data()
    list2 = json_file_reader.read_data()
    for l in list1:
        print(l)

    # print("==========")

    for l in list2:
        print(l)






"""
SQL综合案例,读取文件,写入到MySQL数据库中
"""
import pymysql

from file_define import FileReader, TextFileReader, JsonFileReader
from data_define import Record
# from pyecharts.charts import Bar
# from pyecharts.options import *
# from pyecharts.globals import ThemeType
from pymysql import Connection


text_file_reader = TextFileReader("E:/PycharmProjects/面向对象数据/2011年1月销售数据.txt")
json_file_reader = JsonFileReader("E:/PycharmProjects/面向对象数据/2011年2月销售数据JSON.txt")

jan_data: list[Record] = text_file_reader.read_data()
feb_data: list[Record] = json_file_reader.read_data()

# 将两个月份的数据合并为一个list存储
all_data: list[Record] = jan_data + feb_data
# print(all_data)


# 构建MySQL链接对象
conn = Connection(
    host='localhost',   # 主机名(IP)
    port=3306,          # 端口
    user='root',        # 账户(用户名)
    password='123456',  # 密码
    autocommit=True     # 自动提交(确认)(事务)
)
# 获得游标对象
cursor = conn.cursor()
# 选择数据库
conn.select_db('py_sql')
# 组织SQL语句
for record in all_data:
    sql = f"insert into orders(order_date, order_id, money, province) "\
          f"values('{record.date}', '{record.order_id}', {record.money}, '{record.province}')"
    # print(sql)
    cursor.execute(sql)
# 执行SQL语句

# 关闭MySQL链接对象
conn.close()






import json

from pymysql import Connection

conn = Connection(
    host="localhost",
    port=3306,
    user="root",
    password="123456",
)

cursor = conn.cursor()
conn.select_db("py_sql")
cursor.execute("select * from orders")
data_tuple = cursor.fetchall()

f = open("E:\PycharmProjects\Python操作MySQL基础使用\读取写出操作.txt", "a", encoding="UTF-8")

data_dict = {}
for record in data_tuple:
    data_dict["data"] = str(record[0])
    data_dict["order_id"] = record[1]
    data_dict["money"] = int(record[2])
    data_dict["province"] = record[3]
    f.write(json.dumps(data_dict, ensure_ascii=False))
    f.write("\n")

f.close()

conn.close()






下载信息

Admin_Log