基础

这段代码,可以看到一个倒立摆在胡乱操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import gym

env_name = "CartPole-v0"
env = gym.make(env_name) # 导入环境

episodes = 10
for episode in range(1, episodes + 1):
state = env.reset()
done = False
score = 0

while not done:
env.render() # 渲染环境
action = env.action_space.sample() # 随机采样动作
n_state, reward, done, info = env.step(action) # 和环境交互,得到下一个状态,奖励等信息
score += reward # 计算分数
print("Episode : {}, Score : {}".format(episode, score))

env.close() # 关闭窗口

用Stable_baseline3来训练的强化学习模型,可以很好地控制这个环境:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env.dummy_vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
import gym

env_name = "CartPole-v0"
env = gym.make(env_name)
# 把环境向量化,如果有多个环境写成列表传入DummyVecEnv中,可以用一个线程来执行多个环境,提高训练效率
env = DummyVecEnv([lambda : env])
# 定义一个DQN模型,设置其中的各个参数
model = DQN(
"MlpPolicy", # MlpPolicy定义策略网络为MLP网络
env=env,
learning_rate=5e-4,
batch_size=128,
buffer_size=50000,
learning_starts=0,
target_update_interval=250,
policy_kwargs={"net_arch" : [256, 256]}, # 这里代表隐藏层为2层256个节点数的网络
verbose=1, # verbose=1代表打印训练信息,如果是0为不打印,2为打印调试信息
tensorboard_log="./tensorboard/CartPole-v0/" # 训练数据保存目录,可以用tensorboard查看
)
# 开始训练
model.learn(total_timesteps=1e5)
# 策略评估,可以看到倒立摆在平稳运行了
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10, render=true)
#env.close()
print("mean_reward:",mean_reward,"std_reward:",std_reward)
# 保存模型到相应的目录
model.save("./model/CartPole.pkl")

自定义环境

需要继承gym.Env类,然后重新其中的方法,配置一定的参数即可,格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import gym
from gym import spaces

class CustomEnv(gym.Env):
"""Custom Environment that follows gym interface"""
metadata = {'render.modes': ['human']}

def __init__(self, arg1, arg2, ...):
super(CustomEnv, self).__init__()
# Define action and observation space
# They must be gym.spaces objects
# Example when using discrete actions:
self.action_space = spaces.Discrete(N_DISCRETE_ACTIONS)
# Example for using image as input (channel-first; channel-last also works):
self.observation_space = spaces.Box(low=0, high=255,
shape=(N_CHANNELS, HEIGHT, WIDTH), dtype=np.uint8)

def step(self, action):
...
return observation, reward, done, info
def reset(self):
...
return observation # reward, done, info can't be included
def render(self, mode='human'):
...
def close (self):
pass

主要三个函数需要实现:

reset() 在每个回合最开始时执行,返回当前的观测(observation)

step(action) 输入 action,智能体执行 action 与环境交互,返回获得的(新的观测、奖励、是否结束、其他)

可选render(method=’human’)` 渲染环境

  • gym.spaces.Box 任意 shape 的连续空间
  • spaces.Discrete 维度为 1,且有 n 个枚举值的空间

检查环境是否符合gym接口:

1
2
3
4
5
from stable_baselines3.common.env_checker import check_env

env = CustomEnv(arg1, ...)
# It will check your custom environment and output additional warnings if needed
check_env(env)

创建一个让智能体学习如何一直向左边走的1D环境,观测是智能体的当前位置,智能体有两种行为,向左和向右,分别用0和1代表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68

import numpy as np
import gym
from gym import spaces


class GoLeftEnv(gym.Env):
"""
这是一个让智能体学习一直向左走的 1D grid 环境
"""
metadata = {'render.modes': ['console']}
LEFT = 0
RIGHT = 1

def __init__(self, grid_size=10):
super(GoLeftEnv, self).__init__()

# 1D-grid 的大小
self.grid_size = grid_size
# agent 初始化在 grid 的最右边
self.agent_pos = grid_size - 1

# 定义 action observation
# 离散行为空间: left、 right
n_actions = 2
self.action_space = spaces.Discrete(n_actions)
# 观测是智能体现在的位置
self.observation_space = spaces.Box(low=0, high=self.grid_size,
shape=(1,), dtype=np.float32)

def reset(self):
"""
Important: 观测必须是一个 np.array
:return: (np.array)
"""
# Initialize the agent at the right of the grid
self.agent_pos = self.grid_size - 1
# here we convert to float32 to make it more general (in case we want to use continuous actions)
return np.array([self.agent_pos]).astype(np.float32)

def step(self, action):
if action == self.LEFT:
self.agent_pos -= 1
elif action == self.RIGHT:
self.agent_pos += 1
else:
raise ValueError("Received invalid action={} which is not part of the action space".format(action))
# 如果走到边缘就不能继续走了
self.agent_pos = np.clip(self.agent_pos, 0, self.grid_size)
# 如果走到最左边代表结束了
done = bool(self.agent_pos == 0)
# 走到最左边就给一个正的 reward
reward = 1 if self.agent_pos == 0 else 0
# 目前没有需要额外输出的信息
info = {}
return np.array([self.agent_pos]).astype(np.float32), reward, done, info

def render(self, mode='console'):
# 在命令行中渲染
if mode != 'console':
raise NotImplementedError()
# agent is represented as a cross, rest as a dot
print("." * self.agent_pos, end="")
print("x", end="")
print("." * (self.grid_size - self.agent_pos))

def close(self):
pass

构建环境和智能体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from stable_baselines3 import PPO, A2C # DQN coming soon
from stable_baselines3.common.env_util import make_vec_env

# 构建环境
env = GoLeftEnv(grid_size=10)
env = make_vec_env(lambda: env, n_envs=1)

# 训练智能体
model = A2C('MlpPolicy', env, verbose=1).learn(5000)

# 测试智能体:
# Test the trained agent
obs = env.reset()
n_steps = 20
for step in range(n_steps):
action, _ = model.predict(obs, deterministic=True)
print("Step {}".format(step + 1))
print("Action: ", action)
obs, reward, done, info = env.step(action)
print('obs=', obs, 'reward=', reward, 'done=', done)
env.render(mode='console')
if done:
# Note that the VecEnv resets automatically
# when a done signal is encountered
print("Goal reached!", "reward=", reward)
break

也可以是连续动作空间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# _*_coding:utf-8-*-
import sys
import gym
from sympy import *
import math
import numpy as np
gym.logger.set_level(40)
# sys.path.append('这里写其上层文件见的绝对路径,如'~/autodl-nas/robot/'')
# import Params
class RobotEnv(gym.Env):
# 初始化参数
def __init__(self):
# 状态空间为18(关节角度)+18(关节角速度)+2(控制方向)+1(控制速度)=39
self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(39,))
# 动作空间为18(关节角度)
self.action_space = gym.spaces.Box(low=-1, high=1, shape=(18,))
# 附加功能,可选,对应第35行
# self.reward_fun = Params.PARAMS_ENV['reward_fun']

# 获取原始数据,并生成状态,同时更新done
def get_state(self):
origin_data = [这里写获取数据的函数]
tmp_data = [将原始数据按照要求组成状态,注意角度\角速度\方向和速度归一化到[-1,-1],速度可以除以系数归一化到0-1]]
state = np.array(tmp_data).reshape(1,39)
self.done = True if[写判断终止条件]else False
return state

# 根据强化学习输出,发送控制指令,控制机器人运动
def move(self, action):
[这里将动作对应的控制指令发送给仿真环境中的机器人]
pass

# 奖励函数
def get_reward(self):
reward = [根据任务需求定义奖励函数,建议三个方面:1、存活时间长短(即能否满足站立并运动的要求)2、方向是否为给定方向3、速度是否为给定速度]
# reward = self.reward_fun#如果使用这个,就不需要上面个这句,上面这句就可以放到参数文件中进行定义
return reward

# 主程序
def step(self, action):
# 执行动作
self.move(action)
# 获取动作对应奖励
reward = self.get_reward()
# 获取下一状态
state = self.get_state()
# 返回
return state, reward, self.done, {}

# 重置环境
def reset(self):
[初始化机器人]
# 获得对应状态
state = self.get_state()
return state

# 关闭机器人
def close(self):
[关闭机器人]