pandas读取数据
pandas读取数据
csv数据读取
In [ ]:
import pandas as pd
from datetime import datetime
import time
def read_csv(from_path):
table = pd.read_csv(from_path) ##选取表
return table
def write_csv(to_path,data):
data.to_csv(to_path,index=False)
if __name__ == "__main__":
start_time = time.time() # 开始时间
path = 'C:/Users/lenovo/Desktop'
file_name ='query_hive_162768.csv'
from_path = path+"/"+file_name
print(from_path)
df = read_csv(from_path)
df.head()
end_time = time.time() #结束时间
print("程序耗时%f秒." % (end_time - start_time))
print(df.head())
Excel
Excel保存
In [ ]:
def write_xlsx(to_path,data,sheet_name):
writer = pd.ExcelWriter(to_path, engine='xlsxwriter')
data.to_excel(writer,'Sheet1',index=False)
writer.save()
if __name__ == "__main__":
start_time = time.time() # 开始时间
path = r'C:\Users\lenovo\Desktop'
file_name ='点位保存.xlsx'
sheet_name = 'Sheet1'
to_path = path+"\\"+file_name
data = df
try:
write_xlsx(to_path,data,sheet_name)
except:
print('Excel保存失败')
end_time = time.time() #结束时间
print("程序耗时%f秒." % (end_time - start_time))
程序耗时45.744701秒.
Excel 读取
In [ ]:
import re
import pandas as pd
from datetime import datetime
import time
def read_xlsx(path,sheet_name):
xlsx_file = pd.ExcelFile(path) ##路径
table = xlsx_file.parse(sheet_name) ##选取表
return table
if __name__ == "__main__":
start_time = time.time() # 开始时间
path = 'C:/Users/lenovo/Desktop'
file_name ='华东报表.xlsx'
sheet_name_list = {
'hive':'Sheet',
'email':'data',
'mysql':'Sheet4'
}
path = path+"\\"+file_name
sheet_name = sheet_name_list['hive']
#sheet_name = sheet_name_list['email']
df = read_xlsx(path,sheet_name)
print(df.head())
end_time = time.time() #结束时间
print("程序耗时%f秒." % (end_time - start_time))
MYSQL数据操作
MYSQL数据读取
In [ ]:
import pymysql
import time
import pandas as pd
def read_mysql(sql):
# 打开数据库连接
db_connection= pymysql.connect(host="127.0.0.1",port=3307,user="bi_pub_r",passwd="QX1NWqVH8YKf1OTV",db="bi_app_public" )
df = pd.read_sql(sql, con=db_connection)
# 使用 cursor() 方法创建一个游标对象 cursor
#cursor = db.cursor()
# 使用 execute() 方法执行 SQL,如果表存在则删除
# 使用预处理语句创建表
#data = cursor.execute(sql)
# 关闭数据库连接
db_connection.close()
return df
if __name__ == "__main__":
start_time = time.time() # 开始时间
sql ='select * from bi_app_public.da_goods_quality_daily limit 10'
try:
df = read_mysql(sql)
print(df.head())
except:
print('sql查询失败')
end_time = time.time() #结束时间
print("程序耗时%f秒." % (end_time - start_time))
数据存储到mysql
In [ ]:
import pymysql
from sqlalchemy import create_engine
import mysql.connector as sql
def py_mysql(table,table_name):
try:
#engine = create_engine("mysql+mysqlconnector://root:kemi1016@127.0.0.1:3306/temp?charset=utf8mb4")
engine = create_engine("mysql+pymysql://root:ps123456@127.0.0.1:3306/temp?charset=utf8mb4")
table.to_sql(name=table_name,con=engine,if_exists='replace',index=False,chunksize=10000)
print ('数据库写入成功')
except :
print ('数据库写入失败')
if __name__ == "__main__":
start_time = time.time() # 开始时间
try:
py_mysql(df,'点位1206')
except:
print('sql查询失败')
end_time = time.time() #结束时间
print("程序耗时%f秒." % (end_time - start_time))
C:\Users\lenovo\Anaconda3\lib\site-packages\pymysql\cursors.py:170: Warning: (1366, "Incorrect string value: '\\xD6\\xD0\\xB9\\xFA\\xB1\\xEA...' for column 'VARIABLE_VALUE' at row 518") result = self._query(query)
数据库写入成功 程序耗时3.457196秒.
循环读取目录文件
循环读取目录Csv文件
In [ ]:
import os,re
import pandas as pd
dir_path = 'C:/Users/lenovo/Desktop/inv_2'
#col = [] ##选取需要的字段,看了下不是所有的字段都需要的
#df = pd.DataFrame([],columns=col)
df = pd.DataFrame([])
for root,dirs,files in os.walk(dir_path):##文件夹的路径
if files: ##判断是否有文件
for file_name in files: ##循环文件的名称
if '.csv' in file_name: #判定是不是文件是否有o2o_order结尾的文件,是的就继续,不是的就退出了;
path = os.path.join(root,file_name)
print('正在处理的文件是%s'%(path))
try:
data = pd.read_table(path,sep=',') ##路径,注意指定文件的分隔符号
df_tmp = pd.DataFrame(data)
print(df_tmp.head())
df = pd.concat([df,df_tmp], ignore_index=True) ## 数据合并
print(df.shape)
except:
print('读取文件,处理数据失败')
else:
print('warning:非.csv文件不读取')
#print(df.head())
#保存到本地
out_path= 'C:/Users/lenovo/Desktop/inv_2/output-1.xlsx'
writer = pd.ExcelWriter(out_path, engine='xlsxwriter')
df.to_excel(writer,'Sheet1')
writer.save()
循环读取目录Xlsx文件
In [ ]:
import os,re
import pandas as pd
dir_path = 'C:/Users/lenovo/Desktop/email'
#col = [] ##选取需要的字段,看了下不是所有的字段都需要的
#df = pd.DataFrame([],columns=col)
df = pd.DataFrame([])
for root,dirs,files in os.walk(dir_path):##文件夹的路径
if files: ##判断是否有文件
for file_name in files: ##循环文件的名称
if '.xls' in file_name: #判定是不是文件是否有o2o_order结尾的文件,是的就继续,不是的就退出了;
path = os.path.join(root,file_name)
print('正在处理的文件是%s'%(path))
try:
xlsx_file = pd.ExcelFile(path) ##路径
data = xlsx_file.parse('data') ##选取表
df_tmp = pd.DataFrame(data)
print(df_tmp.head())
df = pd.concat([df,df_tmp], ignore_index=True, sort=True) ## 数据合并
print(df.shape)
except:
print('读取文件,处理数据失败')
else:
print('warning:非.xlsx文件不读取')
#print(df.head())
#保存到本地
out_path= 'C:/Users/lenovo/Desktop/email/output-1.xlsx'
writer = pd.ExcelWriter(out_path, engine='xlsxwriter')
df.to_excel(writer,'Sheet1')
writer.save()
Mongodb操作
读取mongodb to DataFrame数据框
In [ ]:
#mongo to datafreme
import pandas as pd
from sqlalchemy import create_engine
def read_mongb(db_name,table_name):
from pymongo import MongoClient
client = MongoClient('67.216.204.220', 27017)
db = client[db_name]
table = db[table_name]
data = pd.DataFrame(pd.DataFrame(list(table.find())))
return data
if __name__ == "__main__":
start_time = time.time() # 开始时间
db_name = 'spyder'
table_name = 'dongchediapp_motor_car_show_get_sells_rank'
try:
df = read_mongb(db_name=db_name,table_name=table_name)
print(df[['month','series_name','count']].head())
except:
print('sql查询失败')
end_time = time.time() #结束时间
print("程序耗时%f秒." % (end_time - start_time))
month series_name count 0 201808 朗逸 38946 1 201808 轩逸 38357 2 201808 捷达 30363 3 201808 卡罗拉 28975 4 201808 速腾 27407 程序耗时2.623511秒.
将pandas 数据集存入到 mongodb数据中
简要介绍:将pandas 数据集存入到 mongodb数据中
存在即更新,不存在则插入 pandas to mongodb · STAY REAL
In [ ]:
import pandas as pd
import pymongo
from pymongo import MongoClient
from bson.json_util import loads
def insert_many(collection, docs=None, update=True):
if not docs:
return
# $set 的时候, 会更新数据, setOnInsert只插入不更新
update_key = "$set" if update else "$setOnInsert"
bulk = pymongo.bulk.BulkOperationBuilder(collection, ordered=False)
for i in docs:
if i.get(index_name) != None:
bulk.find({index_name: i[index_name]}).upsert().update_one({update_key: i})
else:
bulk.insert(i)
result = bulk.execute()
return result
def df2db(df):
# 写入数据库
# client = pymongo.MongoClient()
client = MongoClient('67.216.204.220', 27017)
db = client[db_name]
collection = db[collection_name]
try:
collection.index_information()
except pymongo.errors.OperationFailure:
# 索引Sample_ID
collection.create_index(index_name, unique=True)
data = loads(df.T.to_json()).values()
rs = insert_many(collection, data)
print("-" * 30+'success'+"-" * 30)
if __name__ == "__main__":
start_time = time.time() # 开始时间
db_name = "pt"
collection_name = "tmp_test_dataframe_to_mongodb"
index_name = "unique_id" #唯一表示项
try:
df2db(df)
except:
print('失败')
end_time = time.time() #结束时间
print("程序耗时%f秒." % (end_time - start_time))
------------------------------success------------------------------ 程序耗时1.970120秒.
读取剪贴板数据
In [ ]:
# 读取数据
import pandas as pd
df=pd.read_clipboard()
read_json读取json数据
In [ ]:
import json
json_data = [{'name':'Wang','sal':50000,'job':'VP'},\
{'name':'Zhang','job':'Manager','report':'VP'},\
{'name':'Li','sal':5000,'report':'IT'}]
print('json数据',type(json.dumps(json_data)))
df = pd.read_json(json.dumps(json_data))
df = df.reindex(columns=['name','job','sal','report'])
df
json数据 <class 'str'>
Out[ ]:
name | job | sal | report | |
---|---|---|---|---|
0 | Wang | VP | 50000.0 | NaN |
1 | Zhang | Manager | NaN | VP |
2 | Li | NaN | 5000.0 | IT |
读取Lists and Dictionaries数据
pd.DataFrame.from_dict()
pd.DataFrame.from_records()
不显示科学计数法
In [ ]:
pd.set_option('precision', 5) #设置精度
pd.set_option('display.float_format', lambda x: '%.5f' % x) #为了直观的显示数字,不采用科学计数法
pd.options.display.max_rows = 200 #最多显示200行