-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
76 lines (61 loc) · 2.36 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# -*- coding: utf-8 -*-
"""
Created on Mon Mar 14 16:57:45 2022
@author: duzen
"""
import numpy as np
import gym
from collections import deque
import random
#random.seed(0)
#np.random.seed(0)
# Ornstein-Ulhenbeck Process
# Taken from #https://github.com/vitchyr/rlkit/blob/master/rlkit/exploration_strategies/ou_strategy.py
class OUNoise(object):
def __init__(self, action_space, mu=0.0, theta=0.3, max_sigma=0.04/ np.sqrt(250), min_sigma=0.04/ np.sqrt(250), decay_period=3500):
self.mu = mu
self.theta = theta
self.sigma = max_sigma
self.max_sigma = max_sigma
self.min_sigma = min_sigma
self.decay_period = decay_period
self.action_dim = action_space.shape[0]
self.low = action_space.low
self.high = action_space.high
self.reset()
def reset(self):
self.state = np.ones(self.action_dim) * self.mu
def evolve_state(self):
x = self.state
dx = self.theta * (self.mu - x) + self.sigma * np.random.randn(self.action_dim)
self.state = x + dx
return self.state
def get_action(self, action, t=0):
ou_state = self.evolve_state()
self.sigma = self.max_sigma - (self.max_sigma - self.min_sigma) * min(1.0, t / self.decay_period)
action = np.clip(action + ou_state, self.low, self.high)
return action
class Memory:
def __init__(self, max_size):
self.max_size = max_size
self.buffer = deque(maxlen=max_size)
def push(self, state, action, reward, next_state, done):
experience = (state, action, np.array([reward]), next_state, done)
self.buffer.append(experience)
def sample(self, batch_size):
state_batch = []
action_batch = []
reward_batch = []
next_state_batch = []
done_batch = []
batch = random.sample(self.buffer, batch_size)
for experience in batch:
state, action, reward, next_state, done = experience
state_batch.append(state)
action_batch.append(action)
reward_batch.append(reward)
next_state_batch.append(next_state)
done_batch.append(done)
return state_batch, action_batch, reward_batch, next_state_batch, done_batch
def __len__(self):
return len(self.buffer)