import pip
def import_or_install(package):
try: __import__(package)
except ImportError: pip.main(['install', package])
modules = ['numpy','collections','functools', 'gym']
for m in modules:
import_or_install(m)
from IPython.display import clear_output
clear_output(wait=False)
import gym
import torch
import torch.nn as nn
import numpy as np
import random
def init_weights(m):
if type(m) == nn.Linear:
torch.nn.init.xavier_normal(m)
class Value(nn.Module):
def __init__(self,ni,nh,action_space=[0,1],device = 'cpu'):
super(Value,self).__init__()
self.linear = nn.Sequential(nn.Linear(ni,nh),
nn.Tanh(),
nn.Linear(nh,1))
self.action_space = action_space
self.device = device
self.to(self.device)
init_weights(self)
def forward(self,obs,action,batch=False):
if batch:
convert = lambda x: np.array(x) if type(x)!=np.array else x
bo = convert(obs)
ba = convert(action)
state_action = torch.tensor(np.hstack([bo, ba.reshape(-1,1)])).float().to(self.device)
else:
state_action = self.create_pair(obs,action)
return self.linear(state_action)
def act(self,obs,epilson=0.2,batch=False):
if batch:
if random.random()<epilson: return np.random.choice(self.action_space, len(obs))
return np.argmax([self(obs,np.ones(len(obs))*action,True).detach().numpy() for action in self.action_space],0)
if random.random()<epilson: return np.random.choice(self.action_space)
return np.argmax([self(obs,action).item() for action in self.action_space])
def create_pair(self,obs,action): return torch.tensor(np.append(obs,action)).float().to(self.device)
env = gym.make("Blackjack-v0")
def run(env,policy,k=5000):
win = 0
for _ in range(k):
done=False
obs = env.reset()
while not done:
action = policy(obs)
next_obs, reward, done, _ = env.step(action)
if reward==1:
win+= 1
print(f"Win ratio = {win/k}")
value = Value(3+1, 128)
policy = lambda obs: value.act(obs, epilson=0.0)
run(env, policy)
from torch.optim import Adam
from tqdm import tqdm
def grad_mc(env, k=1, gamma=0.9, lr=1e-3, device = 'cpu'):
ni = 3 + 1
nh = 64
value = Value(ni, nh, device = device)
optimizer = Adam(value.parameters(), lr = lr)
pb = tqdm(range(k))
for i in pb:
value.train()
policy = lambda obs: value.act(obs, epilson = 1/(i+1))
episode = create_episode(env,policy)
G = 0
loss = 0
for obs, action, reward in reversed(episode):
G = gamma*G + reward
yhat = value(obs,action)
loss += 0.5*(G - yhat)**2
optimizer.zero_grad()
loss.backward()
optimizer.step()
value.eval()
policy = lambda obs: value.act(obs,epilson=0.0)
return policy
def create_episode(env, policy):
obs = env.reset()
action = policy(obs)
next_obs, reward, done, _ = env.step(action)
episode = [(next_obs,action,reward)]
while not done:
obs = next_obs
action = policy(obs)
next_obs, reward, done, _ = env.step(action)
episode.append((next_obs,action,reward))
return episode
env = gym.make("Blackjack-v0")
policy = grad_mc(env,10000)
run(env, policy)
import torch.nn.functional as F
def semigrad_sarsa(env, k=1, gamma=0.9, lr=1e-3, device = 'cpu'):
ni = 3 + 1
nh = 64
value = Value(ni, nh, device = device)
optimizer = Adam(value.parameters(), lr = lr)
obs = env.reset()
pb = tqdm(range(k))
for i in pb:
value.train()
action = value.act(obs, epilson=1/(i+1))
next_obs, reward, done, _ = env.step(action)
next_action = value.act(next_obs, epilson=0.0)
target = reward + gamma*value(next_obs,next_action).detach()
yhat = value(obs,action)
loss = F.mse_loss(target,yhat)
optimizer.zero_grad()
loss.backward()
optimizer.step()
obs = next_obs
if done:
obs = env.reset()
value.eval()
policy = lambda obs: value.act(obs,epilson=0)
return policy
env = gym.make("Blackjack-v0")
policy = semigrad_sarsa(env,10000)
run(env, policy)
import gym
import random
env = gym.make("CartPole-v0")
"""
Cartpole Environment
Observation:
Type: Box(4)
Num Observation Min Max
0 Cart Position -4.8 4.8
1 Cart Velocity -Inf Inf
2 Pole Angle -0.418 rad (-24 deg) 0.418 rad (24 deg)
3 Pole Angular Velocity -Inf Inf
Actions:
Type: Discrete(2)
Num Action
0 Push cart to the left
1 Push cart to the right
Reward:
Reward is 1 for every step taken, including the termination step
Starting State:
All observations are assigned a uniform random value in [-0.05..0.05]
Episode Termination:
Pole Angle is more than 12 degrees.
Cart Position is more than 2.4 (center of the cart reaches the edge of
the display).
Episode length is greater than 200.
Solved Requirements:
Considered solved when the average return is greater than or equal to
195.0 over 100 consecutive trials.
"""
def run(env,policy,k=100):
crewards = 0
for _ in range(k):
done=False
obs = env.reset()
while not done:
action = policy(obs)
next_obs, reward, done, _ = env.step(action)
crewards += reward
obs = next_obs
print(f"Avg. Reward per Attempt = {crewards/k}")
value = Value(4+1, 128)
policy = lambda obs: value.act(obs, epilson=0.0)
run(env, policy)
from collections import deque
import random
class ExperienceReplay(object):
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)
def sample(self,nsamples=64):
indices = np.random.choice(np.arange(0,len(self.buffer)),nsamples)
samples = np.array(self.buffer)[indices]
return samples
def add(self,state, action, reward, next_state, done):
self.buffer.append([state, action, reward, next_state, done])
experience_replay = ExperienceReplay(2)
env = gym.make("CartPole-v0")
done= False
obs = env.reset()
for i in range(4):
action = env.action_space.sample()
next_obs, reward, done, _ = env.step(action)
experience_replay.add(obs, action, reward, next_obs, done)
obs = next_obs
print(experience_replay.buffer[0])
from tqdm.notebook import tqdm
import random
import torch.nn.functional as F
class DQN(nn.Module):
def __init__(self,ni,nh,action_space = [0,1], capacity=1000000,device='cpu'):
super(DQN,self).__init__()
self.value = Value(ni,nh, action_space=action_space,device = device)
self.experience_replay = ExperienceReplay(capacity)
self.to(device)
def forward(self, obs, action, batch=False):
return self.value(obs,action,batch=batch)
def act(self,obs,epilson=0.2, batch=False):
return self.value.act(obs,epilson=epilson,batch=batch)
def dqn_train(env, N, sample_size = 1, k = 1, gamma=0.9, lr = 1e-3, device='cpu'):
ni = env.observation_space.shape[0] + 1
nh = 128
dqn = DQN(ni,nh,capacity=N,device=device)
optimizer= Adam(dqn.parameters(), lr=lr)
obs = env.reset()
pb = tqdm(range(k))
creward = 0
for i in pb:
dqn.train()
action = dqn.act(obs,epilson=1/(i+1)+0.1)
next_obs, reward, done, _ = env.step(action)
dqn.experience_replay.add(obs, action, reward, next_obs, done)
obs = next_obs
creward += reward
if done:
pb.set_description(f"{creward}")
obs = env.reset()
creward = 0
if i>sample_size:
e = dqn.experience_replay.sample(sample_size)
obs_, action_, reward_, next_obs_, done_ = [np.stack(e[:,i]) for i in range(5)]
values = dqn(obs_, action_, batch=True)
next_actions = dqn.act(next_obs_, batch=True)
targets = torch.tensor(reward_).unsqueeze(1) + gamma * dqn(next_obs_, next_actions, batch=True) * torch.tensor(1-done_).unsqueeze(1)
loss = F.mse_loss(targets,values)
optimizer.zero_grad()
loss.backward()
optimizer.step()
dqn.eval()
policy = lambda obs: dqn.act(obs, epilson=0.0)
return policy
env = gym.make("CartPole-v0")
policy = dqn_train(env,k=5000,sample_size=256, N=100000)
run(env,policy)
def init_weights(m):
if type(m) == nn.Linear:
torch.nn.init.kaiming_normal(m)
class Policy(nn.Module):
def __init__(self,ni,nh,no,device = 'cpu'):
super(Policy,self).__init__()
self.linear = nn.Sequential(nn.Linear(ni,nh),
nn.ReLU(),
nn.Linear(nh,no))
self.device = device
self.to(self.device)
init_weights(self)
self.no = no
def forward(self,obs, batch=False):
convert = lambda x: torch.tensor(x).float() if type(x)!=torch.tensor else x
obs = convert(obs)
return F.softmax(self.linear(obs), 1 if batch else 0)
def act(self,obs, batch=False):
log_prob = self(obs, batch = batch).detach().numpy()
return np.array([np.random.choice(np.arange(self.no),p=p) for p in log_probs]) if batch else np.random.choice(np.arange(self.no),p=log_prob)
policy = Policy(4,64,2)
env = gym.make("CartPole-v0")
print(policy(env.reset()))
def run(env,policy,k=100):
crewards = 0
for _ in range(k):
done=False
obs = env.reset()
while not done:
action = policy.act(obs)
next_obs, reward, done, _ = env.step(action)
crewards += reward
obs = next_obs
print(f"Avg. Reward per Attempt = {crewards/k}")
run(env, policy)
def reinforce(env, k = 1, gamma=0.99, lr = 1e-3, device='cpu', verbose=False):
nh = 128
policy = Policy(4,nh,2)
optimizer= Adam(policy.parameters(), lr=lr)
pb = tqdm(range(k)) if verbose else range(k)
obs = env.reset()
for i in pb:
policy.train()
episode = create_episode(env,policy)
creward = 0
G = []
R = 0
for obs, log_prob, action, reward in reversed(episode):
R = gamma* R + reward
G.insert(0, R)
creward +=reward
loss=0
for R,(obs, log_prob, action, reward) in zip(G,episode):
loss -= R*torch.log(log_prob)
optimizer.zero_grad()
loss.backward()
optimizer.step()
policy.eval()
return policy
def create_episode(env, policy):
obs = env.reset()
log_prob = policy(obs)
action = policy.act(obs)
next_obs, reward, done, _ = env.step(action)
episode = [(next_obs,log_prob[action],action,reward)]
while not done:
obs = next_obs
log_prob = policy(obs)
action = policy.act(obs)
next_obs, reward, done, _ = env.step(action)
episode.append((next_obs,log_prob[action],action,reward))
return episode
policy = reinforce(env,k=500,verbose=True)
run(env,policy)
from collections import defaultdict
from torch.distributions.normal import Normal
def create_policy(ni,nh,no,ref):
policy = Policy(ni,nh,no)
for layer in policy.linear:
if hasattr(layer,"weight"):
layer.weight.data = Normal(ref[str(layer)]['mu'],
ref[str(layer)]['std']).rsample()
return policy
ref = defaultdict(dict)
for layer in policy.linear:
if hasattr(layer,"weight"):
ref[str(layer)] = {'mu':torch.rand(layer.weight.data.shape),
"std":torch.rand(layer.weight.data.shape)}
from torch.distributions.normal import Normal
def cross_entropy(env, k = 1, pop_size=100, nelites = 10, device='cpu', verbose=False):
nh = 32
pb = tqdm(range(k)) if verbose else range(k)
pop = [Policy(4,nh,2) for _ in range(pop_size)]
ref = defaultdict(dict)
for i in pb:
scores = run_episode(env,pop)
if verbose: pb.set_description(f"{np.mean(scores)}")
elites_args = np.argsort(scores)
elites = np.array(pop)[elites_args][-nelites:]
#Get all the weights
tmp = defaultdict(list)
for p in elites:
for layer in p.linear:
if hasattr(layer,"weight"):
tmp[str(layer)].append(layer.weight.data)
#Calculate mean and std
for k,v in tmp.items():
weights = torch.stack(tmp[k])
ref[k]['mu'] = torch.mean(weights,0)
ref[k]['std'] = torch.std(weights,0)
pop = [create_policy(4,nh,2,ref) for _ in range(pop_size)]
return pop,scores
def run_episode(env, policies):
scores = np.zeros(len(policies))
for i,policy in enumerate(policies):
done = False
obs = env.reset()
while not done:
action = policy.act(obs)
obs, reward, done, _ = env.step(action)
scores[i]+=reward
return scores
pop,scores = cross_entropy(env,k=100,verbose=True)
policy = pop[np.argmax(scores)]
run(env,policy)
class ActorCritic(nn.Module):
def __init__(self,ni,nh,no,device='cpu'):
super(ActorCritic,self).__init__()
self.actor = nn.Sequential(nn.Linear(ni,nh),
nn.LeakyReLU(),
nn.Linear(nh,no),
nn.Softmax(0))
self.critic = nn.Sequential(nn.Linear(ni,nh),
nn.LeakyReLU(),
nn.Linear(nh,1))
self.to(device)
init_weights(self)
self.no=no
def forward(self,obs):
obs= torch.tensor(obs).float()
prob = self.actor(obs)
action = np.random.choice(np.arange(self.no),p=prob.detach().numpy())
value = self.critic(obs)
return action, torch.log(prob[action]), value
def act(self,obs):
return self(obs)[0]
def fit(self,env,gamma = 0.99,k = 1,lr = 1e-4):
optimizer= Adam(self.parameters(), lr=lr)
pb = tqdm(range(k))
for i in pb:
self.train()
obs = env.reset()
done = False
log_probs = []
rewards = []
values = []
while not done:
action, log_prob, value = self(obs)
obs, reward, done, _ = env.step(action)
log_probs.append(log_prob)
values.append(value)
rewards.append(reward)
returns = []
R = 0
for r in reversed(rewards):
R = r + gamma * R
returns.insert(0, R)
returns = np.array(returns)
loss = 0
for log_prob, value, R in zip(log_probs,values, returns):
loss += value.detach() * -log_prob + F.smooth_l1_loss(torch.tensor([R]),value)
optimizer.zero_grad()
loss.backward()
optimizer.step()
env = gym.make("Blackjack-v0")
def run(env,policy,k=5000):
wins = 0
for _ in range(k):
done=False
obs = env.reset()
while not done:
action = policy.act(obs)
next_obs, reward, done, _ = env.step(action)
obs = next_obs
if reward>=1:
wins += 1
print(f"Win Percentage = {wins/k}")
ni = 3
no = env.action_space.n
nh = 64
model = ActorCritic(ni,nh,no)
model.fit(env, k = 5000)
run(env, model)
class A2C(ActorCritic):
def __init__(self,ni,nh,no,device='cpu'):
super(A2C,self).__init__(ni,nh,no,device)
def fit(self,env,gamma = 0.99,k = 1,lr = 1e-3):
optimizer= Adam(self.parameters(), lr=lr)
pb = tqdm(range(k))
for i in pb:
self.train()
obs = env.reset()
done = False
log_probs = []
rewards = []
values = []
while not done:
action, log_prob, value = self(obs)
obs, reward, done, _ = env.step(action)
log_probs.append(log_prob)
values.append(value)
rewards.append(reward)
returns = []
R = 0
for r in reversed(rewards):
R = r + gamma * R
returns.insert(0, R)
returns = np.array(returns)
loss = 0
for log_prob, value, R in zip(log_probs,values, returns):
advantage = R - value.item()
loss += advantage * -log_prob + F.smooth_l1_loss(torch.tensor([R]),value)
optimizer.zero_grad()
loss.backward()
optimizer.step()
pb.set_description(str(sum(rewards)))
env = gym.make("CartPole-v0")
def run(env,policy,k=100):
crewards = 0
for _ in range(k):
done=False
obs = env.reset()
while not done:
action = policy.act(obs)
next_obs, reward, done, _ = env.step(action)
obs = next_obs
crewards += reward
print(f"Average crewards= {crewards/k}")
ni = 4
no = env.action_space.n
nh = 256
model = A2C(ni,nh,no)
model.fit(env, k = 300)
run(env, model)
class Model(nn.Module):
def __init__(self, ni, nh,device='cpu'):
super(Model,self).__init__()
self.next_obs = nn.Sequential(nn.Linear(ni,nh),
nn.LeakyReLU(),
nn.Linear(nh,ni-1))
self.reward = nn.Sequential(nn.Linear(ni,nh),
nn.Tanh(),
nn.Linear(nh,1))
self.device = device
def forward(self,obs,action,batch=False):
if batch:
convert = lambda x: np.array(x) if type(x)!=np.array else x
bo = convert(obs)
ba = convert(action)
state_action = torch.tensor(np.hstack([bo, ba.reshape(-1,1)])).float().to(self.device)
else:
state_action = self.create_pair(obs,action)
return self.next_obs(state_action), self.reward(state_action)
def create_pair(self,obs,action): return torch.tensor(np.append(obs,action)).float().to(self.device)
env = gym.make("CartPole-v0")
model = Model(4+1,64)
obs = env.reset()
model(obs,1)
def dyna_q(env, n_planning=10, k=1, gamma=0.9, lr=1e-3, device = 'cpu'):
samples = []
ni = 3 + 1
nh = 64
value = Value(ni, nh, device = device)
model = Model(ni, nh, device = device)
value_optimizer = Adam(value.parameters(), lr = lr)
model_optimizer = Adam(model.parameters(), lr = lr)
obs = env.reset()
pb = tqdm(range(k))
for i in pb:
value.train()
action = value.act(obs, epilson=1/(i+1))
next_obs, reward, done, _ = env.step(action)
next_action = value.act(next_obs, epilson=0.0)
target = reward + gamma*value(next_obs,next_action).detach()
yhat = value(obs,action)
loss = F.mse_loss(target,yhat)
value_optimizer.zero_grad()
loss.backward()
value_optimizer.step()
_next_obs, _reward = model(obs,action)
loss = F.mse_loss(torch.tensor(next_obs),_next_obs) + F.mse_loss(torch.tensor([reward]),_reward)
model_optimizer.zero_grad()
loss.backward()
model_optimizer.step()
samples.append((obs,action))
if len(samples)>n_planning:
for j in range(n_planning):
_obs, _action = random.choice(samples)
_next_obs, _reward = model(_obs, _action)
next_action = value.act(next_obs, epilson=0.0)
target = reward + gamma*value(next_obs,next_action).detach()
yhat = value(obs,action)
loss = F.mse_loss(target,yhat)
value_optimizer.zero_grad()
loss.backward()
value_optimizer.step()
obs = next_obs
if done:
obs = env.reset()
value.eval()
policy = lambda obs: value.act(obs,epilson=0)
return policy
env = gym.make("Blackjack-v0")
def run(env,policy,k=50000):
wins = 0
for _ in range(k):
done=False
obs = env.reset()
while not done:
action = policy(obs)
next_obs, reward, done, _ = env.step(action)
obs = next_obs
if reward ==1:
wins += 1
print(f"Win Percentage = {wins/k}")
policy = dyna_q(env,k=500)
run(env, policy)
print("End")