import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
뭔가 데이터는 다운 받아서 왔고, 학습에 6만개, 테스트에 1만개인 것은 알겠는데, numpy에서 보던 (60000, 28, 28) 뭐 이런 식의 shape이 아니라서 알 수가 없다. 게다가 batch 크기로 나누어 놓은 loader의 경우도 데이터의 크기를 바로 알 수 있는 형태가 아니다.
그래서 Gemini한테 물어보니 iter() 함수를 통해 iteration 반복자로 변환한 후 next() 함수로 불러오는 식으로 하면 된다더라.
# 2. DataLoader에서 첫 번째 배치 가져오기
# iter()로 반복자(iterator)를 만든 후 next()를 호출합니다.
data_iter = iter(train_loader)
images, labels = next(data_iter) # images: 이미지 텐서, labels: 레이블 텐서
# 3. 속성 확인 및 출력
print("\n--- DataLoader를 통해 얻은 배치(Batch) 속성 ---")
print(f"이미지 텐서 모양 (Image Tensor Shape): {images.shape}")
print(f"이미지 텐서 자료형 (Image Tensor Data Type): {images.dtype}")
print("-" * 30)
print(f"레이블 텐서 모양 (Label Tensor Shape): {labels.shape}")
print(f"레이블 텐서 자료형 (Label Tensor Data Type): {labels.dtype}")
이렇게 첫 batch에 있는 자료를 확인할 수 있었다.
Keras 때는 channel 정보를 shape의 마지막에 넣어두었었는데, 여기서는 그게 먼저 오는 것 같고, 값을 하나씩 보니, 이미지는 각 픽셀이 0에서 1 사이 값으로, label은 0, 1, 2, ..., 9 이렇게 들어있다.
사실 이제 그냥 해도 되긴 하는데, 예시 코드를 보니 평균과 표준편차를 이용해서 normalize를 하더라. 계산해야 겠지만, 이미 잘 알려져 있는 데이터이므로, 그 값은, MEAN = 0.1307; STD = 0.3081 이라고 한다.
이제 CNN 구조를 만들어 보자.
class CNNModel(nn.Module):
def __init__(self):
super().__init__()
# 1. 컨볼루션 계층 1 (Convolutional Layer 1)
# 입력 채널(Input Channel): 1 (흑백 이미지)
# 출력 채널(Output Channel): 32
# 커널 크기(Kernel Size): 3x3
self.conv1 = nn.Conv2d(1, 32, 3)
# 2. 컨볼루션 계층 2 (Convolutional Layer 2)
# 입력 채널: 32
# 출력 채널: 64
self.conv2 = nn.Conv2d(32, 64, 3)
# 3. 풀링 계층 (Pooling Layer) - Max Pooling
# nn.MaxPool2d는 보통 forward()에서 F.max_pool2d로 사용됩니다.
# 4. 드롭아웃 (Dropout)
# 과적합(Overfitting) 방지를 위해 사용됩니다.
self.dropout1 = nn.Dropout(0.25)
# 5. 선형 계층 (Linear Layer)
# Conv/Pooling을 거친 Flatten된 출력을 받아 10개의 클래스(0~9)로 분류합니다.
# 입력 크기는 (28x28 이미지 기준) 9216 (Flatten된 크기)을 계산하여 얻어야 합니다.
self.fc1 = nn.Linear(9216, 128) # fc: Fully Connected (전결합)
self.fc2 = nn.Linear(128, 10) # 출력: 10개 클래스 (0~9)
def forward(self, x):
# 1. Conv1 -> ReLU -> Max Pooling
x = self.conv1(x)
x = F.relu(x)
# 2. Conv2 -> ReLU -> Max Pooling (커널 크기 2x2)
x = self.conv2(x)
x = F.relu(x)
x = F.max_pool2d(x, 2)
# 3. 드롭아웃 적용
x = self.dropout1(x)
# 4. Flatten (평탄화): 2차원/3차원 데이터를 1차원 벡터로 만듦
# x.shape: (배치 크기, 채널, 높이, 너비) -> (배치 크기, 채널*높이*너비)
x = torch.flatten(x, 1)
# 5. Fully Connected Layers
x = self.fc1(x)
x = F.relu(x)
x = self.fc2(x)
# 최종 출력: 10개의 클래스에 대한 로짓(Logits)
return x
이미 위 코드에 주석으로 설명이 다 들어갔지만, 전체적인 설명을 하자면, 이전 글에서는 sequential하게 이미 __init__() 함수 내에서 layer 간의 연결이 다 이루어졌지만, 여기서는 __init__() 함수 안에서는 layer들만 정의가 되고, forward() 함수에서 모델 간의 관계가 정의되는 식이다. Keras에 익숙한 내가 보기에는 functional api 방식으로 모델을 만들 때와 비슷한 느낌이다. 다소 번거로울 수 있지만, 입출력 데이터의 모양을 명확하게 알아야 하는 부분이기도 하다.
이제 모델도 만들었겠다, 장치 설정도 하고 손실함수와 옵티마이저도 정의하자.
# GPU 사용 가능 여부를 확인하고 장치 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 모델 인스턴스 생성 및 GPU로 이동
model = CNNModel().to(device)
print(f"모델은 {device}에서 실행됩니다.")
# 손실 함수(Loss Function): 분류 문제에 적합한 CrossEntropyLoss 사용
criterion = nn.CrossEntropyLoss()
# 옵티마이저(Optimizer): Adam 사용
optimizer = optim.Adam(model.parameters(), lr=0.001)
이제 학습 루프와 테스트 루프를 함수 형태로 제작하자. 그리고 원레 데이터 다운로드하면서 하는 게 일반적이지만, 본 글에서는 데이터를 normalize하지 않고 가져왔기 때문에, 여기서는 학습과 테스트 함수 내에서 normalize까지 함께 진행한다.
MEAN = 0.1307
STD = 0.3081
# 데이터 정규화(Normalization)를 수행하는 함수 정의
def normalize_data(data, mean, std):
# (data - mean) / std 수식 적용
return (data - mean) / std
# ----------------- train 함수 수정 (손실만 반환) ------------------
def train(model, device, train_loader, optimizer, epoch, mean, std):
model.train()
total_loss = 0 # Epoch 전체 손실을 누적할 변수
for batch_idx, (data, target) in enumerate(train_loader):
data = normalize_data(data, mean, std)
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item() * len(data) # 배치 크기를 곱해 실제 손실 기여도를 누적
if batch_idx % 100 == 0:
print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}')
# 에포크 전체 평균 손실 반환
return total_loss / len(train_loader.dataset)
# ----------------- evaluate_train 함수 추가 (정확도 계산) ------------------
def evaluate_train(model, device, train_loader, mean, std):
model.eval()
correct = 0
with torch.no_grad():
for data, target in train_loader:
data = normalize_data(data, mean, std)
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
accuracy = 100. * correct / len(train_loader.dataset)
# print(f'Train set: Accuracy: {correct}/{len(train_loader.dataset)} ({accuracy:.0f}%)')
return accuracy
# ----------------- test 함수는 이전과 동일 (손실/정확도 모두 반환) ------------------
def test(model, device, test_loader, mean, std):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data = normalize_data(data, mean, std)
data, target = data.to(device), target.to(device)
output = model(data)
test_loss += criterion(output, target).item()
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
test_loss /= len(test_loader.dataset)
print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({100. * correct / len(test_loader.dataset):.0f}%)\n')
return test_loss, 100. * correct / len(test_loader.dataset)
이제 학습을 실행해보자. normalize를 위해 mean과 std를 함수에 전달해줘야 한다.... 역시나 데이터 다운로드 할 때 함께 할 걸 그랬다. ㅎㅎ;
keras에서는 학습하라는 쉬운 명령어가 있었던 것 같은데 pytorch는 일일히 다 만들어줘야 하는 건가? 아니면 교육자료로 Gemini한테 만들어달라고 해서 굳이 디테일하게 보여주는 건가? ㅎㅎ 학습 history 기록도 챙겨줘야 하니 뭔가 좀 번거롭다.
시간을 절약하기 위해 epoch 수는 10번으로 실행
num_epochs = 10
train_loss_history = [] # 훈련 손실 기록
test_loss_history = [] # 테스트 손실 기록
train_accuracy_history = [] # 훈련 정확도 기록
test_accuracy_history = [] # 테스트 정확도 기록
print("CNN 모델 학습 및 결과 기록 시작...")
for epoch in range(1, num_epochs + 1):
# 1. 훈련 및 훈련 손실 기록 (Training and Train Loss Recording)
# train 함수는 이제 Epoch의 평균 손실을 반환합니다.
avg_train_loss = train(model, device, train_loader, optimizer, epoch, MEAN, STD)
train_loss_history.append(avg_train_loss)
# 2. 훈련 정확도 계산 및 기록 (Train Accuracy Calculation)
train_accuracy = evaluate_train(model, device, train_loader, MEAN, STD)
train_accuracy_history.append(train_accuracy)
# 3. 테스트 손실 및 정확도 계산 및 기록 (Test Loss & Accuracy Recording)
avg_test_loss, test_accuracy = test(model, device, test_loader, MEAN, STD)
test_loss_history.append(avg_test_loss)
test_accuracy_history.append(test_accuracy)
print("CNN 모델 학습 완료!")
test loss가 0이라고??? 뭔가 이상한데;; 어쨌든 정확도가 엄청 높게 나왔다.
그래프로 결과를 확인해보자.
# ----------------------------------------------------
# 그래프 시각화 (Visualization)
# ----------------------------------------------------
# Epoch 인덱스 리스트 (X축)
epochs = range(1, num_epochs + 1)
plt.figure(figsize=(12, 5))
# A. 손실(Loss) 그래프: 훈련 손실 vs. 테스트 손실
plt.subplot(1, 2, 1)
plt.plot(epochs, train_loss_history, 'b-o', label='Training Loss') # 훈련 손실
plt.plot(epochs, test_loss_history, 'r-o', label='Test Loss') # 테스트 손실
plt.title('Loss over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Loss (Cross Entropy)')
plt.grid(True)
plt.legend()
# B. 정확도(Accuracy) 그래프: 훈련 정확도 vs. 테스트 정확도
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accuracy_history, 'b-o', label='Training Accuracy') # 훈련 정확도
plt.plot(epochs, test_accuracy_history, 'r-o', label='Test Accuracy') # 테스트 정확도
plt.title('Accuracy over Epochs')
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()
test loss는 뭔가 이상하지만 어쨌든 accuracy는 그럴듯하게 나오는 것 같다.
뭔가 test loss 계산식에 오류가 있었는 것 같은데... 그건 다음에 확인해보도록 하자;; (정신력 부족)