为什么要用DataCollator

!!!以下的Transformers源码来自于4.41。

我们在使用Trainer时,需要传入一个DataCollator,如

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=ds_train,
    eval_dataset=ds_val,
    tokenizer=tokenizer,
    data_collator=data_collator,
)

什么是DataCollator?我们先看官方的定义。

Data collators are objects that will form a batch by using a list of dataset elements as input. These elements are of the same type as the elements of train_dataset or eval_dataset.

通过这句话可以知道,DataCollator的作用是将在数据集中的样本根据需求整合成一个Batch作为训练的输入。

各种DataCollator

在使用Huggingface的Trainer进行训练or微调时,我们需要传入一个data_collator: Optional[DataCollator] = None作为Trainer的数据的批量整合器。由于是Optinal,当然也可以不指定,我们看源码

default_collator = (
    DataCollatorWithPadding(tokenizer)
    if tokenizer is not None and isinstance(tokenizer, (PreTrainedTokenizerBase, SequenceFeatureExtractor))
    else default_data_collator
)
self.data_collator = data_collator if data_collator is not None else default_collator

所以,若不指定DataCollator的话,在初始化Trainer时若传入了Tokenizer,那么会用DataCollatorWithPadding,否则使用default_data_collator

DefaultDataCollator

我们先来看default_data_collator,只用于处理Dict-like的对象。且默认所有样本的长度相同,不一样则报错。一般不用。

def torch_default_data_collator(features: List[InputDataClass]) -> Dict[str, Any]:
    import torch
	# 1
    if not isinstance(features[0], Mapping):
        features = [vars(f) for f in features]
    first = features[0]
    # 2
    batch = {}


    # 3
    if "label" in first and first["label"] is not None:
        label = first["label"].item() if isinstance(first["label"], torch.Tensor) else first["label"]
        dtype = torch.long if isinstance(label, int) else torch.float
        batch["labels"] = torch.tensor([f["label"] for f in features], dtype=dtype)
    # 4
    elif "label_ids" in first and first["label_ids"] is not None:
        if isinstance(first["label_ids"], torch.Tensor):
            batch["labels"] = torch.stack([f["label_ids"] for f in features])
        else:
            dtype = torch.long if isinstance(first["label_ids"][0], int) else torch.float
            batch["labels"] = torch.tensor([f["label_ids"] for f in features], dtype=dtype)

    #5. Handling of all other possible keys.
    # Again, we will use the first element to figure out which key/values are not None for this model.
    for k, v in first.items():
        if k not in ("label", "label_ids") and v is not None and not isinstance(v, str):
            if isinstance(v, torch.Tensor):
                batch[k] = torch.stack([f[k] for f in features])
            elif isinstance(v, np.ndarray):
                batch[k] = torch.tensor(np.stack([f[k] for f in features]))
            else:
                batch[k] = torch.tensor([f[k] for f in features])

    return batch

简单来说,会帮你做这些事:

  1. 首先检查样本是否是Mapping对象,若不是则用vars函数返回样本的字典形式,也就是返回obj.__dict__,即所有的属性名和值的Dict。
  2. 初始化一个Batch字典用于返回。
  3. 检查样本是否含有label键,有的话提取所有样本的label值并添加到batch字典中。
  4. 若没有label键,但有label_ids,则将所有的label_ids堆叠成一个新的tensor。

这里有一个问题,为什么处理label是直接将所有样本的label组成一个新的tensor list,而label_ids则是用了stack函数?答案是在约定俗成的命名规则中,label是单标签样本,本身label是一个单独的值,而label_ids,意味着这是一个多标签样本,那么labels本身是一个List,所以需要用stack函数,而不是直接组成一个dim只有1的Tensor。同样我们注意到,batch的标签键最后都被命名为label

举一个例子,假设样本是双标签,label1=[0,1],label2=[1,0]经过stack后会变成[[0,1],[1,0]]。最终会变成一个Shape是(样本数,标签数)的tensor。

5.处理完了label和label_ids,剩下的键值对中只处理数值类型的。所以这里的样本应该是经过tokenizer过或数据预处理过的。将剩下的数值类型的值进行堆叠。

DataCollatorWithPadding

若指定了tokenizer但没有指定DataCollator,那么就会调用DataCollatorWithPadding来作为数据批量处理器。

class DataCollatorWithPadding:
    """
    Data collator that will dynamically pad the inputs received.

    Args:
        tokenizer ([`PreTrainedTokenizer`] or [`PreTrainedTokenizerFast`]):
            The tokenizer used for encoding the data.
        padding (`bool`, `str` or [`~utils.PaddingStrategy`], *optional*, defaults to `True`):
            Select a strategy to pad the returned sequences (according to the model's padding side and padding index)
            among:

            - `True` or `'longest'` (default): Pad to the longest sequence in the batch (or no padding if only a single
              sequence is provided).
            - `'max_length'`: Pad to a maximum length specified with the argument `max_length` or to the maximum
              acceptable input length for the model if that argument is not provided.
            - `False` or `'do_not_pad'`: No padding (i.e., can output a batch with sequences of different lengths).
        max_length (`int`, *optional*):
            Maximum length of the returned list and optionally padding length (see above).
        pad_to_multiple_of (`int`, *optional*):
            If set will pad the sequence to a multiple of the provided value.

            This is especially useful to enable the use of Tensor Cores on NVIDIA hardware with compute capability >=
            7.5 (Volta).
        return_tensors (`str`, *optional*, defaults to `"pt"`):
            The type of Tensor to return. Allowable values are "np", "pt" and "tf".
    """

    tokenizer: PreTrainedTokenizerBase
    padding: Union[bool, str, PaddingStrategy] = True 
    max_length: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    return_tensors: str = "pt"

    def __call__(self, features: List[Dict[str, Any]]) -> Dict[str, Any]:
        batch = pad_without_fast_tokenizer_warning(
            self.tokenizer,
            features,
            padding=self.padding,
            max_length=self.max_length,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors=self.return_tensors,
        )
        if "label" in batch:
            batch["labels"] = batch["label"]
            del batch["label"]
        if "label_ids" in batch:
            batch["labels"] = batch["label_ids"]
            del batch["label_ids"]
        return batch

从名字可以看到,这个整合器应该是做了pad的任务,调用了一个Call函数来返回处理后的样本。实际上就一句比较重要的语句.

def pad_without_fast_tokenizer_warning(tokenizer, *pad_args, **pad_kwargs):
    padded = tokenizer.pad(*pad_args, **pad_kwargs)

也就是调用了pad函数,返回padding之后的tensor。

总之,DataCollatorWithPadding会为tokenize后的序列添加pad,使得整个batch的序列长度一致,如果选择padding=longest,那么会根据最长的序列来进行padding。

from transformers import DefaultDataCollator,DataCollatorWithPadding,DataCollatorForLanguageModeling,AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained('/data/wtf/model/llama-2-7b-hf',padding_side='left')
input_text = ["hello world","hello hello hello hello","hello,My name is "]

c1 = DefaultDataCollator()
c2 = DataCollatorWithPadding(tokenizer)
c3 = DataCollatorForLanguageModeling(tokenizer,mlm=False)
# 直接调用tokenizer的话,每一个样本的长度不同
tokenizer(input_text) # {'input_ids': [[1, 22172, 3186], [1, 22172, 22172, 22172, 22172], [1, 22172, 29892, 3421, 1024, 338, 29871]], 'attention_mask': [[1, 1, 1], [1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1, 1]]}

c1(input_text) # 直接报错
tok_input = tokenizer(input_text)
c2(input_text)
--------可以看到对齐了
{'input_ids': tensor([[    0,     0,     0,     0,     1, 22172,  3186],
        [    0,     0,     1, 22172, 22172, 22172, 22172],
        [    1, 22172, 29892,  3421,  1024,   338, 29871]]), 'attention_mask': tensor([[0, 0, 0, 0, 1, 1, 1],
        [0, 0, 1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1, 1, 1]])}
decode后的结果:
['<unk><unk><unk><unk><s> hello world',
 '<unk><unk><s> hello hello hello hello',
 '<s> hello,My name is ']

我们看一个例子:

from transformers import DefaultDataCollator,DataCollatorWithPadding,DataCollatorForLanguageModeling,AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained('/data/wtf/model/llama-2-7b-hf',padding_side='left')
input_text = ["hello world","hello hello hello hello","hello,My name is "]

c1 = DefaultDataCollator()
c2 = DataCollatorWithPadding(tokenizer)
c3 = DataCollatorForLanguageModeling(tokenizer,mlm=False)
# 直接调用tokenizer的话,每一个样本的长度不同
tokenizer(input_text) 
"""
{'input_ids': [[1, 22172, 3186], [1, 22172, 22172, 22172, 22172], [1, 22172, 29892, 3421, 1024, 338, 29871]], 'attention_mask': [[1, 1, 1], [1, 1, 1, 1, 1], [1, 1, 1, 1, 1, 1, 1]]}
""" 
c1(input_text) # 直接报错
"""
DataCollatorWithPadding的输入是整个tokenizers.Encoding对象,可以把tokenizer处理后的对象直接放进去
"""
tok_input = tokenizer(input_text)
c2(input_text)
"""返回对齐后的结果
{'input_ids': tensor([[    0,     0,     0,     0,     1, 22172,  3186],
        [    0,     0,     1, 22172, 22172, 22172, 22172],
        [    1, 22172, 29892,  3421,  1024,   338, 29871]]), 'attention_mask': tensor([[0, 0, 0, 0, 1, 1, 1],
        [0, 0, 1, 1, 1, 1, 1],
        [1, 1, 1, 1, 1, 1, 1]])}
decode回去,可以看到长度相同
['<unk><unk><unk><unk><s> hello world',
 '<unk><unk><s> hello hello hello hello',
 '<s> hello,My name is ']
        
"""

DataCollatorForLanguageModeling

接下来常用的DataCollatorForLanguageModeling

class DataCollatorForLanguageModeling(DataCollatorMixin):
    """
    Data collator used for language modeling. Inputs are dynamically padded to the maximum length of a batch if they
    are not all of the same length.

    Args:
        tokenizer ([`PreTrainedTokenizer`] or [`PreTrainedTokenizerFast`]):
            The tokenizer used for encoding the data.
        mlm (`bool`, *optional*, defaults to `True`):
            Whether or not to use masked language modeling. If set to `False`, the labels are the same as the inputs
            with the padding tokens ignored (by setting them to -100). Otherwise, the labels are -100 for non-masked
            tokens and the value to predict for the masked token.
        mlm_probability (`float`, *optional*, defaults to 0.15):
            The probability with which to (randomly) mask tokens in the input, when `mlm` is set to `True`.
        pad_to_multiple_of (`int`, *optional*):
            If set will pad the sequence to a multiple of the provided value.
        return_tensors (`str`):
            The type of Tensor to return. Allowable values are "np", "pt" and "tf".

参数列表中带有MLM,说明可以用于MLM任务。若mlm=True,那么除了需要预测的标签,其余标签都不参与损失的计算,label全都设置为-100,被mask的则是正常的标签值。若mlm=False,pad的laebl值设置为-100。当mlm=True的情况下,tokenizer需要设置mask_token。

接下来我们看主要的几个函数。

torch_call是处理函数,首先也会判断样本是否是字典形式,然后做pad并返回tensor形式的样本,这一段和DataCollatorWithPadding所做的一模一样,只是没有删除label键。然后取出mask_token,然后是否采取了mlm策略来处理样本的input_ids,最终返回处理后的tensor。若不采取mlm策略,那么就把padtoken的位置用-100替代,表示不计算损失。

def torch_call(self, examples: List[Union[List[int], Any, Dict[str, Any]]]) -> Dict[str, Any]:
    # Handle dict or lists with proper padding and conversion to tensor.
    if isinstance(examples[0], Mapping):
        batch = pad_without_fast_tokenizer_warning(
            self.tokenizer, examples, return_tensors="pt", pad_to_multiple_of=self.pad_to_multiple_of
        )
    else:
        batch = {
            "input_ids": _torch_collate_batch(examples, self.tokenizer, pad_to_multiple_of=self.pad_to_multiple_of)
        }

    # If special token mask has been preprocessed, pop it from the dict.
    special_tokens_mask = batch.pop("special_tokens_mask", None)
    if self.mlm:
        batch["input_ids"], batch["labels"] = self.torch_mask_tokens(
            batch["input_ids"], special_tokens_mask=special_tokens_mask
        )
    else:
        labels = batch["input_ids"].clone()
        if self.tokenizer.pad_token_id is not None:
            labels[labels == self.tokenizer.pad_token_id] = -100
        batch["labels"] = labels
    return batch

torch_mask_tokens接收input_idsmask_token,做了以下的工作。

  1. 创建概率矩阵,和输入同形状。
  2. 若没有输入mask token,那么首先回生成一个mask列表。special_tokens_mask = [1 if token in all_special_ids else 0 for token in token_ids_0]。所以1表示特殊符号,正常token以0填充。最后转换为bool形式的tensor。若输入了,则直接返回他的bool形式。
  3. 概率矩阵的特殊符号位置以0填充,不参与损失计算。
  4. 按伯努利分布生成mask索引。
  5. 概率矩阵非mask的索引位置全部用-100填充,表示不计算损失。
  6. 默认情况下 (其实就是bert的操作),80%的mask索引,其索引的值被替换成mask_token。要求预测原token。10%被替换成随机token。剩下10%不变。
def torch_mask_tokens(self, inputs: Any, special_tokens_mask: Optional[Any] = None) -> Tuple[Any, Any]:
    """
    Prepare masked tokens inputs/labels for masked language modeling: 80% MASK, 10% random, 10% original.
    """
    import torch

    labels = inputs.clone()
    # We sample a few tokens in each sequence for MLM training (with probability `self.mlm_probability`)
    probability_matrix = torch.full(labels.shape, self.mlm_probability)
    # 生成mask list。
    if special_tokens_mask is None:
        special_tokens_mask = [
            self.tokenizer.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels.tolist()
        ]
        special_tokens_mask = torch.tensor(special_tokens_mask, dtype=torch.bool)
    else:
        special_tokens_mask = special_tokens_mask.bool()
		
    probability_matrix.masked_fill_(special_tokens_mask, value=0.0)
    masked_indices = torch.bernoulli(probability_matrix).bool()
    labels[~masked_indices] = -100  # We only compute loss on masked tokens

    # 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
    indices_replaced = torch.bernoulli(torch.full(labels.shape, 0.8)).bool() & masked_indices
    inputs[indices_replaced] = self.tokenizer.convert_tokens_to_ids(self.tokenizer.mask_token)

    # 10% of the time, we replace masked input tokens with random word
    indices_random = torch.bernoulli(torch.full(labels.shape, 0.5)).bool() & masked_indices & ~indices_replaced
    random_words = torch.randint(len(self.tokenizer), labels.shape, dtype=torch.long)
    inputs[indices_random] = random_words[indices_random]

    # The rest of the time (10% of the time) we keep the masked input tokens unchanged
    return inputs, labels

综上,相比于DataCollatorWithPaddingDataCollatorForLanguageModeling具备以下功能:

  1. 可以做MLM任务。
  2. DataCollatorWithPadding并没有对pad的token进行特殊操作,而DataCollatorForLanguageModeling则将padtoken通过设置为-100从而不计算损失。

还是看一个例子:

from transformers import DefaultDataCollator,DataCollatorWithPadding,DataCollatorForLanguageModeling,AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained('/data/wtf/model/llama-2-7b-hf',padding_side='left')

input_text = ["hello world","hello hello hello hello","hello,My name is "]

c3 = DataCollatorForLanguageModeling(tokenizer,mlm=False)

注意,输入不可以是tokenizers.Encoding对象,而是要把input_ids单独拿出来,否则会报错'tokenizers.Encoding' object has no attribute 'size'

tok_text = tokenizer(input_text)
# dc = c3(tok_text) 会报错
dc = c3(tok_text['input_ids'])
print(dc)
------------------------
"""结果,可以看到在mlm=False的情况下,pad_id由于是特殊token,直接置为-100,不计算损失
{'input_ids': tensor([[    0,     0,     0,     0,     1, 22172,  3186],
        [    0,     0,     1, 22172, 22172, 22172, 22172],
        [    1, 22172, 29892,  3421,  1024,   338, 29871]]), 'labels': tensor([[ -100,  -100,  -100,  -100,     1, 22172,  3186],
        [ -100,  -100,     1, 22172, 22172, 22172, 22172],
        [    1, 22172, 29892,  3421,  1024,   338, 29871]])}
"""

mlm=True,在tokenizer没有mask_token的情况下,需要指定。

tokenizer.mask_token = "<mask>"
c3 = DataCollatorForLanguageModeling(tokenizer,mlm=True)
tok_text = tokenizer(input_text)
t = tokenizer.batch_decode(tok_text['input_ids'])
dc = c3(tok_text['input_ids'])
------------------
"""labels不等于-100的就是被mask的token
{'input_ids': tensor([[    0,     0,     0,     0,     1,     0,  3186],
        [    0,     0,     1, 22172, 22172, 22172, 22172],
        [    1, 22172, 29892,  3421,  1024,   338, 29871]]), 'labels': tensor([[ -100,  -100,  -100,  -100,  -100, 22172,  -100],
        [ -100,  -100,  -100, 22172, 22172,  -100,  -100],
        [ -100,  -100,  -100,  -100,  -100,  -100,  -100]])}
"""

DataCollatorForWholeWordMask

DataCollatorForWholeWordMask继承了DataCollatorForLanguageModeling。由于tokenizer几乎都采用BPE OR BBPE的分词方式,导致很多时候并不是把整个词汇给mask,而是把词根词缀给mask。DataCollatorForWholeWordMask就解决了这个问题,能把整个词汇给mask了。

DataCollatorForTokenClassification

待更

DataCollatorForSeq2Seq

待更