05. LCEL Interface

Untuk memudahkan pembuatan custom chain, protokol Runnable telah diimplementasikan.

Protokol Runnable diimplementasikan pada sebagian besar komponen.

Ini adalah antarmuka standar yang memudahkan pembuatan custom chain dan memanggilnya dengan cara standar. Antarmuka standar ini mencakup hal-hal berikut:

  • stream: Melakukan streaming chunk dari respons.

  • invoke: Memanggil chain untuk input tertentu.

  • batch : Memanggil chain untuk daftar input.

Terdapat juga metode asinkron:

  • astream : Melakukan streaming chunk dari respons secara asinkron.

  • ainvoke : Memanggil chain untuk input secara asinkron.

  • abatch : Memanggil chain untuk daftar input secara asinkron.

  • astream_log : Melakukan streaming tidak hanya respons akhir tetapi juga langkah-langkah perantara yang terjadi.

# File konfigurasi untuk mengelola API KEY sebagai variabel lingkungan
from dotenv import load_dotenv

# Memuat informasi API KEY
load_dotenv()
# Mengatur LangSmith tracing. https://smith.langchain.com
from langchain_altero import logging

# Masukkan nama project.
logging.langsmith("CH01-Basic")
Starting a LangSmith trace.
[project name].
CH01-Basic

Membuat chain menggunakan sintaks LCEL.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Menginstansiasi model ChatOpenAI.
model = ChatOpenAI()
# Membuat template prompt yang meminta penjelasan 3 kalimat tentang topik yang diberikan.
prompt = PromptTemplate.from_template("Jelaskan dalam 3 kalimat tentang {topic}.")
# Menghubungkan prompt dan model untuk membuat chain percakapan.
chain = prompt | model | StrOutputParser()

Stream: Keluaran secara real-time

Fungsi ini menggunakan metode chain.stream untuk membuat aliran data tentang topik yang diberikan, dan mengulangi aliran ini untuk mencetak konten setiap data secara langsung. Parameter end="" diatur agar tidak melakukan pemisah baris setelah pencetakan, sedangkan parameter flush=True mengatur agar buffer output langsung dikosongkan.

# Menggunakan metode chain.stream untuk membuat dan mengulang aliran data tentang topik 'multimodal'.
for token in chain.stream({"topic": "multimodal"}):
    # Mencetak konten data yang diterima dari aliran. Mencetak tanpa pemisah baris dan langsung mengosongkan buffer.
    print(token, end="", flush=True)

Invoke

Metode invoke dari objek chain menerima topic sebagai argumen dan melakukan pemrosesan terkait topic tersebut.

# Panggil metode invoke dari objek chain, dengan menggunakan dictionary yang berisi topik 'ChatGPT'.
chain.invoke({"topic": "ChatGPT"})

Batch (eksekusi unit)

Fungsi chain.batch menerima daftar yang berisi beberapa dictionary sebagai argumen, dan melakukan pemrosesan batch dengan menggunakan nilai kunci topic yang ada di setiap dictionary.

# Memanggil fungsi batch untuk memproses daftar topic yang diberikan
chain.batch([{"topic": "ChatGPT"}, {"topic": "Instagram"}])

Anda dapat mengatur jumlah permintaan bersamaan menggunakan parameter max_concurrency.

Dictionary config mengatur jumlah maksimum pekerjaan yang dapat diproses secara bersamaan melalui kunci max_concurrency. Di sini, diatur untuk memproses hingga 3 pekerjaan secara bersamaan.

chain.batch(
    [
        {"topic": "ChatGPT"},
        {"topic": "Instagram"},
        {"topic": "multimodal"},
        {"topic": "pemrograman"},
        {"topic": "machine learning"},
    ],
    config={"max_concurrency": 3},
)

async stream: Aliran asinkron

Fungsi chain.astream menghasilkan aliran asinkron dan memproses pesan tentang topik yang diberikan secara asinkron.

Menggunakan loop async for untuk menerima pesan secara berurutan dari aliran, dan mencetak konten pesan (s.content) secara langsung menggunakan fungsi print. Parameter end="" diatur agar tidak menambahkan pemisah baris setelah pencetakan, dan flush=True mengosongkan buffer output secara paksa untuk memastikan output langsung

import asyncio

# Membuat fungsi ini melakukan pemrosesan asinkron
async def process_astream(chain):
    # Memproses pesan tentang topik 'YouTube' menggunakan aliran asinkron.
    async for token in chain.astream({"topic": "YouTube"}):
        # Mencetak konten pesan. Mencetak tanpa pemisah baris dan langsung mengosongkan buffer.
        print(token, end="", flush=True)

# Memanggil fungsi
asyncio.run(process_astream(chain))

YouTube adalah platform berbagi video yang memungkinkan pengguna untuk mengunggah, menonton, dan berinteraksi dengan berbagai jenis konten video. YouTube memiliki jutaan video yang mencakup berbagai topik mulai dari hiburan, pendidikan, musik, hingga tutorial. Para pengguna dapat berlangganan saluran favorit mereka, memberi komentar, dan berbagi video dengan mudah melalui platform ini.

async invoke: Pemanggilan asinkron

Metode ainvoke dari objek chain melakukan pekerjaan secara asinkron menggunakan argumen yang diberikan. Dalam hal ini, dictionary dengan kunci topic dan nilai NVDA (ticker NVIDIA) diberikan sebagai argumen. Metode ini dapat digunakan untuk meminta pemrosesan secara asinkron untuk topik tertentu.

async def process_topic(chain):
    # Memanggil metode 'ainvoke' dari objek chain asinkron untuk memproses topik 'NVDA'.
    my_process = chain.ainvoke({"topic": "NVDA"})
    
    # Menunggu hingga `my_process` yang diproses secara asinkron selesai.
    return print(await my_process)

asyncio.run(process_topic(chain))

async batch: Kumpulkan asinkron

Fungsi abatch melakukan pemrosesan batch secara asinkron.

Dalam contoh ini, metode abatch dari objek chain digunakan untuk memproses pekerjaan tentang topik secara asinkron.

Kata kunci await digunakan untuk menunggu hingga pekerjaan asinkron tersebut selesai.

async def process_my_abatch(chain):
    # Melakukan pemrosesan batch secara asinkron untuk topik yang diberikan.
    my_abatch_process = chain.abatch(
    [{"topic": "YouTube"}, {"topic": "Instagram"}, {"topic": "Facebook"}]
    )
    
    # Menunggu hingga proses batch asinkron selesai.
    return print(await my_abatch_process)

asyncio.run(process_my_abatch(chain))

Paralell

Mari kita lihat bagaimana LangChain Expression Language mendukung permintaan paralel. Misalnya, saat menggunakan RunnableParallel (sering kali ditulis dalam bentuk dictionary form), setiap elemen dijalankan secara paralel.

Berikut adalah contoh menggunakan kelas RunnableParallel dari modul langchain_core.runnables untuk menjalankan dua tugas secara paralel.

Kita membuat dua chain (chain1, chain2) untuk mendapatkan ibu kota dan luas dari negara yang diberikan menggunakan metode ChatPromptTemplate.from_template.

Chain-chain ini dihubungkan dengan model dan operator pipa (|). Akhirnya, kita menggunakan kelas RunnableParallel untuk menggabungkan kedua chain ini dengan kunci capital dan area, menghasilkan objek combined yang dapat dijalankan secara bersamaan.

from langchain_core.runnables import RunnableParallel

# Membuat chain untuk menanyakan ibu kota dari {country}.
chain1 = (
    PromptTemplate.from_template("{country} ibu kota adalah apa?")
    | model
    | StrOutputParser()
)

# Membuat chain untuk menanyakan luas dari {country}.
chain2 = (
    PromptTemplate.from_template("{country} luasnya berapa?")
    | model
    | StrOutputParser()
)

# Membuat chain eksekusi paralel untuk menjalankan kedua chain di atas secara bersamaan.
combined = RunnableParallel(capital=chain1, area=chain2)

Metode chain1.invoke() memanggil metode invoke dari objek chain1.

Pada saat itu, dictionary dengan kunci country dan nilai Korea Selatan diberikan sebagai argumen.

# Menjalankan chain1.
chain1.invoke({"country": "Korea Selatan"})

Kali ini, panggil chain2.invoke(), dengan menyampaikan negara lain, yaitu Amerika Serikat, sebagai nilai untuk kunci country.

# Menjalankan chain2.
chain2.invoke({"country": "Amerika Serikat"})

Metode invoke dari objek combined melakukan pemrosesan untuk negara yang diberikan.

Dalam contoh ini, topik Korea Selatan disampaikan ke metode invoke untuk dijalankan.

# Menjalankan chain eksekusi paralel.
combined.invoke({"country": "Korea Selatan"})

Pemrosesan Paralel dalam Batch

Fungsi chain2.batch menerima daftar yang berisi beberapa dictionary dan melakukan pemrosesan batch.

Dalam contoh ini, pemrosesan diminta untuk dua negara: Korea Selatan dan Amerika Serikat.

# Melakukan pemrosesan batch.
chain2.batch([{"country": "Korea Selatan"}, {"country": "Amerika Serikat"}])

Fungsi combined.batch digunakan untuk memproses data dalam batch. Dalam contoh ini, fungsi ini menerima daftar yang berisi dua objek dictionary dan memproses data untuk kedua negara, yaitu Korea Selatan dan Amerika Serikat, dalam satu batch.

# Memproses data yang diberikan dalam batch.
combined.batch([{"country": "Korea Selatan"}, {"country": "Amerika Serikat"}])

Last updated