09. Generator yang ditentukan pengguna (custom generator)
Generator yang ditentukan pengguna (custom generator)
Fungsi generator (misalnya, dengan menggunakan kata kunci yield dan berfungsi seperti iterable) bisa digunakan di dalam pipelining LCEL.
TSignture dari generator ini harus berupa Iterator[Input] -> Iterator[Output]. Untuk generatore yang asinkron, maka signaturenya menjadi AsyncIterator[Input] -> AsyncIterator[Output].
Hal ini sangat efektif dalam beberapa kegunaan seperti:
Implementasi parser output khusus oleh pengguna
Mengubah output dari tahap sebelumnya sedangkan menjaga fitur streaming
Contoh di bawah ini, kita akan membuat contoh implementasi parser output khusus untuk daftar yang terdiri dari elemen berisikan oleh koma (comma-separated list).
%pip install -qU langchain langchain-openai
from typing import Iterator, Listfrom langchain.prompts.chat import ChatPromptTemplatefrom langchain_core.output_parsers import StrOutputParserfrom langchain_openai import ChatOpenAIprompt = ChatPromptTemplate.from_template( # Tuliskan daftir 5 perusahaan yang mirip dengan {company} dengan menggunakan koma (comma) sebagai pemisah antar elemen
"Tuliskan daftir 5 perusahaan yang mirip dengan {company} dengan menggunakan koma (comma) sebagai pemisah antar eleman"
)# Set temperatur model menjadi 0.0 dan gunakan GPT-4 Turbo Preview model untuk memulai ChatOpenAImodel =ChatOpenAI(temperature=0.0, model="gpt-4-turbo-preview")# Hubungi prompt dengan model dan tambahkan parser output berupa string pada tahap selanjutnya untuk membuat pipelining (chain)
str_chain = prompt | model |StrOutputParser()
Jalankan chain ini sebagai stream dan tampilkan hasilnya. Hasil akan dibuat secara real-time.
# Melakukan streaming data.for chunk in str_chain.stream({"company": "Google"}):# Mencetak setiap chunk dan langsung flush buffer tanpa baris baru.print(chunk, end="", flush=True)
Microsoft, Amazon, Facebook, Apple, Alibaba
Jalankan fungsi invoke() untuk memeriksa hasilnya.
# Jalankan data melalui chain ini.str_chain.invoke({"company": "Google"})
Microsoft, Amazon, Facebook, Apple, Alibaba
Fungsi split_into_list() menerima iterator dari token LLM sebagai input dan menghasilkan iterator yang berisi daftar teks yang dipisahkan oleh koma (comma).
Fungsi ini juga memberikan generator pada bagian terakhir dengan mem-yield hasilnya.
# Parser kustom yang menerima iterator token llm sebagai input dan membagi menjadi daftar string yang dipisahkan oleh koma.
defsplit_into_list(input: Iterator[str]) -> Iterator[List[str]]:# Menyimpan input sebagian sampai koma ditemukan. buffer =""for chunk ininput:# Menambahkan chunk saat ini ke buffer. buffer += chunk# Ulangi selama ada koma dalam buffer.while","in buffer:# Membagi buffer berdasarkan koma. comma_index = buffer.index(",")# Mengembalikan semua konten sebelum koma.yield [buffer[:comma_index].strip()]# Sisanya disimpan untuk iterasi berikutnya. buffer = buffer[comma_index +1:]# Mengembalikan chunk terakhir.yield [buffer.strip()]
Gunakan string dalam variabel str_chain sebagai argumen untuk fungsi split_into_list() dengan menggunakan operator piping (|).
Fungsi split_into_list() memiliki tugas mengubah sebuah teks menjadi daftar.
Daftar yang dipisahkan diberikan ke variabel list_chain.
list_chain = str_chain | split_into_list # Memisahkan string chain menjadi sebuah daftar.
Panggil metode stream pada objek list_chain dengan menerima input data {"animal": "bear"}.
Metode ini akan menghasilkan output dalam blok-blok (chunk) yang harus diproses satu persatu.
Setiap chunk akan ditampilkan secara istirahat dengan menggunakan fungsi print, dan pengaturan flush=True memastikan bahwa output tidak disimpan dalam buffer sebelum dioutputkan.
Kode ini mendemonstrasikan proses penggunaan list_chain untuk menghasilkan output untuk data input, memproses output dalam potongan-potongan dan mengeluarkannya dengan segera.
# Memastikan bahwa list_chain yang dibuat dapat melakukan streaming tanpa masalah.for chunk in list_chain.stream({"company": "Google"}):print(chunk, flush=True)# Mencetak setiap chunk dan segera flush buffer.
Kali ini, kita akan menginjeksikan data ke dalam list_chain dengan invoke(). Mari kita pastikan ini bekerja.
# Memanggil data ke list_chain.list_chain.invoke({"company": "Google"})['Microsoft','Amazon','Facebook','Apple','Alibaba']
Tidak Sinkron (Asynchronous)
Fungsi asplit_into_list menerima AsyncIterator[str] sebagai input dan mengembalikan AsyncIterator[List[str]].
from typing import AsyncIterator# Mendefinisikan fungsi asinkronasyncdefasplit_into_list(input: AsyncIterator[str]) -> AsyncIterator[List[str]]: buffer =""# Karena `input` adalah objek `async_generator`, gunakan `async for`asyncfor chunk ininput: buffer += chunkwhile","in buffer: comma_index = buffer.index(",")yield [ buffer[:comma_index].strip() ] # Membagi berdasarkan koma dan mengembalikannya sebagai daftar buffer = buffer[comma_index +1:]yield [buffer.strip()] # Mengembalikan sisa konten buffer sebagai daftar# Menghubungkan `alist_chain` dan `asplit_into_list` ke dalam pipelinealist_chain = str_chain | asplit_into_list
Streaming menggunakan fungsi asinkron.
# Menggunakan async for loop untuk melakukan streaming data.asyncfor chunk in alist_chain.astream({"company": "Google"}):# Mencetak setiap chunk dan membersihkan buffer.print(chunk, flush=True)