사내 보안 솔루션에 SIEM/Syslog 연동 기능 구현하기
배경
사내 직원 모니터링/보안 솔루션을 개발하고 있다. 의심 활동, 디바이스 제어, 파일 전송, 스크린샷 등 다양한 보안 이벤트를 생성하는데, 엔터프라이즈 고객들은 이 이벤트를 자사 SIEM(Splunk, QRadar, Microsoft Sentinel 등)으로 수집해 통합 보안 관제를 하고 싶어한다.
Organization 단위로 SIEM 서버 연동 설정을 관리하고, 보안 이벤트를 업계 표준 포맷(CEF/RFC 5424)으로 Syslog 전송하는 기능을 구현했다.
설계 포인트
기존 로직에 영향 없는 구조
가장 중요하게 고려한 건 기존 이벤트 핸들러 로직을 건드리지 않는 것이다. 각 이벤트 핸들러(의심 활동 생성, DLP 로그 기록 등)에 emit() 한 줄만 추가하는 방식으로 구현했다.
from common.siem.dispatch import emit
emit(
organization_id=organization_id,
event_type="suspicious_activity",
event_uuid=str(activity.uuid),
severity=8,
details={"name": activity.name},
)
emit()은 내부적으로 모든 예외를 삼킨다. SIEM 서버가 죽어있어도, Redis가 끊겨도, 기존 비즈니스 로직은 절대 영향받지 않는다.
Non-blocking 전송
Syslog 전송은 소켓 I/O가 발생하므로 API 응답 속도에 영향을 줄 수 있다. 이를 방지하기 위해 기존 프로젝트에서 사용하던 thread pool executor에 태워서 백그라운드로 처리한다.
def emit(organization_id, event_type, event_uuid, ...):
try:
config = _get_cached_config(organization_id)
if not config or not config["is_enabled"]:
return
if not _is_event_enabled(config, event_type):
return
thread_pool_executor.submit(
process_siem_event, ...
)
except Exception:
pass # 절대 전파하지 않음
설정 캐시
매 이벤트마다 DB를 조회하면 부하가 생기므로, SiemConfig를 Redis에 5분 TTL로 캐싱한다. 관리자가 설정을 변경하면 즉시 캐시를 무효화해서 다음 이벤트부터 새 설정이 적용된다.
구현 내용
DB 모델
Organization과 1:1 관계인 SiemConfig 테이블과, 전송 이력을 추적하는 SiemTransmissionLog 테이블을 추가했다.
이벤트 유형별 필터(send_suspicious_activity, send_event 등 7개)는 기본값 OFF로 설정했다. 고객이 필요한 이벤트만 선택적으로 활성화하는 방식이다.
포맷 지원
두 가지 업계 표준 포맷을 지원한다:
- CEF (Common Event Format) — Splunk, ArcSight 등에서 주로 사용. 파이프(|)로 구분된 헤더 + key=value 확장 필드
- RFC 5424 — 표준 Syslog 포맷. PRI, 타임스탬프, Structured Data 포함
# CEF 출력 예시
CEF:0|Vendor|Product|1.0|100|suspicious_activity|7|externalId=uuid-123 duser=account-456
# RFC 5424 출력 예시
<131>1 2026-03-10T14:30:00+00:00 - SecurityApp - SUSPICIOUS_ACTIVITY [app@0 eventUuid="uuid-123"]
전송 클라이언트
TCP, UDP, TLS 세 가지 프로토콜을 지원한다. TCP/TLS는 RFC 6587 octet-counting framing을 적용하고, 연결/전송 타임아웃은 각 5초로 설정했다.
재시도 메커니즘
전송 실패 시 Redis 큐에 넣고, 60초 간격 repeat_every 태스크가 재시도한다. 조직별 분산 락으로 멀티 Pod 환경에서도 중복 처리를 방지한다.
API 엔드포인트
| Method | Path | 설명 |
|---|---|---|
| GET | /organization/my/siem | 설정 조회 |
| PUT | /organization/my/siem | 설정 수정 (부분 업데이트 지원) |
| POST | /organization/my/siem/test | 연결 테스트 |
| GET | /organization/my/siem/logs | 전송 로그 조회 |
Syslog의 한계: 수신 확인이 없다
구현하면서 짚고 넘어갈 부분이 있다. Syslog는 HTTP와 달리 응답이 없는 프로토콜이다. 서버에 메시지를 보내고 끝이며, 잘 받았다는 200 같은 응답을 돌려주지 않는다.
그래서 성공/실패 판단은 소켓 레벨에서 한다:
- TCP/TLS: sendall()이 예외 없이 완료되면 sent, 연결 거부/타임아웃/DNS 실패 등 소켓 에러가 발생하면 failed
- UDP: 패킷을 보내기만 하면 성공으로 간주 (connectionless 특성상 상대방이 안 받아도 모름)
SIEM 서버가 메시지를 실제로 파싱하고 저장했는지까지는 확인할 수 없다. 이건 업계 표준 동작이고, 보완하려면 SIEM 쪽에서 별도 수신 확인 API를 제공해야 한다.
테스트
총 40개 단위 테스트를 작성했다:
- 포매터 테스트 14개 — CEF/RFC 5424 출력 형식, severity 매핑, 특수문자 이스케이프
- 디스패치 테스트 11개 — 캐시 히트/미스, 이벤트 필터링, 예외 전파 방지
- 클라이언트 테스트 8개 — TCP/UDP 전송, 연결 테스트, 에러 시 소켓 정리
- 뷰 테스트 7개 — API 엔드포인트 정상/에러 케이스
기존 테스트 포함 전체 543개 통과를 확인했고, 컨테이너 환경에서 curl로 12개 시나리오 API 테스트도 완료했다.
파일 구조
common/siem/
├── __init__.py
├── values.py # Enum 정의, CEF Event ID 매핑
├── entity.py # Pydantic 엔티티, API 응답 모델
├── client.py # TCP/UDP/TLS 소켓 클라이언트
├── formatter.py # CEF, RFC 5424 포매터
├── dispatch.py # emit() 진입점, Redis 캐시
├── processor.py # 백그라운드 포매팅+전송
└── retry.py # 재시도 태스크
마무리
이번 작업의 핵심은 기존 시스템에 영향을 주지 않으면서 엔터프라이즈급 기능을 얹는 것이었다. emit() 한 줄로 연동되는 인터페이스, 예외 전파 차단, 비동기 처리, Redis 캐시 등을 통해 기존 코드 변경을 최소화하면서 요구사항을 충족할 수 있었다.