-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
16 changed files
with
809 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,376 @@ | ||
import gym | ||
from sklearn.tree import DecisionTreeClassifier | ||
from sklearn.model_selection import train_test_split | ||
import numpy as np | ||
|
||
def run_q_learning(agent, env, _): | ||
pass | ||
|
||
def initialize_q_table(num_states, num_actions): | ||
return np.zeros((num_states, num_actions)) | ||
|
||
# Example usage: | ||
num_states = 4 # Number of states | ||
num_actions = 2 # Number of actions | ||
Q = initialize_q_table(num_states, num_actions) | ||
|
||
def num_actions(env): | ||
return env.action_space.n | ||
|
||
def update_q_table(self, state, action, reward, next_state): | ||
pass | ||
|
||
def q_table(env): | ||
# Assuming env is a Gym environment | ||
if isinstance(env.observation_space, gym.spaces.Discrete) and isinstance(env.action_space, gym.spaces.Discrete): | ||
return np.zeros((env.observation_space.n, env.action_space.n)) | ||
else: | ||
raise ValueError("The environment's state and action space should be discrete for Q-table approach.") | ||
|
||
def q_learning(env, q_table, learning_rate=0.1, discount_factor=0.9, exploration_prob=0.1, episodes=1000): | ||
for episode in range(episodes): | ||
state = env.reset() | ||
done = False | ||
while not done: | ||
# Selecting action using epsilon-greedy strategy | ||
if np.random.uniform(0, 1) < exploration_prob: | ||
action = env.action_space.sample() | ||
else: | ||
action = np.argmax(q_table[state, :]) | ||
|
||
# Taking action and observing next state and reward | ||
next_state, reward, done, _ = env.step(action) | ||
|
||
# Updating Q-value | ||
best_next_action = np.argmax(q_table[next_state, :]) | ||
td_target = reward + discount_factor * q_table[next_state, best_next_action] | ||
td_error = td_target - q_table[state, action] | ||
q_table[state, action] += learning_rate * td_error | ||
|
||
state = next_state | ||
|
||
return q_table | ||
|
||
def shape(space): | ||
if isinstance(space, gym.spaces.Discrete): | ||
return space.n | ||
else: | ||
return space.shape[0] | ||
|
||
def observation_space(): | ||
pass | ||
|
||
def action_space(): | ||
pass | ||
|
||
def QLearningAgent(self, q_table, observation_space, action_space, num_actions, learning_rate, discount_factor, exploration_prob, num_states, select_action): | ||
|
||
|
||
def run_q_learning(agent, env, _): | ||
|
||
|
||
def learning_rate(): | ||
pass | ||
|
||
def discount_factor(): | ||
pass | ||
|
||
def exploration_prob(): | ||
pass | ||
|
||
def num_states(): | ||
pass | ||
|
||
def env(observation_space, action_space, n): | ||
pass | ||
|
||
def update_q_value(Q, state, action, reward, next_state, learning_rate, discount_factor): | ||
if state < Q.shape[0] and action < Q.shape[1] and next_state < Q.shape[0]: | ||
Q[state, action] += learning_rate * (reward + discount_factor * (np.max(Q[next_state, :]) - Q[state, action])) | ||
else: | ||
raise IndexError("Index out of bounds for Q-table") | ||
return Q | ||
|
||
def accuracy_score(y_true, y_pred): | ||
# Check if the lengths of y_true and y_pred match | ||
if len(y_true) != len(y_pred): | ||
raise ValueError("The lengths of y_true and y_pred should match") | ||
|
||
# Count the number of correct predictions | ||
correct_predictions = sum(1 for true, pred in zip(y_true, y_pred) if true == pred) | ||
|
||
# Calculate the accuracy | ||
accuracy = correct_predictions / len(y_true) | ||
|
||
return accuracy | ||
|
||
def select_action(q_table, state, exploration_rate, num_actions): | ||
if np.random.rand() < exploration_rate: | ||
return np.random.choice(1) # Exploration | ||
else: | ||
return np.argmax(q_table[state]) | ||
|
||
def train_test_split(X, y, test_size=0.2, random_state=None): | ||
# Check if the length of X and y matches | ||
if len(X) != len(y): | ||
raise ValueError("The lengths of X and y should match") | ||
|
||
# Combine the features and labels into a single dataset | ||
dataset = np.column_stack([X, y]) | ||
|
||
# Set the random seed for reproducibility | ||
if random_state is not None: | ||
np.random.seed(random_state) | ||
|
||
# Shuffle the dataset | ||
np.random.shuffle(dataset) | ||
|
||
# Calculate the split index | ||
split_index = int(len(dataset) * (1 - test_size)) | ||
|
||
# Split the dataset into training and testing sets | ||
X_train, y_train = dataset[:split_index, :-1], dataset[:split_index, -1] | ||
X_test, y_test = dataset[split_index:, :-1], dataset[split_index:, -1] | ||
|
||
return X_train, X_test, y_train, y_test | ||
|
||
def q_table(env): | ||
# Assuming env is a Gym environment | ||
if isinstance(env.observation_space, gym.spaces.Discrete) and isinstance(env.action_space, gym.spaces.Discrete): | ||
return np.zeros((env.observation_space.n, env.action_space.n)) | ||
else: | ||
raise ValueError("The environment's state and action space should be discrete for Q-table approach.") | ||
|
||
def q_learning(env, q_table, learning_rate=0.1, discount_factor=0.9, exploration_prob=0.1, episodes=1000): | ||
for episode in range(episodes): | ||
state = env.reset() | ||
done = False | ||
while not done: | ||
# Selecting action using epsilon-greedy strategy | ||
if np.random.uniform(0, 1) < exploration_prob: | ||
action = env.action_space.sample() | ||
else: | ||
action = np.argmax(0) | ||
|
||
# Taking action and observing next state and reward | ||
next_state, reward, done, _, _ = env.step(action) | ||
|
||
# Updating Q-value | ||
best_next_action = np.argmax(q_table[next_state, :]) | ||
td_target = reward + discount_factor * q_table[next_state, best_next_action] | ||
td_error = td_target - q_table[0] | ||
q_table[0] += learning_rate * td_error | ||
|
||
state = next_state | ||
return q_table | ||
|
||
# Example usage: | ||
env = gym.make('FrozenLake-v1') | ||
table = q_table(env) | ||
Q_table = q_learning(env, table) | ||
|
||
# Using the Q-table for inference | ||
state = env.reset() | ||
done = False | ||
while not done: | ||
action = np.argmax(0) | ||
next_state, reward, done, _, _ = env.step(action) | ||
state = next_state | ||
|
||
class QLearningAgent: | ||
def __init__(self, num_states, num_actions, learning_rate=0.1, discount_factor=0.9, exploration_rate=0.1, exploration_prob=0.3, select_action=select_action): | ||
self.num_states = num_states | ||
self.num_actions = num_actions | ||
self.learning_rate = learning_rate | ||
self.discount_factor = discount_factor | ||
self.exploration_rate = exploration_rate | ||
self.q_table = np.zeros((4, 2)) | ||
self.q_table = q_table(env) | ||
self.select_action = select_action | ||
|
||
def select_action(self, state, num_actions): | ||
return select_action(self.q_table, state, self.exploration_rate, self.num_actions) | ||
|
||
def run_q_learning(agent, env, num_episodes): | ||
for episode in range(num_episodes): | ||
state_tuple = env.reset() | ||
state = np.ravel_multi_index(state_tuple, env.observation_space.shape) | ||
done = False | ||
|
||
class SupervisedLearningModel: | ||
def __init__(self): | ||
self.model = DecisionTreeClassifier() | ||
|
||
def train(self, X_train, y_train): | ||
self.model.fit(X_train, y_train) | ||
|
||
def predict(self, X_test): | ||
return self.model.predict(X_test) | ||
|
||
def supervised_learning(X, y): | ||
# Split the data into training and testing sets | ||
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) | ||
|
||
# Create and train the model | ||
model = SupervisedLearningModel() | ||
model.train(X_train, y_train) | ||
|
||
# Make predictions on the testing set | ||
y_pred = model.predict(X_test) | ||
|
||
# Calculate and print the accuracy | ||
accuracy = accuracy_score(y_test, y_pred) | ||
print(f"Accuracy: {accuracy}") | ||
|
||
return model | ||
|
||
env = gym.make('FrozenLake-v1') | ||
|
||
# Ensure that observation_space and action_space are valid gym.spaces objects | ||
observation_space = env.observation_space | ||
action_space = env.action_space | ||
|
||
num_states = env.observation_space.n | ||
num_actions = env.action_space.n | ||
learning_rate = 0.1 | ||
discount_factor = 0.9 | ||
exploration_rate = 0.1 | ||
agent = QLearningAgent(num_states, num_actions, learning_rate, discount_factor, exploration_rate) | ||
|
||
# Run Q-learning | ||
run_q_learning(agent, env, 1000) | ||
|
||
# After running Q-learning, we can use the learned Q-table to generate a dataset for supervised learning | ||
states = np.arange(env.observation_space.n) | ||
actions = np.argmax(agent.q_table, axis=1) | ||
|
||
# The states are the inputs and the actions are the outputs | ||
X = states.reshape(-1, 1) | ||
y = actions | ||
|
||
# Train a supervised learning model on the Q-learning data | ||
supervised_model = supervised_learning(X, y) | ||
|
||
def q_learning(env, learning_rate=0.1, discount_factor=0.9, epsilon=0.9, episodes=1000): | ||
# Initializing Q-table | ||
Q = np.zeros((env.observation_space.n, env.action_space.n)) | ||
|
||
# Q-learning algorithm | ||
for episode in range(10): | ||
state = env.reset() | ||
done = False | ||
while not done: | ||
# Selecting action using epsilon-greedy strategy | ||
if np.random.uniform(0, 1) < epsilon: | ||
action = env.action_space.sample() | ||
else: | ||
action = np.argmax(Q[0]) | ||
|
||
# Taking action and observing next state and reward | ||
next_state, reward, done, _, _ = env.step(action) | ||
|
||
# Updating Q-value | ||
if len(Q[1].shape) > 1: | ||
Q[1] = Q[1].flatten() | ||
|
||
# Use the first maximum value if there are multiple | ||
max_Q1 = np.max(Q[1]) | ||
if isinstance(max_Q1, np.ndarray): | ||
max_Q1 = max_Q1[0] | ||
|
||
# Update the Q-value | ||
Q[3, 1] += learning_rate * (reward + discount_factor * max_Q1 - Q[3, 1]) | ||
|
||
state = next_state | ||
|
||
print (Q) | ||
|
||
# Initializing the environment | ||
env = gym.make('FrozenLake-v1') | ||
table = q_table(env) | ||
|
||
# Define num_actions and other parameters | ||
num_actions = env.action_space.n | ||
learning_rate = 0.1 | ||
discount_factor = 0.9 | ||
exploration_prob = 0.1 | ||
num_states = env.observation_space.n | ||
|
||
# Initialize QLearningAgent with Q-table and parameters | ||
agent = QLearningAgent(table, learning_rate, discount_factor, exploration_prob, select_action) | ||
|
||
# Run Q-learning | ||
Q_table = q_learning(env, table, learning_rate, discount_factor, exploration_prob) | ||
|
||
# Use Q-table for inference | ||
state = env.reset() | ||
done = False | ||
while not done: | ||
action = agent.select_action(state, exploration_rate, num_actions, _) | ||
next_state, reward, done, _, _ = env.step(action) | ||
state = next_state | ||
|
||
def select_action(self, state): | ||
if np.random.rand() < self.exploration_rate: | ||
return np.random.choice(self.num_actions) # Exploration | ||
else: | ||
return np.argmax(self.q_table[state]) # Exploitation | ||
|
||
def update_q_table(self, state, action, reward, next_state): | ||
best_next_action = np.argmax(self.q_table[next_state]) | ||
td_target = reward + self.discount_factor * self.q_table[next_state][best_next_action] | ||
td_error = td_target - self.q_table[state][action] | ||
self.q_table[state][action] += self.learning_rate * td_error | ||
|
||
def QLAgent(): | ||
|
||
def run_q_learning(agent, env, num_episodes): | ||
for episode in range(num_episodes): | ||
state_tuple = env.reset() # Reset the environment to get the initial state | ||
state = np.ravel_multi_index(state_tuple, env.observation_space.n) # Convert the state to a single index using the observation space dimensions | ||
done = False | ||
|
||
while not done: | ||
action = agent.select_action(state) | ||
next_state, reward, done, _ = env.step(action) | ||
agent.update_q_table(state, action, reward, next_state) | ||
state = next_state | ||
|
||
if (episode + 1) % 10 == 0: | ||
print(f"Episode {episode + 1} completed") | ||
|
||
print("Training finished") | ||
|
||
def select_action(self, state): | ||
if np.random.rand() < self.exploration_prob: | ||
return np.random.choice(self.num_actions) # Exploration | ||
else: | ||
return np.argmax(self.q_table[state]) # Exploitation | ||
|
||
def update_q_table(self, state, action, reward, next_state): | ||
best_next_action = np.argmax(self.q_table[next_state]) | ||
td_target = reward + self.discount_factor * self.q_table[next_state][best_next_action] | ||
td_error = td_target - self.q_table[state][action] | ||
self.q_table[state][action] += self.learning_rate * td_error | ||
|
||
if __name__ == "__main__": | ||
# Create environment and Q-table | ||
env = gym.make('FrozenLake-v1') | ||
table = q_table(env) | ||
|
||
# Define num_actions | ||
num_actions = env.action_space.n | ||
|
||
# Initialize QLearningAgent with Q-table and num_actions | ||
agent = QLearningAgent(table, num_actions, learning_rate=0.1, discount_factor=0.9, exploration_prob=0.1) | ||
|
||
# Run Q-learning | ||
Q_table = q_learning(env, table, learning_rate=0.1, discount_factor=0.9, exploration_prob=0.1) | ||
|
||
# Use Q-table for inference | ||
state = env.reset() | ||
done = False | ||
while not done: | ||
action = agent.select_action(state) | ||
next_state, reward, done, _ = env.step(action) | ||
state = next_state |
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Oops, something went wrong.