A3: 강화학습 (3) – LunarLander

Author

최규빈

Published

September 1, 2023

내 메모는 녹색

강의영상

https://www.youtube.com/watch?v=Y55g-okEsjI&list=PLQqh36zP38-zBEizLbjgRE8qMfsJML6Ua&index=1

imports

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import torch
import collections
import IPython

예비학습

- collections.deque 의 기능

a = collections.deque([1,2,3], maxlen = 5 )
a
deque([1, 2, 3])
a.append(4)
a
deque([1, 2, 3, 4])
a.append(5)
a
deque([1, 2, 3, 4, 5])
a.append(6)
a
deque([2, 3, 4, 5, 6])

- 단점? numpy array 보다는 list 느낌임 (연산에 특화된건 아님)

a + 1
TypeError: can only concatenate deque (not "int") to deque

- 그렇지만 필요하다면 np.array 화 시킬 수 있음.

np.array(a) + 1

- collection.deque 는 리플레이 버퍼를 구현할때 유용한 자료구조이다.

  • (우리가 했던) 기존방식: 모든 데이터를 저장하며 하나의 경험씩 학습함
  • 리플레이버퍼: 최근 \(N\)개의 데이터를 저장하여 여러경험을 샘플링하여 학습하는 방식
  • 리플레이버퍼의 장점: 메모리를 아낄 수 있다, 다양한 종류의 경험을 저장하고 무작위로 재사용하여 학습이 안정적으로 된다, “저장 -> 학습 -> 저장” 순으로 반드시 실시간으로 학습할 필요가 없어서 병렬처리에 용이하다, 강화학습에서 연속된 경험은 상관관계가 있을 수 있는데 무작위 샘플로 이러한 상관관계를 제거할 수 있음

Game3: LunarLander

- 환경생성

env = gym.make('LunarLander-v2', render_mode = 'rgb_array') 
env 
<TimeLimit<OrderEnforcing<PassiveEnvChecker<LunarLander<LunarLander-v2>>>>>

- state_space

env.observation_space
Box([-1.5       -1.5       -5.        -5.        -3.1415927 -5.
 -0.        -0.       ], [1.5       1.5       5.        5.        3.1415927 5.        1.
 1.       ], (8,), float32)
env.observation_space.sample()
array([ 0.26299486, -0.8300088 ,  3.0305617 , -1.0865942 ,  1.9163206 ,
       -0.2580665 ,  0.5529532 ,  0.64002186], dtype=float32)

- action_space

env.action_space
Discrete(4)
env.action_space.sample()
1

- env.reset()

type(env.reset())
tuple
env.reset()[0]
array([-0.00568256,  1.4154928 , -0.57559764,  0.203213  ,  0.00659147,
        0.1303815 ,  0.        ,  0.        ], dtype=float32)
env.reset()[1]
{}

reset된 초기 상태, 시작점?

state, _ = env.reset()
state 
array([ 0.00335846,  1.4095061 ,  0.3401579 , -0.06285264, -0.00388481,
       -0.07705088,  0.        ,  0.        ], dtype=float32)

- env.render()

env.render().shape
(400, 600, 3)
plt.imshow(env.render())

- env.step

env.step(0)의 입력은 action

next_state, reward, terminated, _, _ = env.step(0)
next_state, reward, terminated
(array([ 0.00671701,  1.4075147 ,  0.3396984 , -0.08852121, -0.00769225,
        -0.07615532,  0.        ,  0.        ], dtype=float32),
 -0.6954854601109162,
 False)

terminated => 끝났는지 안 끝났는지~

- play

env.reset()
plt.imshow(env.render())

for _ in range(7):
    env.step(3)
    env.step(2)
plt.imshow(env.render())

자유낙하

for _ in range(7):
    env.step(0)
plt.imshow(env.render())

왼쪽

for _ in range(7):
    env.step(1)
plt.imshow(env.render())

for _ in range(7):
    env.step(2)
plt.imshow(env.render())

오른쪽

for _ in range(7):
    env.step(3)
plt.imshow(env.render())

  • 0 : 아무행동도 하지 않음
  • 1 : 왼쪽
  • 2 : 위
  • 3 : 오른쪽

시각화

jump가 10인 것은 이미지 생략한 것일 뿐임

def show(ims,jump=10):
    ims = ims[::jump]
    fig = plt.Figure()
    ax = fig.subplots()
    def update(i):
       ax.imshow(ims[i])
    ani = FuncAnimation(fig,update,frames=len(ims))
    display(IPython.display.HTML(ani.to_jshtml()))
current_state, _ = env.reset()
ims = [] 
for t in range(500): 
    action = env.action_space.sample()
    next_state, reward, terminated, _, _ = env.step(action)
    im = env.render()
    ims.append(im) 
    current_state = next_state 
    if terminated: break 
show(ims) 

q_net

- 원래는 agent.q 에 해당하는 것인데, 이전에서는 agent.q를 (4,4,4) shape의 numpy array 를 사용했는데 여기서는 불가능

  • 4x4 grid: 상태공간의 차원은 2차원이며 가질수 있는 값은 16개, 각 상태공간에서 할수 있는 행동이 4개 -> 총 16*4의 경우의 수에 대한 reward만 조사하면 되었음
  • LunarLander: 상태공간의 차원은 8차원이지만 가질수 있는 값의 범위는 무한대 -> 무수히 많은 경우에 대한 reward 값을 조사하는건 현실적으로 불가능

- 데이터를 모아보자.

current_states = collections.deque(maxlen=50) 
actions = collections.deque(maxlen=50) 
next_states = collections.deque(maxlen=50) 
rewards = collections.deque(maxlen=50) 
terminations = collections.deque(maxlen=50) 

current_state, _ = env.reset()
for t in range(500): 
    ## step1: agent >> env 
    action = env.action_space.sample()
    ## step2:agent << env 
    next_state, reward, terminated, _, _ = env.step(action)
    current_states.append(current_state)
    actions.append(action)
    next_states.append(next_state)
    rewards.append(reward)
    terminations.append(terminated) 
    ## step3: learn 
    ## step4: update state     
    current_state = next_state 
    ## step5: 종료조건체크 
    if terminated: break 

- 이전코드에서 아래에 대응하는 부분을 구현하면 된다.

## 1. q[x,y,a]를 초기화: q(s)를 넣으면 action에 대한 q값을 알려주는 기능 
agent.q = np.zeros([4,4,4]) 

## 2. q_estimated 를 계산 
x,y = agent.current_state
xx,yy = agent.next_state
a = agent.action 
q_estimated = agent.q[x,y,a] 

## 3. q_realistic = agent.reward + 0.99 * q_future 를 수행하는 과정 
if agent.terminated:
    q_realistic = agent.reward
else:
    q_future = q[xx,yy,:].max()
    q_realistic = agent.reward + 0.99 * q_future

## 4. q_estimated 를 점점 q_realistic 와 비슷하게 만드는 과정 
diff = q_realistic - q_estimated 
agent.q[x,y,a] = q_estimated + 0.05 * diff 

1. agent.q 에 대응하는 과정

q_net = torch.nn.Sequential(
    torch.nn.Linear(8,128),
    torch.nn.ReLU(),
    torch.nn.Linear(128,64),
    torch.nn.ReLU(),
    torch.nn.Linear(64,32),
    torch.nn.ReLU(),
    torch.nn.Linear(32,4)
)
q_net # <- 8개의 숫자가 입력으로 오면 4개의 숫자를 리턴하는 함수 
Sequential(
  (0): Linear(in_features=8, out_features=128, bias=True)
  (1): ReLU()
  (2): Linear(in_features=128, out_features=64, bias=True)
  (3): ReLU()
  (4): Linear(in_features=64, out_features=32, bias=True)
  (5): ReLU()
  (6): Linear(in_features=32, out_features=4, bias=True)
)
q_net(torch.tensor(current_state))
tensor([ 0.2089,  0.0208, -0.0194,  0.1119], grad_fn=<AddBackward0>)
  • q_net은 8개의 숫자가 입력으로 오면 4개의 숫자가 리턴되는 함수이다.
  • 해석을 하면 8개의 숫자는 state를 나타내는 숫자로 이해할 수 있고 4개의 숫자는 각 action에 대한 q값으로 해석할 수 있다.
  • 하지만 이 숫자가 합리적인건 아님 (아무숫자임)
  • q_net의 특징: 고정된 함수가 아니고 데이터를 이용하여 점점 더 그럴듯한 숫자를 뱉어내도록 학습할 수 있는 함수이다. (뉴럴네트워크)

1. agent.q 에 대응하는 과정 (배치버전)

– get batch –

batch_size = 4 
idx = np.random.randint(0,50,size=batch_size)

current_states_batch = torch.tensor(np.array(current_states))[idx].float()
actions_batch = torch.tensor(np.array(actions))[idx].reshape(batch_size,-1) 
rewards_batch = torch.tensor(np.array(rewards))[idx].reshape(batch_size,-1).float()
next_states_batch = torch.tensor(np.array(next_states))[idx].float()
terminations_batch = torch.tensor(np.array(terminations))[idx].reshape(batch_size,-1)

자료형 float64를 float 32로 바꿔주자, 64가 더 자세히 표현 가능함

– q_net –

current_states_batch
tensor([[ 0.0460,  1.1553,  0.2655, -0.7388, -0.1076, -0.1409,  0.0000,  0.0000],
        [ 0.1101,  0.7167,  0.3612, -1.0887, -0.2963, -0.2328,  0.0000,  0.0000],
        [ 0.1986,  0.0916,  0.4388, -1.4444, -0.4664, -0.0699,  0.0000,  0.0000],
        [ 0.1560,  0.3964,  0.4360, -1.2567, -0.4123, -0.1800,  0.0000,  0.0000]])
q_net(current_states_batch)
tensor([[ 0.1819, -0.0285,  0.0155,  0.0395],
        [ 0.1837, -0.0370,  0.0174,  0.0401],
        [ 0.1934, -0.0340,  0.0255,  0.0449],
        [ 0.1874, -0.0365,  0.0215,  0.0408]], grad_fn=<AddmmBackward0>)

2. q_estimated

q_net(current_states_batch), actions_batch
(tensor([[ 0.1819, -0.0285,  0.0155,  0.0395],
         [ 0.1837, -0.0370,  0.0174,  0.0401],
         [ 0.1934, -0.0340,  0.0255,  0.0449],
         [ 0.1874, -0.0365,  0.0215,  0.0408]], grad_fn=<AddmmBackward0>),
 tensor([[0],
         [1],
         [1],
         [1]]))
q_net(current_states_batch).gather(1,actions_batch)
tensor([[ 0.1819],
        [-0.0370],
        [-0.0340],
        [-0.0365]], grad_fn=<GatherBackward0>)

action에 맞는 거 뽑아주는 기능 gather

3. q_realistic = agent.reward + 0.99 * q_future

– q_future –

q_future = q_net(next_states_batch).max(axis=1)[0].reshape(batch_size,1)
q_future
tensor([[0.1820],
        [0.1842],
        [0.1942],
        [0.1883]], grad_fn=<ReshapeAliasBackward0>)
q_realistic = rewards_batch + 0.99 * q_future * (~terminations_batch)

~~terminations_batch 1-를 뒤집는 역할

4. q_estimated 를 점점 q_realistic 와 비슷하게 만드는 과정

## 여기는.. 딥러닝과 파이토치를 좀 알아야.. 모른다면 일단 패스해야합니다.. 
optimizer = torch.optim.Adam(q_net.parameters(),lr=0.0001) 
for _ in range(2000):
    ~~~
    ~~~
    q_estimated = ~~~ 
    q_realistic = ~~~ 
    loss = torch.nn.functional.mse_loss(q_estimated,q_realistic)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

policy

eps = 0.5 
if np.random.rand() < eps:
    action = env.action_space.sample() 
else:
    action = q_net(torch.tensor(current_state)).argmax().item()

tensor에서 수만 나오게 하는 item()

action
0

Agent 클래스 + run

class Agent():
    def __init__(self,env):
        self.eps = 0
        self.n_experiences = 0
        self.n_episode = 0
        self.score = 0
        self.scores = []
        self.playtimes = []
        self.batch_size = 64
        self.buffer_size = 5000 
        self.action_space = env.action_space
        #self.state_space = env.observation_space

        # Q-Network
        self.q_net = torch.nn.Sequential(
            torch.nn.Linear(8,128), 
            torch.nn.ReLU(),
            torch.nn.Linear(128,64),
            torch.nn.ReLU(),
            torch.nn.Linear(64,32),
            torch.nn.ReLU(),
            torch.nn.Linear(32,4)
        ) 
        self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=0.0001)

        # ReplayBuffer
        self.current_states = collections.deque(maxlen=self.buffer_size)
        self.actions = collections.deque(maxlen=self.buffer_size)
        self.rewards = collections.deque(maxlen=self.buffer_size)
        self.next_states = collections.deque(maxlen=self.buffer_size)
        self.terminations = collections.deque(maxlen=self.buffer_size)
       
    def save_experience(self):
        """Add a new experience to memory."""
        self.current_states.append(self.current_state)
        self.actions.append(self.action)
        self.rewards.append(self.reward)
        self.next_states.append(self.next_state)
        self.terminations.append(self.terminated) 
        self.n_experiences = self.n_experiences+1
        self.score += self.reward
    
    def act(self):
        if np.random.rand() < self.eps:
            self.action = self.action_space.sample()
        else:
            self.action = self.q_net(torch.tensor(self.current_state)).argmax().item()
            
    def get_batch(self):
        idx = np.random.randint(0,self.buffer_size,size=self.batch_size) 
        self.current_states_batch = torch.tensor(np.array(self.current_states))[idx].float()
        self.actions_batch = torch.tensor(np.array(self.actions))[idx].reshape(self.batch_size,1)
        self.rewards_batch = torch.tensor(np.array(self.rewards))[idx].reshape(self.batch_size,-1).float()
        self.next_states_batch = torch.tensor(np.array(self.next_states))[idx].float()
        self.terminations_batch = torch.tensor(np.array(self.terminations))[idx].reshape(self.batch_size,-1) 
    
    def learn(self):
        if self.n_experiences < self.buffer_size:
            pass
        else: 
            self.get_batch()
            q_estimated = self.q_net(self.current_states_batch).gather(1, self.actions_batch)
            q_future = self.q_net(self.next_states_batch).detach().max(1)[0].reshape(self.batch_size,1)
            q_realistic = self.rewards_batch + 0.99 * q_future * (~self.terminations_batch)

            loss = torch.nn.functional.mse_loss(q_estimated, q_realistic)
            loss.backward()
            self.optimizer.step()
            self.optimizer.zero_grad()
env = gym.make('LunarLander-v2',render_mode='rgb_array')
agent = Agent(env)
agent.eps = 1.0 
for _ in range(2000):
    ### 1. 본질적인 코드
    agent.current_state, _  = env.reset() 
    agent.terminated = False
    agent.score = 0 
    for t in range(500):
        # step1: agent >> env 
        agent.act() 
        env.agent_action = agent.action  
        # step2: agent << env 
        agent.next_state, agent.reward, agent.terminated, _,_ = env.step(env.agent_action)
        agent.save_experience() 
        # step3: learn "
        agent.learn()
        # step4: state update 
        agent.current_state = agent.next_state 
        # step5: 
        if agent.terminated: break 
    agent.scores.append(agent.score) 
    agent.playtimes.append(t+1)
    agent.n_episode = agent.n_episode + 1 
    agent.eps = agent.eps*0.995
    ## 2. 비본질적 코드
    if (agent.n_episode % 10) == 0:
        print(
            f'Episode {agent.n_episode}\t'
            f'Score: {np.mean(agent.scores[-100:]) : .2f}\t'
            f'Playtime: {np.mean(agent.playtimes[-100:]) : .2f}\t'
            f'n_eps: {agent.eps}\t'
            f'n_experiences: {agent.n_experiences}\t'
        )
    if np.mean(agent.scores[-100:])>=200.0:
        break
Episode 10  Score: -172.47  Playtime:  96.90    n_eps: 0.9511101304657719   n_experiences: 969  
Episode 20  Score: -169.88  Playtime:  100.10   n_eps: 0.9046104802746175   n_experiences: 2002 
Episode 30  Score: -183.41  Playtime:  98.43    n_eps: 0.8603841919146962   n_experiences: 2953 
Episode 40  Score: -206.50  Playtime:  101.62   n_eps: 0.8183201210226743   n_experiences: 4065 
Episode 50  Score: -219.56  Playtime:  103.06   n_eps: 0.778312557068642    n_experiences: 5153 
Episode 60  Score: -222.80  Playtime:  101.28   n_eps: 0.7402609576967045   n_experiences: 6077 
Episode 70  Score: -218.37  Playtime:  100.14   n_eps: 0.7040696960536299   n_experiences: 7010 
Episode 80  Score: -215.56  Playtime:  98.80    n_eps: 0.6696478204705644   n_experiences: 7904 
Episode 90  Score: -209.10  Playtime:  97.70    n_eps: 0.6369088258938781   n_experiences: 8793 
Episode 100 Score: -205.01  Playtime:  97.46    n_eps: 0.6057704364907278   n_experiences: 9746 
Episode 110 Score: -201.82  Playtime:  100.77   n_eps: 0.5761543988830038   n_experiences: 11046    
Episode 120 Score: -213.35  Playtime:  107.39   n_eps: 0.547986285490042    n_experiences: 12741    
Episode 130 Score: -211.61  Playtime:  110.09   n_eps: 0.5211953074858876   n_experiences: 13962    
Episode 140 Score: -196.57  Playtime:  114.99   n_eps: 0.49571413690105054  n_experiences: 15564    
Episode 150 Score: -180.50  Playtime:  117.85   n_eps: 0.47147873742168567  n_experiences: 16938    
Episode 160 Score: -189.93  Playtime:  126.25   n_eps: 0.4484282034609769   n_experiences: 18702    

- 시각화를 위한코드

agent2 = Agent(env) 
agent2.q_net = agent.q_net

agent2.current_state, _ = env.reset()
agent2.terminated = False 
ims = [] 
ims.append(env.render())
for t in range(500):
    agent2.act() 
    agent2.next_state, agent2.reward, agent2.terminated, _, _  = env.step(agent2.action)
    im = env.render()
    ims.append(im)
    agent2.current_state = agent2.next_state
    if agent2.terminated: break 
show(ims)