트러블 슈팅

2. 토이 프로젝트 데이터 전처리 - 패스트캠퍼스 백엔드 부트캠프 3기

gkss2tpt 2025. 4. 30. 19:57

1. 상황

  • 도서 MBTI 점수를 기반으로 사용자에게 도서를 추천하는 AI를 만드는 중...
  • 단어 훈련 데이터를 200개씩 넣어줬더니 책의 MBTI 점수가 고르지 않게 나오는 현상이 발생

  • 단어에서 문장 형식으로 변경 하기로 결정
{
  "text": "겹겹이 쌓인 내면의 매듭을 사진과 문장으로 조용히 들여다보는 감성적인 성찰의 기록입니다.",
  "label": {"S": 25, "I": 75, "F": 100, "D": 0, "N": 75, "M": 25, "Q": 25, "A": 75
  }
},
{
  "text": "현실과 꿈의 경계에서 상징과 파편으로 감각을 표현하는 시적 상상력의 집약입니다.",
  "label": {"S": 25, "I": 75, "F": 75, "D": 25, "N": 40, "M": 60, "Q": 25, "A": 75
  }
},
{
  "text": "사라지지 않기 위해, 자신과 세상을 껴안는 강렬하고 단단한 감정의 서사입니다.",
  "label": {"S": 50, "I": 50, "F": 100, "D": 0, "N": 75, "M": 25, "Q": 100, "A": 0
  }
}
  • 약 500개의 훈련 데이터를 작성해서 다시 훈련을 돌려보기로 하고
  • 다층 퍼셉트론을 활용하여 직접 알고리즘을 작성 해보려하였으나... 활용 방법이 쉽지 않아 파이토치와 사이킷런 텐서를 사용
  • MBTIMLP 클래스를 작성하여  768차원 백터로 변환 시키는 코드를 작성
from torch import nn

class MBTIMLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(768, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 8),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.net(x)



 

  • 모델을 훈련시키는 클래스를 작성
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from sentence_transformers import SentenceTransformer
import numpy as np
import os
import json
from MBTIMLP import MBTIMLP

from torch.utils.data import TensorDataset, DataLoader

# 모델 로딩
embedder = SentenceTransformer('jhgan/ko-sroberta-multitask')

# 데이터 로딩(json 파일 사용)
with open('book_mbti_sample_data_fixed.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

texts = [entry['text'] for entry in data]
labels = [list(entry['label'].values()) for entry in data]


# 임베딩 변환
X = [embedder.encode(text) for text in texts]       # 문장 -> 768차원 벡터
y = np.array(labels, dtype=np.float32)              # 점수 -> float32 벡터

X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y/100.0, dtype=torch.float32)

X_train, X_val, y_train, y_val = train_test_split(X_tensor, y_tensor, test_size=0.3)

train_ds = TensorDataset(X_train, y_train)
val_ds = TensorDataset(X_val, y_val)

train_loader = DataLoader(train_ds, batch_size=8, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=8)

model = MBTIMLP()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()  # 평균 제곱 오차

# 훈련
for epoch in range(100):
    model.train()
    total_loss = 0
    for xb, yb in train_loader:
        optimizer.zero_grad()
        preds = model(xb)
        loss = criterion(preds, yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"[Epoch {epoch+1}] Loss: {total_loss:.4f}")

model.eval()

with torch.no_grad():
    input_vec = X_val[0].unsqueeze(0)  # (1, 768)
    output = model(input_vec) * 100    # 다시 0~100으로 되돌리기
    print("예측:", output[0].numpy())
    print("정답:", y_val[0].numpy() * 100)

# 저장 폴더 생성
dest = os.path.join('trainedAI')
if not os.path.exists(dest):
    os.makedirs(dest)

# 훈련된 모델 저장
torch.save(model.state_dict(), os.path.join(dest, 'mbti_model.pt'))
  • 현재 약 2700개의 도서 상품 데이터를 크롤링해서 json형태로 저장한 파일 데이터가 있고, 훈련한 모델을 사용하여 예측 진행2700개의 상품 소개를 임베딩 및 예측 하는 과정에서 시간이 오래 걸려 캐싱을 시켰고, 문장 요약 모델을 사용하여 상품 소개의 길이를 축소시켜보았다.
import random
from datetime import datetime
import os
import torch
import pandas as pd
import numpy as np
from sentence_transformers import SentenceTransformer
from transformers import PreTrainedTokenizerFast, BartForConditionalGeneration
from MBTIMLP import MBTIMLP
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from tqdm import tqdm
import re

# ===== 현재 시간 출력 함수 ===== #
def currentTime():
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

# ===== 임베딩 전처리 ===== #
def clean_book_intro(text: str) -> str:
    if not isinstance(text, str):
        return ''

    # 1. 특수문자, 이모지 제거
    text = re.sub(r'[^\w\s.,!?가-힣a-zA-Z]', '', text)

    # 2. HTML 태그 제거
    text = re.sub(r'<.*?>', '', text)

    # 3. 괄호 및 괄호 안 내용 제거
    text = re.sub(r'\(.*?\)', '', text)
    text = re.sub(r'\[.*?\]', '', text)

    # 4. 링크 제거
    text = re.sub(r'http[s]?://\S+|www\.\S+', '', text)

    # 5. 공백 정리
    text = re.sub(r'\s+', ' ', text).strip()

    # # 6. 불용어 제거
    # stopwords = ['그리고', '또한', '등', '더불어', '이런', '저런', '하는', '한다','합니다','입니다']
    # for word in stopwords:
    #     text = text.replace(f' {word} ', ' ')

    # # 7. 200자까지만 자르기
    # text = text[:200]


    return text

# ===== 거리 계산 함수 ===== #
def mbti_distance(user_scores, doc_scores):
    keys = user_scores.keys()
    u = np.array([user_scores[k] for k in keys])
    d = np.array([doc_scores.get(k, 50) for k in keys])

    dist = np.linalg.norm(u - d)
    max_dist = np.linalg.norm(np.array([100]*8) - np.array([0]*8))

    return dist / max_dist * 100

# ===== 요약 함수 정의 ===== #
def batch_summarize_kobart(text_list, batch_size=32):
    results = []
    for i in tqdm(range(0, len(text_list), batch_size)):
        batch = text_list[i:i+batch_size]
        inputs = tokenizer(batch, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)

        with torch.inference_mode():
            summary_ids = summary_model.generate(
                input_ids=inputs['input_ids'],
                attention_mask=inputs['attention_mask'],
                max_length=50,
                min_length=20,
                num_beams=1,
                do_sample=False
            )

        decoded = tokenizer.batch_decode(summary_ids, skip_special_tokens=True)
        results.extend(decoded)
    return results


# ===== 경로 및 파일 설정 ===== #
dest = os.path.join('trainedAI')
DATA_PATH = 'book_data_filltered2.json'
CACHE_DATA_PATH = 'paragraph_cached_data2.json'
CACHE_EMB_PATH = 'paragraph_embeddings2.pt'

# 그래픽카드가 있으면 cuda로, 없으면 cpu로 사용
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"[{currentTime()}] 사용 중인 디바이스: {device}")

# ===== 모델 및 임베딩기 로딩 ===== #
print(f"[{currentTime()}] ko-sroberta-multitask 모델 로딩 중...")
embedder = SentenceTransformer("jhgan/ko-sroberta-multitask")

# ===== 요약 모델 로딩 ===== #
print(f"[{currentTime()}] 요약 모델 로딩 중...")
tokenizer = PreTrainedTokenizerFast.from_pretrained("digit82/kobart-summarization")
summary_model = BartForConditionalGeneration.from_pretrained("digit82/kobart-summarization").to(device)
print(f"[{currentTime()}] 요약 모델 로딩 완료")

model = MBTIMLP().to(device)
model.load_state_dict(torch.load(os.path.join(dest, 'mbti_model.pt')))
model.eval()
print(f"[{currentTime()}] 모델 로딩 완료")

# ===== 데이터 로딩 ===== #
if os.path.exists(CACHE_DATA_PATH):
    print(f"[{currentTime()}] 책 소개 캐시 불러오는 중...")
    df_para = pd.read_json(CACHE_DATA_PATH)
    print(f"[{currentTime()}] 캐시 불러오기 완료")
else:
    print(f"[{currentTime()}] 원본 파일 로딩 시작")
    df = pd.read_json(DATA_PATH)
    print(f"[{currentTime()}] 원본 파일 로딩 완료")

    df['title'] = df['title'].astype(str)
    df['book_intro'] = df['book_intro'].astype(str)
    df = df[~df['book_intro'].str.contains('책 소개 정보')]

    # 누락된 값이 있는 행 삭제
    df.dropna(axis=0)

    # 패턴 처리
    df['book_intro'] = df['book_intro'].apply(clean_book_intro)

    print(f"[{currentTime()}] 책 소개 추출 시작...")
    data = []
    for _, row in df.iterrows():
        intro = row['book_intro'].strip()
        if intro:
            data.append({'title': row['title'], 'paragraph': intro})
    df_para = pd.DataFrame(data)

    # 중복 제거
    df_para = df_para.drop_duplicates(subset='title', keep='first').reset_index(drop=True)

    # ===== 책 소개문 요약 적용 ===== #
    print(f"[{currentTime()}] 책 소개 요약 시작...")
    tqdm.pandas()
    df_para['paragraph'] = batch_summarize_kobart(df_para['paragraph'].tolist(), batch_size=32)
    print(f"[{currentTime()}] 책 소개 요약 완료")

    df_para.to_json(CACHE_DATA_PATH, force_ascii=False, indent=2)
    print(f"[{currentTime()}] 책 소개 캐시 저장 완료")

# ===== 사용자 MBTI 점수 입력 (총합 100점씩) ===== #
s = random.randint(1, 100)
f = random.randint(1, 100)
n = random.randint(1, 100)
q = random.randint(1, 100)

user_mbti_score = {
    'S': s, 'I': 100-s,
    'F': f, 'D': 100-f,
    'N': n, 'M': 100-n,
    'Q': q, 'A': 100-q
    # 'S': 25.941866636276245, 'I': 74.96695518493652,
    # 'F': 47.76947200298309, 'D': 51.448994874954224,
    # 'N': 24.016346037387848, 'M': 77.30849385261536,
    # 'Q': 42.78730750083923, 'A': 56.82256817817688
}

# ===== 문단별 추천 처리 ===== #
if os.path.exists(CACHE_EMB_PATH):
    print(f"[{currentTime()}] 임베딩 캐시 불러오는 중...")
    para_embeddings = torch.load(CACHE_EMB_PATH, map_location=device)
    print(f"[{currentTime()}] 캐시 불러오기 완료")
else:
    all_embeddings = []
    print(f"[{currentTime()}] 임베딩 시작...")
    for i, row in df_para.iterrows():
        paragraph = row['paragraph']

        # 임베딩
        para_emb = embedder.encode(paragraph, convert_to_tensor=True)
        para_emb = para_emb.unsqueeze(0).to(device)
        all_embeddings.append(para_emb.cpu())

    # 임베딩 캐시 저장
    para_embeddings = torch.cat(all_embeddings, dim=0)
    torch.save(para_embeddings, CACHE_EMB_PATH)
    print(f"[{currentTime()}] 임베딩 종료...")

# ===== 예측 및 거리 계산 ===== #
dataset = []
print(f"[{currentTime()}] 예측 시작...")
for i, row in df_para.iterrows():
    title = row['title']
    paragraph = row['paragraph']
    para_emb = para_embeddings[i].unsqueeze(0).to(device)
    with torch.no_grad():
        mbti_tensor = model(para_emb).squeeze()
        mbti_score = {k: v.item()*100 for k, v in zip(['S','I','F','D','N','M','Q','A'], mbti_tensor)}
    distance = mbti_distance(user_mbti_score, mbti_score)
    dataset.append((distance, title, paragraph, mbti_score))
print(f"[{currentTime()}] 예측 종료...")


# ===== 결과 출력 ===== #
print(f"User MBTI 점수: {user_mbti_score}")
dataset.sort(key=lambda x: x[0])
for dist, title, para, mbti in dataset[:5]:
    print(f"\n제목: {title}")
    print(f"거리: {dist:.2f}%")
    print(f"문단: {para[:300]}...")
    print(f"MBTI 점수: {mbti}")
    print("ㅡ" * 80)

print(f"[{currentTime()}] 전체 완료시간")

# 전체 출력하고 싶으면 주석 해제
# for dist, title, para, mbti in dataset:
#     print(f'{mbti}')




# 한글 폰트 설정 (예: 맑은 고딕)
plt.rcParams['font.family'] = 'Malgun Gothic'  # Windows용
# plt.rcParams['font.family'] = 'AppleGothic'  # macOS용
# plt.rcParams['font.family'] = 'NanumGothic'  # Linux용 (nanum 설치 필요)

# ===== MBTI 점수들 → 2차원 PCA =====
book_mbti_vectors = [list(mbti.values()) for _, _, _, mbti in dataset]
book_titles = [title for _, title, _, _ in dataset]

pca = PCA(n_components=2)
book_pca_coords = pca.fit_transform(book_mbti_vectors)

# ===== 사용자 MBTI 점수도 PCA로 변환 =====
user_vector = np.array([user_mbti_score[k] for k in ['S','I','F','D','N','M','Q','A']])
user_pca_coord = pca.transform([user_vector])[0]

# ===== 산점도 시각화 =====
plt.figure(figsize=(12, 8))
plt.scatter(book_pca_coords[:, 0], book_pca_coords[:, 1], alpha=0.6, label='도서', edgecolors='k')

# 가장 가까운 도서 5개는 텍스트로 출력
for i in range(5):
    dist, title, _, _ = dataset[i]
    x, y = book_pca_coords[i]
    plt.scatter(x, y, color='red', label='추천 도서')
    plt.text(x + 1, y + 1, title[:15], fontsize=9)

# 사용자 MBTI 좌표 표시
plt.scatter(user_pca_coord[0], user_pca_coord[1], color='blue', marker='*', s=200, label='사용자 MBTI')
plt.text(user_pca_coord[0]+1, user_pca_coord[1]+1, '사용자', fontsize=10, color='blue')

plt.title("MBTI 기반 도서 추천 분포 (PCA 2D)")
plt.xlabel("PCA1")
plt.ylabel("PCA2")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
  • 768차원 백터의 분포도를 2차원으로 압축한 산점도로 보면 2700개의 상품도 적어 보인다.
  • 현재 고르게 분포해있지 않은 상태로 중간 쪽에 상품들이 몰려있는 상태이다.
  • 분포를 고르게 퍼트리려면 극단적인 훈련 데이터셋들이 필요하다.

  • 현재 하고 있는 것은 예측 훈련만 하고 있었고, 산포도를 분산시키기 위해서는 임베딩을 훈련시켜야 한다는 것을 알게되었다.

  • 책 데이터를 7900개로 늘렸고 한쪽으로 집중 된 것이 더 잘 보여진다.

  • 극단적인 훈련 데이터셋의 비율을 20%, 평균적인 데이터셋의 비율을 80%로 조정해서 다시 훈련시켜보았다.
  • 산포도가 조금 더 퍼졌지만 그럼에도 아직 한쪽으로 집중된 현상이 보여서 훈련데이터를 다시 조정 해보기로했다

  • 조정을 마무리 하고 마지막으로 훈련되지 않은 모델을 사용 시의 산포도 이다.