使用Policy network和Value network实现CartPole

Policy network

``````H = 50

observate = tf.placeholder(tf.float32, [None, 4], name="input_x")
W1 = tf.get_variable("w1", shape=[4, H],
initializer=tf.contrib.layers.xavier_initializer())
layer1 = tf.nn.relu(tf.matmul(observate, W1))
W2 = tf.get_variable("w2", shape=[H, 1],
initializer=tf.contrib.layers.xavier_initializer())
score = tf.matmul(layer1, W2)
probability = tf.nn.sigmoid(score)
``````

``````def discount_reward(r):
# 根据每个reward:r和gamma来求每次的潜在价值
discount_r = np.zeros_like(r)
for t in reversed(range(r.size)):
return discount_r
``````

``````import numpy as np
import tensorflow as tf
import gym
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
env = gym.make('CartPole-v0')

env.reset()
H = 50
batch_size = 25
learning_rate = 1e-1
D = 4
gamma = 0.99
xs, ys, drs = [], [], []
reward_sum = 0
episode_number = 1
total_episodes = 1000

# 根据当前的环境状态根据隐藏节点求action为1的概率
observate = tf.placeholder(tf.float32, [None, D], name="input_x")
W1 = tf.get_variable("w1", shape=[D, H],
initializer=tf.contrib.layers.xavier_initializer())
layer1 = tf.nn.relu(tf.matmul(observate, W1))
W2 = tf.get_variable("w2", shape=[H, 1],
initializer=tf.contrib.layers.xavier_initializer())
score = tf.matmul(layer1, W2)
probability = tf.nn.sigmoid(score)

# 根据概率来求损失和梯度
input_y = tf.placeholder(tf.float32, [None, 1], name="input_y")
loglik = tf.log(input_y * (input_y - probability) +
(1 - input_y) * (input_y + probability))

tvars = tf.trainable_variables()

# 根据梯度优化训练两层神经网络

def discount_reward(r):
# 根据每个reward:r和gamma来求每次的潜在价值
discount_r = np.zeros_like(r)
for t in reversed(range(r.size)):
return discount_r

# Session执行
with tf.Session() as sess:
rendering = False
init = tf.global_variables_initializer()
sess.run(init)
observation = env.reset()
while episode_number <= total_episodes:

if reward_sum / batch_size > 100 or rendering == True:
rendering = True
env.render()

x = np.reshape(observation, [1, D])

tfprob = sess.run(probability, feed_dict={observate: x})
action = 1 if np.random.uniform() < tfprob else 0
xs.append(x)
y = 1 - action
ys.append(y)

observation, reward, done, info = env.step(action)
reward_sum += reward
drs.append(reward)

if done:
episode_number += 1
epx = np.vstack(xs)
epy = np.vstack(ys)
epr = np.vstack(drs)
xs, ys, drs = [], [], []
discount_epr = discount_reward(epr)
discount_epr -= np.mean(discount_epr)
discount_epr /= np.std(discount_epr)

input_y:epy,

if episode_number % batch_size == 0:
print('Average reward for episode %d: %f.' % \
(episode_number, reward_sum/batch_size))

if reward_sum/batch_size > 200:
break

reward_sum = 0

observation = env.reset()
``````

Value network

``````    W1 = tf.Variable(tf.truncated_normal([STATE, HIDDEN_SIZE]))
b1 = tf.Variable(tf.constant(0.01, shape = [HIDDEN_SIZE]))
W2 = tf.Variable(tf.truncated_normal([HIDDEN_SIZE, ACTION]))
b2 = tf.Variable(tf.constant(0.01, shape=[ACTION]))

state_input = tf.placeholder("float",[None,STATE])
h_layer = tf.nn.relu(tf.matmul(state_input,W1) + b1)
Q_value = tf.matmul(h_layer,W2) + b2
``````

``````buffer = deque()

if len(buffer) > 100:
buffer.popleft()
buffer.append((state,action,reward,next_state,done))
``````

``````action_input = tf.placeholder("float",[None, ACTION])
y_input = tf.placeholder("float",[None])
Q_action = tf.reduce_sum(tf.multiply(Q_value, action_input),reduction_indices=1)
cost = tf.reduce_mean(tf.square(y_input - Q_action))
``````

``````import gym
import tensorflow as tf
import numpy as np
import random
from collections import deque

GAMMA = 0.9
INITIAL_EPSILON = 0.5
FINAL_EPSILON = 0.01
REPLAY_SIZE = 10000
BATCH_SIZE = 32
HIDDEN_SIZE = 20

class DQN():
def __init__(self, env):
self.replay_buffer = deque()
self.time_step = 0
self.epsilon = INITIAL_EPSILON
self.state_dim = env.observation_space.shape[0]
self.action_dim = env.action_space.n

self.create_Q_network()
self.create_training_method()

self.session = tf.InteractiveSession()
self.session.run(tf.initialize_all_variables())

def create_Q_network(self):
W1 = self.weight_variable([self.state_dim, HIDDEN_SIZE])
b1 = self.bias_variable([HIDDEN_SIZE])
W2 = self.weight_variable([HIDDEN_SIZE,self.action_dim])
b2 = self.bias_variable([self.action_dim])

self.state_input = tf.placeholder("float",[None,self.state_dim])

h_layer = tf.nn.relu(tf.matmul(self.state_input,W1) + b1)

self.Q_value = tf.matmul(h_layer,W2) + b2

def create_training_method(self):
self.action_input = tf.placeholder("float",[None,self.action_dim]) # one hot presentation
self.y_input = tf.placeholder("float",[None])
Q_action = tf.reduce_sum(tf.multiply(self.Q_value,self.action_input),reduction_indices = 1)
self.cost = tf.reduce_mean(tf.square(self.y_input - Q_action))

def perceive(self,state,action,reward,next_state,done):
one_hot_action = np.zeros(self.action_dim)
one_hot_action[action] = 1
self.replay_buffer.append((state,one_hot_action,reward,next_state,done))
if len(self.replay_buffer) > REPLAY_SIZE:
self.replay_buffer.popleft()

if len(self.replay_buffer) > BATCH_SIZE:
self.train_Q_network()

def train_Q_network(self):
self.time_step += 1
minibatch = random.sample(self.replay_buffer,BATCH_SIZE)
state_batch = [data[0] for data in minibatch]
action_batch = [data[1] for data in minibatch]
reward_batch = [data[2] for data in minibatch]
next_state_batch = [data[3] for data in minibatch]
print(reward_batch)
y_batch = []
Q_value_batch = self.Q_value.eval(feed_dict={self.state_input:next_state_batch})
for i in range(0,BATCH_SIZE):
done = minibatch[i][4]
if done:
y_batch.append(reward_batch[i])
else :
y_batch.append(reward_batch[i] + GAMMA * np.max(Q_value_batch[i]))
#print(state_batch)
#print(action_batch)
#print(y_batch)
self.optimizer.run(feed_dict={
self.y_input:y_batch,
self.action_input:action_batch,
self.state_input:state_batch
})

def egreedy_action(self,state):
value = self.Q_value.eval(feed_dict = {
self.state_input:[state]
})
self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON)/10000
Q_value = value[0]
if random.random() <= self.epsilon:
return random.randint(0,self.action_dim - 1)
else:
return np.argmax(Q_value)

def action(self,state):
value = self.Q_value.eval(feed_dict = {
self.state_input:[state]
})

return np.argmax(value[0])

def weight_variable(self,shape):
initial = tf.truncated_normal(shape)
return tf.Variable(initial)

def bias_variable(self,shape):
initial = tf.constant(0.01, shape = shape)
return tf.Variable(initial)

ENV_NAME = 'CartPole-v0'
EPISODE = 10000
STEP = 300
TEST = 10

def main():
env = gym.make(ENV_NAME)
agent = DQN(env)

for episode in range(EPISODE):

state = env.reset()

for step in range(STEP):
action = agent.egreedy_action(state)
next_state,reward,done,_ = env.step(action)

agent.perceive(state,action,reward,next_state,done)
state = next_state
if done:
break

if episode % 100 == 0:
total_reward = 0
for i in range(TEST):
state = env.reset()
for j in range(STEP):
if total_reward/TEST >= 160:
env.render()
action = agent.action(state)
state,reward,done,_ = env.step(action)
total_reward += reward
if done:
break
ave_reward = total_reward/TEST
print ('episode: ',episode,'Evaluation Average Reward:',ave_reward)

if __name__ == '__main__':
main()
``````
原文作者：碧影江白
原文地址: https://www.jianshu.com/p/e46ea0a13666
本文转自网络文章，转载此文章仅为分享知识，如有侵权，请联系博主进行删除。