import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import torch
import collections
import IPython
내 메모는 녹색
강의영상
https://www.youtube.com/watch?v=Y55g-okEsjI&list=PLQqh36zP38-zBEizLbjgRE8qMfsJML6Ua&index=1
imports
예비학습
-
collections.deque
의 기능
= collections.deque([1,2,3], maxlen = 5 )
a a
deque([1, 2, 3])
4)
a.append( a
deque([1, 2, 3, 4])
5)
a.append( a
deque([1, 2, 3, 4, 5])
6)
a.append( a
deque([2, 3, 4, 5, 6])
-
단점? numpy array 보다는 list 느낌임 (연산에 특화된건 아님)
+ 1 a
TypeError: can only concatenate deque (not "int") to deque
-
그렇지만 필요하다면 np.array 화 시킬 수 있음.
+ 1 np.array(a)
-
collection.deque 는 리플레이 버퍼를 구현할때 유용한 자료구조이다.
- (우리가 했던) 기존방식: 모든 데이터를 저장하며 하나의 경험씩 학습함
- 리플레이버퍼: 최근 \(N\)개의 데이터를 저장하여 여러경험을 샘플링하여 학습하는 방식
- 리플레이버퍼의 장점: 메모리를 아낄 수 있다, 다양한 종류의 경험을 저장하고 무작위로 재사용하여 학습이 안정적으로 된다, “저장 -> 학습 -> 저장” 순으로 반드시 실시간으로 학습할 필요가 없어서 병렬처리에 용이하다, 강화학습에서 연속된 경험은 상관관계가 있을 수 있는데 무작위 샘플로 이러한 상관관계를 제거할 수 있음
Game3: LunarLander
-
환경생성
= gym.make('LunarLander-v2', render_mode = 'rgb_array')
env env
<TimeLimit<OrderEnforcing<PassiveEnvChecker<LunarLander<LunarLander-v2>>>>>
-
state_space
env.observation_space
Box([-1.5 -1.5 -5. -5. -3.1415927 -5.
-0. -0. ], [1.5 1.5 5. 5. 3.1415927 5. 1.
1. ], (8,), float32)
env.observation_space.sample()
array([ 0.26299486, -0.8300088 , 3.0305617 , -1.0865942 , 1.9163206 ,
-0.2580665 , 0.5529532 , 0.64002186], dtype=float32)
-
action_space
env.action_space
Discrete(4)
env.action_space.sample()
1
-
env.reset()
type(env.reset())
tuple
0] env.reset()[
array([-0.00568256, 1.4154928 , -0.57559764, 0.203213 , 0.00659147,
0.1303815 , 0. , 0. ], dtype=float32)
1] env.reset()[
{}
reset된 초기 상태, 시작점?
= env.reset()
state, _ state
array([ 0.00335846, 1.4095061 , 0.3401579 , -0.06285264, -0.00388481,
-0.07705088, 0. , 0. ], dtype=float32)
-
env.render()
env.render().shape
(400, 600, 3)
plt.imshow(env.render())
-
env.step
env.step(0)
의 입력은 action
= env.step(0)
next_state, reward, terminated, _, _ next_state, reward, terminated
(array([ 0.00671701, 1.4075147 , 0.3396984 , -0.08852121, -0.00769225,
-0.07615532, 0. , 0. ], dtype=float32),
-0.6954854601109162,
False)
terminated => 끝났는지 안 끝났는지~
-
play
env.reset() plt.imshow(env.render())
for _ in range(7):
3)
env.step(2)
env.step( plt.imshow(env.render())
자유낙하
for _ in range(7):
0)
env.step( plt.imshow(env.render())
왼쪽
for _ in range(7):
1)
env.step( plt.imshow(env.render())
위
for _ in range(7):
2)
env.step( plt.imshow(env.render())
오른쪽
for _ in range(7):
3)
env.step( plt.imshow(env.render())
- 0 : 아무행동도 하지 않음
- 1 : 왼쪽
- 2 : 위
- 3 : 오른쪽
시각화
jump가 10인 것은 이미지 생략한 것일 뿐임
def show(ims,jump=10):
= ims[::jump]
ims = plt.Figure()
fig = fig.subplots()
ax def update(i):
ax.imshow(ims[i])= FuncAnimation(fig,update,frames=len(ims))
ani display(IPython.display.HTML(ani.to_jshtml()))
= env.reset()
current_state, _ = []
ims for t in range(500):
= env.action_space.sample()
action = env.step(action)
next_state, reward, terminated, _, _ = env.render()
im
ims.append(im) = next_state
current_state if terminated: break
show(ims)
q_net
-
원래는 agent.q
에 해당하는 것인데, 이전에서는 agent.q
를 (4,4,4) shape의 numpy array 를 사용했는데 여기서는 불가능
- 4x4 grid: 상태공간의 차원은 2차원이며 가질수 있는 값은 16개, 각 상태공간에서 할수 있는 행동이 4개 -> 총 16*4의 경우의 수에 대한 reward만 조사하면 되었음
- LunarLander: 상태공간의 차원은 8차원이지만 가질수 있는 값의 범위는 무한대 -> 무수히 많은 경우에 대한 reward 값을 조사하는건 현실적으로 불가능
-
데이터를 모아보자.
= collections.deque(maxlen=50)
current_states = collections.deque(maxlen=50)
actions = collections.deque(maxlen=50)
next_states = collections.deque(maxlen=50)
rewards = collections.deque(maxlen=50)
terminations
= env.reset()
current_state, _ for t in range(500):
## step1: agent >> env
= env.action_space.sample()
action ## step2:agent << env
= env.step(action)
next_state, reward, terminated, _, _
current_states.append(current_state)
actions.append(action)
next_states.append(next_state)
rewards.append(reward)
terminations.append(terminated) ## step3: learn
## step4: update state
= next_state
current_state ## step5: 종료조건체크
if terminated: break
-
이전코드에서 아래에 대응하는 부분을 구현하면 된다.
## 1. q[x,y,a]를 초기화: q(s)를 넣으면 action에 대한 q값을 알려주는 기능
= np.zeros([4,4,4])
agent.q
## 2. q_estimated 를 계산
= agent.current_state
x,y = agent.next_state
xx,yy = agent.action
a = agent.q[x,y,a]
q_estimated
## 3. q_realistic = agent.reward + 0.99 * q_future 를 수행하는 과정
if agent.terminated:
= agent.reward
q_realistic else:
= q[xx,yy,:].max()
q_future = agent.reward + 0.99 * q_future
q_realistic
## 4. q_estimated 를 점점 q_realistic 와 비슷하게 만드는 과정
= q_realistic - q_estimated
diff = q_estimated + 0.05 * diff agent.q[x,y,a]
1
. agent.q 에 대응하는 과정
= torch.nn.Sequential(
q_net 8,128),
torch.nn.Linear(
torch.nn.ReLU(),128,64),
torch.nn.Linear(
torch.nn.ReLU(),64,32),
torch.nn.Linear(
torch.nn.ReLU(),32,4)
torch.nn.Linear( )
# <- 8개의 숫자가 입력으로 오면 4개의 숫자를 리턴하는 함수 q_net
Sequential(
(0): Linear(in_features=8, out_features=128, bias=True)
(1): ReLU()
(2): Linear(in_features=128, out_features=64, bias=True)
(3): ReLU()
(4): Linear(in_features=64, out_features=32, bias=True)
(5): ReLU()
(6): Linear(in_features=32, out_features=4, bias=True)
)
q_net(torch.tensor(current_state))
tensor([ 0.2089, 0.0208, -0.0194, 0.1119], grad_fn=<AddBackward0>)
- q_net은 8개의 숫자가 입력으로 오면 4개의 숫자가 리턴되는 함수이다.
- 해석을 하면 8개의 숫자는 state를 나타내는 숫자로 이해할 수 있고 4개의 숫자는 각 action에 대한 q값으로 해석할 수 있다.
- 하지만 이 숫자가 합리적인건 아님 (아무숫자임)
- q_net의 특징: 고정된 함수가 아니고 데이터를 이용하여 점점 더 그럴듯한 숫자를 뱉어내도록 학습할 수 있는 함수이다. (뉴럴네트워크)
1
. agent.q 에 대응하는 과정 (배치버전)
– get batch –
= 4
batch_size = np.random.randint(0,50,size=batch_size)
idx
= torch.tensor(np.array(current_states))[idx].float()
current_states_batch = torch.tensor(np.array(actions))[idx].reshape(batch_size,-1)
actions_batch = torch.tensor(np.array(rewards))[idx].reshape(batch_size,-1).float()
rewards_batch = torch.tensor(np.array(next_states))[idx].float()
next_states_batch = torch.tensor(np.array(terminations))[idx].reshape(batch_size,-1) terminations_batch
자료형 float64를 float 32로 바꿔주자, 64가 더 자세히 표현 가능함
– q_net –
current_states_batch
tensor([[ 0.0460, 1.1553, 0.2655, -0.7388, -0.1076, -0.1409, 0.0000, 0.0000],
[ 0.1101, 0.7167, 0.3612, -1.0887, -0.2963, -0.2328, 0.0000, 0.0000],
[ 0.1986, 0.0916, 0.4388, -1.4444, -0.4664, -0.0699, 0.0000, 0.0000],
[ 0.1560, 0.3964, 0.4360, -1.2567, -0.4123, -0.1800, 0.0000, 0.0000]])
q_net(current_states_batch)
tensor([[ 0.1819, -0.0285, 0.0155, 0.0395],
[ 0.1837, -0.0370, 0.0174, 0.0401],
[ 0.1934, -0.0340, 0.0255, 0.0449],
[ 0.1874, -0.0365, 0.0215, 0.0408]], grad_fn=<AddmmBackward0>)
2
. q_estimated
q_net(current_states_batch), actions_batch
(tensor([[ 0.1819, -0.0285, 0.0155, 0.0395],
[ 0.1837, -0.0370, 0.0174, 0.0401],
[ 0.1934, -0.0340, 0.0255, 0.0449],
[ 0.1874, -0.0365, 0.0215, 0.0408]], grad_fn=<AddmmBackward0>),
tensor([[0],
[1],
[1],
[1]]))
1,actions_batch) q_net(current_states_batch).gather(
tensor([[ 0.1819],
[-0.0370],
[-0.0340],
[-0.0365]], grad_fn=<GatherBackward0>)
action에 맞는 거 뽑아주는 기능 gather
3
. q_realistic = agent.reward + 0.99 * q_future
– q_future –
= q_net(next_states_batch).max(axis=1)[0].reshape(batch_size,1)
q_future q_future
tensor([[0.1820],
[0.1842],
[0.1942],
[0.1883]], grad_fn=<ReshapeAliasBackward0>)
= rewards_batch + 0.99 * q_future * (~terminations_batch) q_realistic
~~terminations_batch
1-를 뒤집는 역할
4
. q_estimated 를 점점 q_realistic 와 비슷하게 만드는 과정
## 여기는.. 딥러닝과 파이토치를 좀 알아야.. 모른다면 일단 패스해야합니다..
= torch.optim.Adam(q_net.parameters(),lr=0.0001)
optimizer for _ in range(2000):
~~~
~~~
= ~~~
q_estimated = ~~~
q_realistic = torch.nn.functional.mse_loss(q_estimated,q_realistic)
loss
loss.backward()
optimizer.step() optimizer.zero_grad()
policy
= 0.5
eps if np.random.rand() < eps:
= env.action_space.sample()
action else:
= q_net(torch.tensor(current_state)).argmax().item() action
tensor에서 수만 나오게 하는 item()
action
0
Agent 클래스 + run
class Agent():
def __init__(self,env):
self.eps = 0
self.n_experiences = 0
self.n_episode = 0
self.score = 0
self.scores = []
self.playtimes = []
self.batch_size = 64
self.buffer_size = 5000
self.action_space = env.action_space
#self.state_space = env.observation_space
# Q-Network
self.q_net = torch.nn.Sequential(
8,128),
torch.nn.Linear(
torch.nn.ReLU(),128,64),
torch.nn.Linear(
torch.nn.ReLU(),64,32),
torch.nn.Linear(
torch.nn.ReLU(),32,4)
torch.nn.Linear(
) self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=0.0001)
# ReplayBuffer
self.current_states = collections.deque(maxlen=self.buffer_size)
self.actions = collections.deque(maxlen=self.buffer_size)
self.rewards = collections.deque(maxlen=self.buffer_size)
self.next_states = collections.deque(maxlen=self.buffer_size)
self.terminations = collections.deque(maxlen=self.buffer_size)
def save_experience(self):
"""Add a new experience to memory."""
self.current_states.append(self.current_state)
self.actions.append(self.action)
self.rewards.append(self.reward)
self.next_states.append(self.next_state)
self.terminations.append(self.terminated)
self.n_experiences = self.n_experiences+1
self.score += self.reward
def act(self):
if np.random.rand() < self.eps:
self.action = self.action_space.sample()
else:
self.action = self.q_net(torch.tensor(self.current_state)).argmax().item()
def get_batch(self):
= np.random.randint(0,self.buffer_size,size=self.batch_size)
idx self.current_states_batch = torch.tensor(np.array(self.current_states))[idx].float()
self.actions_batch = torch.tensor(np.array(self.actions))[idx].reshape(self.batch_size,1)
self.rewards_batch = torch.tensor(np.array(self.rewards))[idx].reshape(self.batch_size,-1).float()
self.next_states_batch = torch.tensor(np.array(self.next_states))[idx].float()
self.terminations_batch = torch.tensor(np.array(self.terminations))[idx].reshape(self.batch_size,-1)
def learn(self):
if self.n_experiences < self.buffer_size:
pass
else:
self.get_batch()
= self.q_net(self.current_states_batch).gather(1, self.actions_batch)
q_estimated = self.q_net(self.next_states_batch).detach().max(1)[0].reshape(self.batch_size,1)
q_future = self.rewards_batch + 0.99 * q_future * (~self.terminations_batch)
q_realistic
= torch.nn.functional.mse_loss(q_estimated, q_realistic)
loss
loss.backward()self.optimizer.step()
self.optimizer.zero_grad()
= gym.make('LunarLander-v2',render_mode='rgb_array')
env = Agent(env)
agent = 1.0
agent.eps for _ in range(2000):
### 1. 본질적인 코드
= env.reset()
agent.current_state, _ = False
agent.terminated = 0
agent.score for t in range(500):
# step1: agent >> env
agent.act() = agent.action
env.agent_action # step2: agent << env
= env.step(env.agent_action)
agent.next_state, agent.reward, agent.terminated, _,_
agent.save_experience() # step3: learn "
agent.learn()# step4: state update
= agent.next_state
agent.current_state # step5:
if agent.terminated: break
agent.scores.append(agent.score) +1)
agent.playtimes.append(t= agent.n_episode + 1
agent.n_episode = agent.eps*0.995
agent.eps ## 2. 비본질적 코드
if (agent.n_episode % 10) == 0:
print(
f'Episode {agent.n_episode}\t'
f'Score: {np.mean(agent.scores[-100:]) : .2f}\t'
f'Playtime: {np.mean(agent.playtimes[-100:]) : .2f}\t'
f'n_eps: {agent.eps}\t'
f'n_experiences: {agent.n_experiences}\t'
)if np.mean(agent.scores[-100:])>=200.0:
break
Episode 10 Score: -172.47 Playtime: 96.90 n_eps: 0.9511101304657719 n_experiences: 969
Episode 20 Score: -169.88 Playtime: 100.10 n_eps: 0.9046104802746175 n_experiences: 2002
Episode 30 Score: -183.41 Playtime: 98.43 n_eps: 0.8603841919146962 n_experiences: 2953
Episode 40 Score: -206.50 Playtime: 101.62 n_eps: 0.8183201210226743 n_experiences: 4065
Episode 50 Score: -219.56 Playtime: 103.06 n_eps: 0.778312557068642 n_experiences: 5153
Episode 60 Score: -222.80 Playtime: 101.28 n_eps: 0.7402609576967045 n_experiences: 6077
Episode 70 Score: -218.37 Playtime: 100.14 n_eps: 0.7040696960536299 n_experiences: 7010
Episode 80 Score: -215.56 Playtime: 98.80 n_eps: 0.6696478204705644 n_experiences: 7904
Episode 90 Score: -209.10 Playtime: 97.70 n_eps: 0.6369088258938781 n_experiences: 8793
Episode 100 Score: -205.01 Playtime: 97.46 n_eps: 0.6057704364907278 n_experiences: 9746
Episode 110 Score: -201.82 Playtime: 100.77 n_eps: 0.5761543988830038 n_experiences: 11046
Episode 120 Score: -213.35 Playtime: 107.39 n_eps: 0.547986285490042 n_experiences: 12741
Episode 130 Score: -211.61 Playtime: 110.09 n_eps: 0.5211953074858876 n_experiences: 13962
Episode 140 Score: -196.57 Playtime: 114.99 n_eps: 0.49571413690105054 n_experiences: 15564
Episode 150 Score: -180.50 Playtime: 117.85 n_eps: 0.47147873742168567 n_experiences: 16938
Episode 160 Score: -189.93 Playtime: 126.25 n_eps: 0.4484282034609769 n_experiences: 18702
-
시각화를 위한코드
= Agent(env)
agent2 = agent.q_net
agent2.q_net
= env.reset()
agent2.current_state, _ = False
agent2.terminated = []
ims
ims.append(env.render())for t in range(500):
agent2.act() = env.step(agent2.action)
agent2.next_state, agent2.reward, agent2.terminated, _, _ = env.render()
im
ims.append(im)= agent2.next_state
agent2.current_state if agent2.terminated: break
show(ims)