학습목표¶
- 앞서가려 애쓰기보다, 옳은 방향을 지켜라. 조금 늦더라도 방향이 맞다면 언젠가는 반드시 원하는 목적지에 도착하기 마련입니다.
- 참여형 미니프로젝트 중심(좌충우돌 성장기)
- LangChain(LLM + RAG)활용 실시간 스트리밍
- 대화형 RAG 응답 처리
In [1]:
'''
Quiz
AI 보안 탐정: 해킹 로그에서 단서를 찾아라!
LangChain과 FAISS로 보안 로그를 분석하고, Streamlit으로 시각화하는 RAG 기반 대시보드 만들기
"SQL Injection": "로그인 쿼리에서 의심스러운 SQL 페이로드 탐지",
"XSS": "웹 폼 입력란에서 스크립트 삽입 시도 감지",
"Brute Force": "여러 번의 로그인 실패 시도 관찰됨",
"DDoS": "여러 IP에서 대량의 요청 발생",
"Port Scan": "순차적인 포트 접근 패턴 탐지",
"Ransomware": "랜섬웨어 동작으로 보이는 암호화된 트래픽 탐지",
"Phishing": "악성 이메일 링크 탐지",
"Command Injection": "HTTP 요청 내 쉘 명령어 패턴 발견",
"CSRF": "크로스사이트 요청 위조 시도 감지",
"Directory Traversal": "상위 디렉토리 접근 시도 탐지"
'''
Out[1]:
'\nQuiz\nAI 보안 탐정: 해킹 로그에서 단서를 찾아라!\nLangChain과 FAISS로 보안 로그를 분석하고, Streamlit으로 시각화하는 RAG 기반 대시보드 만들기\n'
In [2]:
# analysis & visualization
import pandas as pd
import numpy as np
import streamlit as st
import plotly.express as px
# llm
import re, os, json, openai, random, time
from openai import OpenAI
from dotenv import load_dotenv
# langchain
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.vectorstores import FAISS
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA, LLMChain
In [3]:
frm = pd.read_csv('./data/attack_logs_ko.csv')
frm.head()
Out[3]:
| time | ip | country | attack_type | description | |
|---|---|---|---|---|---|
| 0 | 2025-11-04 06:13:00 | 190.185.214.32 | Russia | Brute Force | 여러 번의 로그인 실패 시도 관찰됨 |
| 1 | 2025-11-06 17:06:00 | 120.136.155.120 | France | CSRF | 크로스사이트 요청 위조 시도 감지 |
| 2 | 2025-11-05 16:40:00 | 130.16.114.226 | UK | CSRF | 크로스사이트 요청 위조 시도 감지 |
| 3 | 2025-11-03 10:52:00 | 17.99.212.168 | France | Phishing | 악성 이메일 링크 탐지 |
| 4 | 2025-11-02 08:32:00 | 101.167.18.159 | UK | Phishing | 악성 이메일 링크 탐지 |
In [17]:
database = []
for idx, row in frm.iterrows() :
txt = f"time: {row['time']}, ip: {row['ip']}, country: {row['country']}, attack: {row['attack_type']}, description: {row['description']}"
database.append(txt)
In [19]:
database[:10]
Out[19]:
['time: 2025-11-04 06:13:00, ip: 190.185.214.32, country: Russia, attack: Brute Force, description: 여러 번의 로그인 실패 시도 관찰됨', 'time: 2025-11-06 17:06:00, ip: 120.136.155.120, country: France, attack: CSRF, description: 크로스사이트 요청 위조 시도 감지', 'time: 2025-11-05 16:40:00, ip: 130.16.114.226, country: UK, attack: CSRF, description: 크로스사이트 요청 위조 시도 감지', 'time: 2025-11-03 10:52:00, ip: 17.99.212.168, country: France, attack: Phishing, description: 악성 이메일 링크 탐지', 'time: 2025-11-02 08:32:00, ip: 101.167.18.159, country: UK, attack: Phishing, description: 악성 이메일 링크 탐지', 'time: 2025-11-07 03:15:00, ip: 42.124.149.195, country: UK, attack: Command Injection, description: HTTP 요청 내 쉘 명령어 패턴 발견', 'time: 2025-11-02 14:35:00, ip: 40.239.17.99, country: India, attack: CSRF, description: 크로스사이트 요청 위조 시도 감지', 'time: 2025-11-03 14:31:00, ip: 100.232.175.193, country: Russia, attack: Port Scan, description: 순차적인 포트 접근 패턴 탐지', 'time: 2025-11-07 10:33:00, ip: 168.131.57.97, country: Germany, attack: Directory Traversal, description: 상위 디렉토리 접근 시도 탐지', 'time: 2025-11-04 01:10:00, ip: 198.156.77.116, country: India, attack: Directory Traversal, description: 상위 디렉토리 접근 시도 탐지']
In [21]:
load_dotenv()
api_key = os.getenv('OPENAI_API_KEY')
def masking(key) :
# 앞 4자리 , 뒤 4자리만 남기고 * 처리
if len(key) <= 8 :
return '*' * len(key)
return key[:4] + '*' * (len(key) - 8) + key[-4:]
masked_api_key = masking(api_key)
# print('masked api key : ', masked_api_key)
In [23]:
# 텍스트를 분할하고 vector db (숫자배열)
splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=20)
docs = splitter.create_documents(database)
embeddings = OpenAIEmbeddings()
vectorDB = FAISS.from_documents(docs, embeddings)
In [27]:
retriever = vectorDB.as_retriever(search_kwargs={'k' : 3})
llm = ChatOpenAI(model='gpt-4o-mini', temperature=0.3)
qaChain = RetrievalQA.from_chain_type(llm=llm, chain_type='stuff', retriever=retriever)
In [28]:
query = '어떤 공격 유형이 가장 많이 발생했나요'
response = qaChain.run(query)
print(response)
C:\Users\snower\AppData\Local\Temp\ipykernel_12592\2660451405.py:3: LangChainDeprecationWarning: The method `Chain.run` was deprecated in langchain 0.1.0 and will be removed in 1.0. Use invoke instead. response = qaChain.run(query)
제공된 정보에 따르면, DDoS 공격이 여러 번 발생했습니다. 따라서 DDoS 공격 유형이 가장 많이 발생한 것으로 보입니다.
In [ ]: