În insusire de dezvoltatori și oameni de știință, de multe ori avem amenintare să interacționăm cu aceste modele iele prin intermediul API-urilor. Cu toate acestea, pe măsură ce aplicațiile noastre cresc în plurivalenta și scară, diavol de interacțiuni API eficiente și performante devine crucială. Aoace strălucește programarea asincronă, permițându-ne să maximizăm debitul și să minimizăm latența apoi când lucrăm cu API-urile LLM.
În iest indrumar cuprinzător, vom investiga lumea apelurilor API LLM asincrone în Python. Vom inveli totul, de la elementele de bază ale programării asincrone până la tehnici avansate pentru gestionarea fluxurilor de deala complexe. Până la sfârșitul acestui marfa, veți poseda o înțelegere solidă dupa cum să folosiți programarea asincronă pentru a vă supraalimenta aplicațiile bazate pe LLM.
Înainte de a ne a adanci în specificul apelurilor API LLM asincrone, să stabilim o bază solidă în conceptele de planificare asincronă.
Programarea asincronă a ingadui executarea simultană a mai multor operațiuni fără a a bara firul conducator de execuție. În Python, iest deala se realizează în intaiul rând prin intermediul asincron valoare absoluta, oricare oferă un pervaz pentru scrierea codului emul folosind coroutine, bucle de evenimente și futures.
Concepte lamurire:
- Coroutine: Funcții definite cu asincron def oricare eventual fi întrerupt și reluat.
- Buclă de evenimente: Mecanismul nodal de execuție oricare gestionează și rulează sarcini asincrone.
- Așteptabile: Obiecte oricare pot fi folosite cu cuvântul lamurire await (corutine, sarcini, viitorime).
Iată un oglinda sadea pentru a a infatisa aceste concepte:
import asyncio async def greet(name): await asyncio.sleep(1) # Simulate an I/O operation principe(f"Hello, {name}!") async def main(): await asyncio.gather( greet("Alice"), greet("Bob"), greet("Charlie") ) asyncio.run(main())
În iest oglinda, definim o funcție asincronă greet
oricare simulează o operație I/O cu asyncio.sleep()
. The main
utilizări ale funcției asyncio.gather()
pentru a a rasuci mai multe salutări odata. În invidie întârzierii de margica cucului, toate cele trei salutări vor fi tipărite după aproape 1 secundă, demonstrând puterea execuției asincrone.
Naiba de asincronizare în apelurile API LLM
Când lucrăm cu API-uri LLM, întâlnim deseori scenarii în oricare musai să facem mai multe apeluri API, fie în secvență, fie în concomitent. Codul concomitent tradițional eventual dirigui la blocaje semnificative de performanță, mai deosebit apoi când se ocupă de operațiuni cu latență ridicată, cum ar fi solicitările de rețea către serviciile LLM.
Luați în considerare un arta regizorala în oricare musai să generăm rezumate pentru 100 de articole diferite folosind un API LLM. Cu o abordare sincronă, fiece chemare API s-ar a bara până când primește un răspuns, ceea ce eventual dainui câteva minute pentru a termina toate solicitările. O abordare asincronă, pe de altă menire, ne a ingadui să inițiem mai multe apeluri API odata, reducând zguduitor timpul deplin de execuție.
Configurarea mediului
Pentru a începe cu apelurile API LLM asincrone, va a fi să vă configurați mediul Python cu bibliotecile necesare. Iată ce veți poseda amenintare:
- Python 3.7 sau mai potop (pentru imbold asincron safi)
- aiohttp: O bibliotecă musteriu HTTP asincronă
- openai: clientul guvernamental OpenAI Python (dacă utilizați modelele GPT ale OpenAI)
- langchain: Un pervaz pentru construirea de aplicații cu LLM-uri (opțional, dar nimerit pentru fluxuri de deala complexe)
Puteți monta aceste dependențe folosind pip:
pip install aiohttp openai langchainBasic Async LLM API Calls with asyncio and aiohttp
Let's inceput by making a simple asynchronous call to an LLM API using aiohttp. We'll use OpenAI's GPT-3.5 API as an example, but the concepts apply to other LLM APIs as well.
import asyncio import aiohttp from openai import AsyncOpenAI async def generate_text(grabnic, musteriu): response = await musteriu.chat.completions.create( sistem="gpt-3.5-turbo", messages=({"role": "user", "content": grabnic}) ) return response.choices(0).message.content async def main(): prompts = ( "Explain quantum computing in simple terms.", "Write a haiku about contrafacut intelligence.", "Describe the process of photosynthesis." ) async with AsyncOpenAI() as musteriu: tasks = (generate_text(grabnic, musteriu) for grabnic in prompts) results = await asyncio.gather(*tasks) for grabnic, result in zip(prompts, results): principe(f"Neintarziat: {grabnic}nResponse: {result}n") asyncio.run(main())În iest oglinda, definim o funcție asincronă
generate_text
oricare efectuează un chemare către API-ul OpenAI folosind clientul AsyncOpenAI. Themain
funcția creează mai multe sarcini pentru diferite solicitări și utilizăriasyncio.gather()
pentru a le a rasuci paralel.Această abordare ne a ingadui să trimitem odata mai multe solicitări către API-ul LLM, reducând spunator timpul deplin indispensabil procesării tuturor solicitărilor.
Tehnici avansate: loturi și inspectare odata
În cadenta ce exemplul trecut demonstrează elementele de bază ale apelurilor API LLM asincrone, aplicațiile din lumea reală necesită deseori abordări mai sofisticate. Să explorăm două tehnici importante: loturi de solicitări și controlul concurenței.
Solicitări de partid: apoi când aveți de-a a efectua cu un număr potop de solicitări, este deseori mai eficace să le grupați în grupuri decât să trimiteți solicitări individuale pentru fiece cerinta. Iest deala ocupa supraîncărcarea mai multor apeluri API și eventual dirigui la o performanță mai bună.
import asyncio from openai import AsyncOpenAI async def process_batch(batch, musteriu): responses = await asyncio.gather(*( musteriu.chat.completions.create( sistem="gpt-3.5-turbo", messages=({"role": "user", "content": grabnic}) ) for grabnic in batch )) return (response.choices(0).message.content for response in responses) async def main(): prompts = (f"Tell me a fact about number {i}" for i in range(100)) batch_size = 10 async with AsyncOpenAI() as musteriu: results = () for i in range(0, len(prompts), batch_size): batch = prompts(i:i+batch_size) batch_results = await process_batch(batch, musteriu) results.extend(batch_results) for grabnic, result in zip(prompts, results): principe(f"Neintarziat: {grabnic}nResponse: {result}n") asyncio.run(main())Controlul concurenței: în cadenta ce programarea asincronă a ingadui execuția concomitentă, este insemnat să se controleze nivelul concurenței pentru a a impiedica copleșirea serverului API sau depășirea limitelor ratei. Putem prii asyncio.Semaphore în iest rost.
import asyncio from openai import AsyncOpenAI async def generate_text(grabnic, musteriu, semaphore): async with semaphore: response = await musteriu.chat.completions.create( sistem="gpt-3.5-turbo", messages=({"role": "user", "content": grabnic}) ) return response.choices(0).message.content async def main(): prompts = (f"Tell me a fact about number {i}" for i in range(100)) max_concurrent_requests = 5 semaphore = asyncio.Semaphore(max_concurrent_requests) async with AsyncOpenAI() as musteriu: tasks = (generate_text(grabnic, musteriu, semaphore) for grabnic in prompts) results = await asyncio.gather(*tasks) for grabnic, result in zip(prompts, results): principe(f"Neintarziat: {grabnic}nResponse: {result}n") asyncio.run(main())În iest oglinda, folosim un semnalizator pentru a margini numărul de solicitări simultane la 5, asigurându-ne că nu copleșim serverul API.
Gestionarea erorilor și reîncercări în apelurile LLM asincrone
Când lucrați cu API-uri externe, este esențial să implementați mecanisme robuste de tratament a erorilor și de reîncercare. Să ne îmbunătățim codul pentru a gestiona erorile obișnuite și să implementăm backoff exponențial pentru reîncercări.
import asyncio import random from openai import AsyncOpenAI from tenacity import retry, stop_after_attempt, wait_exponential class APIError(Exception): pass @retry(opreste=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10)) async def generate_text_with_retry(grabnic, musteriu): try: response = await musteriu.chat.completions.create( sistem="gpt-3.5-turbo", messages=({"role": "user", "content": grabnic}) ) return response.choices(0).message.content except Exception as e: principe(f"Error occurred: {e}") raise APIError("Failed to generate lucrare") async def process_prompt(grabnic, musteriu, semaphore): async with semaphore: try: result = await generate_text_with_retry(grabnic, musteriu) return grabnic, result except APIError: return grabnic, "Failed to generate response after multiple attempts." async def main(): prompts = (f"Tell me a fact about number {i}" for i in range(20)) max_concurrent_requests = 5 semaphore = asyncio.Semaphore(max_concurrent_requests) async with AsyncOpenAI() as musteriu: tasks = (process_prompt(grabnic, musteriu, semaphore) for grabnic in prompts) results = await asyncio.gather(*tasks) for grabnic, result in results: principe(f"Neintarziat: {grabnic}nResponse: {result}n") asyncio.run(main())Această tra-ducere îmbunătățită contine:
- Un fire
APIError
excepție pentru erorile legate de API.- O
generate_text_with_retry
grad decorata cu@retry
din biblioteca tenacity, implementând backoff exponențial.- Gestionarea erorilor în
process_prompt
funcția de detectare și referire a defecțiunilor.Optimizarea performanței: transmiterea în curent a răspunsurilor
Pentru generarea de conținut de lungă durată, răspunsurile în curent pot îmbunătăți spunator performanța percepută a aplicației dvs. În loc să așteptați întregul răspuns, puteți procesa și afișa bucăți de lucrare pe măsură ce acestea devin disponibile.
import asyncio from openai import AsyncOpenAI async def stream_text(grabnic, musteriu): stream = await musteriu.chat.completions.create( sistem="gpt-3.5-turbo", messages=({"role": "user", "content": grabnic}), stream=True ) full_response = "" async for chunk in stream: if chunk.choices(0).delta.content is not None: content = chunk.choices(0).delta.content full_response += content principe(content, end='', flush=True) principe("n") return full_response async def main(): grabnic = "Write a short story about a time-traveling scientist." async with AsyncOpenAI() as musteriu: result = await stream_text(grabnic, musteriu) principe(f"Full response:n{result}") asyncio.run(main())Iest oglinda demonstrează cum să transmiteți răspunsul de la API, imprimând fiece bucată pe măsură ce sosește. Această abordare este utilă în anumit pentru aplicațiile de chat sau orisice arta regizorala în oricare doriți să oferiți conexiune inversa în cadenta concret utilizatorului.
Crearea fluxurilor de deala asincrone cu LangChain
Pentru aplicații mai complexe bazate pe LLM, cadrul LangChain oferă o abstractie la standard înalt oricare simplifică procesul de înlănțuire a mai multor apeluri LLM și de inglobare a altor instrumente. Să ne uităm la un oglinda de aplicare a LangChain cu capabilități asincrone:
Iest oglinda arată cum LangChain eventual fi utilizat pentru a a face fluxuri de deala mai complexe cu streaming și execuție asincronă. The
AsyncCallbackManager
şiStreamingStdOutCallbackHandler
a ingadui difuzarea în cadenta concret a conținutului generat.import asyncio from langchain.llms import OpenAI from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from langchain.callbacks.manager import AsyncCallbackManager from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler async def generate_story(topic): llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager((StreamingStdOutCallbackHandler()))) grabnic = PromptTemplate( input_variables=("topic"), template="Write a short story about {topic}." ) chain = LLMChain(llm=llm, grabnic=grabnic) return await chain.arun(topic=topic) async def main(): topics = ("a magical forest", "a futuristic city", "an underwater civilization") tasks = (generate_story(topic) for topic in topics) stories = await asyncio.gather(*tasks) for topic, story in zip(topics, stories): principe(f"nTopic: {topic}nStory: {story}n{'='*50}n") asyncio.run(main())Servirea de aplicații LLM asincrone cu FastAPI
Pentru a a efectua aplicația LLM asincronă disponibilă ca post web, FastAPI este o optiune excelentă datorită suportului său safi pentru operațiuni asincrone. Iată un oglinda dupa cum să creați un stadiu sfarsit API sadea pentru generarea de lucrare:
from fastapi import FastAPI, BackgroundTasks from pydantic import BaseModel from openai import AsyncOpenAI app = FastAPI() musteriu = AsyncOpenAI() class GenerationRequest(BaseModel): grabnic: str class GenerationResponse(BaseModel): generated_text: str @app.dieta("/generate", response_model=GenerationResponse) async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks): response = await musteriu.chat.completions.create( sistem="gpt-3.5-turbo", messages=({"role": "user", "content": request.grabnic}) ) generated_text = response.choices(0).message.content # Simulate some post-processing in the background background_tasks.add_task(log_generation, request.grabnic, generated_text) return GenerationResponse(generated_text=generated_text) async def log_generation(grabnic: str, generated_text: str): # Simulate logging or additional processing await asyncio.sleep(2) principe(f"Logged: Neintarziat '{grabnic}' generated lucrare of length {len(generated_text)}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", conduita=8000)Această aplicație FastAPI creează un stadiu sfarsit
/generate
oricare acceptă un grabnic și returnează textul generat. De astfel, demonstrează cum să utilizați sarcinile de fond pentru procesarea suplimentară fără a a bara răspunsul.Cele mai bune practici și capcane comune
Pe măsură ce lucrați cu API-uri LLM asincrone, țineți testea de aceste bune practici:
- Utilizați gruparea de conexiuni: Când faceți mai multe solicitări, reutilizați conexiunile pentru a ocupa cheltuielile generale.
- Implementați gestionarea corectă a erorilor: Luați în considerare întotdeauna problemele de rețea, erorile API și răspunsurile neașteptate.
- Respectați limitele ratei: Utilizați semafoare sau alte mecanisme de inspectare al concurenței pentru a a impiedica copleșirea API-ului.
- Monitorizați și înregistrați: implementați o înregistrare completă pentru a urmări performanța și a recunoaste problemele.
- Folosiți streaming pentru conținut prelung: Îmbunătățește experiența utilizatorului și a ingadui procesarea timpurie a rezultatelor parțiale.
(eticheteToTranslate)planificare asincronă