# analysis & visualization
import pandas as pd
import numpy as np
import streamlit as st
import plotly.express as px
# llm
import re, os, json, openai, random, time
from openai import OpenAI
from dotenv import load_dotenv
# langchain
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.vectorstores import FAISS
from langchain.text_splitter import CharacterTextSplitter, RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA, LLMChain, ConversationalRetrievalChain
load_dotenv()
api_key = os.getenv('OPENAI_API_KEY')
# 캐싱을 이용해서 데이터를 효율적으로 관리할 수 있다.
@st.cache_data
def load_data() :
frm = pd.read_csv('../data/attack_logs_ko.csv')
return frm
frm = load_data()
# VectorDB(리소스)
# 캐싱의 리소스를 이용해서 데이터를 효율적으로 관리할 수 있다.
@st.cache_resource
def create_vector_store(frm) :
database = []
for idx, row in frm.iterrows() :
txt = f"time: {row['time']}, ip: {row['ip']}, country: {row['country']}, attack: {row['attack_type']}, description: {row['description']}"
database.append(txt)
splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=20)
docs = splitter.create_documents(database)
embeddings = OpenAIEmbeddings()
vectorDB = FAISS.from_documents(docs, embeddings)
return vectorDB
vectorDB = create_vector_store(frm)
# langchain(리소스)
# 캐싱의 리소스를 이용해서 데이터를 효율적으로 관리할 수 있다.
@st.cache_resource
def model() :
retriever = vectorDB.as_retriever(search_kwargs={'k' : 10})
llm = ChatOpenAI(model='gpt-4o-mini', temperature=0.9)
qaChain = ConversationalRetrievalChain.from_llm(llm=llm, retriever=retriever)
return qaChain
# streamlit view 구성
# 대화기억(세션을 유지) : st.session_state
def view():
st.set_page_config(page_title='AI 보안 탐정')
st.title('대화형 해킹 로그분석 RAG 시스템')
st.markdown('LangChain + FAISS(RAG) + Streamlit (feat. Memory) 활용한 로그분석 보드')
# 세션 상태를 초기화
if "messages" not in st.session_state :
st.session_state["messages"] = [
{'role' : 'assistant', 'content' : "👋 보안팀 섭섭이입니다. 무엇을 도와드릴까요?"}
]
# langchain memory
if "chat_history" not in st.session_state :
st.session_state["chat_history"] = []
# 기존에 대화가 있다면 출력
# 최초 실행시에는 대화 내용이 없음.
for msg in st.session_state['messages'] :
with st.chat_message(msg['role']) :
st.markdown(msg['content'])
# 사용자 입력
prompt = st.chat_input('궁금한 것을 물어보세요 : ')
if prompt :
st.session_state['messages'].append({"role" : "user", "content" : prompt })
with st.chat_message("user") :
st.markdown(prompt)
# model response
with st.chat_message("assistant") :
with st.spinner('분석중입니다...'):
chain = model()
response = chain({"question" : prompt, "chat_history" : st.session_state["chat_history"]})
answer = response['answer']
st.markdown(answer)
# 대화 이력 업데이트
st.session_state['chat_history'].append((prompt, answer))
st.session_state['messages'].append({"role" : "assistant", "content" : answer })
if __name__=='__main__' :
view()