처음부터 바둑 AI 만들기
이 문서는 단계별로 간단한 AlphaGo Zero 스타일 바둑 AI를 구현하는 방법을 안내합니다. 게임 로직, 신경망, MCTS 및 학습 프로세스를 다룹니다.
학습 목표
이 튜토리얼을 완료하면 다음을 할 수 있는 바둑 AI를 갖게 됩니다:
- 9×9 바둑판에서 자가 대국
- 강화학습을 통한 지속적 향상
- 아마추어 초급 수준의 기력 달성
프로젝트 구조
mini-alphago/
├── game/
│ ├── __init__.py
│ ├── board.py # 바둑판 로직
│ ├── rules.py # 규칙 구현
│ └── state.py # 게임 상태
├── model/
│ ├── __init__.py
│ ├── network.py # 신경망
│ └── features.py # 특성 인코딩
├── mcts/
│ ├── __init__.py
│ ├── node.py # MCTS 노드
│ └── search.py # MCTS 탐색
├── training/
│ ├── __init__.py
│ ├── self_play.py # 자가 대국
│ └── trainer.py # 학습기
├── main.py # 메인 프로그램
└── requirements.txt
1단계: 바둑판과 규칙
바둑판 구현
# game/board.py
import numpy as np
class Board:
"""바둑 바둑판"""
EMPTY = 0
BLACK = 1
WHITE = 2
def __init__(self, size=9):
self.size = size
self.board = np.zeros((size, size), dtype=np.int8)
self.current_player = self.BLACK
self.ko_point = None
self.history = []
def copy(self):
"""바둑판 복사"""
new_board = Board(self.size)
new_board.board = self.board.copy()
new_board.current_player = self.current_player
new_board.ko_point = self.ko_point
new_board.history = self.history.copy()
return new_board
def get_opponent(self, player):
"""상대 가져오기"""
return self.WHITE if player == self.BLACK else self.BLACK
def is_on_board(self, x, y):
"""바둑판 위에 있는지 확인"""
return 0 <= x < self.size and 0 <= y < self.size
def get_neighbors(self, x, y):
"""인접 점 가져오기"""
neighbors = []
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1)]:
nx, ny = x + dx, y + dy
if self.is_on_board(nx, ny):
neighbors.append((nx, ny))
return neighbors
def get_group(self, x, y):
"""돌 그룹 가져오기 (연결된 같은 색 돌)"""
color = self.board[x, y]
if color == self.EMPTY:
return set(), set()
group = set()
liberties = set()
stack = [(x, y)]
while stack:
cx, cy = stack.pop()
if (cx, cy) in group:
continue
group.add((cx, cy))
for nx, ny in self.get_neighbors(cx, cy):
if self.board[nx, ny] == self.EMPTY:
liberties.add((nx, ny))
elif self.board[nx, ny] == color and (nx, ny) not in group:
stack.append((nx, ny))
return group, liberties
def count_liberties(self, x, y):
"""활로 개수 계산"""
_, liberties = self.get_group(x, y)
return len(liberties)
def remove_group(self, group):
"""돌 그룹 제거"""
for x, y in group:
self.board[x, y] = self.EMPTY
def is_legal(self, x, y, player=None):
"""합법 착수인지 확인"""
if player is None:
player = self.current_player
# 빈 점인지 확인
if self.board[x, y] != self.EMPTY:
return False
# 패인지 확인
if self.ko_point == (x, y):
return False
# 착수 시뮬레이션
test_board = self.copy()
test_board.board[x, y] = player
# 먼저 잡을 수 있는지 확인
opponent = self.get_opponent(player)
captured = []
for nx, ny in self.get_neighbors(x, y):
if test_board.board[nx, ny] == opponent:
group, liberties = test_board.get_group(nx, ny)
if len(liberties) == 0:
captured.extend(group)
if captured:
return True
# 자살 확인
_, liberties = test_board.get_group(x, y)
if len(liberties) == 0:
return False
return True
def play(self, x, y):
"""착수"""
if not self.is_legal(x, y):
return False
player = self.current_player
opponent = self.get_opponent(player)
# 착수
self.board[x, y] = player
# 잡기
captured = []
for nx, ny in self.get_neighbors(x, y):
if self.board[nx, ny] == opponent:
group, liberties = self.get_group(nx, ny)
if len(liberties) == 0:
captured.extend(group)
self.remove_group(group)
# 패 설정
if len(captured) == 1:
cx, cy = list(captured)[0]
_, my_liberties = self.get_group(x, y)
if len(my_liberties) == 1:
self.ko_point = (cx, cy)
else:
self.ko_point = None
else:
self.ko_point = None
# 히스토리 기록
self.history.append((x, y, player))
# 플레이어 교체
self.current_player = opponent
return True
def pass_move(self):
"""패스"""
self.history.append((-1, -1, self.current_player))
self.current_player = self.get_opponent(self.current_player)
self.ko_point = None
def is_game_over(self):
"""게임 종료 확인"""
if len(self.history) < 2:
return False
# 양측 연속 패스
return (self.history[-1][0] == -1 and
self.history[-2][0] == -1)
def get_legal_moves(self):
"""모든 합법 착수 가져오기"""
moves = []
for x in range(self.size):
for y in range(self.size):
if self.is_legal(x, y):
moves.append((x, y))
moves.append((-1, -1)) # 패스
return moves
def score(self):
"""승패 계산 (간단한 계자법)"""
black_score = np.sum(self.board == self.BLACK)
white_score = np.sum(self.board == self.WHITE)
# 간단한 영역 계산
for x in range(self.size):
for y in range(self.size):
if self.board[x, y] == self.EMPTY:
neighbors = self.get_neighbors(x, y)
colors = set(self.board[nx, ny] for nx, ny in neighbors)
colors.discard(self.EMPTY)
if len(colors) == 1:
if self.BLACK in colors:
black_score += 1
else:
white_score += 1
komi = 5.5 if self.size == 9 else 7.5
return black_score - white_score - komi
2단계: 특성 인코딩
입력 특성
# model/features.py
import numpy as np
def encode_board(board):
"""
바둑판을 신경망 입력으로 인코딩
특성 평면:
0: 내 돌
1: 상대 돌
2: 빈 점
3: 마지막 수 위치
4: 마지막에서 두 번째 수 위치
5: 합법 착수 위치
6: 흑 차례 (전부 1 또는 전부 0)
"""
size = board.size
features = np.zeros((7, size, size), dtype=np.float32)
current = board.current_player
opponent = board.get_opponent(current)
# 기본 돌 위치
features[0] = (board.board == current).astype(np.float32)
features[1] = (board.board == opponent).astype(np.float32)
features[2] = (board.board == board.EMPTY).astype(np.float32)
# 최근 착수
if len(board.history) >= 1:
x, y, _ = board.history[-1]
if x >= 0:
features[3, x, y] = 1.0
if len(board.history) >= 2:
x, y, _ = board.history[-2]
if x >= 0:
features[4, x, y] = 1.0
# 합법 착수
for x in range(size):
for y in range(size):
if board.is_legal(x, y):
features[5, x, y] = 1.0
# 누구 차례
if current == board.BLACK:
features[6] = np.ones((size, size), dtype=np.float32)
return features
3단계: 신경망
듀얼 헤드 네트워크 아키텍처
# model/network.py
import torch
import torch.nn as nn
import torch.nn.functional as F
class ResidualBlock(nn.Module):
"""잔차 블록"""
def __init__(self, channels):
super().__init__()
self.conv1 = nn.Conv2d(channels, channels, 3, padding=1)
self.bn1 = nn.BatchNorm2d(channels)
self.conv2 = nn.Conv2d(channels, channels, 3, padding=1)
self.bn2 = nn.BatchNorm2d(channels)
def forward(self, x):
residual = x
x = F.relu(self.bn1(self.conv1(x)))
x = self.bn2(self.conv2(x))
x = F.relu(x + residual)
return x
class PolicyValueNetwork(nn.Module):
"""정책-가치 듀얼 헤드 네트워크"""
def __init__(self, board_size=9, input_channels=7, num_filters=64, num_blocks=4):
super().__init__()
self.board_size = board_size
# 초기 합성곱
self.conv_input = nn.Conv2d(input_channels, num_filters, 3, padding=1)
self.bn_input = nn.BatchNorm2d(num_filters)
# 잔차 블록
self.residual_blocks = nn.ModuleList([
ResidualBlock(num_filters) for _ in range(num_blocks)
])
# Policy Head
self.policy_conv = nn.Conv2d(num_filters, 2, 1)
self.policy_bn = nn.BatchNorm2d(2)
self.policy_fc = nn.Linear(2 * board_size * board_size, board_size * board_size + 1)
# Value Head
self.value_conv = nn.Conv2d(num_filters, 1, 1)
self.value_bn = nn.BatchNorm2d(1)
self.value_fc1 = nn.Linear(board_size * board_size, 64)
self.value_fc2 = nn.Linear(64, 1)
def forward(self, x):
# 공유 백본
x = F.relu(self.bn_input(self.conv_input(x)))
for block in self.residual_blocks:
x = block(x)
# Policy Head
policy = F.relu(self.policy_bn(self.policy_conv(x)))
policy = policy.view(policy.size(0), -1)
policy = self.policy_fc(policy)
policy = F.log_softmax(policy, dim=1)
# Value Head
value = F.relu(self.value_bn(self.value_conv(x)))
value = value.view(value.size(0), -1)
value = F.relu(self.value_fc1(value))
value = torch.tanh(self.value_fc2(value))
return policy, value
def create_network(board_size=9):
"""네트워크 생성"""
return PolicyValueNetwork(
board_size=board_size,
input_channels=7,
num_filters=64,
num_blocks=4
)
4단계: MCTS 구현
노드 클래스
# mcts/node.py
import numpy as np
class MCTSNode:
"""MCTS 노드"""
def __init__(self, prior=0.0):
self.visit_count = 0
self.value_sum = 0.0
self.prior = prior
self.children = {}
@property
def value(self):
if self.visit_count == 0:
return 0.0
return self.value_sum / self.visit_count
def expand(self, policy, legal_moves):
"""노드 확장"""
for move in legal_moves:
if move not in self.children:
idx = move[0] * 9 + move[1] if move[0] >= 0 else 81
self.children[move] = MCTSNode(prior=np.exp(policy[idx]))
def select_child(self, c_puct=1.5):
"""PUCT로 자식 노드 선택"""
best_score = -float('inf')
best_move = None
best_child = None
sqrt_total = np.sqrt(max(1, self.visit_count))
for move, child in self.children.items():
if child.visit_count > 0:
q_value = child.value
else:
q_value = 0.0
u_value = c_puct * child.prior * sqrt_total / (1 + child.visit_count)
score = q_value + u_value
if score > best_score:
best_score = score
best_move = move
best_child = child
return best_move, best_child
탐색 구현
# mcts/search.py
import numpy as np
import torch
from .node import MCTSNode
class MCTS:
"""몬테카를로 트리 탐색"""
def __init__(self, network, board_size=9, num_simulations=100, c_puct=1.5):
self.network = network
self.board_size = board_size
self.num_simulations = num_simulations
self.c_puct = c_puct
def search(self, board, add_noise=False):
"""MCTS 탐색 실행"""
root = MCTSNode()
# 루트 노드 평가
policy, value = self.evaluate(board)
legal_moves = board.get_legal_moves()
root.expand(policy, legal_moves)
# Dirichlet 노이즈 추가 (학습 시)
if add_noise:
self.add_dirichlet_noise(root)
# 시뮬레이션 실행
for _ in range(self.num_simulations):
node = root
scratch_board = board.copy()
path = [node]
# Selection
while node.children and scratch_board.get_legal_moves():
move, node = node.select_child(self.c_puct)
if move[0] >= 0:
scratch_board.play(move[0], move[1])
else:
scratch_board.pass_move()
path.append(node)
if scratch_board.is_game_over():
break
# Expansion + Evaluation
if not scratch_board.is_game_over():
policy, value = self.evaluate(scratch_board)
legal_moves = scratch_board.get_legal_moves()
if legal_moves:
node.expand(policy, legal_moves)
# 탐색 시작점의 관점에서 가치 계산
if scratch_board.is_game_over():
score = scratch_board.score()
value = 1.0 if score > 0 else (-1.0 if score < 0 else 0.0)
if board.current_player != scratch_board.BLACK:
value = -value
# Backpropagation
for node in reversed(path):
node.visit_count += 1
node.value_sum += value
value = -value
return root
def evaluate(self, board):
"""신경망으로 평가"""
from model.features import encode_board
features = encode_board(board)
features = torch.tensor(features).unsqueeze(0)
self.network.eval()
with torch.no_grad():
policy, value = self.network(features)
return policy[0].numpy(), value[0].item()
def add_dirichlet_noise(self, root, alpha=0.3, epsilon=0.25):
"""탐색 노이즈 추가"""
noise = np.random.dirichlet([alpha] * len(root.children))
for i, child in enumerate(root.children.values()):
child.prior = (1 - epsilon) * child.prior + epsilon * noise[i]
def get_policy(self, root, temperature=1.0):
"""탐색 결과에서 정책 가져오기"""
visits = np.zeros(self.board_size ** 2 + 1)
for move, child in root.children.items():
idx = move[0] * self.board_size + move[1] if move[0] >= 0 else self.board_size ** 2
visits[idx] = child.visit_count
if temperature == 0:
policy = np.zeros_like(visits)
policy[np.argmax(visits)] = 1.0
else:
visits = visits ** (1 / temperature)
policy = visits / visits.sum()
return policy
def select_move(self, root, temperature=1.0):
"""착수 선택"""
policy = self.get_policy(root, temperature)
idx = np.random.choice(len(policy), p=policy)
if idx == self.board_size ** 2:
return (-1, -1)
else:
return (idx // self.board_size, idx % self.board_size)
5단계: 자가 대국
# training/self_play.py
import numpy as np
from game.board import Board
from model.features import encode_board
def self_play_game(mcts, temperature=1.0, temp_threshold=30):
"""한 판 자가 대국 실행"""
board = Board(size=9)
game_history = []
move_count = 0
while not board.is_game_over() and move_count < 200:
# MCTS 탐색
root = mcts.search(board, add_noise=True)
# 정책 가져오기
temp = temperature if move_count < temp_threshold else 0.0
policy = mcts.get_policy(root, temp)
# 학습 데이터 기록
features = encode_board(board)
game_history.append({
'features': features,
'policy': policy,
'player': board.current_player
})
# 착수 선택 및 실행
move = mcts.select_move(root, temp)
if move[0] >= 0:
board.play(move[0], move[1])
else:
board.pass_move()
move_count += 1
# 승패 계산
score = board.score()
winner = Board.BLACK if score > 0 else (Board.WHITE if score < 0 else 0)
# 가치 표시
for data in game_history:
if winner == 0:
data['value'] = 0.0
elif data['player'] == winner:
data['value'] = 1.0
else:
data['value'] = -1.0
return game_history
def generate_training_data(mcts, num_games=100):
"""학습 데이터 생성"""
all_data = []
for i in range(num_games):
print(f"Self-play game {i+1}/{num_games}")
game_data = self_play_game(mcts)
all_data.extend(game_data)
return all_data
6단계: 학습기
# training/trainer.py
import torch
import torch.nn.functional as F
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
class Trainer:
"""학습기"""
def __init__(self, network, learning_rate=0.001):
self.network = network
self.optimizer = torch.optim.Adam(network.parameters(), lr=learning_rate)
def train_step(self, batch):
"""단일 학습 단계"""
features, target_policy, target_value = batch
self.network.train()
self.optimizer.zero_grad()
# 순전파
pred_policy, pred_value = self.network(features)
# 손실 계산
policy_loss = F.kl_div(pred_policy, target_policy, reduction='batchmean')
value_loss = F.mse_loss(pred_value.squeeze(), target_value)
total_loss = policy_loss + value_loss
# 역전파
total_loss.backward()
self.optimizer.step()
return {
'total_loss': total_loss.item(),
'policy_loss': policy_loss.item(),
'value_loss': value_loss.item()
}
def train_epoch(self, data, batch_size=32):
"""한 에폭 학습"""
# 데이터 준비
features = np.array([d['features'] for d in data])
policies = np.array([d['policy'] for d in data])
values = np.array([d['value'] for d in data])
features = torch.tensor(features, dtype=torch.float32)
policies = torch.tensor(policies, dtype=torch.float32)
values = torch.tensor(values, dtype=torch.float32)
dataset = TensorDataset(features, policies, values)
loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
total_losses = []
for batch in loader:
losses = self.train_step(batch)
total_losses.append(losses['total_loss'])
return np.mean(total_losses)
def save(self, path):
"""모델 저장"""
torch.save(self.network.state_dict(), path)
def load(self, path):
"""모델 로드"""
self.network.load_state_dict(torch.load(path))
7단계: 메인 프로그램
# main.py
from model.network import create_network
from mcts.search import MCTS
from training.self_play import generate_training_data
from training.trainer import Trainer
def main():
# 네트워크 생성
network = create_network(board_size=9)
mcts = MCTS(network, board_size=9, num_simulations=100)
trainer = Trainer(network)
# 학습 루프
num_iterations = 100
games_per_iteration = 50
epochs_per_iteration = 10
for iteration in range(num_iterations):
print(f"\n=== Iteration {iteration + 1}/{num_iterations} ===")
# 자가 대국
print("Generating self-play games...")
training_data = generate_training_data(mcts, num_games=games_per_iteration)
# 학습
print("Training...")
for epoch in range(epochs_per_iteration):
loss = trainer.train_epoch(training_data)
print(f" Epoch {epoch + 1}: loss = {loss:.4f}")
# 저장
trainer.save(f"model_iter_{iteration + 1}.pt")
print("\nTraining complete!")
if __name__ == "__main__":
main()
실행과 테스트
의존성 설치
pip install torch numpy
학습 실행
python main.py
예상 출력
=== Iteration 1/100 ===
Generating self-play games...
Self-play game 1/50
Self-play game 2/50
...
Training...
Epoch 1: loss = 2.3456
Epoch 2: loss = 1.8765
...
개선 제안
단기 개선
| 개선 항목 | 설명 |
|---|---|
| 잔차 블록 증가 | 4 → 8 → 16 블록 |
| 채널 수 증가 | 64 → 128 → 256 |
| 시뮬레이션 횟수 증가 | 100 → 400 → 800 |
| 더 큰 학습 세트 | 50 → 200 → 1000 게임/반복 |
장기 개선
- 19×19 바둑판 지원
- 보조 학습 목표 추가 (영역 예측)
- 병렬 자가 대국 구현
- GPU 가속 추가
추가 읽기
- 신경망 아키텍처 상세 분석 — 더 깊은 네트워크 설계
- MCTS 구현 세부사항 — 고급 탐색 기술
- KataGo 학습 메커니즘 분석 — 프로덕션급 학습 시스템
- 핵심 논문 가이드 — 이론적 기초