잘 알려진게... kafaka, radis, rabbiMQ, zeroMQ임.
사실상 새로운 네트워크 레이어 라이브러리.
각 node끼리는 direct connection이 생김. 애초에 브로커가 없기 때문에 직접 TCP로 연결때려야 함.
Reilable하지는 않음. 어디선가 drop이 생길수도 있긴함. ack하는 과정이 없기 때문임. (대신에 더 빠름)
Message Broker를 따로 설치할 필요가 없음. SQLite 느낌으로 바인딩만 받으면 됨.
메세지 브로커가 없기 때문에 동기화 문제가 생길수 있음. 다른 MQ에서는 Queing하는 버퍼가 있지만 여기는 아님.
비동기적으로 작동하기 때문에 예상밖의 결과를 내뿜을수 있음.
이 경우에 Sub에서는 아무것도 안보일 수도 있음. 비동기이기 때문에 실제로는 연결이 안되었고 그 사이에 1000개의 메세지가 전송되어서 임.
그렇기 때문에 "시작"과 "끝"이 없는 데이터에서 사용해야함. 동시에 시작해서 헤더등을 받아야 작동 되는 프로그램이라면... 사용이 불가함. (Request-Reply 패턴으로 옮겨가야함)
주의할건 일단 연결이 되면 수신되는 메세지가 큐에 버퍼링 될 것이나 (이 마저도 최대한계가 있음), 연결이 안되면 수신도 안된다는 것. 그에 더 나아가서 Sub가 아무도 없으면 모든 메세지는 다 drop됨.
프로세스마다 자신의 작업공간이 필요함. 프로그램의 시작과 끝에 zmq_ctx_new()
와 zmq_ctx_destory()
를 호출해야 하며, fork시에도 zmq_ctx_new, destory
를 각각 실행해 줘야함.
zmq는 작업용 쓰레드를 따로 가짐. (한 쓰레드에서 여러개의 socket을 관리) zmq_ctx_new
는 작업용 쓰래드를 만드는데 쓰이는듯. fork를 하면... 메모리 공간은 비슷하게 가져간다고 해도 쓰레드는 없을테니.... 얼추 맞는듯
멀티쓰레드 상황에서 각 쓰레드마다 소켓을 만들어주는게 좋음. 지금 상황에서는 socket 단위의 thread-safe는 확보되지 않은듯 함. ctx는 프로그램 전역으로, 소켓은 각 쓰레드 단위로. 위에서 언급 한 것 처럼 zmq_ctx_destory()
는 각 socket이 모두 안전하게 종료될 때까지 대기하니까 주의해서 처리할 것. → mutex나 lock을 쓰면 되긴하는데... 그럴바에 그냥 소켓을 새로 만들어라는 뜻
쓰래드별로 소켓을 만들라는것을 보아하니, 첫 설계때 멀티 쓰래드용으로는 main thread에서 zmq_ctx_destory
를 join같이 써라고 만든듯.
ctx → socket → message 구조로 되어있음. socket이 제대로 close 처리되지 않으면 zmq_ctx_destory가 무한히 대기하는 상황이 생김. message는 zmq_msg_close()
를 이용해서 항상 닫아주는 작업을 해야함. socket을 많이 생성하고 닫고 해야한다면 안티 패턴을 사용하는것. socket의 생성은 거의 안하는 수준으로 프로그램 구조를 만들어야 함.
static int s_interrupted = 0;
static void s_signal_handler (int signal_value)
{
s_interrupted = 1;
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset (&action.sa_mask);
sigaction (SIGINT, &action, NULL);
sigaction (SIGTERM, &action, NULL);
}
// ...
while(true) {
if (s_interrupted) break;
}
// 적절한 처리