Skip to content

Multiprocess encoding for speed #139

Closed
@davidmezzetti

Description

@davidmezzetti

While Model2Vec is fast, I'm wondering if it can be even faster 🚀

The Sentence-Transformers library has a multiprocess encoding method that is useful for multiple GPUs.

I was planning to add something similar to txtai for Model2Vec but first I wanted to check if perhaps it made sense to add it directly to the library.

So I tested encoding 10 million inputs with the standard encode vs a multiprocessing encode. The standard encode method took 2 minutes, a multiprocessing encode method on my machine took 20 seconds.

Standard

from model2vec import StaticModel

model = StaticModel.from_pretrained("minishlab/potion-base-8M")
print(model.encode(["1000"] * 10000000, batch_size=1024).shape)
$ time python standard.py 
(10000000, 256)

real	2m13.506s
user	3m29.900s
sys	0m11.513s

Multiprocess

import queue
import os

from multiprocessing import Process, Queue

import numpy as np

from model2vec import StaticModel


class Execute:
    @staticmethod
    def worker(model, inputs, outputs):
        while True:
            try:
                outputs.put(model.encode(inputs.get()))
            except queue.Empty:
                break

    @staticmethod
    def start(model, size=None):
        # Create queues
        inputs, outputs = Queue(), Queue()

        # Start worker processes
        processes = []
        for _ in range(size if size else os.cpu_count()):
            process = Process(target=Execute.worker, args=(model, inputs, outputs))
            process.start()
            processes.append(process)

        return processes, inputs, outputs

    @staticmethod
    def encode(data, pool, batch):
        _, inputs, outputs = pool

        # Chunk and add data to be processed
        chunks = 0
        for x in range(0, len(data), batch):
            inputs.put(data[x : x + batch])
            chunks += 1

        # Collect results
        embeddings = []
        for _ in range(chunks):
            embeddings.append(outputs.get())

        # Combine into single array and return
        return np.concatenate(embeddings)

    @staticmethod
    def stop(pool):
        if pool:
            processes, inputs, outputs = pool

            for process in processes:
                process.terminate()

            for process in processes:
                process.join()
                process.close()

            inputs.close()
            outputs.close()

if __name__ == "__main__":
    pool = None
    try:
        model = StaticModel.from_pretrained("minishlab/potion-base-8M")
        pool = Execute.start(model)
        print(Execute.encode(["1000"] * 10000000, pool, 1024).shape)

    finally:
        Execute.stop(pool)
$ time python multiprocess.py 
(10000000, 256)

real	0m20.879s
user	3m35.571s
sys	0m29.292s

Any interest?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions