文章目录
- python常用数据存储方法
- txt
- json
- csv
- msyql
- 安装pymysql
- 连接
- 建库建表
- 插入
- 删除
- 修改
- 查询
- mongodb
- 安装
- 连接
- 建库建文档
- 增
- 删
- 改
- 查
- redis
- 安装
- 连接
- 操作
- 公用方法
- 字符串
- 列表
- 集合
- 有序集合
- 散列
- 其他
- sqlalchemy
- postgresql
- mysql
- oracle
- Microsoft SQL Server
- sqlite
python常用数据存储方法
txt
with
open
(
'test.txt'
,
'w'
,
encoding
=
'utf8'
)
as
f
:
f
.
write
(
'sdfasdf'
+
'\n'
)
json
object = json.loads(jsonstr) : json字符串 转化为 python对象
str = json.dumps(objects,indent=2) :python对象 转化为 json字符串
注意:输出中文要设置ensure_ascii为False,还要注意编码
with
open
(
'test.txt'
,
'w'
,
encoding
=
'utf8'
)
as
f
:
f
.
write
(
json
.
dumps
(
objects
,
indent
=
2
,
ensure_ascii
=
False
)
)
操作文件
import
json
with
open
(
'jiuba.json'
,
'w'
,
encoding
=
'utf8'
)
as
f
:
json
.
dump
(
res
.
json
(
)
,
f
,
indent
=
2
,
ensure_ascii
=
False
)
with
open
(
'jiuba.json'
,
encoding
=
'utf8'
)
as
f
:
jiuba_dict
=
json
.
load
(
f
)
csv
用csv模块写,用pandas.read_csv() 读取很方便
import
csv
from
pandas
import
read_csv
with
open
(
'csv.csv'
,
'w'
,
encoding
=
'utf8'
)
as
f
:
writer
=
csv
.
writer
(
f
,
delimiter
=
','
)
writer
.
writerow
(
[
'id'
,
'name'
,
'age'
]
)
writer
.
writerows
(
(
[
'1'
,
'bob'
,
'12'
]
,
[
'2'
,
'amy'
,
'21'
]
)
)
# 读写字典格式
with
open
(
'dict.csv'
,
'w'
,
encoding
=
'utf8'
)
as
f
:
field_headers
=
[
'id'
,
'name'
,
'age'
]
writer
=
csv
.
DictWriter
(
f
,
fieldnames
=
field_headers
)
writer
.
writeheader
(
)
writer
.
writerow
(
{
'id'
:
3
,
'name'
:
'Mike'
,
'age'
:
22
}
)
df
=
read_csv
(
'csv.csv'
)
print
(
df
)
msyql
安装pymysql
pip install PyMySQL
To use “sha256_password” or “caching_sha2_password” for authenticate,
you need to install additional dependency:
pip install PyMySQL[rsa]
连接
单实例连接
db = pymysql.connect(host='127.0.0.1', port=3306,
user='root', password='huajian')
cursor = db.cursor()
建库建表
cursor
.
execute
(
f
'create database if not exists {db_name};'
)
cursor
.
execute
(
f
'use {db_name};'
)
cursor
.
execute
(
f
'create table if not exists {table_name}(id varchar(20), sex varchar(30),name varchar(20), age int, primary key (id));'
)
插入
data
=
{
'id'
:
'465674646'
,
'name'
:
'Mike'
,
'age'
:
25
,
'sex'
:
'Male'
}
# ================================================
# insert ignore 使用唯一约束情况下,存在则不插入,保持原来数据
# =================================================
table_name
=
'xxxx'
keys
=
','
.
join
(
data
.
keys
(
)
)
values
=
','
.
join
(
map
(
lambda
x
:
f
'"{x}"'
,
data
.
values
(
)
)
)
sql
=
f
'insert ignore into {table_name}({keys}) values({values});'
try
:
cursor
.
execute
(
sql
)
db
.
commit
(
)
except
:
db
.
rollback
(
)
# =================================================
# insert update 使用唯一约束情况下,存在则替换,更换为新数据
# =================================================
keys
=
','
.
join
(
data
.
keys
(
)
)
values
=
','
.
join
(
map
(
lambda
x
:
f
'"{x}"'
,
data
.
values
(
)
)
)
update_sql
=
','
.
join
(
[
f
'{k} = "{v}"'
for
k
,
v
in
data
.
items
(
)
]
)
sql
=
f
'insert into {table_name}({keys}) values({values}) on duplicate key update {update_sql};'
try
:
cursor
.
execute
(
sql
)
db
.
commit
(
)
except
:
db
.
rollback
(
)
cursor
.
close
(
)
db
.
close
(
)
删除
修改
查询
sql
=
'select * from xtable;'
cursor
.
execute
(
sql
)
res
=
cursor
.
fetchall
(
)
# res = cursor.fetchone()
……
# =======================================================
# 下面是逐条获取
# =======================================================
sql
=
'select * from xtabel;'
try
:
cursor
.
execute
(
sql
)
print
(
'Count:'
,
cursor
.
rowcount
)
row
=
cursor
.
fetchone
(
)
print
(
row
)
while
row
:
row
=
cursor
.
fetchone
(
)
print
(
row
)
except
:
print
(
'error'
)
mongodb
安装
pip install --upgrade pymongo
连接
client
=
pymongo
.
MongoClient
(
host
=
'localhost'
,
port
=
27017
)
建库建文档
db = client.test_db
collection = db.test_collection
增
collection.insert_one({……})
collection.insert_many({……},{……})
删
collection.remove({……})
改
# 正常情况:查找,然后修改
# 下面这个,去重插入,
collection.update_one({'id': data1['id']}, {'$set': data1}, True)
查
collection.find()
collection.find_one()
collection.find().count()
redis
安装
pip install redis hiredis
连接
连接池不需要管关闭的事情
import
redis
pool
=
redis
.
ConnectionPool
(
host
=
'localhost'
,
port
=
6379
,
db
=
15
,
decode_responses
=
True
)
r
=
redis
.
StrictRedis
(
connection_pool
=
pool
)
操作
公用方法
方法 | 作用 | 例子 |
---|---|---|
exists() | 判断是否存在 | |
delete() | 删除一个键 | |
type() | 判断键类型 | |
keys(pattern) | 获取所有符合规则的键 | keys(*) |
randomkey() | 随机获取一个键 | |
rename(src,dst) | ||
dbsize() | ||
expire(name,time) | ||
ttl() | ||
move(name) | 把键移动到其他数据库 | |
flushdb() | 清空当前数据库 | |
flushall() | 清空所有数据库 | |
expire() |
字符串
方法 | 作用 | 例子 |
---|---|---|
set | ||
get | ||
getset() | ||
mget() | ||
mset() | ||
incr(name,amount=1) | ||
decr(name, amount=1) | ||
append(key,value) | ||
列表
方法 | 作用 | 例子 |
---|---|---|
rpush() | 末尾添加一个元素 | |
lpush() | ||
lpop() | ||
rpop() | ||
llen(name) | ||
lrange(name,start,end) | ||
lindex(name,index) | ||
lset(name,index,value) | lset(‘list’,1,5) 索引为1的元素,赋值为5 | |
lrem(name.count,value) | lrem(‘list’,2,3) 删除两个3 |
集合
方法 | 作用 | 例子 |
---|---|---|
sadd(setname,*values) | ||
srem(setname,*values) | ||
spop(setname) | ||
smove(src,dst,value) | 把value移动到另外一个集合中去 | |
scard(name) | 获取元素个数 | |
sismember(setname,value) | 判断 | |
sinter([setname,setname,……]) | 返回交集 | |
sunion([setname,setname,……]) | 返回并集 | |
sdiff([setname,setname,……]) | 返回差集 | |
smembers(setname) | 查看全部元素 | |
srandmember(setname) | 随即返回一个元素,不删除 | |
有序集合
方法 | 作用 | 例子 |
---|---|---|
zadd() | zadd(‘grade’,100,‘bob’,98,‘mike’) | |
zrem() | zrem(‘grade’,‘Mike’) | |
zincrby() | zincrby(‘grade’,‘bob’,-2) | |
zrank(setname,value) | 返回名次,从小到大排序 | zrank(‘grade’,‘amy’) |
zrevrank(setname,value) | 返回名词,从大到小排序 | |
zrevrange(setname,start,end) | zrevrange(‘grade’,0,3) | |
zrangebyscore(setname,start,end) | zrangebyscore(‘grade’,85,100) | |
zcount(setname,min,max) | min->max之间的个数 | |
zcard(setname) | 返回个数 | |
zremrangebyrank(setname,min,max) | 删除 | |
zremrangebyscore(setname,min,max) | 删除元素 | |
散列
方法 | 作用 | 例子 |
---|---|---|
hset() | hset(‘price’,‘cake’,5) | |
hget() | ||
hmset() | ||
hmget() | ||
hincrby() | ||
hexists() | ||
hdel() | ||
hlen() | ||
hkeys() | ||
hvals() | ||
hgetall() |
其他
sqlalchemy
dialect+driver://username:password@host:port/database
例如:
from
sqlalchemy
import
create_engine
engine
=
create_engine
(
"mysql://scott:tiger@hostname/dbname"
,
encoding
=
'latin1'
,
echo
=
True
,
pool_size
=
10
)
例子:
from
sqlalchemy
import
create_engine
DB_NAME
=
'only_test'
TABLE_NAME
=
'students'
DB_URI
=
f
'mysql://root:huajian@localhost:3306/{DB_NAME}'
engine
=
create_engine
(
DB_URI
,
encoding
=
'utf8'
,
echo
=
True
)
conn
=
engine
.
connect
(
)
sql_cre_table
=
f
'''
create table if not exists {TABLE_NAME}(
id int(11) auto_increment,
name varchar(20),
age int,
grade int,
primary key (id)
)
'''
sql_insert_1
=
f
'insert ignore into {TABLE_NAME}(name,age,grade) values ("bob",21,98);'
sql_insert_2
=
f
'insert ignore into {TABLE_NAME}(name,age,grade) values ("amy",21,98);'
# 一次性commit
with
conn
.
begin
(
)
as
trans
:
conn
.
execute
(
sql_cre_table
)
conn
.
execute
(
sql_insert_1
)
conn
.
execute
(
sql_insert_2
)
res
=
conn
.
execute
(
'select count(*) from region_new;'
)
for
i
in
res
:
print
(
i
)
trans
.
commit
(
)
postgresql
# default
engine = create_engine('postgresql://scott:tiger@localhost/mydatabase')
# psycopg2
engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase')
# pg8000
engine = create_engine('postgresql+pg8000://scott:tiger@localhost/mydatabase')
mysql
# default
engine = create_engine('mysql://scott:tiger@localhost/foo')
# mysqlclient (a maintained fork of MySQL-Python)
engine = create_engine('mysql+mysqldb://scott:tiger@localhost/foo')
# PyMySQL
engine = create_engine('mysql+pymysql://scott:tiger@localhost/foo')
oracle
engine = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname')
engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname')
Microsoft SQL Server
# pyodbc
engine = create_engine('mssql+pyodbc://scott:tiger@mydsn')
# pymssql
engine = create_engine('mssql+pymssql://scott:tiger@hostname:port/dbname')
sqlite
engine = create_engine('sqlite:///foo.db')
# to use a SQLite :memory: database, specify an empty URL:
engine = create_engine('sqlite://')