Apeluri API LLM asincrone în Python: un ghid cuprinzător

14 Min Read

În insusire de dezvoltatori și oameni de știință, de multe ori avem amenintare să interacționăm cu aceste modele iele prin intermediul API-urilor. Cu toate acestea, pe măsură ce aplicațiile noastre cresc în plurivalenta și scară, diavol de interacțiuni API eficiente și performante devine crucială. Aoace strălucește programarea asincronă, permițându-ne să maximizăm debitul și să minimizăm latența apoi când lucrăm cu API-urile LLM.

În iest indrumar cuprinzător, vom investiga lumea apelurilor API LLM asincrone în Python. Vom inveli totul, de la elementele de bază ale programării asincrone până la tehnici avansate pentru gestionarea fluxurilor de deala complexe. Până la sfârșitul acestui marfa, veți poseda o înțelegere solidă dupa cum să folosiți programarea asincronă pentru a vă supraalimenta aplicațiile bazate pe LLM.

Înainte de a ne a adanci în specificul apelurilor API LLM asincrone, să stabilim o bază solidă în conceptele de planificare asincronă.

Programarea asincronă a ingadui executarea simultană a mai multor operațiuni fără a a bara firul conducator de execuție. În Python, iest deala se realizează în intaiul rând prin intermediul asincron valoare absoluta, oricare oferă un pervaz pentru scrierea codului emul folosind coroutine, bucle de evenimente și futures.

Concepte lamurire:

  • Coroutine: Funcții definite cu asincron def oricare eventual fi întrerupt și reluat.
  • Buclă de evenimente: Mecanismul nodal de execuție oricare gestionează și rulează sarcini asincrone.
  • Așteptabile: Obiecte oricare pot fi folosite cu cuvântul lamurire await (corutine, sarcini, viitorime).

Iată un oglinda sadea pentru a a infatisa aceste concepte:

import asyncio
async def greet(name):
    await asyncio.sleep(1)  # Simulate an I/O operation
    principe(f"Hello, {name}!")
async def main():
    await asyncio.gather(
        greet("Alice"),
        greet("Bob"),
        greet("Charlie")
    )
asyncio.run(main())

În iest oglinda, definim o funcție asincronă greet oricare simulează o operație I/O cu asyncio.sleep(). The main utilizări ale funcției asyncio.gather() pentru a a rasuci mai multe salutări odata. În invidie întârzierii de margica cucului, toate cele trei salutări vor fi tipărite după aproape 1 secundă, demonstrând puterea execuției asincrone.

Naiba de asincronizare în apelurile API LLM

Când lucrăm cu API-uri LLM, întâlnim deseori scenarii în oricare musai să facem mai multe apeluri API, fie în secvență, fie în concomitent. Codul concomitent tradițional eventual dirigui la blocaje semnificative de performanță, mai deosebit apoi când se ocupă de operațiuni cu latență ridicată, cum ar fi solicitările de rețea către serviciile LLM.

Luați în considerare un arta regizorala în oricare musai să generăm rezumate pentru 100 de articole diferite folosind un API LLM. Cu o abordare sincronă, fiece chemare API s-ar a bara până când primește un răspuns, ceea ce eventual dainui câteva minute pentru a termina toate solicitările. O abordare asincronă, pe de altă menire, ne a ingadui să inițiem mai multe apeluri API odata, reducând zguduitor timpul deplin de execuție.

Configurarea mediului

Pentru a începe cu apelurile API LLM asincrone, va a fi să vă configurați mediul Python cu bibliotecile necesare. Iată ce veți poseda amenintare:

  • Python 3.7 sau mai potop (pentru imbold asincron safi)
  • aiohttp: O bibliotecă musteriu HTTP asincronă
  • openai: clientul guvernamental OpenAI Python (dacă utilizați modelele GPT ale OpenAI)
  • langchain: Un pervaz pentru construirea de aplicații cu LLM-uri (opțional, dar nimerit pentru fluxuri de deala complexe)

Puteți monta aceste dependențe folosind pip:

pip install aiohttp openai langchain

Basic Async LLM API Calls with asyncio and aiohttp

Let's inceput by making a simple asynchronous call to an LLM API using aiohttp. We'll use OpenAI's GPT-3.5 API as an example, but the concepts apply to other LLM APIs as well.

import asyncio
import aiohttp
from openai import AsyncOpenAI
async def generate_text(grabnic, musteriu):
    response = await musteriu.chat.completions.create(
        sistem="gpt-3.5-turbo",
        messages=({"role": "user", "content": grabnic})
    )
    return response.choices(0).message.content
async def main():
    prompts = (
        "Explain quantum computing in simple terms.",
        "Write a haiku about contrafacut intelligence.",
        "Describe the process of photosynthesis."
    )
    
    async with AsyncOpenAI() as musteriu:
        tasks = (generate_text(grabnic, musteriu) for grabnic in prompts)
        results = await asyncio.gather(*tasks)
    
    for grabnic, result in zip(prompts, results):
        principe(f"Neintarziat: {grabnic}nResponse: {result}n")
asyncio.run(main())

În iest oglinda, definim o funcție asincronă generate_text oricare efectuează un chemare către API-ul OpenAI folosind clientul AsyncOpenAI. The main funcția creează mai multe sarcini pentru diferite solicitări și utilizări asyncio.gather() pentru a le a rasuci paralel.

Această abordare ne a ingadui să trimitem odata mai multe solicitări către API-ul LLM, reducând spunator timpul deplin indispensabil procesării tuturor solicitărilor.

Tehnici avansate: loturi și inspectare odata

În cadenta ce exemplul trecut demonstrează elementele de bază ale apelurilor API LLM asincrone, aplicațiile din lumea reală necesită deseori abordări mai sofisticate. Să explorăm două tehnici importante: loturi de solicitări și controlul concurenței.

Solicitări de partid: apoi când aveți de-a a efectua cu un număr potop de solicitări, este deseori mai eficace să le grupați în grupuri decât să trimiteți solicitări individuale pentru fiece cerinta. Iest deala ocupa supraîncărcarea mai multor apeluri API și eventual dirigui la o performanță mai bună.

import asyncio
from openai import AsyncOpenAI
async def process_batch(batch, musteriu):
    responses = await asyncio.gather(*(
        musteriu.chat.completions.create(
            sistem="gpt-3.5-turbo",
            messages=({"role": "user", "content": grabnic})
        ) for grabnic in batch
    ))
    return (response.choices(0).message.content for response in responses)
async def main():
    prompts = (f"Tell me a fact about number {i}" for i in range(100))
    batch_size = 10
    
    async with AsyncOpenAI() as musteriu:
        results = ()
        for i in range(0, len(prompts), batch_size):
            batch = prompts(i:i+batch_size)
            batch_results = await process_batch(batch, musteriu)
            results.extend(batch_results)
    
    for grabnic, result in zip(prompts, results):
        principe(f"Neintarziat: {grabnic}nResponse: {result}n")
asyncio.run(main())

Controlul concurenței: în cadenta ce programarea asincronă a ingadui execuția concomitentă, este insemnat să se controleze nivelul concurenței pentru a a impiedica copleșirea serverului API sau depășirea limitelor ratei. Putem prii asyncio.Semaphore în iest rost.

import asyncio
from openai import AsyncOpenAI
async def generate_text(grabnic, musteriu, semaphore):
    async with semaphore:
        response = await musteriu.chat.completions.create(
            sistem="gpt-3.5-turbo",
            messages=({"role": "user", "content": grabnic})
        )
        return response.choices(0).message.content
async def main():
    prompts = (f"Tell me a fact about number {i}" for i in range(100))
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as musteriu:
        tasks = (generate_text(grabnic, musteriu, semaphore) for grabnic in prompts)
        results = await asyncio.gather(*tasks)
    
    for grabnic, result in zip(prompts, results):
        principe(f"Neintarziat: {grabnic}nResponse: {result}n")
asyncio.run(main())

În iest oglinda, folosim un semnalizator pentru a margini numărul de solicitări simultane la 5, asigurându-ne că nu copleșim serverul API.

Gestionarea erorilor și reîncercări în apelurile LLM asincrone

Când lucrați cu API-uri externe, este esențial să implementați mecanisme robuste de tratament a erorilor și de reîncercare. Să ne îmbunătățim codul pentru a gestiona erorile obișnuite și să implementăm backoff exponențial pentru reîncercări.

import asyncio
import random
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
class APIError(Exception):
    pass
@retry(opreste=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_text_with_retry(grabnic, musteriu):
    try:
        response = await musteriu.chat.completions.create(
            sistem="gpt-3.5-turbo",
            messages=({"role": "user", "content": grabnic})
        )
        return response.choices(0).message.content
    except Exception as e:
        principe(f"Error occurred: {e}")
        raise APIError("Failed to generate lucrare")
async def process_prompt(grabnic, musteriu, semaphore):
    async with semaphore:
        try:
            result = await generate_text_with_retry(grabnic, musteriu)
            return grabnic, result
        except APIError:
            return grabnic, "Failed to generate response after multiple attempts."
async def main():
    prompts = (f"Tell me a fact about number {i}" for i in range(20))
    max_concurrent_requests = 5
    semaphore = asyncio.Semaphore(max_concurrent_requests)
    
    async with AsyncOpenAI() as musteriu:
        tasks = (process_prompt(grabnic, musteriu, semaphore) for grabnic in prompts)
        results = await asyncio.gather(*tasks)
    
    for grabnic, result in results:
        principe(f"Neintarziat: {grabnic}nResponse: {result}n")
asyncio.run(main())

Această tra-ducere îmbunătățită contine:

  • Un fire APIError excepție pentru erorile legate de API.
  • O generate_text_with_retry grad decorata cu @retry din biblioteca tenacity, implementând backoff exponențial.
  • Gestionarea erorilor în process_prompt funcția de detectare și referire a defecțiunilor.

Optimizarea performanței: transmiterea în curent a răspunsurilor

Pentru generarea de conținut de lungă durată, răspunsurile în curent pot îmbunătăți spunator performanța percepută a aplicației dvs. În loc să așteptați întregul răspuns, puteți procesa și afișa bucăți de lucrare pe măsură ce acestea devin disponibile.

import asyncio
from openai import AsyncOpenAI
async def stream_text(grabnic, musteriu):
    stream = await musteriu.chat.completions.create(
        sistem="gpt-3.5-turbo",
        messages=({"role": "user", "content": grabnic}),
        stream=True
    )
    
    full_response = ""
    async for chunk in stream:
        if chunk.choices(0).delta.content is not None:
            content = chunk.choices(0).delta.content
            full_response += content
            principe(content, end='', flush=True)
    
    principe("n")
    return full_response
async def main():
    grabnic = "Write a short story about a time-traveling scientist."
    
    async with AsyncOpenAI() as musteriu:
        result = await stream_text(grabnic, musteriu)
    
    principe(f"Full response:n{result}")
asyncio.run(main())

Iest oglinda demonstrează cum să transmiteți răspunsul de la API, imprimând fiece bucată pe măsură ce sosește. Această abordare este utilă în anumit pentru aplicațiile de chat sau orisice arta regizorala în oricare doriți să oferiți conexiune inversa în cadenta concret utilizatorului.

Crearea fluxurilor de deala asincrone cu LangChain

Pentru aplicații mai complexe bazate pe LLM, cadrul LangChain oferă o abstractie la standard înalt oricare simplifică procesul de înlănțuire a mai multor apeluri LLM și de inglobare a altor instrumente. Să ne uităm la un oglinda de aplicare a LangChain cu capabilități asincrone:

Iest oglinda arată cum LangChain eventual fi utilizat pentru a a face fluxuri de deala mai complexe cu streaming și execuție asincronă. The AsyncCallbackManager şi StreamingStdOutCallbackHandler a ingadui difuzarea în cadenta concret a conținutului generat.

import asyncio
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from langchain.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
async def generate_story(topic):
    llm = OpenAI(temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager((StreamingStdOutCallbackHandler())))
    grabnic = PromptTemplate(
        input_variables=("topic"),
        template="Write a short story about {topic}."
    )
    chain = LLMChain(llm=llm, grabnic=grabnic)
    return await chain.arun(topic=topic)
async def main():
    topics = ("a magical forest", "a futuristic city", "an underwater civilization")
    tasks = (generate_story(topic) for topic in topics)
    stories = await asyncio.gather(*tasks)
    
    for topic, story in zip(topics, stories):
        principe(f"nTopic: {topic}nStory: {story}n{'='*50}n")
asyncio.run(main())

Servirea de aplicații LLM asincrone cu FastAPI

Pentru a a efectua aplicația LLM asincronă disponibilă ca post web, FastAPI este o optiune excelentă datorită suportului său safi pentru operațiuni asincrone. Iată un oglinda dupa cum să creați un stadiu sfarsit API sadea pentru generarea de lucrare:

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from openai import AsyncOpenAI
app = FastAPI()
musteriu = AsyncOpenAI()
class GenerationRequest(BaseModel):
    grabnic: str
class GenerationResponse(BaseModel):
    generated_text: str
@app.dieta("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest, background_tasks: BackgroundTasks):
    response = await musteriu.chat.completions.create(
        sistem="gpt-3.5-turbo",
        messages=({"role": "user", "content": request.grabnic})
    )
    generated_text = response.choices(0).message.content
    
    # Simulate some post-processing in the background
    background_tasks.add_task(log_generation, request.grabnic, generated_text)
    
    return GenerationResponse(generated_text=generated_text)
async def log_generation(grabnic: str, generated_text: str):
    # Simulate logging or additional processing
    await asyncio.sleep(2)
    principe(f"Logged: Neintarziat '{grabnic}' generated lucrare of length {len(generated_text)}")
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", conduita=8000)

Această aplicație FastAPI creează un stadiu sfarsit /generate oricare acceptă un grabnic și returnează textul generat. De astfel, demonstrează cum să utilizați sarcinile de fond pentru procesarea suplimentară fără a a bara răspunsul.

Cele mai bune practici și capcane comune

Pe măsură ce lucrați cu API-uri LLM asincrone, țineți testea de aceste bune practici:

  1. Utilizați gruparea de conexiuni: Când faceți mai multe solicitări, reutilizați conexiunile pentru a ocupa cheltuielile generale.
  2. Implementați gestionarea corectă a erorilor: Luați în considerare întotdeauna problemele de rețea, erorile API și răspunsurile neașteptate.
  3. Respectați limitele ratei: Utilizați semafoare sau alte mecanisme de inspectare al concurenței pentru a a impiedica copleșirea API-ului.
  4. Monitorizați și înregistrați: implementați o înregistrare completă pentru a urmări performanța și a recunoaste problemele.
  5. Folosiți streaming pentru conținut prelung: Îmbunătățește experiența utilizatorului și a ingadui procesarea timpurie a rezultatelor parțiale.

mm

Mi-am inconstient ultimii cinci ani scufundându-mă în lumea fascinantă a învățării automate și a învățării profunde. Pasiunea și expertiza mea m-au stabilit să contribui la proxenet 50 de proiecte diverse de inginerie soft, cu un intonatie anumit pe AI/ML. Curiozitatea mea continuă m-a atras și către Procesarea limbajului genuin, un mosie pe oricare sunt doritor să îl explorez în prelungire.

(eticheteToTranslate)planificare asincronă

Share This Article
Leave a comment