09. Generator yang ditentukan pengguna (custom generator)

Generator yang ditentukan pengguna (custom generator)

Fungsi generator (misalnya, dengan menggunakan kata kunci yield dan berfungsi seperti iterable) bisa digunakan di dalam pipelining LCEL.

TSignture dari generator ini harus berupa Iterator[Input] -> Iterator[Output]. Untuk generatore yang asinkron, maka signaturenya menjadi AsyncIterator[Input] -> AsyncIterator[Output].

Hal ini sangat efektif dalam beberapa kegunaan seperti:

  • Implementasi parser output khusus oleh pengguna

  • Mengubah output dari tahap sebelumnya sedangkan menjaga fitur streaming

Contoh di bawah ini, kita akan membuat contoh implementasi parser output khusus untuk daftar yang terdiri dari elemen berisikan oleh koma (comma-separated list).

%pip install -qU langchain langchain-openai
from typing import Iterator, List

from langchain.prompts.chat import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

prompt = ChatPromptTemplate.from_template(
    # Tuliskan daftir 5 perusahaan yang mirip dengan {company} dengan menggunakan koma (comma) sebagai pemisah antar elemen
    "Tuliskan daftir 5 perusahaan yang mirip dengan {company} dengan menggunakan koma (comma) sebagai pemisah antar eleman"
)
# Set temperatur model menjadi 0.0 dan gunakan GPT-4 Turbo Preview model untuk memulai ChatOpenAI
model = ChatOpenAI(temperature=0.0, model="gpt-4-turbo-preview")

# Hubungi prompt dengan model dan tambahkan parser output berupa string pada tahap selanjutnya untuk membuat pipelining (chain)
str_chain = prompt | model | StrOutputParser()

Jalankan chain ini sebagai stream dan tampilkan hasilnya. Hasil akan dibuat secara real-time.

# Melakukan streaming data.
for chunk in str_chain.stream({"company": "Google"}):
    # Mencetak setiap chunk dan langsung flush buffer tanpa baris baru.
    print(chunk, end="", flush=True)
Microsoft, Amazon, Facebook, Apple, Alibaba

Jalankan fungsi invoke() untuk memeriksa hasilnya.

# Jalankan data melalui chain ini.
str_chain.invoke({"company": "Google"})
Microsoft, Amazon, Facebook, Apple, Alibaba

Fungsi split_into_list() menerima iterator dari token LLM sebagai input dan menghasilkan iterator yang berisi daftar teks yang dipisahkan oleh koma (comma).

  • Fungsi ini juga memberikan generator pada bagian terakhir dengan mem-yield hasilnya.

# Parser kustom yang menerima iterator token llm sebagai input dan membagi menjadi daftar string yang dipisahkan oleh koma.
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
    # Menyimpan input sebagian sampai koma ditemukan.
    buffer = ""
    for chunk in input:
        # Menambahkan chunk saat ini ke buffer.
        buffer += chunk
        # Ulangi selama ada koma dalam buffer.
        while "," in buffer:
            # Membagi buffer berdasarkan koma.
            comma_index = buffer.index(",")
            # Mengembalikan semua konten sebelum koma.
            yield [buffer[:comma_index].strip()]
            # Sisanya disimpan untuk iterasi berikutnya.
            buffer = buffer[comma_index + 1 :]
    # Mengembalikan chunk terakhir.
    yield [buffer.strip()]
  • Gunakan string dalam variabel str_chain sebagai argumen untuk fungsi split_into_list() dengan menggunakan operator piping (|).

  • Fungsi split_into_list() memiliki tugas mengubah sebuah teks menjadi daftar.

  • Daftar yang dipisahkan diberikan ke variabel list_chain.

list_chain = str_chain | split_into_list  # Memisahkan string chain menjadi sebuah daftar.
  • Panggil metode stream pada objek list_chain dengan menerima input data {"animal": "bear"}.

  • Metode ini akan menghasilkan output dalam blok-blok (chunk) yang harus diproses satu persatu.

  • Setiap chunk akan ditampilkan secara istirahat dengan menggunakan fungsi print, dan pengaturan flush=True memastikan bahwa output tidak disimpan dalam buffer sebelum dioutputkan.

Kode ini mendemonstrasikan proses penggunaan list_chain untuk menghasilkan output untuk data input, memproses output dalam potongan-potongan dan mengeluarkannya dengan segera.

# Memastikan bahwa list_chain yang dibuat dapat melakukan streaming tanpa masalah.
for chunk in list_chain.stream({"company": "Google"}):
    print(chunk, flush=True)  # Mencetak setiap chunk dan segera flush buffer.
['Microsoft']
['Amazon']
['Facebook']
['Apple']
['Alibaba']

Kali ini, kita akan menginjeksikan data ke dalam list_chain dengan invoke(). Mari kita pastikan ini bekerja.

# Memanggil data ke list_chain.
list_chain.invoke({"company": "Google"})['Microsoft', 'Amazon', 'Facebook', 'Apple', 'Alibaba']

Tidak Sinkron (Asynchronous)

Fungsi asplit_into_list menerima AsyncIterator[str] sebagai input dan mengembalikan AsyncIterator[List[str]].

from typing import AsyncIterator


# Mendefinisikan fungsi asinkron
async def asplit_into_list(input: AsyncIterator[str]) -> AsyncIterator[List[str]]:
    buffer = ""
    # Karena `input` adalah objek `async_generator`, gunakan `async for`
    async for chunk in input:
        buffer += chunk
        while "," in buffer:
            comma_index = buffer.index(",")
            yield [
                buffer[:comma_index].strip()
            ]  # Membagi berdasarkan koma dan mengembalikannya sebagai daftar
            buffer = buffer[comma_index + 1:]
    yield [buffer.strip()]  # Mengembalikan sisa konten buffer sebagai daftar


# Menghubungkan `alist_chain` dan `asplit_into_list` ke dalam pipeline
alist_chain = str_chain | asplit_into_list

Streaming menggunakan fungsi asinkron.

# Menggunakan async for loop untuk melakukan streaming data.
async for chunk in alist_chain.astream({"company": "Google"}):
    # Mencetak setiap chunk dan membersihkan buffer.
    print(chunk, flush=True)
['Microsoft', 'Amazon', 'Facebook', 'Apple', 'Alibaba']

Meng-invoke data ke dalam chain asinkron untuk memastikan bahwa itu berfungsi tanpa masalah.

# Memanggil rantai daftar secara asinkron.
await alist_chain.ainvoke({"company": "Google"})
['Microsoft']
['Amazon']
['Facebook']
['Apple']
['Alibaba']

Last updated