Python大文件处理

For Data Mining

Posted by Jiayue Cai on September 21, 2018

Last updated on 2019-3-21…

基础知识

进程与线程

  • 进程在执行过程中拥有独立的内存单元,而多个线程共享内存。
  • 进程切换时,耗费资源较大,效率要差一些。对于一些要求同时进行并且又要共享某些变量的并发操作,只能用线程,不能用进程。
  • 一个程序至少有一个进程,一个进程可以有多个线程。

Python多线程

  • Python中的多线程是假的多线程! 虽然Python解释器可以运行多个线程,但是只有一个线程在解释器中运行(时间片轮转)。
  • 对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同时只有一个线程在运行。
  • 要真正利用多核,除非重写一个不带GIL的解释器。

Python 垃圾回收

  • Python中的垃圾回收是以引用计数为主(如果一个对象的引用数为0,Python虚拟机就会回收这个对象的内存),分代收集为辅。引用计数的缺陷是循环引用的问题。

数据横向切分

from csv import DictWriter
import pandas as pd
import os

def mkSubFile(lines, head, srcName, sub):
    [des_filename, extname] = os.path.splitext(srcName)
    filename = des_filename + '_' + str(sub) + extname
    print('make file: %s' % filename)
    fout = open(filename, 'w')
    try:
        fout.writelines([head])
        fout.writelines(lines)
        return sub + 1
    finally:
        fout.close()
		
def splitByLineCount(filename, count): #count为每份的行数

    fin = open(filename, 'r')
    try:
        head = fin.readline()
        buf = []
        sub = 1
        for line in fin:
            buf.append(line)
            if len(buf) == count:
                sub = mkSubFile(buf, head, filename, sub)
                buf = []
        if len(buf) != 0:
            sub = mkSubFile(buf, head, filename, sub)
    finally:
        fin.close()

’|’分割的文件转换成csv

#读取文件,并且对用户特征预处理

path = 'data/'

if os.path.exists(path+'userFeature.csv'):
    user_feature=pd.read_csv(path+'userFeature.csv')
else:
    userFeature_data = []
    with open(path+'userFeature.data', 'r') as f:
        for i, line in enumerate(f):
            line = line.strip().split('|')
            userFeature_dict = {}
            for each in line:
                each_list = each.split(' ')
                userFeature_dict[each_list[0]] = ' '.join(each_list[1:])
            userFeature_data.append(userFeature_dict)
            if i % 100000 == 0:
                print(i)
        user_feature = pd.DataFrame(userFeature_data)
        user_feature.to_csv(path+'userFeature.csv', index=False)

下面转载自Bryan

数据分片

可以将数据分片处理的任务适合用多进程代码处理,核心思路是将data分片,对每一片数据处理返回结果(可能是无序的),然后合并。应用场景:多进程爬虫,类mapreduce任务。缺点是子进程会拷贝父进程所有状态,内存浪费严重。

import math
from multiprocessing import Pool
 
def run(data, index, size):  # data 传入数据,index 数据分片索引,size进程数

    size = math.ceil(len(data) / size)
    start = size * index
    end = (index + 1) * size if (index + 1) * size < len(data) else len(data)
    temp_data = data[start:end]
    # do something
    
    return data  # 可以返回数据,在后面收集起来
    
processor = 40
res = []
p = Pool(processor)
for i in range(processor):
    res.append(p.apply_async(run, args=(data, i, processor,)))
    print(str(i) + ' processor started !')
p.close()
p.join()
for i in res:
    print(i.get())  # 使用get获得多进程处理的结果

分文件处理

当内存受限时,不能再继续使用数据分片,因为子进程会拷贝父进程的所有状态,导致内存的浪费。这时候可以考虑先把大文件分片保存到磁盘,然后del 释放掉数据,接着在多进程处理的函数里面分别读取,这样子进程就会分别读取需要处理的数据,而不会占用大量内存。

from multiprocessing import Pool
import pandas as pd
import math
data=pd.DataFrame({'user_id':[1,2,3,4],'item_id':[6,7,8,9]})
users=pd.DataFrame(data['user_id'].unique(),columns=['user_id'])
processor=4
p=Pool(processor)
l_data = len(users)
size = math.ceil(l_data / processor)
res = []
def run(i):
    data=pd.read_csv('../data/user_'+str(i)+'.csv')
    #todo
    return data

for i in range(processor):
    start = size * i
    end = (i + 1) * size if (i + 1) * size < l_data else l_data
    user = users[start:end]
    t_data = pd.merge(data, user, on='user_id').reset_index(drop=True)
    t_data.to_csv('../data/user_'+str(i)+'.csv',index=False)
    print(len(t_data))

del data,l_data,users
for i in range(processor):
    res.append(p.apply_async(run, args=(i,)))
    print(str(i) + ' processor started !')
p.close()
p.join()
data = pd.concat([i.get() for i in res])

多进程数据共享

当需要修改共享的数据时,那么这个时候可以使用数据共享。

from multiprocessing import Process, Manager
# 每个子进程执行的函数

# 参数中,传递了一个用于多进程之间数据共享的特殊字典

def func(i, d):
    d[i] = i + 100
    print(d.values())
# 在主进程中创建特殊字典

m = Manager()
d = m.dict()
for i in range(5):
    # 让子进程去修改主进程的特殊字典
    
    p = Process(target=func, args=(i, d))
    p.start()
p.join()

------------
[100]
[100, 101]
[100, 101, 102, 103]
[100, 101, 102, 103]
[100, 101, 102, 103, 104]
---------------------