!pip install -q aiohttp==3.8.3 datasets==2.14.6 pandas==1.5.3 requests==2.31.0 tqdm==4.66.1 huggingface-hub>=0.20
Inference Endpoints
How to use Inference Endpoints to Embed Documents
Authored by: Derek Thomas
Goal
I have a dataset I want to embed for semantic search (or QA, or RAG), I want the easiest way to do embed this and put it in a new dataset.
Approach
I’m using a dataset from my favorite subreddit r/bestofredditorupdates. Because it has long entries, I will use the new jinaai/jina-embeddings-v2-base-en since it has an 8k context length. I will deploy this using Inference Endpoint to save time and money. To follow this tutorial, you will need to have already added a payment method. If you haven’t, you can add one here in billing. To make it even easier, I’ll make this fully API based.
To make this MUCH faster I will use the Text Embeddings Inference image. This has many benefits like: - No model graph compilation step - Small docker images and fast boot times. Get ready for true serverless! - Token based dynamic batching - Optimized transformers code for inference using Flash Attention, Candle and cuBLASLt - Safetensors weight loading - Production ready (distributed tracing with Open Telemetry, Prometheus metrics)
Requirements
Imports
import asyncio
from getpass import getpass
import json
from pathlib import Path
import time
from typing import Optional
from aiohttp import ClientSession, ClientTimeout
from datasets import load_dataset, Dataset, DatasetDict
from huggingface_hub import notebook_login, create_inference_endpoint, list_inference_endpoints, whoami
import numpy as np
import pandas as pd
import requests
from tqdm.auto import tqdm
Config
DATASET_IN
is where your text data is DATASET_OUT
is where your embeddings will be stored
Note I used 5 for the MAX_WORKERS
since jina-embeddings-v2
are quite memory hungry.
= 'derek-thomas/dataset-creator-reddit-bestofredditorupdates'
DATASET_IN = "processed-subset-bestofredditorupdates"
DATASET_OUT = "boru-jina-embeddings-demo-ie"
ENDPOINT_NAME
= 5 # This is for how many async workers you want. Choose based on the model and hardware
MAX_WORKERS = 100 # Choose None to use all rows, Im using 100 just for a demo ROW_COUNT
Hugging Face offers a number of GPUs that you can choose from a number of GPUs that you can choose in Inference Endpoints. Here they are in table form:
GPU | instanceType | instanceSize | vRAM |
---|---|---|---|
1x Nvidia Tesla T4 | g4dn.xlarge | small | 16GB |
4x Nvidia Tesla T4 | g4dn.12xlarge | large | 64GB |
1x Nvidia A10G | g5.2xlarge | medium | 24GB |
4x Nvidia A10G | g5.12xlarge | xxlarge | 96GB |
1x Nvidia A100* | p4de | xlarge | 80GB |
2x Nvidia A100* | p4de | 2xlarge | 160GB |
*Note that for A100s you might get a note to email us to get access.
# GPU Choice
="aws"
VENDOR="us-east-1"
REGION="medium"
INSTANCE_SIZE="g5.2xlarge" INSTANCE_TYPE
notebook_login()
Some users might have payment registered in an organization. This allows you to connect to an organization (that you are a member of) with a payment method.
Leave it blank is you want to use your username.
= whoami()
who = getpass(prompt="What is your Hugging Face 🤗 username or organization? (with an added payment method)")
organization
= organization or who['name'] namespace
What is your Hugging Face 🤗 username or organization? (with an added payment method) ········
Get Dataset
= load_dataset(DATASET_IN)
dataset 'train'] dataset[
Dataset({
features: ['id', 'content', 'score', 'date_utc', 'title', 'flair', 'poster', 'permalink', 'new', 'updated'],
num_rows: 10042
})
= dataset['train'].to_pandas().to_dict('records')[:ROW_COUNT]
documents len(documents), documents[0]
(100,
{'id': '10004zw',
'content': '[removed]',
'score': 1,
'date_utc': Timestamp('2022-12-31 18:16:22'),
'title': 'To All BORU contributors, Thank you :)',
'flair': 'CONCLUDED',
'poster': 'IsItAcOnSeQuEnCe',
'permalink': '/r/BestofRedditorUpdates/comments/10004zw/to_all_boru_contributors_thank_you/',
'new': False,
'updated': False})
Inference Endpoints
Create Inference Endpoint
We are going to use the API to create an Inference Endpoint. This should provide a few main benefits: - It’s convenient (No clicking) - It’s repeatable (We have the code to run it easily) - It’s cheaper (No time spent waiting for it to load, and automatically shut it down)
try:
= create_inference_endpoint(
endpoint
ENDPOINT_NAME,="jinaai/jina-embeddings-v2-base-en",
repository="7302ac470bed880590f9344bfeee32ff8722d0e5",
revision="sentence-embeddings",
task="pytorch",
framework="gpu",
accelerator=INSTANCE_SIZE,
instance_size=INSTANCE_TYPE,
instance_type=REGION,
region=VENDOR,
vendor=namespace,
namespace={
custom_image"health_route": "/health",
"env": {
"MAX_BATCH_TOKENS": str(MAX_WORKERS * 2048),
"MAX_CONCURRENT_REQUESTS": "512",
"MODEL_ID": "/repository"
},"url": "ghcr.io/huggingface/text-embeddings-inference:0.5.0",
},type="protected",
)except:
= [ie for ie in list_inference_endpoints(namespace=namespace) if ie.name == ENDPOINT_NAME][0]
endpoint print('Loaded endpoint')
There are a few design choices here: - As discussed before we are using jinaai/jina-embeddings-v2-base-en
as our model. - For reproducibility we are pinning it to a specific revision. - If you are interested in more models, check out the supported list here. - Note that most embedding models are based on the BERT architecture. - MAX_BATCH_TOKENS
is chosen based on our number of workers and the context window of our embedding model. - type="protected"
utilized the security from Inference Endpoints detailed here. - I’m using 1x Nvidia A10 since jina-embeddings-v2
is memory hungry (remember the 8k context length). - You should consider further tuning MAX_BATCH_TOKENS
and MAX_CONCURRENT_REQUESTS
if you have high workloads
Wait until it’s running
%%time
endpoint.wait()
CPU times: user 48.1 ms, sys: 15.7 ms, total: 63.8 ms
Wall time: 52.6 s
InferenceEndpoint(name='boru-jina-embeddings-demo-ie', namespace='HF-test-lab', repository='jinaai/jina-embeddings-v2-base-en', status='running', url='https://k7l1xeok1jwnpbx5.us-east-1.aws.endpoints.huggingface.cloud')
When we use endpoint.client.post
we get a bytes string back. This is a little tedious because we need to convert this to an np.array
, but it’s just a couple quick lines in python.
= endpoint.client.post(json={"inputs": 'This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music!', 'truncate': True}, task="feature-extraction")
response = np.array(json.loads(response.decode()))
response 0][:20] response[
array([-0.05630935, -0.03560849, 0.02789049, 0.02792823, -0.02800371,
-0.01530391, -0.01863454, -0.0077982 , 0.05374297, 0.03672185,
-0.06114018, -0.06880157, -0.0093503 , -0.03174005, -0.03206085,
0.0610647 , 0.02243694, 0.03217408, 0.04181686, 0.00248854])
You may have inputs that exceed the context. In such scenarios, it’s up to you to handle them. In my case, I’d like to truncate rather than have an error. Let’s test that it works.
= 'This input will get multiplied' * 10000
embedding_input print(f'The length of the embedding_input is: {len(embedding_input)}')
= endpoint.client.post(json={"inputs": embedding_input, 'truncate': True}, task="feature-extraction")
response = np.array(json.loads(response.decode()))
response 0][:20] response[
The length of the embedding_input is: 300000
array([-0.03088215, -0.0351537 , 0.05749275, 0.00983467, 0.02108356,
0.04539965, 0.06107162, -0.02536954, 0.03887688, 0.01998681,
-0.05391388, 0.01529677, -0.1279156 , 0.01653782, -0.01940958,
0.0367411 , 0.0031748 , 0.04716022, -0.00713609, -0.00155313])
Get Embeddings
Here I send a document, update it with the embedding, and return it. This happens in parallel with MAX_WORKERS
.
async def request(document, semaphore):
# Semaphore guard
async with semaphore:
= await endpoint.async_client.post(json={"inputs": document['content'], 'truncate': True}, task="feature-extraction")
result = np.array(json.loads(result.decode()))
result 'embedding'] = result[0] # Assuming the API's output can be directly assigned
document[return document
async def main(documents):
# Semaphore to limit concurrent requests. Adjust the number as needed.
= asyncio.BoundedSemaphore(MAX_WORKERS)
semaphore
# Creating a list of tasks
= [request(document, semaphore) for document in documents]
tasks
# Using tqdm to show progress. It's been integrated into the async loop.
for f in tqdm(asyncio.as_completed(tasks), total=len(documents)):
await f
= time.perf_counter()
start
# Get embeddings
await main(documents)
# Make sure we got it all
= 0
count for document in documents:
if 'embedding' in document.keys() and len(document['embedding']) == 768:
+= 1
count print(f'Embeddings = {count} documents = {len(documents)}')
# Print elapsed time
= time.perf_counter() - start
elapsed_time = divmod(elapsed_time, 60)
minutes, seconds print(f"{int(minutes)} min {seconds:.2f} sec")
Embeddings = 100 documents = 100
0 min 21.33 sec
Pause Inference Endpoint
Now that we have finished, let’s pause the endpoint so we don’t incur any extra charges, this will also allow us to analyze the cost.
= endpoint.pause()
endpoint
print(f"Endpoint Status: {endpoint.status}")
Endpoint Status: paused
Push updated dataset to Hub
We now have our documents updated with the embeddings we wanted. First we need to convert it back to a Dataset
format. I find it easiest to go from list of dicts -> pd.DataFrame
-> Dataset
= pd.DataFrame(documents)
df = DatasetDict({'train': Dataset.from_pandas(df)}) dd
I’m uploading it to the user’s account by default (as opposed to uploading to an organization) but feel free to push to wherever you want by setting the user in the repo_id
or in the config by setting DATASET_OUT
=DATASET_OUT) dd.push_to_hub(repo_id
print(f'Dataset is at https://huggingface.co/datasets/{who["name"]}/{DATASET_OUT}')
Dataset is at https://huggingface.co/datasets/derek-thomas/processed-subset-bestofredditorupdates
Analyze Usage
- Go to your
dashboard_url
printed below - Click on the Usage & Cost tab
- See how much you have spent
= f'https://ui.endpoints.huggingface.co/{namespace}/endpoints/{ENDPOINT_NAME}'
dashboard_url print(dashboard_url)
https://ui.endpoints.huggingface.co/HF-test-lab/endpoints/boru-jina-embeddings-demo-ie
input("Hit enter to continue with the notebook")
Hit enter to continue with the notebook
''
We can see that it only took $0.04
to pay for this!
Delete Endpoint
Now that we are done, we don’t need our endpoint anymore. We can delete our endpoint programmatically.
= endpoint.delete()
endpoint
if not endpoint:
print('Endpoint deleted successfully')
else:
print('Delete Endpoint in manually')
Endpoint deleted successfully