为什么要用DataCollator
!!!以下的Transformers源码来自于4.41。
我们在使用Trainer时,需要传入一个DataCollator,如
trainer = Trainer(
model=model,
args=training_args,
train_dataset=ds_train,
eval_dataset=ds_val,
tokenizer=tokenizer,
data_collator=data_collator,
)
什么是DataCollator?我们先看官方的定义。
Data collators are objects that will form a batch by using a list of
dataset elements as input. These elements are of the same type as the
elements of train_dataset
or eval_dataset
.
通过这句话可以知道,DataCollator的作用是将在数据集中的样本根据需求整合成一个Batch作为训练的输入。
各种DataCollator
在使用Huggingface的Trainer进行训练or微调时,我们需要传入一个data_collator: Optional[DataCollator] = None
作为Trainer的数据的批量整合器。由于是Optinal,当然也可以不指定,我们看源码
default_collator = (
DataCollatorWithPadding(tokenizer)
if tokenizer is not None and isinstance(tokenizer, (PreTrainedTokenizerBase, SequenceFeatureExtractor))
else default_data_collator
)
self.data_collator = data_collator if data_collator is not None else default_collator
所以,若不指定DataCollator的话,在初始化Trainer时若传入了Tokenizer,那么会用DataCollatorWithPadding
,否则使用default_data_collator
。
DefaultDataCollator
我们先来看default_data_collator
,只用于处理Dict-like的对象。且默认所有样本的长度相同,不一样则报错。一般不用。
def torch_default_data_collator(features: List[InputDataClass]) -> Dict[str, Any]:
import torch
# 1
if not isinstance(features[0], Mapping):
features = [vars(f) for f in features]
first = features[0]
# 2
batch = {}
# 3
if "label" in first and first["label"] is not None:
label = first["label"].item() if isinstance(first["label"], torch.Tensor) else first["label"]
dtype = torch.long if isinstance(label, int) else torch.float
batch["labels"] = torch.tensor([f["label"] for f in features], dtype=dtype)
# 4
elif "label_ids" in first and first["label_ids"] is not None:
if isinstance(first["label_ids"], torch.Tensor):
batch["labels"] = torch.stack([f["label_ids"] for f in features])
else:
dtype = torch.long if isinstance(first["label_ids"][0], int) else torch.float
batch["labels"] = torch.tensor([f["label_ids"] for f in features], dtype=dtype)
#5. Handling of all other possible keys.
# Again, we will use the first element to figure out which key/values are not None for this model.
for k, v in first.items():
if k not in ("label", "label_ids") and v is not None and not isinstance(v, str):
if isinstance(v, torch.Tensor):
batch[k] = torch.stack([f[k] for f in features])
elif isinstance(v, np.ndarray):
batch[k] = torch.tensor(np.stack([f[k] for f in features]))
else:
batch[k] = torch.tensor([f[k] for f in features])
return batch
简单来说,会帮你做这些事:
- 首先检查样本是否是Mapping对象,若不是则用vars函数返回样本的字典形式,也就是返回obj.__dict__,即所有的属性名和值的Dict。
- 初始化一个Batch字典用于返回。
- 检查样本是否含有label键,有的话提取所有样本的label值并添加到batch字典中。
- 若没有label键,但有label_ids,则将所有的label_ids堆叠成一个新的tensor。
这里有一个问题,为什么处理label是直接将所有样本的label组成一个新的tensor
list,而label_ids则是用了stack函数?答案是在约定俗成的命名规则中,label是单标签样本,本身label是一个单独的值,而label_ids,意味着这是一个多标签样本,那么labels本身是一个List,所以需要用stack函数,而不是直接组成一个dim只有1的Tensor。同样我们注意到,batch的标签键最后都被命名为label
。
举一个例子,假设样本是双标签,label1=[0,1],label2=[1,0]经过stack后会变成[[0,1],[1,0]]。最终会变成一个Shape是(样本数,标签数)的tensor。
5.处理完了label和label_ids,剩下的键值对中只处理数值类型的。所以这里的样本应该是经过tokenizer过或数据预处理过的。将剩下的数值类型的值进行堆叠。
DataCollatorWithPadding
若指定了tokenizer但没有指定DataCollator,那么就会调用DataCollatorWithPadding
来作为数据批量处理器。
class DataCollatorWithPadding:
"""
Data collator that will dynamically pad the inputs received.
Args:
tokenizer ([`PreTrainedTokenizer`] or [`PreTrainedTokenizerFast`]):
The tokenizer used for encoding the data.
padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `True`):
Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
among:
- `True` or `'longest'` (default): Pad to the longest sequence in the batch (or no padding if only a single
sequence is provided).
- `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum
acceptable input length for the model if that argument is not provided.
- `False` or `'do_not_pad'`: No padding (i.e., can output a batch with sequences of different lengths).
max_length (`int`, *optional*):
Maximum length of the returned list and optionally padding length (see above).
pad_to_multiple_of (`int`, *optional*):
If set will pad the sequence to a multiple of the provided value.
This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
7.5 (Volta).
return_tensors (`str`, *optional*, defaults to `"pt"`):
The type of Tensor to return. Allowable values are "np", "pt" and "tf".
"""
tokenizer: PreTrainedTokenizerBase
padding: Union[bool, str, PaddingStrategy] = True
max_length: Optional[int] = None
pad_to_multiple_of: Optional[int] = None
return_tensors: str = "pt"
def __call__(self, features: List[Dict[str, Any]]) -> Dict[str, Any]:
batch = pad_without_fast_tokenizer_warning(
self.tokenizer,
features,
padding=self.padding,
max_length=self.max_length,
pad_to_multiple_of=self.pad_to_multiple_of,
return_tensors=self.return_tensors,
)
if "label" in batch:
batch["labels"] = batch["label"]
del batch["label"]
if "label_ids" in batch:
batch["labels"] = batch["label_ids"]
del batch["label_ids"]
return batch
从名字可以看到,这个整合器应该是做了pad的任务,调用了一个Call函数来返回处理后的样本。实际上就一句比较重要的语句.
def pad_without_fast_tokenizer_warning(tokenizer, *pad_args, **pad_kwargs):
padded = tokenizer.pad(*pad_args, **pad_kwargs)
也就是调用了pad函数,返回padding之后的tensor。
总之,DataCollatorWithPadding
会为tokenize后的序列添加pad,使得整个batch的序列长度一致,如果选择padding=longest
,那么会根据最长的序列来进行padding。
from transformers import DefaultDataCollator,DataCollatorWithPadding,DataCollatorForLanguageModeling,AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('/data/wtf/model/llama-2-7b-hf',padding_side='left')
input_text = ["hello world","hello hello hello hello","hello,My name is "]
c1 = DefaultDataCollator()
c2 = DataCollatorWithPadding(tokenizer)
c3 = DataCollatorForLanguageModeling(tokenizer,mlm=False)
# 直接调用tokenizer的话,每一个样本的长度不同
tokenizer(input_text) # {'input_ids': [[1, 22172, 3186], [1, 22172, 22172, 22172, 22172], [1, 22172, 29892, 3421, 1024, 338, 29871]], 'attention_mask': [[1, 1, 1], [1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1, 1]]}
c1(input_text) # 直接报错
tok_input = tokenizer(input_text)
c2(input_text)
--------可以看到对齐了
{'input_ids': tensor([[ 0, 0, 0, 0, 1, 22172, 3186],
[ 0, 0, 1, 22172, 22172, 22172, 22172],
[ 1, 22172, 29892, 3421, 1024, 338, 29871]]), 'attention_mask': tensor([[0, 0, 0, 0, 1, 1, 1],
[0, 0, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1, 1]])}
decode后的结果:
['<unk><unk><unk><unk><s> hello world',
'<unk><unk><s> hello hello hello hello',
'<s> hello,My name is ']
我们看一个例子:
from transformers import DefaultDataCollator,DataCollatorWithPadding,DataCollatorForLanguageModeling,AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('/data/wtf/model/llama-2-7b-hf',padding_side='left')
input_text = ["hello world","hello hello hello hello","hello,My name is "]
c1 = DefaultDataCollator()
c2 = DataCollatorWithPadding(tokenizer)
c3 = DataCollatorForLanguageModeling(tokenizer,mlm=False)
# 直接调用tokenizer的话,每一个样本的长度不同
tokenizer(input_text)
"""
{'input_ids': [[1, 22172, 3186], [1, 22172, 22172, 22172, 22172], [1, 22172, 29892, 3421, 1024, 338, 29871]], 'attention_mask': [[1, 1, 1], [1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1, 1]]}
"""
c1(input_text) # 直接报错
"""
DataCollatorWithPadding的输入是整个tokenizers.Encoding对象,可以把tokenizer处理后的对象直接放进去
"""
tok_input = tokenizer(input_text)
c2(input_text)
"""返回对齐后的结果
{'input_ids': tensor([[ 0, 0, 0, 0, 1, 22172, 3186],
[ 0, 0, 1, 22172, 22172, 22172, 22172],
[ 1, 22172, 29892, 3421, 1024, 338, 29871]]), 'attention_mask': tensor([[0, 0, 0, 0, 1, 1, 1],
[0, 0, 1, 1, 1, 1, 1],
[1, 1, 1, 1, 1, 1, 1]])}
decode回去,可以看到长度相同
['<unk><unk><unk><unk><s> hello world',
'<unk><unk><s> hello hello hello hello',
'<s> hello,My name is ']
"""
DataCollatorForLanguageModeling
接下来常用的DataCollatorForLanguageModeling
。
class DataCollatorForLanguageModeling(DataCollatorMixin):
"""
Data collator used for language modeling. Inputs are dynamically padded to the maximum length of a batch if they
are not all of the same length.
Args:
tokenizer ([`PreTrainedTokenizer`] or [`PreTrainedTokenizerFast`]):
The tokenizer used for encoding the data.
mlm (`bool`, *optional*, defaults to `True`):
Whether or not to use masked language modeling. If set to `False`, the labels are the same as the inputs
with the padding tokens ignored (by setting them to -100). Otherwise, the labels are -100 for non-masked
tokens and the value to predict for the masked token.
mlm_probability (`float`, *optional*, defaults to 0.15):
The probability with which to (randomly) mask tokens in the input, when `mlm` is set to `True`.
pad_to_multiple_of (`int`, *optional*):
If set will pad the sequence to a multiple of the provided value.
return_tensors (`str`):
The type of Tensor to return. Allowable values are "np", "pt" and "tf".
参数列表中带有MLM,说明可以用于MLM任务。若mlm=True,那么除了需要预测的标签,其余标签都不参与损失的计算,label全都设置为-100,被mask的则是正常的标签值。若mlm=False,pad的laebl值设置为-100。当mlm=True的情况下,tokenizer需要设置mask_token。
接下来我们看主要的几个函数。
torch_call
是处理函数,首先也会判断样本是否是字典形式,然后做pad并返回tensor形式的样本,这一段和DataCollatorWithPadding
所做的一模一样,只是没有删除label键。然后取出mask_token,然后是否采取了mlm策略来处理样本的input_ids
,最终返回处理后的tensor。若不采取mlm策略,那么就把padtoken的位置用-100替代,表示不计算损失。
def torch_call(self, examples: List[Union[List[int], Any, Dict[str, Any]]]) -> Dict[str, Any]:
# Handle dict or lists with proper padding and conversion to tensor.
if isinstance(examples[0], Mapping):
batch = pad_without_fast_tokenizer_warning(
self.tokenizer, examples, return_tensors="pt", pad_to_multiple_of=self.pad_to_multiple_of
)
else:
batch = {
"input_ids": _torch_collate_batch(examples, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of)
}
# If special token mask has been preprocessed, pop it from the dict.
special_tokens_mask = batch.pop("special_tokens_mask", None)
if self.mlm:
batch["input_ids"], batch["labels"] = self.torch_mask_tokens(
batch["input_ids"], special_tokens_mask=special_tokens_mask
)
else:
labels = batch["input_ids"].clone()
if self.tokenizer.pad_token_id is not None:
labels[labels == self.tokenizer.pad_token_id] = -100
batch["labels"] = labels
return batch
torch_mask_tokens
接收input_ids
和mask_token
,做了以下的工作。
- 创建概率矩阵,和输入同形状。
- 若没有输入mask
token,那么首先回生成一个mask列表。
special_tokens_mask = [1 if token in all_special_ids else 0 for token in token_ids_0]
。所以1表示特殊符号,正常token以0填充。最后转换为bool形式的tensor。若输入了,则直接返回他的bool形式。 - 概率矩阵的特殊符号位置以0填充,不参与损失计算。
- 按伯努利分布生成mask索引。
- 概率矩阵非mask的索引位置全部用-100填充,表示不计算损失。
- 默认情况下 (其实就是bert的操作),80%的mask索引,其索引的值被替换成mask_token。要求预测原token。10%被替换成随机token。剩下10%不变。
def torch_mask_tokens(self, inputs: Any, special_tokens_mask: Optional[Any] = None) -> Tuple[Any, Any]:
"""
Prepare masked tokens inputs/labels for masked language modeling: 80% MASK, 10% random, 10% original.
"""
import torch
labels = inputs.clone()
# We sample a few tokens in each sequence for MLM training (with probability `self.mlm_probability`)
probability_matrix = torch.full(labels.shape, self.mlm_probability)
# 生成mask list。
if special_tokens_mask is None:
special_tokens_mask = [
self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels.tolist()
]
special_tokens_mask = torch.tensor(special_tokens_mask, dtype=torch.bool)
else:
special_tokens_mask = special_tokens_mask.bool()
probability_matrix.masked_fill_(special_tokens_mask, value=0.0)
masked_indices = torch.bernoulli(probability_matrix).bool()
labels[~masked_indices] = -100 # We only compute loss on masked tokens
# 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
indices_replaced = torch.bernoulli(torch.full(labels.shape, 0.8)).bool() & masked_indices
inputs[indices_replaced] = self.tokenizer.convert_tokens_to_ids(self.tokenizer.mask_token)
# 10% of the time, we replace masked input tokens with random word
indices_random = torch.bernoulli(torch.full(labels.shape, 0.5)).bool() & masked_indices & ~indices_replaced
random_words = torch.randint(len(self.tokenizer), labels.shape, dtype=torch.long)
inputs[indices_random] = random_words[indices_random]
# The rest of the time (10% of the time) we keep the masked input tokens unchanged
return inputs, labels
综上,相比于DataCollatorWithPadding
,DataCollatorForLanguageModeling
具备以下功能:
- 可以做MLM任务。
DataCollatorWithPadding
并没有对pad的token进行特殊操作,而DataCollatorForLanguageModeling
则将padtoken通过设置为-100从而不计算损失。
还是看一个例子:
from transformers import DefaultDataCollator,DataCollatorWithPadding,DataCollatorForLanguageModeling,AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('/data/wtf/model/llama-2-7b-hf',padding_side='left')
input_text = ["hello world","hello hello hello hello","hello,My name is "]
c3 = DataCollatorForLanguageModeling(tokenizer,mlm=False)
注意,输入不可以是tokenizers.Encoding对象,而是要把input_ids
单独拿出来,否则会报错'tokenizers.Encoding' object has no attribute 'size'
。
tok_text = tokenizer(input_text)
# dc = c3(tok_text) 会报错
dc = c3(tok_text['input_ids'])
print(dc)
------------------------
"""结果,可以看到在mlm=False的情况下,pad_id由于是特殊token,直接置为-100,不计算损失
{'input_ids': tensor([[ 0, 0, 0, 0, 1, 22172, 3186],
[ 0, 0, 1, 22172, 22172, 22172, 22172],
[ 1, 22172, 29892, 3421, 1024, 338, 29871]]), 'labels': tensor([[ -100, -100, -100, -100, 1, 22172, 3186],
[ -100, -100, 1, 22172, 22172, 22172, 22172],
[ 1, 22172, 29892, 3421, 1024, 338, 29871]])}
"""
若mlm=True
,在tokenizer没有mask_token的情况下,需要指定。
tokenizer.mask_token = "<mask>"
c3 = DataCollatorForLanguageModeling(tokenizer,mlm=True)
tok_text = tokenizer(input_text)
t = tokenizer.batch_decode(tok_text['input_ids'])
dc = c3(tok_text['input_ids'])
------------------
"""labels不等于-100的就是被mask的token
{'input_ids': tensor([[ 0, 0, 0, 0, 1, 0, 3186],
[ 0, 0, 1, 22172, 22172, 22172, 22172],
[ 1, 22172, 29892, 3421, 1024, 338, 29871]]), 'labels': tensor([[ -100, -100, -100, -100, -100, 22172, -100],
[ -100, -100, -100, 22172, 22172, -100, -100],
[ -100, -100, -100, -100, -100, -100, -100]])}
"""
DataCollatorForWholeWordMask
DataCollatorForWholeWordMask
继承了DataCollatorForLanguageModeling
。由于tokenizer几乎都采用BPE
OR
BBPE的分词方式,导致很多时候并不是把整个词汇给mask,而是把词根词缀给mask。DataCollatorForWholeWordMask
就解决了这个问题,能把整个词汇给mask了。
DataCollatorForTokenClassification
待更
DataCollatorForSeq2Seq
待更