/*
[package]
name = "client"
version = "0.1.0"
edition = "2018"
[profile.release]
lto = true
# See more keys and their definitions at <https://doc.rust-lang.org/cargo/reference/manifest.html>
[dependencies]
tokio = { version = "*", features = ["full"] }
lazy_static = "*"
rand = "*"
*/
#[macro_use]
extern crate lazy_static;
#[derive(Eq, PartialEq, Clone, Debug)]
enum State {
init,
wait_for_connect,
run,
end
}
lazy_static! {
static ref CONNECT_ADDR: String = "127.0.0.1:9800".to_string();
}
#[tokio::main]
async fn main() {
let (notifier_tx, notifier_rx) = tokio::sync::watch::channel(State::init);
for _ in 0..15000 {
tokio::spawn(process(notifier_rx.clone()));
}
notifier_tx.send(State::wait_for_connect);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
notifier_tx.send(State::run);
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
notifier_tx.send(State::end);
}
async fn process(mut rx: tokio::sync::watch::Receiver<State>) {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(80));
let mut last_state = State::init;
let mut sock = tokio::net::TcpStream::connect(&*CONNECT_ADDR).await.unwrap();
sock.set_nodelay(true); // 패킷이 합쳐지는것을 막기위해 no_delay flag 설정
loop {
tokio::select! {
_ = rx.changed() => {
match *rx.borrow() {
State::run => last_state = State::run,
State::end => break,
_ => ()
}
}
_ = interval.tick(), if last_state == State::run => {
use tokio::io::*;
let random_len = rand::random::<usize>() % 1409; // MTU에 맞추려고.. 1400에 가장 가까운 소수 선택
let random_bytes: Vec<u8> = (0..random_len).map(|_| { rand::random::<u8>() }).collect();
sock.write_u64(random_len as u64).await;
if let Err(e) = sock.write(&random_bytes).await {
println!("write err {:?}", e);
if e.raw_os_error().unwrap_or(0) == 32 { return }
}
}
}
}
}
/*
[package]
name = "server"
version = "0.1.0"
edition = "2018"
[profile.release]
lto = true
# See more keys and their definitions at <https://doc.rust-lang.org/cargo/reference/manifest.html>
[dependencies]
tokio = { version = "*", features = ["full"] }
bytes = "*"
*/
#[derive(Clone, Eq, PartialEq, Debug)]
enum Message{
broadcast(usize, bytes::Bytes)
}
#[derive(Clone)]
struct Shared {
broadcast: tokio::sync::broadcast::Sender<Message>,
message_count: std::sync::Arc<std::sync::atomic::AtomicUsize>
}
#[tokio::main]
async fn main() {
let (broadcast, _) = tokio::sync::broadcast::channel(65536);
let shared = Shared {
broadcast,
message_count: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0))
};
tokio::spawn(listen(shared.clone()));
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
let mut report_no = 0usize;
loop {
interval.tick().await;
println!("{} : {} message recved", report_no, shared.message_count.load(std::sync::atomic::Ordering::Relaxed));
report_no += 1;
}
}
async fn listen(shared: Shared) {
let listener = tokio::net::TcpListener::bind("127.0.0.1:9800").await.unwrap();
let mut conn_no = 0usize;
let mut interval = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
tokio::select! {
acpt = listener.accept() => {
if let Ok(sock) = acpt {
tokio::spawn(process(conn_no, shared.clone(), sock.1, sock.0));
conn_no += 1;
}
}
_ = interval.tick() => {
println!("accept len = {}", conn_no);
}
}
}
}
async fn process(conn_no: usize, shared: Shared, addr: std::net::SocketAddr, mut sock: tokio::net::TcpStream) {
use tokio::io::*;
use bytes::*;
let mut rx = shared.broadcast.subscribe();
let tx = shared.broadcast;
let mut read_buf = BytesMut::with_capacity(4096);
let mut write_buf = BytesMut::with_capacity(4096);
let mut box_size = 0usize;
write_buf.put_slice(&format!("{}", addr).into_bytes());
sock.write_u64(write_buf.len() as u64);
sock.write_buf(&mut write_buf);
loop {
tokio::select! {
Ok(msg) = rx.recv() => {
match msg {
Message::broadcast(from, data) => {
if from == conn_no { continue; }
write_buf.put_u64(data.len() as u64);
write_buf.extend_from_slice(&data);
sock.write_buf(&mut write_buf).await;
}
_ => ()
}
}
res = sock.read_buf(&mut read_buf) => {
if res.is_err() || res.unwrap() == 0 { println!("return 0"); return; }
'inner: loop {
if read_buf.len() == 0 { break 'inner; }
if box_size == 0 {
if read_buf.len() < 8 { break 'inner; }
box_size = read_buf.get_u64() as usize;
}
if read_buf.len() < box_size { break 'inner }
let msg = read_buf.split_to(box_size).freeze();
box_size = 0;
tx.send(Message::broadcast(conn_no, msg));
shared.message_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
else => {}
}
}
}
/etc/sysctl.conf
fs.file-max = 100000
/etc/security/limits.conf
* soft nproc 65535
* hard nproc 65535
* soft nofile 65535
* hard nofile 65535
reboot now
vultr sent 372k
iwinv sent 334.9k
iwinv recv 328.2k
vultr recv 283.3k
전부 패킷단위
mio가 edge-trigger였다... edge-trigger인지 모르고 level-trigger 방식으로 코딩을 한게 문제였음. 내부 read/write buf 크기와 시스템의 버퍼크기를 맞춰줘야함. 문제는 리눅스의 기본 버퍼크기를 워커에서 그대로 쓰면 1.5만명일때도 메모리 부족으로 터져버림... 우선 서버의 메모리와 rmem, wmem값을 보고 생각하는게 좋음.
sysctl net.ipv4.tcp_rmem -> default 크기는 87380 => 이러면 커널에도 87k, 프로그램에서도 87k가 잡힘. 짜피 텍스트 통신이니까 32768 설정 (write 위주니까)
sysctl net.ipv4.tcp_wmem -> default 크기는 16380 => 오히려 이걸 32k로 바꿔야 할 것 같음.
sysctl -w net.ipv4.tcp_rmem="4096 16384 4194304"
sysctl -w net.ipv4.tcp_wmem="4096 32768 4194304"
아예 대형서버면 budget을 많이 줘서 context-switching 이 적어지게 하자
sudo sysctl -w net.core.netdev_budget=600
sudo sysctl -w net.core.dev_weight=300
53345844 message recved, 1011464 message sent
53398916 message recved, 1593552 message sent
53304339 message recved, 1643044 message sent (1 core only)
53351350 message recved, 1769667 message sent (1 core only)
53176907 message recved, 6270346 message sent (76k)
52531322 message recved, 1766348 message sent (42k)
52710988 message recved, 1419083 message sent (39k)
51394650 message recved, 1905916 message sent (1 core. 97k)
53346451 message recved, 1495038 message sent (1 core, 41k)
만명 이상이면 한 코어에 아예 irq를 맞길것. (3코어 이상의 CPU로 구성할것) irq 처리에만 코어 하나를 다 쓰고있음
15950993 message recved, 1500813 message sent (44k)
이전에는 Rust + broadcast 시스템을 이용해서 패킷을 대량으로 생성함. 설정을 통해 단일코어에서만 Rust 서버 프로그램이 돌도록 했으나 이후 생각해 보기에는 2번 (soft-irq 전용 코어)에도 영향을 끼쳤을것으로 생각됨. 또한 1번 코어가 65%의 사용률 이상으로 작동하지 못한점으로 보아 Rust 프로그램의 한계가 있었을 것으로 추점됨
아예 C++로 Epoll 처리 프로그램을 만들어서 각각 단일 코어에 집어넣고 극한의 패킷 생성을 이루어 보고자 함.