Python – 深度学习系列38 重塑实体识别5-预测并行化改造

作者 : admin 本文共7611个字,预计阅读时间需要20分钟 发布时间: 2024-06-9 共4人阅读

说明

在重塑实体识别4中梳理了数据流,然后我发现pipeline的串行效率太低了,所以做了并行化改造。里面还是有不少坑的,记录一下。

内容

1 pipeline

官方的pipeline看起来的确是比较好用的,主要是实现了比较好的数据预处理。因为在训练/使用过程中都要进行数据的令牌化与反令牌化,有些字符会被特殊处理,例如 ‘##A’等。
Python – 深度学习系列38 重塑实体识别5-预测并行化改造插图
在使用过程中,我用200条新闻数据进行测试,用pipeline方法花了11分钟处理完毕,期间CUDA的使用率大约为10%。按此估算,即使用多接口并行的方式,那么一分钟最多处理2000条,一天最多处理0.14*2000~30万条数据。这个效率太低了。

2 并行化

最终的结论是不到30秒处理200条,显存只占用2.6G,理论上可以支持3个服务并行(以确保GPU的完全利用)。按最保守的估计,改造后的并行化应该可以提升3倍的效率,稍微激进一点,可以提升10倍的效率。这个之后可以进行测试。

一些主要的点如下

2.1 结果解析

结果可以分为:

  • 1 仅含解析出的实体列表,用逗号连接字符串表示。
  • 2 含实体及其起始位置的表示,这个用于标注反馈、二次增强处理。
  • 3 仅含BIO标签,主要用于和测试数据进行效果比对。

对应的相关函数,看起来有点繁杂,我自己都不太想看第二眼。

from datasets import ClassLabel
# 定义标签列表
label_list = ['B', 'I', 'O']
# 创建 ClassLabel 对象
class_label = ClassLabel(names=label_list)
def convert_entity_label_batch(x):
x1 = x
return class_label.int2str(x1)
# 定义函数将整数Tensor转换为字符串 | 反令牌函数,但是用不上;因为predict label列表的长度和 ss_padding相同
def tensor_to_string(tensor, tokenizer = None , skip_special_tokens = True):
return tokenizer.decode(tensor.tolist(), skip_special_tokens=skip_special_tokens).replace(' ','')
from datasets import ClassLabel
def detokenize(word_piece):
"""
将 WordPiece 令牌还原为原始句子。
"""
if word_piece.startswith('##'):
x = word_piece[2:]
else:
x = word_piece
return x
import re
def extract_bio_positions(bio_string):
pattern = re.compile(r'B(I+)(O|$)')
matches = pattern.finditer(bio_string)
results = []
for match in matches:
start, end = match.span()
results.append((start, end - 1))  # end-1 to include the last 'I'
return results
# 0.1ms
def parse_ent_pos_map_batch(some_dict = None):
word_list = some_dict['token_words']
label_list = [int(x) for x in list(some_dict['label_list'])]
min_len = min(len(word_list),len(label_list))
word_list = word_list[:min_len]
label_list = label_list[:min_len]
label_list1 =  list(map(convert_entity_label_batch,label_list))
oriword_list1 = list(map(detokenize,word_list))
ori_word_str =''.join(oriword_list1)
# 补到等长
label_str = ''
for i in range(len(label_list1)):
len_of_ori_word = len(oriword_list1[i])
if len_of_ori_word == 1:
tem_str = label_list1[i]
else:
if label_list1[i] in ['I','O']:
tem_str = label_list1[i] * len_of_ori_word
else:
tem_str = 'B' + 'I' * (len_of_ori_word -1)        
label_str += tem_str
pos_list = extract_bio_positions(label_str)
part_ent_list = [(ori_word_str[x[0]:x[1]] , *x) for x in pos_list]
return part_ent_list
# =============
def make_BIO_by_len(some_len):
default_str = 'I' * some_len
str_list = list(default_str)
str_list[0] ='B'
return str_list
def gen_BIO_list2(some_dict):
the_content = some_dict['clean_data']
ent_list =  some_dict['ent_tuple_list']
content_list = list(the_content)
tag_list = list('O'* len(content_list))
for ent_info in ent_list:
start = ent_info[1]
end = ent_info[2]
label_len = end-start
tem_bio_list = make_BIO_by_len(label_len)
tag_list[start:end] = tem_bio_list
res_dict = {}
res_dict['x'] = content_list
res_dict['y'] = tag_list
return res_dict
def trim_len(some_dict = None):
padding_BIO = some_dict['padding_BIO']
ss_len = some_dict['ss_len']
return padding_BIO[:ss_len]

2.2 批量预测

看起来同样很繁杂,但是不得不细看。首先,数据会按照几个长度 20,50,198分为三部分处理,batch_predict每次仅处理一个批次。在这里,将数据转为定长的令牌长度,然后转入CUDA进行批量预测。结果再按照实体-位置 tuple, 实体列表和BIO三种方式进行解析。

from functools import partial
import transformers 
import torch 
from transformers import AutoModelForMaskedLM, AutoTokenizer,AutoModelForTokenClassification
from functools import partial
# some_batch 是原文经过padding的数据,['ss_hash','ss','ss_len', 'ss_padding'], 其中ss_padding的长度是固定的
# 模型文件和令牌文件都放在model_path之下,model比较大,避免重载;而tokenize会有padding过程,必须重载
# 模型先载入cuda
def batch_predict(some_batch, ss_padding_len = None, model = None, model_path = None):
# 因为tokenize会在令牌的前后加上分隔令牌,所以+2
if ss_padding_len is None:
ss_padding_len = some_batch['ss_padding'].apply(len).max()
print('ss_padding_len is %s ' % ss_padding_len)
max_len = ss_padding_len+2
tokenizer = AutoTokenizer.from_pretrained(model_path)
tencoder = partial(tokenizer.encode,truncation=True, max_length=max_len, is_split_into_words=True, return_tensors="pt",  padding='max_length')
some_batch['ss_padding_token'] = some_batch['ss_padding'].apply(list).apply(tencoder)
# 构成矩阵
minput = torch.cat(list(some_batch['ss_padding_token'].values))
# 将数据搬到GPU中处理再返回
with torch.no_grad():
input_cuda = minput.to(device)
outputs_cuda = model(input_cuda).logits
predictions = torch.argmax(outputs_cuda, dim=2)
predictions_list = list(predictions.to('cpu').numpy())
predict_list1 = []
for predictions in predictions_list:
tem_pred_tag = [int(x) for x in predictions[1:-1]]
predict_list1.append(tem_pred_tag)
some_batch['label_list'] = predict_list1
_s = cols2s(some_df =some_batch, cols= ['ss_padding','label_list'], cols_key_mapping= ['token_words', 'label_list'])
_s1 = _s.apply(parse_ent_pos_map_batch)
some_batch['ent_tuple_list'] = list(_s1)
some_batch['ent_list'] = some_batch['ent_tuple_list'].apply(lambda x: ','.join([a[0] for a in x ]))
_s = cols2s(some_batch, cols= ['ss_padding', 'ent_tuple_list'], cols_key_mapping= ['clean_data', 'ent_tuple_list'])
s1 = _s.apply(gen_BIO_list2)
ent_tuple_res_df1 = pd.DataFrame(s1.to_list())
some_batch['padding_BIO'] = list(ent_tuple_res_df1['y'].apply(lambda x: ''.join(x)))
_s00 = cols2s(some_batch, cols = ['ss_len', 'padding_BIO'], cols_key_mapping=['ss_len', 'padding_BIO'])
some_batch['BIO'] = list(_s00.apply(trim_len))
return some_batch    

3 迭代器

在推送数据处理时,可以采用迭代器来控制不同的批次数据

# 迭代器切分
import pandas as pd
class DataFrameBatchIterator:
def __init__(self, dataframe, batch_size):
self.dataframe = dataframe
self.batch_size = batch_size
# 【我增加的】
self.fail_batch_list = []
def __iter__(self):
num_rows = len(self.dataframe)
num_batches = (num_rows - 1) // self.batch_size + 1
for i in range(num_batches):
start_idx = i * self.batch_size
end_idx = (i + 1) * self.batch_size
batch_data = self.dataframe.iloc[start_idx:end_idx]
yield batch_data
# 【我增加的】
def clear_fail(self):
self.fail_batch_list = []
# 【我增加的】
def get_some_batch(self, batch_idx):
return self.dataframe.iloc[self.batch_size * batch_idx: self.batch_size * (batch_idx + 1)]
# 【我增加的】记录失败的批次
def rec_fail_batch_idx(self, batch_idx):
self.fail_batch_list.append(batch_idx)
# 创建一个示例 DataFrame
data = {'Name': ['John', 'Jane', 'Mike', 'Alice', 'Bob'],
'Age': [25, 30, 35, 28, 32],
'City': ['New York', 'Paris', 'London', 'Tokyo', 'Sydney']}
df = pd.DataFrame(data)
# 创建 DataFrame 迭代器
batch_iterator = DataFrameBatchIterator(df, batch_size=2)
import tqdm
# 使用迭代器逐批次处理数据
for i,batch in tqdm.tqdm(enumerate(batch_iterator)):
try:
# 在这里可以对当前批次的数据进行相应的操作
# 例如进行数据清洗、特征处理、模型训练等
# 示例:打印当前批次的数据
#         raise Exception(e) 
print(batch)
except:
print('>>> %s Fail' % i)
batch_iterator.rec_fail_batch_idx(i)

以下是实际的调度

# 假设处理长度为1万的句子
# 20 * 2000 ~ 4w
# 50 * 800 ~  4w
# 200 * 200 ~ 4w
import warnings 
warnings.filterwarnings('ignore')
batch_slice_para = {20:2000, 50:800, 200:200}
batch_len_list = sorted(list(batch_slice_para.keys()))
batch_len_list.insert(0,0)
batch_df_list = []
for i in range(len(batch_len_list)):
if i >0:
sel = (ss_df['ss_len'] >=batch_len_list[i-1]) & (ss_df['ss_len'] < batch_len_list[i])
if sel.sum():
padding_len = batch_len_list[i]
padding_batch = batch_slice_para[padding_len]
tem_df= ss_df[sel]
# tem_df['ss_padding'] = tem_df['ss'].apply(lambda x: x.ljust(padding_len,'a'))
tem_df['ss_padding'] = tem_df['ss']
tem_df_iterator = DataFrameBatchIterator(tem_df, padding_batch)
batch_df_list.append(tem_df_iterator)
else:
batch_df_list.append(None)

对每个批次执行处理,载入模型

label_list = ['B','I','O']
model_checkpoint = 'model03'
model = AutoModelForTokenClassification.from_pretrained(model_checkpoint, num_labels=len(label_list))
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Device: %s' % device)
# 自动切换设备
if model.device.type != device:
model.to(device)
print('>>> 检测到模型设备与当前指定不一致,切换 %s' % device )
else:
print('>>> 模型设备一致,不切换 %s' % device)

分批次预测(主要是确保显存不溢出)

batch_res_list = []
for some_iter in batch_df_list:
for some_batch in some_iter:
batch_res = batch_predict(some_batch, model = model, model_path = 'model03')
batch_res_list.append(batch_res)

结果合并

batch_res_df = pd.concat(batch_res_list, ignore_index= True)
mdf = pd.merge(input_df , batch_res_df[['ss_hash', 'ent_list']],how='left', on ='ss_hash')

Python – 深度学习系列38 重塑实体识别5-预测并行化改造插图(1)

4 总结

一个在理论上证明可以显著提升效率的点在于,模型进行实体识别时先切分了短句,然后按短句进行了去重:相同短句的实体结果一定是相同的。

实验中,200条新闻产生了约5万个短句,去重后只剩下约3.5万。所以即使在这一步也是有提升的。当然,这种方式同样也可以被用于pipeline。

还有就是在处理填充时,并不按照最大长度统一填充。而是按照句子长度的统计特性分为了短、中、长三种方式。从统计上看,约70%的短句长度是在20个字符以内的,真正超过50个字符的短句(中间无分隔符),即使从语法上来看也是比较奇怪的。
这样在填充数据时浪费就比pipeline要小,同样显存可以装下更多的数据。

本站无任何商业行为
个人在线分享 » Python – 深度学习系列38 重塑实体识别5-预测并行化改造
E-->