08 Januari 2021 [Async programming, Programming, Rust]
Saya belajar banyak hari ini tentang cara berpikir tentang konkurensi di Rust. Saya mencoba menggunakan Semaphore untuk membatasi berapa banyak soket terbuka yang diizinkan oleh pendengar TCP saya, dan saya mengalami kesulitan dalam membuatnya berfungsi. Entah itu tidak benar-benar berfungsi, memungkinkan sejumlah klien untuk terhubung, atau kompiler mengatakan kepada saya bahwa saya tidak dapat melakukan apa yang ingin saya lakukan, karena masa pakai Semaphore saya tidak ‘statis. Inilah perjalanan yang saya ambil menuju kode kerja yang menurut saya benar (umpan balik diterima).
Motivasi
Dalam tutorial tokio terdapat bagian pendek berjudul “Tekanan balik dan saluran terbatas” (di bagian bawah halaman Saluran). Ini berisi pernyataan ini:
…berhati-hatilah untuk memastikan jumlah total konkurensi dibatasi. Misalnya, saat menulis loop penerimaan TCP, pastikan jumlah total soket terbuka dibatasi.
Jelas sekali, ketika saya mulai mengerjakan loop penerimaan TCP, saya ingin mengikuti saran ini.
Seperti banyak hal dalam perjalanan saya bersama Rust, perjalanan ini lebih sulit dari yang saya perkirakan, dan akhirnya memberikan pencerahan.
Kode
Berikut adalah program Rust singkat yang mendengarkan port TCP dan menerima koneksi masuk.
Kargo.toml:
[package]
name = "tcp-listener-example"
version = "1.0.0"
edition = "2018"
include = ["src/"]
[dependencies]
tokio = { version = ">=1.0.1", features = ["full"] }
src/main.rs:
use tokio::io::AsyncReadExt;
use tokio::net::TcpListener;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap();
loop {
let (mut tcp_stream, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
let mut buf: [u8; 1024] = [0; 1024];
loop {
let n = tcp_stream.read(&mut buf).await.unwrap();
if n == 0 {
return;
}
print!("{}", String::from_utf8_lossy(&buf[0..n]));
}
});
}
}
Program ini mendengarkan pada port 8080, dan setiap kali klien terhubung, program ini memunculkan tugas asinkron untuk menanganinya.
Jika saya menjalankannya dengan:
cargo run
Itu dimulai, dan saya dapat menghubungkannya dari beberapa proses lain seperti ini:
telnet 0.0.0.0 8080
Apa pun yang saya ketik di jendela terminal telnet akan dicetak di terminal tempat saya menjalankan kargo. Program ini berfungsi: ia mendengarkan pada port TCP 8080 dan mencetak semua pesan yang diterimanya.
Jadi apa masalahnya?
Masalahnya adalah program ini bisa kewalahan: jika banyak proses yang terhubung dengannya, program ini akan menerima semua koneksi, dan pada akhirnya kehabisan soket. Hal ini mungkin menghalangi hal-hal lain untuk berfungsi dengan baik di komputer, atau mungkin membuat program kita crash, atau hal lainnya. Kita memerlukan semacam batasan yang masuk akal, seperti yang disebutkan dalam tutorial tokio.
Jadi bagaimana kita membatasi jumlah orang yang diperbolehkan terhubung pada saat yang bersamaan?
Gunakan saja semaphore, bodoh
Semafor melakukan apa yang kita perlukan di sini – ia menghitung berapa banyak orang yang melakukan sesuatu, dan mencegah jumlah tersebut menjadi terlalu besar. Jadi yang perlu kita lakukan hanyalah membatasi jumlah klien yang kita izinkan untuk terhubung menggunakan semaphore.
Inilah upaya pertama saya:
use tokio::io::AsyncReadExt; use tokio::net::TcpListener; use tokio::sync::Semaphore; #[tokio::main] async fn main() { let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap(); let sem = Semaphore::new(2); loop { let (mut tcp_stream, _) = listener.accept().await.unwrap(); // Don't copy this code: it doesn't work let aq = sem.try_acquire(); if let Ok(_guard) = aq { tokio::spawn(async move { let mut buf: [u8; 1024] = [0; 1024]; loop { let n = tcp_stream.read(&mut buf).await.unwrap(); if n == 0 { return; } print!("{}", String::from_utf8_lossy(&buf[0..n])); } }); } else { println!("Rejecting client: too many open sockets"); } } }
Ini dikompilasi dengan baik, tetapi tidak menghasilkan apa-apa! Meskipun kami memanggil Semaphore::new dengan argumen 2, bermaksud untuk mengizinkan 2 klien saja yang terhubung, nyatanya saya masih bisa terhubung lebih dari itu. Sepertinya perubahan kode kita tidak berpengaruh sama sekali.
Apa yang kami harapkan terjadi adalah setiap kali klien terhubung, kami membuat _guard, yang merupakan SemaphoreGuard, yang menempati salah satu slot di semaphore. Kami mengharapkan penjaga itu tetap hidup sampai klien terputus, dan pada saat itulah slot akan dilepaskan.
Mengapa itu tidak berhasil? Sangat mudah untuk memahami ketika Anda memikirkan tentang apa yang tokio::spawn lakukan. Ini menciptakan tugas dan memintanya untuk dieksekusi di masa depan, tetapi sebenarnya tidak menjalankannya. Jadi tokio::spawn segera kembali, dan _guard dihapus, sebelum kode yang menangani permintaan tersebut dieksekusi. Jadi, tentu saja, perubahan kami tidak membatasi berapa banyak permintaan yang ditangani karena slot semaphore dikosongkan sebelum permintaan diproses.
Tahan saja penjaganya lebih lama, bodoh
Jadi, mari kita pertahankan SemaphoreGuard lebih lama lagi:
use tokio::io::AsyncReadExt; use tokio::net::TcpListener; use tokio::sync::Semaphore; #[tokio::main] async fn main() { let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap(); let sem = Semaphore::new(2); loop { let (mut tcp_stream, _) = listener.accept().await.unwrap(); let aq = sem.try_acquire(); if let Ok(guard) = aq { tokio::spawn(async move { let mut buf: [u8; 1024] = [0; 1024]; loop { let n = tcp_stream.read(&mut buf).await.unwrap(); if n == 0 { drop(guard); return; } print!("{}", String::from_utf8_lossy(&buf[0..n])); } }); } else { println!("Rejecting client: too many open sockets"); } } }
Idenya adalah meneruskan objek SemaphoreGuard ke dalam kode yang benar-benar berhubungan dengan permintaan klien. Cara saya mencobanya adalah dengan merujuk ke penjaga di suatu tempat dalam penutupan gerakan async. Apa yang sebenarnya saya lakukan adalah menyuruhnya untuk berhenti berjaga ketika kita selesai dengan permintaannya, namun sebenarnya setiap penyebutan variabel tersebut di dalam penutupan sudah cukup untuk memberi tahu kompiler bahwa kita ingin memindahkannya, dan hanya membuangnya ketika kita selesai.
Kedengarannya masuk akal, tetapi sebenarnya kode ini tidak dapat dikompilasi. Inilah kesalahan yang saya dapatkan:
error[E0597]: `sem` does not live long enough --> src/main.rs:12:18 | 12 | let aq = sem.try_acquire(); | ^^^-------------- | | | borrowed value does not live long enough | argument requires that `sem` is borrowed for `'static` ... 29 | } | - `sem` dropped here while still borrowed
Apa yang dikatakan kompiler adalah bahwa SemaphoreGuard mengacu pada sem (objek Semaphore), tetapi penjaga tersebut mungkin hidup lebih lama daripada semaphore.
Mengapa? Tentunya sem diadakan dalam lingkup yang mencakup seluruh kode penanganan klien, sehingga harus bertahan cukup lama?
Sebenarnya, penutupan perpindahan asinkron yang kami teruskan ke tokio::spawn sedang ditambahkan ke daftar tugas yang akan dijalankan di masa mendatang, sehingga bisa bertahan lebih lama. Fakta bahwa kita berada di dalam lingkaran tak terbatas membuat saya semakin bingung di sini, namun prinsipnya tetap sama: setiap kali kita membuat penutupan seperti ini dan memasukkan sesuatu ke dalamnya, penutupan tersebut harus memilikinya, atau jika kita meminjamnya, ia harus hidup selamanya (yang merupakan apa yang dimaksud dengan ‘seumur hidup statis).
Kode di atas meneruskan kepemilikan penjaga ke penutupan, tetapi penjaga itu sendiri mengacu pada (meminjam) sem. Inilah sebabnya kompiler mengatakan bahwa “sem dipinjam untuk ‘statis”.
Hal-hal salah yang saya coba
Karena saya tidak mengerti apa yang saya lakukan, saya mencoba berbagai hal lain seperti membuat sem sebuah Arc, membuat guard sebuah Arc, membuat guard di dalam penutupan, dan bahkan mencoba membuat sem benar-benar memiliki ‘penyimpanan statis dengan menjadikannya sebuah konstanta. (Yang terakhir tidak berhasil karena hanya tipe yang sangat sederhana seperti angka dan string yang dapat menjadi konstanta.)
Solusi: Bagikan Semaphore dalam Arc
Setelah merasa terlalu banyak meronta-ronta, saya menemukan apa yang menurut saya adalah jawaban yang tepat:
use std::sync::Arc; use tokio::io::AsyncReadExt; use tokio::net::TcpListener; use tokio::sync::Semaphore; #[tokio::main] async fn main() { let listener = TcpListener::bind("0.0.0.0:8080").await.unwrap(); let sem = Arc::new(Semaphore::new(2)); loop { let (mut tcp_stream, _) = listener.accept().await.unwrap(); let sem_clone = Arc::clone(&sem); tokio::spawn(async move { let aq = sem_clone.try_acquire(); if let Ok(_guard) = aq { let mut buf: [u8; 1024] = [0; 1024]; loop { let n = tcp_stream.read(&mut buf).await.unwrap(); if n == 0 { return; } print!("{}", String::from_utf8_lossy(&buf[0..n])); } } else { println!("Rejecting client: too many open sockets"); } }); } }
Kode ini:
- Membuat Semaphore dan menyimpannya di dalam Arc, yang merupakan penunjuk penghitungan referensi yang dapat dibagikan antar tugas. Artinya, ini akan tetap ada selama seseorang memegang referensinya.
- Mengkloning Arc sehingga kita memiliki salinan yang dapat dipindahkan dengan aman ke penutupan pemindahan async. Kita tidak bisa memindahkan sem ke dalam penutupan karena itu akan digunakan lagi pada putaran berikutnya. Kita dapat memindahkan sem_clone ke dalam penutupan karena tidak digunakan di tempat lain. sem dan sem_clone keduanya mengacu pada objek Semaphore yang sama, jadi keduanya menyetujui jumlah klien yang terhubung, namun keduanya merupakan instance Arc yang berbeda, sehingga dapat dipindahkan ke penutupan.
- Hanya memperoleh SemaphoreGuard setelah kita berada di dalam penutupan. Dengan cara ini kita tidak melakukan sesuatu yang sulit seperti meminjam referensi ke sesuatu yang berada di luar penutupan. Sebaliknya, kita meminjam referensi melalui sem_clone, yang dimiliki oleh penutupan tempat kita berada di dalamnya, sehingga kita tahu referensi tersebut akan bertahan cukup lama.
Ini benar-benar berhasil! Setelah dua klien terhubung, pendengar.accept sebenarnya membuka soket ke klien baru mana pun, tetapi karena kami segera kembali dari penutupan, kami hanya membukanya sebentar sebelum menjatuhkannya. Tampaknya ini lebih baik daripada menolak untuk membukanya sama sekali, yang menurut saya mungkin akan membuat klien menunggu, menunggu koneksi yang mungkin tidak akan pernah datang.
Seumur hidup itu keren dan rumit
Sekali lagi, saya telah belajar banyak tentang apa yang sebenarnya dilakukan kode saya dari kompiler Rust. Menurut saya hal ini sangat membingungkan, namun mudah-mudahan dengan menuliskan pemahaman saya di postingan ini, saya telah membantu diri saya saat ini dan di masa depan, dan mungkin bahkan Anda, menjadi lebih jelas tentang cara berbagi semaphore di antara beberapa tugas asinkron.
Sangat menyenangkan dan memberdayakan untuk menulis kode yang saya yakini benar, dan juga berfungsi. Perasaan bahwa “kompiler mendukung saya” kuat, dan saya menyukainya.
News
Berita Teknologi
Berita Olahraga
Sports news
sports
Motivation
football prediction
technology
Berita Technologi
Berita Terkini
Tempat Wisata
News Flash
Football
Gaming
Game News
Gamers
Jasa Artikel
Jasa Backlink
Agen234
Agen234
Agen234
Resep
Download Film
A gaming center is a dedicated space where people come together to play video games, whether on PCs, consoles, or arcade machines. These centers can offer a range of services, from casual gaming sessions to competitive tournaments.
