Last updated on 2019-3-21…
基础知识
进程与线程
- 进程在执行过程中拥有独立的内存单元,而多个线程共享内存。
- 进程切换时,耗费资源较大,效率要差一些。对于一些要求同时进行并且又要共享某些变量的并发操作,只能用线程,不能用进程。
- 一个程序至少有一个进程,一个进程可以有多个线程。
Python多线程
- Python中的多线程是假的多线程! 虽然Python解释器可以运行多个线程,但是只有一个线程在解释器中运行(时间片轮转)。
- 对Python虚拟机的访问由
全局解释器锁(GIL)
来控制,正是这个锁能保证同时只有一个线程在运行。 - 要真正利用多核,除非重写一个不带GIL的解释器。
Python 垃圾回收
- Python中的垃圾回收是以引用计数为主(如果一个对象的引用数为0,Python虚拟机就会回收这个对象的内存),分代收集为辅。引用计数的缺陷是循环引用的问题。
数据横向切分
from csv import DictWriter
import pandas as pd
import os
def mkSubFile(lines, head, srcName, sub):
[des_filename, extname] = os.path.splitext(srcName)
filename = des_filename + '_' + str(sub) + extname
print('make file: %s' % filename)
fout = open(filename, 'w')
try:
fout.writelines([head])
fout.writelines(lines)
return sub + 1
finally:
fout.close()
def splitByLineCount(filename, count): #count为每份的行数
fin = open(filename, 'r')
try:
head = fin.readline()
buf = []
sub = 1
for line in fin:
buf.append(line)
if len(buf) == count:
sub = mkSubFile(buf, head, filename, sub)
buf = []
if len(buf) != 0:
sub = mkSubFile(buf, head, filename, sub)
finally:
fin.close()
’|’分割的文件转换成csv
#读取文件,并且对用户特征预处理
path = 'data/'
if os.path.exists(path+'userFeature.csv'):
user_feature=pd.read_csv(path+'userFeature.csv')
else:
userFeature_data = []
with open(path+'userFeature.data', 'r') as f:
for i, line in enumerate(f):
line = line.strip().split('|')
userFeature_dict = {}
for each in line:
each_list = each.split(' ')
userFeature_dict[each_list[0]] = ' '.join(each_list[1:])
userFeature_data.append(userFeature_dict)
if i % 100000 == 0:
print(i)
user_feature = pd.DataFrame(userFeature_data)
user_feature.to_csv(path+'userFeature.csv', index=False)
下面转载自Bryan
数据分片
可以将数据分片处理的任务适合用多进程代码处理,核心思路是将data分片,对每一片数据处理返回结果(可能是无序的),然后合并。应用场景:多进程爬虫,类mapreduce任务。缺点是子进程会拷贝父进程所有状态,内存浪费严重。
import math
from multiprocessing import Pool
def run(data, index, size): # data 传入数据,index 数据分片索引,size进程数
size = math.ceil(len(data) / size)
start = size * index
end = (index + 1) * size if (index + 1) * size < len(data) else len(data)
temp_data = data[start:end]
# do something
return data # 可以返回数据,在后面收集起来
processor = 40
res = []
p = Pool(processor)
for i in range(processor):
res.append(p.apply_async(run, args=(data, i, processor,)))
print(str(i) + ' processor started !')
p.close()
p.join()
for i in res:
print(i.get()) # 使用get获得多进程处理的结果
分文件处理
当内存受限时,不能再继续使用数据分片,因为子进程会拷贝父进程的所有状态,导致内存的浪费。这时候可以考虑先把大文件分片保存到磁盘,然后del 释放掉数据,接着在多进程处理的函数里面分别读取,这样子进程就会分别读取需要处理的数据,而不会占用大量内存。
from multiprocessing import Pool
import pandas as pd
import math
data=pd.DataFrame({'user_id':[1,2,3,4],'item_id':[6,7,8,9]})
users=pd.DataFrame(data['user_id'].unique(),columns=['user_id'])
processor=4
p=Pool(processor)
l_data = len(users)
size = math.ceil(l_data / processor)
res = []
def run(i):
data=pd.read_csv('../data/user_'+str(i)+'.csv')
#todo
return data
for i in range(processor):
start = size * i
end = (i + 1) * size if (i + 1) * size < l_data else l_data
user = users[start:end]
t_data = pd.merge(data, user, on='user_id').reset_index(drop=True)
t_data.to_csv('../data/user_'+str(i)+'.csv',index=False)
print(len(t_data))
del data,l_data,users
for i in range(processor):
res.append(p.apply_async(run, args=(i,)))
print(str(i) + ' processor started !')
p.close()
p.join()
data = pd.concat([i.get() for i in res])
多进程数据共享
当需要修改共享的数据时,那么这个时候可以使用数据共享。
from multiprocessing import Process, Manager
# 每个子进程执行的函数
# 参数中,传递了一个用于多进程之间数据共享的特殊字典
def func(i, d):
d[i] = i + 100
print(d.values())
# 在主进程中创建特殊字典
m = Manager()
d = m.dict()
for i in range(5):
# 让子进程去修改主进程的特殊字典
p = Process(target=func, args=(i, d))
p.start()
p.join()
------------
[100]
[100, 101]
[100, 101, 102, 103]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]
---------------------