diff --git a/.gitignore b/.gitignore index 52376fc..a249be0 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,7 @@ cache/ checkpoints/ training_summary/ visualizations/ +diagnostics/ diff --git a/images/cat.jpg b/images/cat.jpg new file mode 100644 index 0000000..6557c35 Binary files /dev/null and b/images/cat.jpg differ diff --git a/images/dreamed_cat.jpg b/images/dreamed_cat.jpg new file mode 100644 index 0000000..68ebdd9 Binary files /dev/null and b/images/dreamed_cat.jpg differ diff --git a/requirements.txt b/requirements.txt index fd399cc..edd7314 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,6 +21,7 @@ matplotlib plotly datasets huggingface-hub +pgmpy # Bayesian Networks # Image Processing pillow diff --git a/src/20_bayes_medical_explanability.py b/src/20_bayes_medical_explanability.py new file mode 100644 index 0000000..6d023c5 --- /dev/null +++ b/src/20_bayes_medical_explanability.py @@ -0,0 +1,781 @@ +""" +Medical Bayesian Networks and Large Language Models: A Historical Perspective + +Historical Significance: +------------------------ +Bayesian networks in medicine trace back to the 1980s with systems like MYCIN and +INTERNIST-1. These early expert systems demonstrated both the potential and limitations +of rule-based medical reasoning: + +1. Early Systems (1970s-1980s): + - MYCIN: Used certainty factors for bacterial infections + - INTERNIST-1: Attempted comprehensive internal medicine diagnosis + - Key limitation: Rigid, rule-based reasoning + +2. Bayesian Revolution (1990s): + - Introduction of probabilistic graphical models + - QMR-DT: First major Bayesian medical diagnosis system + - Enabled handling of uncertainty and incomplete information + +3. Modern Integration (2020s): + - Combination of Bayesian networks with LLMs + - Natural language understanding meets probabilistic reasoning + - Explainable AI becomes crucial for medical applications + +Key Innovations in This Implementation: +------------------------------------- +1. Hybrid Architecture: + - Bayesian networks provide probabilistic reasoning + - LLMs enable natural language understanding + - Combines structured and unstructured data processing + +2. Explainability: + - Every decision has a traceable reasoning path + - Natural language explanations for medical professionals + - Audit trail for accountability + +3. Medical Knowledge Integration: + - Dynamic knowledge structure creation + - Causal relationship extraction + - Evidence-based reasoning paths + +Technical Components: +------------------- +1. Bayesian Network: + - Nodes: Medical conditions/symptoms + - Edges: Causal relationships + - CPTs: Conditional probabilities + +2. LLM Integration: + - Structure learning from text + - Evidence extraction + - Natural language generation + +3. Logging System: + - Diagnostic process tracking + - Decision auditing + - Quality control + +This system represents a step toward more interpretable and reliable medical AI, +addressing key challenges in healthcare automation: +- Uncertainty handling +- Decision transparency +- Knowledge integration +- Clinical workflow integration + +Usage Example: +------------- +patient_story = ''' +I am experiencing severe fatigue, especially in the mornings, +along with persistent headaches and occasional dizziness. +''' + +system = BayesianLLM() +system.setup_medical_network(patient_story) +diagnosis = system.generate_diagnostic_reasoning(evidence) + +The system will: +1. Extract relevant medical concepts +2. Build a Bayesian network structure +3. Generate probabilistic diagnoses +4. Provide natural language explanations +""" + +import json +import logging +from typing import Dict, List, Tuple, Optional, NamedTuple +from langchain_ollama import ChatOllama +from langchain_core.output_parsers import StrOutputParser +from langchain_core.prompts import ChatPromptTemplate +from langchain.chains import LLMChain +from pgmpy.models import BayesianNetwork +from pgmpy.factors.discrete import TabularCPD +from dataclasses import dataclass +import csv +from datetime import datetime +from pathlib import Path + +logging.basicConfig(level=logging.INFO) +logging.getLogger("httpx").setLevel(logging.WARNING) +logger = logging.getLogger(__name__) + +MODEL_NAME = "hermes3:latest" + + +@dataclass +class DiagnosticReasoning: + conclusion: str + confidence: float + evidence_path: List[str] + alternative_explanations: List[Tuple[str, float]] + + +class BayesianLLM: + """ + A Bayesian network-based medical diagnosis system that uses LLMs for: + 1. Network structure learning + 2. Evidence extraction + 3. Diagnostic reasoning + 4. Natural language explanations + + Key Components: + - LLM Integration: Uses Ollama for natural language understanding + - Bayesian Network: Captures causal relationships between medical concepts + - Logging System: Tracks diagnostic processes for accountability + - Explanation Generation: Provides human-readable reasoning paths + """ + + def __init__(self, model_name: str = MODEL_NAME): + """Initialize the BayesianLLM system""" + self.model_name = model_name + self.llm = ChatOllama(model=model_name) + self.nodes: Dict[str, List[str]] = {} + self.network: Optional[BayesianNetwork] = None + self.patient_story: str = "" + self.log_file = Path("diagnostics/diagnostic_logs.csv") + self._initialize_log_file() + + def _initialize_log_file(self): + """Initialize the CSV log file with headers if it doesn't exist""" + if not self.log_file.exists(): + logger.info(f"Creating log file: {self.log_file}") + if not self.log_file.parent.exists(): + self.log_file.parent.mkdir(parents=True, exist_ok=True) + logger.info(f"Created parent directory: {self.log_file.parent}") + with open(self.log_file, "w", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + writer.writerow( + [ + "timestamp", + "patient_story", + "extracted_evidence", + "primary_conclusion", + "confidence", + "evidence_path", + "alternative_explanations", + "network_structure", + ] + ) + + def log_diagnostic_process( + self, evidence: Dict[str, str], diagnosis: DiagnosticReasoning + ) -> None: + """ + Creates an audit trail of diagnostic decisions. + + Purpose: + 1. Accountability: Track decision-making process + 2. Learning: Analyze patterns in successful diagnoses + 3. Quality Control: Monitor system performance + + Stores: + - Timestamp: When diagnosis was made + - Patient Story: Original description + - Evidence: What was observed + - Reasoning: How conclusions were reached + - Network State: System configuration + + This is crucial for: + - Medical documentation + - System improvement + - Potential legal requirements + """ + try: + # Convert network structure to string representation + network_structure = ( + [f"{cause} โ†’ {effect}" for cause, effect in self.network.edges()] + if self.network + else [] + ) + + # Prepare the log entry + log_entry = { + "timestamp": datetime.now().isoformat(), + "patient_story": self.patient_story.strip(), + "extracted_evidence": json.dumps(evidence), + "primary_conclusion": diagnosis.conclusion, + "confidence": str(diagnosis.confidence), # Convert float to string + "evidence_path": json.dumps(diagnosis.evidence_path), + "alternative_explanations": json.dumps( + diagnosis.alternative_explanations + ), + "network_structure": json.dumps(network_structure), + } + + logger.debug(f"Preparing to log entry: {log_entry}") + + # Write to CSV + with open(self.log_file, "a", newline="", encoding="utf-8") as f: + writer = csv.DictWriter( + f, + fieldnames=[ + "timestamp", + "patient_story", + "extracted_evidence", + "primary_conclusion", + "confidence", + "evidence_path", + "alternative_explanations", + "network_structure", + ], + ) + + writer.writerow(log_entry) + + logger.info(f"Successfully logged diagnostic process to {self.log_file}") + + except Exception as e: + logger.error(f"Failed to log diagnostic process: {e}", exc_info=True) + raise + + def create_node(self, description: str) -> Tuple[str, List[str]]: + """ + Converts natural language descriptions into Bayesian network nodes. + + The Process: + 1. Takes a medical concept description (e.g., "patient's fatigue level") + 2. Uses LLM to generate: + - A standardized node name (snake_case) + - 5 possible states for that node + 3. Returns structured format for network building + + Example: + Input: "patient's fatigue level" + Output: ("fatigue_level", ["none", "mild", "moderate", "severe", "extreme"]) + """ + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + "You are a helpful assistant that creates nodes for Bayesian networks. Return only valid JSON.", + ), + ( + "user", + """Create a node for a Bayesian network based on this description: + "{description}" + + Return a JSON object with: + 1. A short snake_case name for the node + 2. A list of 5 possible states for this node + + Return ONLY the JSON object, no additional text or formatting: + {{"name": "node_name", "states": ["state1", "state2", "state3", "state4", "state5"]}}""", + ), + ] + ) + + print(f"๐Ÿ”„ Creating node from description: '{description}'") + print("๐Ÿ“ค Sending request to LLM...") + + try: + chain = prompt | self.llm | StrOutputParser() + content = chain.invoke({"description": description}) + + # Clean up the response + content = content.strip() + # Remove any markdown formatting + if "```" in content: + content = content.split("```")[1] + if "json" in content.split("\n")[0]: + content = "\n".join(content.split("\n")[1:]) + # Remove any trailing backticks + content = content.replace("`", "").strip() + + try: + node_info = json.loads(content) + if ( + not isinstance(node_info, dict) + or "name" not in node_info + or "states" not in node_info + ): + raise ValueError("Invalid JSON structure") + return node_info["name"], node_info["states"] + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON: {content}") + logger.error(f"JSON error: {e}") + raise + except ValueError as e: + logger.error(f"Invalid JSON structure: {content}") + raise + + except Exception as e: + logger.error(f"Unexpected error in create_node: {e}") + logger.error(f"Failed description: {description}") + raise + + def extract_relationships(self, text: str) -> List[Tuple[str, str]]: + """ + Identifies causal relationships between medical concepts. + + The Process: + 1. Analyzes patient story for cause-effect relationships + 2. Maps relationships to existing network nodes + 3. Validates relationships against known nodes + + Example: + "Fatigue is causing decreased activity" -> + [("fatigue_level", "activity_level")] + + This forms the structure of our Bayesian network, showing how + different medical conditions influence each other. + """ + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + "You are a helpful assistant that identifies causal relationships in text.", + ), + ( + "user", + """ + From this text, identify causal relationships between concepts: + {text} + + Use ONLY these exact node names in your response: + {nodes} + + Return a JSON array of objects with cause and effect properties: + [ + {{"cause": "node_name1", "effect": "node_name2"}} + ] + """, + ), + ] + ) + + try: + chain = prompt | self.llm | StrOutputParser() + # Pass both text and nodes as variables to the prompt + content = chain.invoke({"text": text, "nodes": list(self.nodes.keys())}) + + # Handle markdown formatting if present + if "```" in content: + content = content.split("```")[1].strip() + if content.startswith("json\n"): + content = content[5:] + + relationships = json.loads(content) + + # Map and validate relationships + valid_relationships = [ + (rel["cause"], rel["effect"]) + for rel in relationships + if rel["cause"] in self.nodes and rel["effect"] in self.nodes + ] + + logger.info(f"Extracted relationships: {valid_relationships}") + return valid_relationships + + except Exception as e: + logger.error(f"Failed to extract relationships: {e}") + return [] + + def build_network(self): + """Build the Bayesian network structure""" + print("\n๐Ÿ”— Building network structure...") + self.network = BayesianNetwork() + + # Add nodes + for node in self.nodes: + self.network.add_node(node) + + # Add edges from relationships using the actual patient story + relationships = self.extract_relationships(self.patient_story) + print(f"Found {len(relationships)} relationships") + + for cause, effect in relationships: + self.network.add_edge(cause, effect) + + def extract_medical_concepts(self, story: str) -> List[str]: + """Extract relevant medical concepts from patient story""" + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + """You are a medical expert that identifies key medical concepts. + Return ONLY a JSON array of descriptions, no additional text or formatting. + Example: ["concept1", "concept2", "concept3"]""", + ), + ( + "user", + """From this patient story, identify all key medical concepts that should be modeled: + {story} + + Return ONLY the JSON array, no explanation or additional text.""", + ), + ] + ) + + try: + chain = prompt | self.llm | StrOutputParser() + content = chain.invoke({"story": story}) + + # Clean up the response + content = content.strip() + + # Remove any markdown formatting if present + if "```json" in content: + content = content.split("```json")[1] + if "```" in content: + content = content.split("```")[0] + + # Remove any trailing or leading whitespace or special characters + content = content.strip("`\n\r\t ") + + logger.debug(f"Cleaned medical concepts response: {content}") + + try: + concepts = json.loads(content) + if not isinstance(concepts, list): + raise ValueError("Response is not a list") + + # Ensure all elements are strings + concepts = [str(concept) for concept in concepts] + + if not concepts: + logger.warning( + "No medical concepts extracted, using fallback concepts" + ) + return ["mood state", "energy level", "fatigue symptoms"] + + logger.info(f"Extracted medical concepts: {concepts}") + return concepts + + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON response: {content}") + logger.error(f"JSON error: {e}") + # Provide fallback concepts + return ["mood state", "energy level", "fatigue symptoms"] + + except Exception as e: + logger.error(f"Failed to extract medical concepts: {e}") + # Provide fallback concepts + return ["mood state", "energy level", "fatigue symptoms"] + + def extract_evidence(self, story: str) -> Dict[str, str]: + """Extract evidence from patient story matching node states""" + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + "You are a medical expert that extracts patient information.", + ), + ( + "user", + """ + From this patient story, extract relevant states for our nodes. + Story: {story} + + Available nodes and states: + {nodes_and_states} + + Return a JSON object mapping node names to their states based on the story. + Only include nodes where there is clear evidence in the story. + """, + ), + ] + ) + + nodes_str = "\n".join( + [f"{name}: {states}" for name, states in self.nodes.items()] + ) + chain = prompt | self.llm | StrOutputParser() + content = chain.invoke({"story": story, "nodes_and_states": nodes_str}) + return json.loads(content) + + def setup_medical_network(self, story: str): + """Set up a medical diagnosis network from patient story""" + self.patient_story = story + print("\n๐Ÿ“‹ Setting up medical diagnosis network from patient story...") + + # Extract concepts from story + concepts = self.extract_medical_concepts(story) + + # Create nodes for each concept + print("\n๐Ÿ—๏ธ Creating nodes...") + for i, desc in enumerate(concepts, 1): + print(f"\nNode {i}/{len(concepts)}\n") + name, states = self.create_node(desc) + self.nodes[name] = states + print(f"โœ… Created node: {name} with states: {states}") + + self.build_network() + + def generate_explanation(self, evidence: Dict[str, str]) -> str: + """Generate a natural language explanation of the network state given evidence""" + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + """You are a medical expert that explains Bayesian network states. + Explain the relationships between variables and likely outcomes based on evidence.""", + ), + ( + "user", + """Given this Bayesian network structure and evidence, explain the likely medical implications: + + Network Nodes: {nodes} + + Current Evidence: {evidence} + + Please provide: + 1. An interpretation of the evidence + 2. Likely implications for other variables + 3. Key relationships between variables that are relevant + + Keep the explanation clear and medical-focused.""", + ), + ] + ) + + try: + chain = prompt | self.llm | StrOutputParser() + + # Format nodes for better readability + nodes_str = "\n".join( + [ + f"- {name}: {', '.join(states)}" + for name, states in self.nodes.items() + ] + ) + + # Format evidence for better readability + evidence_str = "\n".join( + [f"- {node}: {state}" for node, state in evidence.items()] + ) + + explanation = chain.invoke({"nodes": nodes_str, "evidence": evidence_str}) + + return explanation + + except Exception as e: + logger.error(f"Failed to generate explanation: {e}") + return "Unable to generate explanation due to an error." + + def generate_diagnostic_reasoning( + self, evidence: Dict[str, str] + ) -> DiagnosticReasoning: + """ + Produces structured diagnostic analysis using LLM reasoning. + + The Process: + 1. Takes observed evidence (symptoms, test results, etc.) + 2. Uses network structure to understand relationships + 3. Generates: + - Primary diagnosis with confidence + - Step-by-step reasoning path + - Alternative explanations with probabilities + + This mimics medical differential diagnosis where doctors: + - Consider multiple possibilities + - Weigh evidence strength + - Rule out alternatives systematically + """ + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + """You are a medical expert that provides detailed diagnostic reasoning. + You must respond ONLY with a JSON object in this exact format: + {{ + "conclusion": "Primary diagnostic conclusion", + "confidence": 0.XX, + "evidence_path": ["step1", "step2", "step3"], + "alternative_explanations": [["alternative1", 0.XX], ["alternative2", 0.XX]], + }} + Do not include any additional text, markdown formatting, or explanations.""", + ), + ( + "user", + """Based on this evidence and network structure, provide diagnostic reasoning: + Network Structure: {network_structure} + Evidence: {evidence} + Nodes and States: {nodes_states}""", + ), + ] + ) + + try: + chain = prompt | self.llm | StrOutputParser() + + network_structure = [ + f"{cause} โ†’ {effect}" for cause, effect in self.network.edges() + ] + nodes_states = {node: states for node, states in self.nodes.items()} + + response = chain.invoke( + { + "network_structure": network_structure, + "evidence": evidence, + "nodes_states": nodes_states, + } + ) + + # Clean up the response + response = response.strip() + + # Remove any markdown formatting if present + if "```json" in response: + response = response.split("```json")[1] + if "```" in response: + response = response.split("```")[0] + + # Remove any trailing or leading whitespace or special characters + response = response.strip("`\n\r\t ") + + logger.debug(f"Cleaned response: {response}") + + try: + result = json.loads(response) + + # Validate required fields + required_fields = { + "conclusion", + "confidence", + "evidence_path", + "alternative_explanations", + } + if not all(field in result for field in required_fields): + missing = required_fields - set(result.keys()) + raise ValueError(f"Missing required fields: {missing}") + + # Ensure confidence is float + result["confidence"] = float(result["confidence"]) + + # Ensure alternative_explanations format is correct + result["alternative_explanations"] = [ + [str(alt), float(conf)] + for alt, conf in result["alternative_explanations"] + ] + + return DiagnosticReasoning(**result) + + except json.JSONDecodeError as e: + logger.error(f"Failed to parse JSON response: {response}") + logger.error(f"JSON error: {e}") + # Provide a fallback response + return DiagnosticReasoning( + conclusion="Unable to generate proper diagnosis due to system error", + confidence=0.0, + evidence_path=["System encountered an error in processing"], + alternative_explanations=[], + ) + + except Exception as e: + logger.error(f"Failed to generate diagnostic reasoning: {e}") + raise + + def explain_decision_path(self, diagnosis: DiagnosticReasoning) -> str: + """Generate a human-readable explanation of the diagnostic decision path""" + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + "You are a medical expert explaining diagnostic reasoning to other medical professionals.", + ), + ( + "user", + """Create a detailed explanation of this diagnostic reasoning: + Conclusion: {conclusion} + Confidence: {confidence} + Evidence Path: {evidence_path} + Alternatives: {alternatives} + + Format the explanation with: + 1. Primary conclusion and confidence level + 2. Step-by-step reasoning path + 3. Key evidence relationships + 4. Alternative considerations""", + ), + ] + ) + + chain = prompt | self.llm | StrOutputParser() + return chain.invoke( + { + "conclusion": diagnosis.conclusion, + "confidence": diagnosis.confidence, + "evidence_path": diagnosis.evidence_path, + "alternatives": diagnosis.alternative_explanations, + } + ) + + def verify_log_file(self): + """Verify that the log file exists and contains data""" + try: + if not self.log_file.exists(): + logger.error("Log file does not exist!") + return False + + with open(self.log_file, "r", encoding="utf-8") as f: + reader = csv.DictReader(f) + rows = list(reader) + logger.info(f"Log file contains {len(rows)} entries") + if rows: + logger.debug(f"Last entry: {rows[-1]}") + return True + except Exception as e: + logger.error(f"Error verifying log file: {e}", exc_info=True) + return False + + +def main(): + try: + print("\n๐Ÿš€ Initializing BayesianLLM system...") + llm = BayesianLLM() + + patient_story = """ + I am Lydia and I'm not feeling well. I feel so somber and tired. + """ + + # Store patient story + llm.patient_story = patient_story + + # Setup network + llm.setup_medical_network(patient_story) + + # Extract evidence from story + evidence = llm.extract_evidence(patient_story) + logger.info(f"Extracted Evidence: {evidence}") + + # Generate detailed diagnostic reasoning + diagnosis = llm.generate_diagnostic_reasoning(evidence) + logger.info(f"Generated diagnosis: {diagnosis}") + + # Log the diagnostic process + llm.log_diagnostic_process(evidence, diagnosis) + + print("\n๐Ÿ“Š Diagnostic Analysis:") + print( + f"Primary Conclusion: {diagnosis.conclusion} (Confidence: {diagnosis.confidence*100:.1f}%)" + ) + print("\nReasoning Path:") + for step in diagnosis.evidence_path: + print(f"- {step}") + + print("\nAlternative Explanations:") + for alt, conf in diagnosis.alternative_explanations: + print(f"- {alt} ({conf*100:.1f}% confidence)") + + # Generate detailed explanation + print("\n๐Ÿ“ Detailed Medical Explanation:") + explanation = llm.explain_decision_path(diagnosis) + print(explanation) + + print(f"\nโœ… Diagnostic process has been logged to: {llm.log_file}") + + # Verify the log file + llm.verify_log_file() + + except Exception as e: + logger.error(f"Error in main: {e}", exc_info=True) + raise + + +if __name__ == "__main__": + # Set up logging + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + main() diff --git a/src/journey_to_transformer/01_xor_network.py b/src/journey_to_transformer/01_xor_network.py new file mode 100644 index 0000000..00d9c67 --- /dev/null +++ b/src/journey_to_transformer/01_xor_network.py @@ -0,0 +1,201 @@ +""" +The XOR Problem and Its Historical Significance + +The XOR (exclusive OR) problem was a pivotal challenge in AI history that helped lead to +the first AI winter in the 1970s. The controversy began when Marvin Minsky and Seymour +Papert published their 1969 book "Perceptrons", which demonstrated that single-layer +perceptrons could not solve the XOR problem. + +The XOR function returns: +- 1 when inputs are different (1,0) or (0,1) +- 0 when inputs are same (0,0) or (1,1) + +This created a crisis because: +1. XOR is a simple logical operation that humans can easily understand +2. Single-layer perceptrons could not learn this pattern +3. It wasn't clear if adding layers would help or if they could be trained effectively + +The solution emerged in the 1980s with: +1. Multi-layer networks (adding hidden layers) +2. Backpropagation algorithm for training +3. Non-linear activation functions + +This combination allowed neural networks to learn the complex decision boundaries needed +for XOR, helping to end the first AI winter. The XOR problem demonstrates that: +- Sometimes simple-looking problems require complex solutions +- The limitations of one approach can drive innovation in new directions +- Understanding failure cases is crucial for advancing the field + +The network below uses: +- 2 input neurons (for the two binary inputs) +- 4 hidden neurons (to create complex decision boundaries) +- 1 output neuron (for the binary output) +- ReLU activation (to introduce non-linearity) + + +It doesn't always learn correctly. This is a classic case of the network getting stuck in a +local minimum - in this case, it's actually stuck at its initial state where it's just +predicting 0.5 for everything. The constant loss of 0.6931 (which is approximately -ln(0.5)) +is a telltale sign that the network isn't learning at all. + +This happens because: +- Neural networks are initialized with random weights +- Sometimes these initial weights lead to a configuration where the gradients aren't + strong enough to push the network out of this "lazy" state +- The network finds it's "comfortable" just predicting 0.5 for everything, as this + minimizes its maximum error for any input + +Solutions typically include: +- Just restart training with new random weights (reinitialize the model) +- Try different learning rates +- Use different weight initialization strategies +- Add momentum to the optimizer + +This is actually a great learning example because it shows how neural networks can sometimes +get stuck, just like humans can get stuck in suboptimal thinking patterns! The good news is +that if you just run the code again, the new random initialization will likely give you +better results. + +Modern XOR networks use: +- Batch normalization to stabilize values +- Leaky ReLU activation to improve learning +- Sigmoid at the end to ensure output is between 0 and 1 + +class ModernXORNetwork(nn.Module): + def __init__(self): + super().__init__() + self.network = nn.Sequential( + nn.Linear(2, 4), + nn.BatchNorm1d(4), # Stabilize values + nn.LeakyReLU(), # Better activation + nn.Linear(4, 1), + nn.Sigmoid(), # Only at end for 0-1 output + ) + + def forward(self, x): + return self.network(x) +""" + +import torch +import torch.nn as nn +import matplotlib.pyplot as plt + +# Configure matplotlib to work in VS Code +plt.switch_backend("TkAgg") + + +class XORNetwork(nn.Module): + """ + A simple neural network for solving the XOR problem. + """ + + def __init__(self): + super().__init__() + self.layers = nn.Sequential( + # First layer: 2 inputs -> 4 neurons + # We need 4 neurons because XOR is a complex pattern: + # - 2 neurons aren't enough to separate the data properly + # - 4 neurons give us more "decision boundaries" to work with + nn.Linear(2, 4), + # ReLU activation function + # - Converts negative numbers to 0 + # - Keeps positive numbers as they are + # - Helps network learn non-linear patterns + nn.ReLU(), + # Output layer: 4 neurons -> 1 output + # - Takes the 4 intermediate values + # - Combines them into final yes/no decision + nn.Linear(4, 1), + # Sigmoid squishes output between 0 and 1 + # - Perfect for yes/no decisions + # - 0 = false, 1 = true + nn.Sigmoid(), + ) + + def forward(self, x): + return self.layers(x) + + +# Create training data +# XOR truth table: output is 1 if inputs are different, 0 if same +X = torch.tensor( + [ + [0.0, 0.0], # Input: (0,0) -> Output should be 0 + [0.0, 1.0], # Input: (0,1) -> Output should be 1 + [1.0, 0.0], # Input: (1,0) -> Output should be 1 + [1.0, 1.0], + ] +) # Input: (1,1) -> Output should be 0 + +y = torch.tensor( + [ + [0.0], # Expected output for (0,0) + [1.0], # Expected output for (0,1) + [1.0], # Expected output for (1,0) + [0.0], + ] +) # Expected output for (1,1) + +# Create network and training tools +model = XORNetwork() +# Binary Cross Entropy Loss: good for yes/no problems +criterion = nn.BCELoss() +# Adam optimizer: automatically adjusts learning speed +# lr=0.05 means "take bigger steps" when learning +optimizer = torch.optim.Adam(model.parameters(), lr=0.05) + +# Keep track of how well we're learning +losses = [] + +print("Training the network to solve XOR...") +print("Epoch Loss") +print("-" * 20) + +# Train for 1000 rounds +for epoch in range(1000): + # 1. Make a prediction with current network + output = model(X) + # 2. Calculate how wrong we were + loss = criterion(output, y) + # 3. Reset gradients from last time + optimizer.zero_grad() + # 4. Calculate how to adjust the network + loss.backward() + # 5. Update the network + optimizer.step() + + # Store loss for plotting + losses.append(loss.item()) + + # Show progress every 100 epochs + if (epoch + 1) % 100 == 0: + print(f"{epoch+1:5d} {loss.item():.4f}") + +# Test how well we learned +print("\nTesting the network:") +print("Input Target Prediction Result") +print("-" * 40) +with torch.no_grad(): # Don't need gradients for testing + predictions = model(X) + for i in range(len(X)): + prediction = predictions[i].item() + target = y[i].item() + # Consider prediction wrong if it's more than 0.2 away from target + is_correct = abs(prediction - target) < 0.2 + result = "โœ…" if is_correct else "๐Ÿ’ฅ" + print(f"{X[i].numpy()} {target:.0f} {prediction:.3f} {result}") + +print("\nNetwork parameters (weights and biases):") +for name, param in model.named_parameters(): + print(f"{name}: {param.data}") +print() +# Plot how the learning progressed +plt.figure(figsize=(10, 5)) +plt.plot(losses) +plt.title("Training Loss Over Time") +plt.xlabel("Epoch") +plt.ylabel("Loss") +plt.grid(True) +plt.show() + +print("\nLook how quickly it learns! Much faster than waiting 17 years... ๐Ÿ˜‰") diff --git a/src/journey_to_transformer/02_neural_net_evolution.py b/src/journey_to_transformer/02_neural_net_evolution.py new file mode 100644 index 0000000..6642cfe --- /dev/null +++ b/src/journey_to_transformer/02_neural_net_evolution.py @@ -0,0 +1,216 @@ +""" +Neural Network Evolution: From Basic to Modern Architectures + +This file demonstrates the historical evolution of neural network architectures, +showing how various improvements helped solve fundamental problems: + +1. BasicNetwork: Uses sigmoid activation (historical approach from 1980s) +2. ImprovedNetwork: Uses tanh activation (1990s improvement) +3. ModernNetwork: Implements batch normalization and ReLU (2010s best practices) +4. SimpleMemoryNetwork: Demonstrates early memory concepts (precursor to LSTM) + +Each network shows key innovations that helped advance deep learning. +""" + +import torch +import torch.nn as nn +import torch.optim as optim +import matplotlib.pyplot as plt + + +# 1. Basic Network with Sigmoid (prone to vanishing gradients) +class BasicNetwork(nn.Module): + """ + Represents the earliest practical neural networks (1980s-style). + + Problems with this architecture: + - Sigmoid activation suffers from vanishing gradients + - Gradients become very small for extreme values + - Network learns very slowly in deep layers + """ + def __init__(self, input_size, hidden_size, output_size): + super().__init__() + self.layer1 = nn.Linear(input_size, hidden_size) + self.sigmoid = nn.Sigmoid() # Historical activation function + self.layer2 = nn.Linear(hidden_size, output_size) + + def forward(self, x): + x = self.sigmoid(self.layer1(x)) # Sigmoid squashes values to (0,1) + x = self.layer2(x) + return x + + +# 2. Improved Network with Better Activation +class ImprovedNetwork(nn.Module): + """ + Represents 1990s improvements with tanh activation. + + Advantages over sigmoid: + - Outputs centered around 0 (-1 to 1 range) + - Stronger gradients + - Generally faster convergence + """ + def __init__(self, input_size, hidden_size, output_size): + super().__init__() + self.layer1 = nn.Linear(input_size, hidden_size) + self.tanh = nn.Tanh() # Centered activation function + self.layer2 = nn.Linear(hidden_size, output_size) + + def forward(self, x): + x = self.tanh(self.layer1(x)) + x = self.layer2(x) + return x + + +# 3. Deep Network with Modern Solutions +class ModernNetwork(nn.Module): + """ + Represents current best practices (2010s onwards). + + Key modern features: + - ReLU activation (solves vanishing gradient) + - Batch Normalization (stabilizes training) + - Deeper architecture (more layers) + - Xavier/Glorot initialization (built into PyTorch) + """ + def __init__(self, input_size, hidden_size, output_size): + super().__init__() + # Modern architecture with multiple improvements + self.layer1 = nn.Linear(input_size, hidden_size) + self.bn1 = nn.BatchNorm1d(hidden_size) # Normalizes layer outputs + self.relu = nn.ReLU() # Modern activation function + self.layer2 = nn.Linear(hidden_size, hidden_size) + self.bn2 = nn.BatchNorm1d(hidden_size) + self.layer3 = nn.Linear(hidden_size, output_size) + + def forward(self, x): + # Each layer follows the pattern: Linear -> BatchNorm -> ReLU + x = self.relu(self.bn1(self.layer1(x))) + x = self.relu(self.bn2(self.layer2(x))) + x = self.layer3(x) # No activation on final layer + return x + + +# 4. Early LSTM-like Memory (simplified for demonstration) +class SimpleMemoryNetwork(nn.Module): + """ + Demonstrates early attempts at networks with memory (pre-LSTM). + + Key concepts: + - Input gate: Controls what information to store + - Memory cell: Maintains state over time + - Output gate: Controls what information to output + + This is a simplified version showing the concept that led to LSTM/GRU. + """ + def __init__(self, input_size, hidden_size, output_size): + super().__init__() + self.hidden_size = hidden_size + + # Gates control information flow + self.input_gate = nn.Linear(input_size + hidden_size, hidden_size) + self.memory_transform = nn.Linear(input_size + hidden_size, hidden_size) + self.output_gate = nn.Linear(input_size + hidden_size, hidden_size) + + self.output = nn.Linear(hidden_size, output_size) + + # Activation functions for different purposes + self.tanh = nn.Tanh() # For memory content + self.sigmoid = nn.Sigmoid() # For gates + + def forward(self, x, hidden_state=None): + batch_size = x.size(0) + + # Initialize hidden state if none provided + if hidden_state is None: + hidden_state = torch.zeros(batch_size, self.hidden_size).to(x.device) + + # Combine current input with previous state + combined = torch.cat((x, hidden_state), dim=1) + + # Gate mechanisms + input_gate = self.sigmoid(self.input_gate(combined)) + memory_write = self.tanh(self.memory_transform(combined)) + output_gate = self.sigmoid(self.output_gate(combined)) + + # Update memory state + memory_cell = input_gate * memory_write + + # Generate output using gated memory + hidden_state = output_gate * self.tanh(memory_cell) + output = self.output(hidden_state) + + return output, hidden_state + + +# Demonstration +def train_and_compare(): + """ + Trains all network variants on a simple task and compares their performance. + + The task is to sum input features - chosen because: + - It's simple enough to learn quickly + - Complex enough to show differences between architectures + - Easy to verify results + """ + # Generate some sample data + X = torch.randn(100, 10) # 100 samples, 10 features + y = torch.sum(X, dim=1).unsqueeze(1) # Simple sum task + + # Create networks + networks = { + "Basic (Sigmoid)": BasicNetwork(10, 20, 1), + "Improved (Tanh)": ImprovedNetwork(10, 20, 1), + "Modern (ReLU+BN)": ModernNetwork(10, 20, 1), + "Memory Net": SimpleMemoryNetwork(10, 20, 1), + } + + # Training settings + epochs = 500 + losses = {name: [] for name in networks} + + for name, net in networks.items(): + print(f"\nTraining {name}...") + optimizer = optim.Adam(net.parameters(), lr=0.01) + criterion = nn.MSELoss() + + for epoch in range(epochs): + optimizer.zero_grad() + + # Forward pass (handle memory network separately) + if isinstance(net, SimpleMemoryNetwork): + output, _ = net(X) + else: + output = net(X) + + # Compute loss + loss = criterion(output, y) + losses[name].append(loss.item()) + + # Backward pass + loss.backward() + + # Gradient clipping for stability + torch.nn.utils.clip_grad_norm_(net.parameters(), max_norm=1.0) + + optimizer.step() + + if (epoch + 1) % 20 == 0: + print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}") + + return losses + + +# Run training and plot results +losses = train_and_compare() + +plt.figure(figsize=(10, 6)) +for name, loss_values in losses.items(): + plt.plot(loss_values, label=name) +plt.xlabel("Epoch") +plt.ylabel("Loss") +plt.title("Training Loss Comparison") +plt.legend() +plt.yscale("log") # Better visualization of loss differences +plt.grid(True) +plt.show() diff --git a/src/journey_to_transformer/03_rnn_mood_pred.py b/src/journey_to_transformer/03_rnn_mood_pred.py new file mode 100644 index 0000000..2335d1a --- /dev/null +++ b/src/journey_to_transformer/03_rnn_mood_pred.py @@ -0,0 +1,239 @@ +import torch +import torch.nn as nn +import matplotlib.pyplot as plt + + +""" +RNN Mood Predictor: Understanding Sequential Data and Memory + +This example demonstrates how Recurrent Neural Networks (RNNs) can process sequences +of events and maintain a "memory" of past events to make predictions. It's like how +your mood throughout the day is influenced by the sequence of events you experience. + +Key Concepts: +1. Sequential Processing - RNNs handle data that comes in sequences (like events in a day) +2. Hidden State - The network maintains a "memory" of previous events +3. Time Steps - Each event is processed one at a time, updating the memory +4. Non-linear Transformations - Using activation functions to model complex patterns + +Historical Significance: +- RNNs were a breakthrough in handling sequential data +- They enabled applications like: + * Natural language processing + * Time series prediction + * Music generation + * Speech recognition + +The network uses: +- Input layer: Transforms each event into a hidden representation +- RNN cell: Updates the memory based on current event and previous state +- Output layer: Makes predictions based on current memory state +""" + +# Our Simple RNN - like a friend who remembers your day's events! +class MoodPredictor(nn.Module): + def __init__(self, input_size, hidden_size, output_size): + super().__init__() + self.hidden_size = hidden_size # size of the memory + + # Transform input events + self.input_layer = nn.Linear( + input_size, hidden_size + ) # used to transform the input events + + # The "memory" layer - remembers previous events + self.rnn_cell = nn.RNNCell( + hidden_size, hidden_size + ) # used to update the memory + + # Final prediction layer + self.output_layer = nn.Linear( + hidden_size, output_size + ) # used to make a prediction + + # Activation functions + self.tanh = nn.Tanh() # used to squash the values between -1 and 1 + self.sigmoid = nn.Sigmoid() # used to squash the values between 0 and 1 + + def forward(self, x, hidden=None): + # For first event of day, start with neutral state + if hidden is None: + hidden = torch.zeros(x.size(0), self.hidden_size).to(x.device) + + # Lists to store predictions at each time step + outputs = [] + + # Process each event in the sequence + for t in range(x.size(1)): + # Get current event + current_input = x[:, t, :] + + # Transform input + transformed = self.tanh(self.input_layer(current_input)) + + # Update memory with new event + hidden = self.rnn_cell(transformed, hidden) + + # Make prediction + output = self.sigmoid(self.output_layer(hidden)) + outputs.append(output) + + # Stack all predictions together + outputs = torch.stack(outputs, dim=1) + return outputs, hidden + + +# Let's create some example data! +def generate_day_sequences(num_sequences=100): + """ + Generate synthetic day sequences to train our model. + + Each day is represented as a sequence of 5 events: + - Events are one-hot encoded: [good, neutral, bad] + - Final mood is calculated based on the balance of good vs bad events + - Some randomness is added to make it more realistic + + This is like how your actual day might have a mix of events that + collectively influence your final mood. + """ + sequences = [] + labels = [] + + for _ in range(num_sequences): + # Generate random day sequence + day = torch.zeros(5, 3) + for t in range(5): + # Random event type (one-hot encoded) + event_type = torch.randint(0, 3, (1,)) + day[t, event_type] = 1 + + # Calculate mood based on events (with some randomness) + good_events = day[:, 0].sum() + bad_events = day[:, 2].sum() + mood = torch.sigmoid(torch.tensor([(good_events - bad_events) / 2])) + + sequences.append(day) + labels.append(mood) + + return torch.stack(sequences), torch.stack(labels) + + +# Training time! +def train_and_test(): + """ + Train the mood predictor and evaluate its performance. + + The training process: + 1. Split data into training and test sets + 2. Train model for 100 epochs + 3. Use Binary Cross Entropy loss (good for 0-1 predictions) + 4. Use Adam optimizer (adaptive learning rates) + 5. Evaluate on test set + 6. Visualize training progress + + This mimics how we might train a real mood prediction system, + though real-world data would be much more complex! + """ + # Generate data + X, y = generate_day_sequences() + + # Split into train and test + train_size = int(0.8 * len(X)) + X_train, X_test = X[:train_size], X[train_size:] + y_train, y_test = y[:train_size], y[train_size:] + + # Create model + model = MoodPredictor(input_size=3, hidden_size=12, output_size=1) + + # Training setup + criterion = nn.BCELoss() + optimizer = torch.optim.Adam(model.parameters(), lr=0.01) + + # Training loop + print("Training the mood predictor...") + losses = [] + + for epoch in range(100): + optimizer.zero_grad() + + # Forward pass + outputs, _ = model(X_train) + loss = criterion(outputs[:, -1], y_train) # Only care about final prediction + + # Backward pass + loss.backward() + optimizer.step() + + losses.append(loss.item()) + + if (epoch + 1) % 10 == 0: + print(f"Epoch {epoch+1}/100, Loss: {loss.item():.4f}") + + # Test the model + model.eval() + with torch.no_grad(): + _calculate_test_loss_and_accuracy(model, X_test, criterion, y_test) + # Plot training progress + plt.figure(figsize=(10, 5)) + plt.plot(losses) + plt.title("Training Progress") + plt.xlabel("Epoch") + plt.ylabel("Loss") + plt.grid(True) + plt.show() + + return model + + +def _calculate_test_loss_and_accuracy(model, X_test, criterion, y_test): + test_outputs, _ = model(X_test) + test_predictions = test_outputs[:, -1] + test_loss = criterion(test_predictions, y_test) + + # Convert predictions to binary decisions with a threshold of 0.5 + binary_preds = (test_predictions >= 0.5).float() + binary_targets = (y_test >= 0.5).float() + accuracy = (binary_preds == binary_targets).float().mean() + + print(f"\nTest Loss: {test_loss:.4f}") + print(f"Accuracy: {accuracy:.2%}") + + +# Let's run it! +if __name__ == "__main__": + model = train_and_test() + + # Try a specific day sequence + good_day = torch.tensor( + [ + [1, 0, 0], # Good morning + [1, 0, 0], # Nice lunch + [0, 1, 0], # Normal afternoon + [0, 0, 1], # Minor setback + [1, 0, 0], # Great evening + ], + dtype=torch.float32, # Specify float32 data type + ).unsqueeze(0) + + bad_day = torch.tensor( + [ + [0, 0, 1], # Bad morning + [0, 1, 0], # Meh lunch + [0, 0, 1], # Bad afternoon + [0, 0, 1], # Bad evening + [1, 0, 0], # Good night + ], + dtype=torch.float32, # Add float32 data type + ).unsqueeze( + 0 + ) # Add batch dimension + + with torch.no_grad(): + predictions, _ = model(good_day) + final_mood = predictions[0, -1].item() + print(f"\nPredicted mood for the good day: {final_mood:.2%}") + + with torch.no_grad(): + predictions, _ = model(bad_day) + final_mood = predictions[0, -1].item() + print(f"Predicted mood for the bad day: {final_mood:.2%}") diff --git a/src/journey_to_transformer/04_rnn_vs_lstm_mem.py b/src/journey_to_transformer/04_rnn_vs_lstm_mem.py new file mode 100644 index 0000000..b6f7bfc --- /dev/null +++ b/src/journey_to_transformer/04_rnn_vs_lstm_mem.py @@ -0,0 +1,144 @@ +import torch +import torch.nn as nn +import matplotlib.pyplot as plt + + +class SimpleRNN(nn.Module): + def __init__(self, input_size, hidden_size, output_size): + super().__init__() + self.hidden_size = hidden_size + self.rnn_cell = nn.RNNCell(input_size, hidden_size) + self.output = nn.Linear(hidden_size, output_size) + + def forward(self, x, hidden=None): + if hidden is None: + hidden = torch.zeros(x.size(0), self.hidden_size) + + outputs = [] + for t in range(x.size(1)): + hidden = self.rnn_cell(x[:, t, :], hidden) + output = self.output(hidden) + outputs.append(output) + return torch.stack(outputs, 1) + + +class SimpleLSTM(nn.Module): + def __init__(self, input_size, hidden_size, output_size): + super().__init__() + self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True) + self.output = nn.Linear(hidden_size, output_size) + + def forward(self, x): + lstm_out, _ = self.lstm(x) + return self.output(lstm_out) + + +def create_tricky_memory_test(sequence_length=100, batch_size=32): + """ + Creates a MUCH harder memory test: + - Longer sequences (100 timesteps) + - Multiple important events to remember + - Random noise to distract the network + - Multiple sequences at once (batch_size) + """ + # Initialize input and target sequences + x = torch.zeros(batch_size, sequence_length, 5) # 5 input features now! + y = torch.zeros(batch_size, sequence_length, 1) + + for b in range(batch_size): + # Place important events (1s) at random positions in first channel + important_positions = torch.randint(0, sequence_length // 2, (2,)) + x[b, important_positions, 0] = 1 + + # Add random noise in other channels + x[b, :, 1:] = torch.randn(sequence_length, 4) * 0.5 + + # Target: Remember the important events forever + for pos in important_positions: + y[b, pos:, 0] = 1 + + return x, y + + +# Training function with visualization +def train_and_compare(sequence_length=100, hidden_size=32, epochs=200): + # Create models + rnn_model = SimpleRNN(input_size=5, hidden_size=hidden_size, output_size=1) + lstm_model = SimpleLSTM(input_size=5, hidden_size=hidden_size, output_size=1) + + # Training setup + criterion = nn.BCEWithLogitsLoss() + rnn_optimizer = torch.optim.Adam(rnn_model.parameters(), lr=0.01) + lstm_optimizer = torch.optim.Adam(lstm_model.parameters(), lr=0.01) + + # Training history + rnn_losses = [] + lstm_losses = [] + + print("Training both models...") + for epoch in range(epochs): + # Generate new random sequences each epoch + x, y = create_tricky_memory_test(sequence_length) + + # Train RNN + rnn_optimizer.zero_grad() + rnn_out = rnn_model(x) + rnn_loss = criterion(rnn_out, y) + rnn_loss.backward() + rnn_optimizer.step() + rnn_losses.append(rnn_loss.item()) + + # Train LSTM + lstm_optimizer.zero_grad() + lstm_out = lstm_model(x) + lstm_loss = criterion(lstm_out, y) + lstm_loss.backward() + lstm_optimizer.step() + lstm_losses.append(lstm_loss.item()) + + if (epoch + 1) % 20 == 0: + print(f"Epoch {epoch+1}/{epochs}") + print(f"RNN Loss: {rnn_loss.item():.4f}") + print(f"LSTM Loss: {lstm_loss.item():.4f}\n") + + # Plot training progress + plt.figure(figsize=(10, 5)) + plt.plot(rnn_losses, label="RNN") + plt.plot(lstm_losses, label="LSTM") + plt.title("Training Loss Over Time") + plt.xlabel("Epoch") + plt.ylabel("Loss") + plt.legend() + plt.grid(True) + plt.show() + + # Test with a single sequence for visualization + x_test, y_test = create_tricky_memory_test(sequence_length, batch_size=1) + + with torch.no_grad(): + rnn_test = torch.sigmoid(rnn_model(x_test)) + lstm_test = torch.sigmoid(lstm_model(x_test)) + + # Plot test sequence predictions + plt.figure(figsize=(15, 5)) + + plt.subplot(1, 2, 1) + plt.plot(x_test[0, :, 0].numpy(), label="Important Events", marker="o") + plt.plot(rnn_test[0, :, 0].numpy(), label="RNN Prediction", alpha=0.7) + plt.title("RNN Memory Test") + plt.legend() + plt.grid(True) + + plt.subplot(1, 2, 2) + plt.plot(x_test[0, :, 0].numpy(), label="Important Events", marker="o") + plt.plot(lstm_test[0, :, 0].numpy(), label="LSTM Prediction", alpha=0.7) + plt.title("LSTM Memory Test") + plt.legend() + plt.grid(True) + + plt.tight_layout() + plt.show() + + +# Run the comparison! +train_and_compare() diff --git a/src/journey_to_transformer/05_lstm_next_char_pred.py b/src/journey_to_transformer/05_lstm_next_char_pred.py new file mode 100644 index 0000000..b2d953d --- /dev/null +++ b/src/journey_to_transformer/05_lstm_next_char_pred.py @@ -0,0 +1,225 @@ +""" +Long Short-Term Memory (LSTM) Networks and Their Significance + +LSTMs were introduced in 1997 by Hochreiter & Schmidhuber to solve the vanishing gradient +problem in traditional RNNs. They're particularly good at learning long-term dependencies +in sequential data. + +Key Components of an LSTM: +1. Forget Gate: Decides what information to throw away from the cell state +2. Input Gate: Decides which new information to store in the cell state +3. Candidate Memory: Creates new candidate values that could be added to the state +4. Output Gate: Decides what parts of the cell state to output + +The LSTM's power comes from its cell state (C_t), which acts like a conveyor belt. +Information can flow along it unchanged, and the network can learn to add or remove +information from the cell state, regulated by the gates. + +The gates are the key innovation: +- They use sigmoid functions that output numbers between 0 and 1 +- These numbers are used as filters (0 = "let nothing through", 1 = "let everything through") +- The network learns what information is important to keep or throw away + +Mathematical Formulation: +f_t = ฯƒ(W_f ยท [h_{t-1}, x_t] + b_f) # Forget gate +i_t = ฯƒ(W_i ยท [h_{t-1}, x_t] + b_i) # Input gate +Cฬƒ_t = tanh(W_c ยท [h_{t-1}, x_t] + b_c) # Candidate memory +o_t = ฯƒ(W_o ยท [h_{t-1}, x_t] + b_o) # Output gate +C_t = f_t * C_{t-1} + i_t * Cฬƒ_t # Cell state update +h_t = o_t * tanh(C_t) # Hidden state update + +Where: +- ฯƒ is the sigmoid function +- * is element-wise multiplication +- [h_{t-1}, x_t] is concatenation of previous hidden state and current input +""" + +import torch +import torch.nn as nn +import string + + +class TextPredictor(nn.Module): + """ + Neural network for predicting the next character in a sequence. + Uses LSTM (Long Short-Term Memory) architecture for understanding patterns in text. + """ + + def __init__(self, vocab_size, embedding_dim=32, hidden_size=128): + super().__init__() + self.hidden_size = hidden_size + + # Embedding layer: converts character indices to dense vectors + # - Each character gets a learned vector representation + # - Similar to word embeddings but for individual characters + # - embedding_dim controls how detailed these representations are + self.embedding = nn.Embedding(vocab_size, embedding_dim) + + # LSTM layer: processes sequences and maintains memory + # - input_size: size of embedded character vectors + # - hidden_size: how much information to remember + # - num_layers=2: stacked LSTMs for more complex patterns + # - batch_first=True: expect data in (batch, sequence, features) format + # - dropout=0.2: randomly drop 20% of connections for regularization + self.lstm = nn.LSTM( + input_size=embedding_dim, + hidden_size=hidden_size, + num_layers=2, + batch_first=True, + dropout=0.2, + ) + + # Final layer: convert LSTM output to character probabilities + # - Takes LSTM's hidden state + # - Outputs scores for each possible character + self.fc = nn.Linear(hidden_size, vocab_size) + + def forward(self, x, hidden=None): + # 1. Convert character indices to embeddings + embeds = self.embedding(x) + + # 2. Process sequence through LSTM + # - Returns processed sequence and updated hidden state + # - hidden state carries memory between batches + lstm_out, hidden = self.lstm(embeds, hidden) + + # 3. Convert LSTM output to character predictions + output = self.fc(lstm_out) + return output, hidden + + +# Text processing utilities +class TextProcessor: + """ + Handles conversion between text and the numerical format needed by the network. + Think of it as a translator between human-readable text and network-readable numbers. + """ + + def __init__(self): + # Create character mappings using all printable ASCII characters + # - Includes letters, numbers, punctuation, and whitespace + # - char_to_idx: converts characters to unique numbers + # - idx_to_char: converts numbers back to characters + self.chars = string.printable + self.char_to_idx = {ch: i for i, ch in enumerate(self.chars)} + self.idx_to_char = dict(enumerate(self.chars)) + self.vocab_size = len(self.chars) + + def encode(self, text): + """Convert text string to tensor of indices.""" + return torch.tensor([self.char_to_idx[ch] for ch in text]) + + def decode(self, indices): + """Convert tensor of indices back to text string.""" + return "".join([self.idx_to_char[idx.item()] for idx in indices]) + + +def generate_text(model, processor, start_text="Hello", length=100, temperature=0.8): + """ + Generate new text by sampling from the model's predictions. + + Parameters: + - start_text: initial text to seed the generation + - length: how many characters to generate + - temperature: controls randomness of sampling + - Lower (e.g., 0.5): more conservative, predictable text + - Higher (e.g., 1.2): more creative, potentially chaotic text + """ + model.eval() # Switch to evaluation mode + current_text = start_text + hidden = None # LSTM's memory state + + with torch.no_grad(): # Don't track gradients during generation + for _ in range(length): + # 1. Prepare input sequence + x = processor.encode(current_text) + x = x.unsqueeze(0) # Add batch dimension + + # 2. Get model's predictions + output, hidden = model(x, hidden) + + # 3. Apply temperature to adjust prediction randomness + # - Higher temperature = more uniform probabilities + # - Lower temperature = more peaked probabilities + probs = torch.softmax(output[0, -1] / temperature, dim=0) + + # 4. Sample next character from probability distribution + next_char_idx = torch.multinomial(probs, 1) + next_char = processor.decode([next_char_idx]) + + # 5. Add to generated text + current_text += next_char + + return current_text + + +def train_model(): + # Sample training text (you can replace this with your own text) + text = """The quick brown fox jumps over the lazy dog. + Smalltalk is a fantastic programming language. + LSTMs are great for processing sequential data. + Neural networks learn from examples.""" + + # Setup + processor = TextProcessor() + model = TextPredictor(processor.vocab_size) + criterion = nn.CrossEntropyLoss() + optimizer = torch.optim.Adam(model.parameters(), lr=0.001) + + # Prepare data + sequence_length = 20 + sequences = [] + targets = [] + + # Create training sequences + for i in range(len(text) - sequence_length): + sequences.append(text[i : i + sequence_length]) + targets.append(text[i + sequence_length]) + + print("Training the model...") + for epoch in range(100): + model.train() + total_loss = 0 + + for seq, target in zip(sequences, targets): + # Prepare data + x = processor.encode(seq).unsqueeze(0) # Shape: [1, seq_len] + y = processor.encode(target) # Shape: [1] + + # Forward pass + output, _ = model(x) # output shape: [1, seq_len, vocab_size] + + # Get only the last prediction and reshape + last_output = output[:, -1, :] # Shape: [1, vocab_size] + + # Loss calculation + loss = criterion(last_output, y) + + # Backward pass + optimizer.zero_grad() + loss.backward() + optimizer.step() + + total_loss += loss.item() + + if (epoch + 1) % 10 == 0: + print(f"Epoch {epoch+1}/100, Loss: {total_loss/len(sequences):.4f}") + # Generate sample text + sample = generate_text(model, processor, "The quick brown ", length=50) + print(f"Sample text: {sample}\n") + + return model, processor + + +if __name__ == "__main__": + # Train the model + model, processor = train_model() + + # Generate some text + print("\nGenerating text with different temperatures:") + for temp in [0.5, 0.8, 1.2]: + print(f"\nTemperature: {temp}") + generated = generate_text( + model, processor, "The quick brown ", length=100, temperature=temp + ) + print(generated) diff --git a/src/journey_to_transformer/06_word2vec.py b/src/journey_to_transformer/06_word2vec.py new file mode 100644 index 0000000..83ab0be --- /dev/null +++ b/src/journey_to_transformer/06_word2vec.py @@ -0,0 +1,243 @@ +""" +Word2Vec: Understanding Words Through Context + +Word2Vec, introduced by Mikolov et al. at Google in 2013, revolutionized how computers understand +words by learning their meaning from context. The key insight was: words that appear in similar +contexts probably have similar meanings. + +For example, in these sentences: +- "The cat drinks milk" +- "The dog drinks water" +We can guess that 'cat' and 'dog' are similar because they appear in similar contexts. + +The model works by: +1. Converting each word to a dense vector (embedding) +2. Learning to predict context words from target words (or vice versa) +3. Similar words end up with similar vectors + +Two main architectures: +- Skip-gram: Predict context words from target word +- CBOW (Continuous Bag of Words): Predict target word from context words + +This implementation uses Skip-gram with negative sampling: +- For each word, look at nearby words (within a window) +- Learn to predict these context words (positive samples) +- Also learn to NOT predict random other words (negative samples) + +The resulting word embeddings capture semantic relationships: +king - man + woman โ‰ˆ queen +paris - france + italy โ‰ˆ rome +""" + +import torch +import torch.nn as nn +import torch.optim as optim +import numpy as np +from collections import Counter, deque +import random + + +class Word2Vec(nn.Module): + """ + Neural network for learning word embeddings. + Uses two embedding layers: + - target_embeddings: for the main word we're looking at + - context_embeddings: for the surrounding words + """ + + def __init__(self, vocab_size, embedding_dim): + super().__init__() + # Two separate embedding layers: + # - When a word is the target, we use target_embeddings + # - When a word is in the context, we use context_embeddings + # This asymmetry helps learn richer representations + self.target_embeddings = nn.Embedding(vocab_size, embedding_dim) + self.context_embeddings = nn.Embedding(vocab_size, embedding_dim) + + # Initialize with small random values to break symmetry + # Without this, all words would start too similar + self.target_embeddings.weight.data.uniform_(-0.1, 0.1) + self.context_embeddings.weight.data.uniform_(-0.1, 0.1) + + def forward(self, target_word, context_word): + # Get vector representations + target_embed = self.target_embeddings(target_word) + context_embed = self.context_embeddings(context_word) + + # Compute similarity using dot product + # Similar words should have vectors pointing in similar directions + similarity = torch.sum(target_embed * context_embed, dim=1) + + return torch.sigmoid(similarity) + + def get_embedding(self, word_idx): + # For using the trained model, we only need target embeddings + # Context embeddings are just for training + return self.target_embeddings(torch.tensor([word_idx])).detach() + + +class Word2VecTrainer: + """ + Handles the training process for Word2Vec: + 1. Creates vocabulary from text + 2. Generates training pairs (target word + context) + 3. Trains the model using negative sampling + """ + + def __init__(self, text, embedding_dim=64, window_size=2, min_count=5): + self.window_size = window_size # How many words to look at on each side + + # Create vocabulary from text + words = text.lower().split() + word_counts = Counter(words) + + # Filter out rare words (appear less than min_count times) + # This reduces noise and speeds up training + filtered_words = [ + (word, count) for word, count in word_counts.items() if count >= min_count + ] + + # Create word-to-index mappings + self.vocab = {word: idx for idx, (word, _) in enumerate(filtered_words)} + self.idx_to_word = {idx: word for word, idx in self.vocab.items()} + self.vocab_size = len(self.vocab) + + # Generate training pairs + self.training_pairs = self._create_training_pairs(words) + + # Initialize model and training tools + self.model = Word2Vec(self.vocab_size, embedding_dim) + self.optimizer = optim.Adam(self.model.parameters()) + self.criterion = nn.BCELoss() + + def _create_training_pairs(self, words): + """ + Creates training pairs using sliding window approach: + - For each word (target), look at nearby words (context) + - Create positive pairs (target + actual context word) + - Create negative pairs (target + random word) + """ + pairs = [] + window = deque(maxlen=2 * self.window_size + 1) + + for word in words: + if word in self.vocab: + window.append(word) + if len(window) == 2 * self.window_size + 1: + target = window[self.window_size] # Middle word + # Get context words (words before and after target) + context = ( + list(window)[: self.window_size] + + list(window)[self.window_size + 1 :] + ) + + for ctx_word in context: + if ctx_word in self.vocab: + # Positive pair: target word + context word (label = 1) + pairs.append( + (self.vocab[target], self.vocab[ctx_word], 1.0) + ) + + # Negative pair: target word + random word (label = 0) + # Keep sampling until we get a word not in current context + neg_idx = random.randint(0, self.vocab_size - 1) + while self.idx_to_word[neg_idx] in context + [target]: + neg_idx = random.randint(0, self.vocab_size - 1) + + pairs.append((self.vocab[target], neg_idx, 0.0)) + return pairs + + def train(self, epochs=100, batch_size=24): + """ + Trains the model using mini-batch gradient descent: + 1. Split data into batches + 2. For each batch: + - Make predictions + - Calculate loss + - Update model weights + """ + print(f"Training Word2Vec model with {self.vocab_size} words...") + for epoch in range(epochs): + total_loss = 0 + # Shuffle pairs to prevent learning order dependencies + random.shuffle(self.training_pairs) + + # Process in batches for efficiency + for i in range(0, len(self.training_pairs), batch_size): + batch = self.training_pairs[i : i + batch_size] + targets, contexts, labels = zip(*batch) + + # Convert to PyTorch tensors + target_tensor = torch.tensor(targets) + context_tensor = torch.tensor(contexts) + label_tensor = torch.tensor(labels, dtype=torch.float32) + + # Training step + self.optimizer.zero_grad() # Reset gradients + outputs = self.model(target_tensor, context_tensor) # Forward pass + loss = self.criterion(outputs, label_tensor) # Calculate loss + loss.backward() # Backward pass + self.optimizer.step() # Update weights + + total_loss += loss.item() + + # Print progress + avg_loss = total_loss / (len(self.training_pairs) / batch_size) + print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}") + + def get_similar_words(self, word, n=5): + """ + Finds words with similar meanings by: + 1. Getting the target word's embedding + 2. Computing similarity with all other words + 3. Returning the most similar ones + """ + if word not in self.vocab: + return [] + + # Get embedding for input word + word_embedding = self.model.get_embedding(self.vocab[word]) + + # Compare with all other words using cosine similarity + similarities = [] + for other_word, idx in self.vocab.items(): + if other_word != word: + other_embedding = self.model.get_embedding(idx) + similarity = torch.cosine_similarity(word_embedding, other_embedding) + similarities.append((other_word, similarity.item())) + + # Return top N most similar words + return sorted(similarities, key=lambda x: x[1], reverse=True)[:n] + + +if __name__ == "__main__": + # Example text with related concepts + text = """ + The quick brown fox jumps over the lazy dog. + The fox is quick and brown and jumps high. + The dog is lazy and sleeps all day. + The quick rabbit jumps over the fence. + The brown bear likes honey and fish. + Fish swim in the river all day. + Dogs and foxes are related animals. + Bears and foxes live in the forest. + """ + + # Create and train model + trainer = Word2VecTrainer( + text, + embedding_dim=64, # Size of word vectors + window_size=2, # Words to consider as context + min_count=2, # Minimum word frequency + ) + + trainer.train(epochs=100, batch_size=24) + + # Test the model by finding similar words + test_words = ["quick", "fox", "dog", "river", "bear"] + for word in test_words: + similar = trainer.get_similar_words(word) + print(f"\nWords similar to '{word}':") + for similar_word, similarity in similar: + if similarity > 0.4: + print(f" {similar_word}: {similarity:.3f}") diff --git a/src/journey_to_transformer/07_softmax.py b/src/journey_to_transformer/07_softmax.py new file mode 100644 index 0000000..261a716 --- /dev/null +++ b/src/journey_to_transformer/07_softmax.py @@ -0,0 +1,101 @@ +""" +Understanding Softmax: The Neural Network's Decision Maker + +Softmax is a crucial function in neural networks that converts raw scores (logits) into +probabilities. It's used when we want our network to make decisions between multiple options. + +Key Properties of Softmax: +1. Converts any real numbers into probabilities (0-1) +2. Ensures all outputs sum to 1.0 +3. Maintains relative differences (bigger inputs = bigger probabilities) +4. Handles both positive and negative numbers + +Historical Significance: +- Introduced in 1959 by R. Duncan Luce in "Individual Choice Behavior" +- Became fundamental in neural networks during the 1980s +- Critical for modern classification tasks + +Why We Need Softmax: +- Raw neural network outputs can be any number +- We often need probabilities for decision making +- Helps with training stability +- Makes outputs interpretable +""" + +import torch +import torch.nn as nn + +# Raw scores for where to get lunch +scores = torch.tensor([10.0, 2.0, 5.0]) # Pizza, Salad, Tacos +print("Raw scores:", scores) + +def softmax(x): + """ + Converts raw scores into probabilities using the softmax function: + P(i) = exp(x[i]) / sum(exp(x)) + + Why exp()? + - Always positive (we can't have negative probabilities) + - Maintains relative differences + - Differentiable (important for training) + """ + exp_x = torch.exp(x) # Step 1: Convert to positive numbers + return exp_x / exp_x.sum() # Step 2: Normalize to sum to 1 + +# Apply softmax to our lunch scores +probabilities = softmax(scores) +print("\nAfter softmax (probabilities):", probabilities) +print("Notice they sum to 1:", probabilities.sum()) + +class SimpleClassifier(nn.Module): + """ + A basic neural network classifier that demonstrates softmax in action. + + Architecture: + - Input layer (2 features) + - Single linear layer + - Output layer (3 classes) + """ + def __init__(self): + super().__init__() + self.layer = nn.Linear(2, 3) # 2 inputs, 3 classes + # Note: PyTorch's CrossEntropyLoss includes softmax! + + def forward(self, x): + # Raw scores (logits) + scores = self.layer(x) + + # Compare outputs before and after softmax + raw_output = scores # Raw network outputs (can be any number) + probability_output = softmax(scores) # Converted to probabilities + + return raw_output, probability_output + +# Demonstrate with real data +model = SimpleClassifier() +# Two samples with two features each +input_data = torch.tensor([[2.0, 1.0], [1.0, 3.0]]) + +raw, probs = model(input_data) +print("\nRaw network outputs (can be any number):") +print(raw) +print("\nAfter softmax (nice probabilities between 0-1):") +for i in range(len(probs)): + print(f"Sample {i+1}: {probs[i]}") +print("\nEach row sums to:", probs.sum(dim=1)) # Always 1! + +# Demonstrate how softmax handles different scenarios +print("\nScenario 1: Similar inputs") +small_diffs = torch.tensor([2.0, 2.1, 2.2]) +print("Input:", small_diffs) +print("Output (notice gentle preferences):", softmax(small_diffs)) + +print("\nScenario 2: Very different inputs") +big_diffs = torch.tensor([2.0, 4.0, 2.2]) +print("Input:", big_diffs) +print("Output (notice strong preference):", softmax(big_diffs)) + +print("\nScenario 3: Mixed positive/negative") +mixed_numbers = torch.tensor([-1.0, 5.0, 2.0]) +print("Input:", mixed_numbers) +print("Output (still works!):", softmax(mixed_numbers)) diff --git a/src/journey_to_transformer/08_attention.py b/src/journey_to_transformer/08_attention.py new file mode 100644 index 0000000..7ae3e34 --- /dev/null +++ b/src/journey_to_transformer/08_attention.py @@ -0,0 +1,133 @@ +""" +Understanding Attention Mechanisms in Neural Networks + +Attention mechanisms are a fundamental concept in modern deep learning, especially in +transformers. This example demonstrates a simple attention mechanism that can: +1. Encode sentences into vector representations +2. Calculate attention scores between words +3. Find relevant sentences based on attention + +Historical Significance: +- Introduced in 2014 by Bahdanau et al. for machine translation +- Revolutionary because it allowed models to "focus" on relevant parts of input +- Led directly to the transformer architecture (2017) and modern LLMs + +Key Concepts Demonstrated: +1. Word Embeddings: Converting words to vectors +2. Attention Scores: Measuring relevance between vectors +3. Dot Product Attention: Simplest form of attention mechanism + +The network below uses: +- Word embeddings (5 dimensions per word) +- Simple dot product attention +- Mean pooling for sentence-level scores +""" + +import torch +import torch.nn as nn + + +class CoolAttention(nn.Module): + def __init__(self): + super().__init__() + + # Our dataset: simple sentences about food preferences + # Each sentence follows pattern: [Person] [Verb] [Food] + self.story = [ + "Alice loves pizza", + "Bob hates broccoli", + "Charlie eats cookies", + "Alice likes cake", + "Bob loves sushi", + ] + + # Vocabulary mapping: convert words to unique indices + # Organized by semantic categories (people, verbs, foods) + self.word2idx = { + # People embeddings (indices 0-2) + "Alice": 0, + "Bob": 1, + "Charlie": 2, + # Verb embeddings (indices 3-6) + "loves": 3, + "hates": 4, + "likes": 5, + "eats": 6, + # Food embeddings (indices 7-11) + "pizza": 7, + "broccoli": 8, + "cookies": 9, + "cake": 10, + "sushi": 11, + } + + # Create learnable word embeddings + # - Each word gets a 5-dimensional vector + # - These vectors are randomly initialized and could be trained + # - 5 dimensions is arbitrary (could be larger for more complex relationships) + self.embeddings = nn.Embedding(len(self.word2idx), 5) + + def encode_sentence(self, sentence): + """ + Convert a sentence into its vector representation. + + Args: + sentence (str): Input sentence to encode + + Returns: + torch.Tensor: Tensor of word embeddings (shape: [num_words, embedding_dim]) + """ + # Split sentence into words and convert to indices + words = sentence.split() + indices = [self.word2idx[word] for word in words] + # Look up embeddings for each word + return self.embeddings(torch.tensor(indices)) + + def attention_search(self, person): + """ + Find sentences relevant to a specific person using attention. + + Args: + person (str): Person to search for + + Returns: + list: Sorted list of (sentence, attention_score) tuples + """ + results = [] + + # Step 1: Convert all sentences to vector representations + encoded_sentences = [self.encode_sentence(s) for s in self.story] + + # Step 2: Calculate attention scores for each sentence + for i, sentence_embedding in enumerate(encoded_sentences): + # Calculate attention using dot product between: + # - First word of sentence (usually the person) + # - Embedding of the search query (person) + score = torch.mean( + sentence_embedding[0] + * self.embeddings(torch.tensor([self.word2idx[person]])) + ) + results.append((self.story[i], score.item())) + + # Step 3: Sort results by attention score (highest first) + results.sort(key=lambda x: x[1], reverse=True) + return results + + +# Demo the attention mechanism +attention = CoolAttention() + +# Search for Alice's food preferences +print("๐Ÿ” Searching for Alice's food preferences...") +results = attention.attention_search("Alice") +for sentence, score in results: + # Visualize attention scores with stars + attention_emojis = "๐ŸŒŸ" * int(score * 5) + print(f"{attention_emojis} {sentence}") + +# Search for Bob's food preferences +print("\n๐Ÿ” Now searching for Bob's food preferences...") +results = attention.attention_search("Bob") +for sentence, score in results: + attention_emojis = "๐ŸŒŸ" * int(score * 5) + print(f"{attention_emojis} {sentence}") diff --git a/src/journey_to_transformer/09_mini_gpt.py b/src/journey_to_transformer/09_mini_gpt.py new file mode 100644 index 0000000..495e38c --- /dev/null +++ b/src/journey_to_transformer/09_mini_gpt.py @@ -0,0 +1,452 @@ +""" +MiniGPT: A Small But Powerful Transformer Implementation + +This implementation demonstrates core concepts of the transformer architecture: +1. Multi-head self-attention for capturing relationships between tokens +2. Position embeddings to maintain sequence order information +3. Feed-forward networks for processing token representations +4. Layer normalization and residual connections for stable training + +Historical Significance: +- Transformers revolutionized NLP when introduced in "Attention Is All You Need" (2017) +- GPT (Generative Pre-trained Transformer) showed that transformers could be used for + general language understanding +- The architecture scales remarkably well, leading to models like GPT-3 and GPT-4 + +Key Components: +1. Token Embeddings: Convert discrete tokens to continuous vectors +2. Position Embeddings: Add position information to tokens +3. Self-Attention: Learn relationships between tokens +4. Feed-Forward: Process token representations +5. Layer Norm: Stabilize training +""" + +import torch +import torch.nn as nn +import torch.nn.functional as F +import math +import json +from tqdm import tqdm + + +class MultiHeadAttention(nn.Module): + """ + Multi-head attention mechanism that allows the model to jointly attend to information + from different representation subspaces at different positions. + + Key Concepts: + - Query, Key, Value: Different projections of input for attention computation + - Multiple heads: Allow attention to focus on different aspects of the input + - Causal masking: Ensures model only looks at past tokens (for autoregressive generation) + """ + + def __init__(self, config): + super().__init__() + self.num_heads = config.num_heads + self.head_size = config.head_size + self.dropout = config.dropout + + # Create separate projections for Q,K,V + # Each head gets its own portion of the embedding dimension + self.query = nn.Linear(config.n_embd, config.n_embd) + self.key = nn.Linear(config.n_embd, config.n_embd) + self.value = nn.Linear(config.n_embd, config.n_embd) + + # Final projection to combine all heads + self.proj = nn.Linear(config.n_embd, config.n_embd) + + # Causal mask ensures autoregressive property + # Each token can only attend to previous tokens and itself + self.register_buffer( + "mask", torch.tril(torch.ones(config.block_size, config.block_size)) + ) + + def forward(self, x): + B, T, C = x.shape # batch, sequence length, embedding dim + + # Split heads and transpose for parallel attention computation + q = self.query(x).view(B, T, self.num_heads, self.head_size).transpose(1, 2) + k = self.key(x).view(B, T, self.num_heads, self.head_size).transpose(1, 2) + v = self.value(x).view(B, T, self.num_heads, self.head_size).transpose(1, 2) + + # Scaled dot-product attention + # Scale factor prevents softmax saturation with large embedding dimensions + att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1))) + att = att.masked_fill( + self.mask[:T, :T] == 0, float("-inf") + ) # Apply causal mask + att = F.softmax(att, dim=-1) # Convert to probabilities + att = F.dropout(att, p=self.dropout, training=self.training) # Apply dropout + + # Combine attention weights with values + out = att @ v + + # Restore original dimensions and project + out = out.transpose(1, 2).contiguous().view(B, T, C) + return self.proj(out) + + +class FeedForward(nn.Module): + def __init__(self, config): + super().__init__() + self.net = nn.Sequential( + # First we expand + nn.Linear(config.n_embd, 4 * config.n_embd), + nn.ReLU(), + # Then we shrink back down + nn.Linear(4 * config.n_embd, config.n_embd), + nn.Dropout(config.dropout), + ) + + def forward(self, x): + return self.net(x) + + +class TransformerBlock(nn.Module): + def __init__(self, config): + super().__init__() + self.attention = MultiHeadAttention(config) + self.feed_forward = FeedForward(config) + self.ln1 = nn.LayerNorm(config.n_embd) + self.ln2 = nn.LayerNorm(config.n_embd) + + def forward(self, x): + # Attention with residual connection + x = x + self.attention(self.ln1(x)) + # Feed forward with residual connection + x = x + self.feed_forward(self.ln2(x)) + return x + + +class MiniGPT(nn.Module): + def __init__(self, config): + super().__init__() + # Store config as instance variable + self.config = config + + # Token embedding table + self.token_embedding = nn.Embedding(config.vocab_size, config.n_embd) + # Position embedding table + self.position_embedding = nn.Embedding(config.block_size, config.n_embd) + + # Transformer blocks + self.blocks = nn.ModuleList( + [TransformerBlock(config) for _ in range(config.n_layer)] + ) + + # Final layer norm + self.ln_f = nn.LayerNorm(config.n_embd) + + # Language model head + self.lm_head = nn.Linear(config.n_embd, config.vocab_size) + + # Initialize weights + self.apply(self._init_weights) + + def _init_weights(self, module): + if isinstance(module, nn.Linear): + torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) + if module.bias is not None: + torch.nn.init.zeros_(module.bias) + elif isinstance(module, nn.Embedding): + torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) + + def forward(self, idx, targets=None): + B, T = idx.shape + + # Get token embeddings + tok_emb = self.token_embedding(idx) + # Get position embeddings + pos_emb = self.position_embedding(torch.arange(T, device=idx.device)) + # Combine them + x = tok_emb + pos_emb + + # Apply transformer blocks + for block in self.blocks: + x = block(x) + + # Apply final layer norm + x = self.ln_f(x) + + # Get logits + logits = self.lm_head(x) + + # If we have targets, compute the loss + if targets is not None: + loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1)) + return logits, loss + + return logits + + @torch.no_grad() + def generate(self, idx, max_new_tokens, temperature=1.0, sample_fn=None): + # idx is (B, T) array of indices in the current context + for _ in range(max_new_tokens): + # Crop context to block_size + context = idx[:, -self.config.block_size :] + # Get predictions + logits = self(context) + # Focus only on the last time step + logits = logits[:, -1, :] + + # Use custom sampling function if provided, otherwise default sampling + if sample_fn is not None: + idx_next = sample_fn(logits) + else: + # Default sampling logic + logits = logits / temperature + probs = F.softmax(logits, dim=-1) + idx_next = torch.multinomial(probs, num_samples=1) + + # Append sampled index to the running sequence + idx = torch.cat((idx, idx_next), dim=1) + + return idx + + +# Configuration class to hold hyperparameters +class GPTConfig: + def __init__( + self, + vocab_size, + block_size, + n_layer=6, + n_embd=384, + num_heads=6, + dropout=0.1, + ): + self.vocab_size = vocab_size + self.block_size = block_size + self.n_layer = n_layer + self.n_embd = n_embd + self.num_heads = num_heads + self.head_size = n_embd // num_heads # Derived from n_embd + self.dropout = dropout + + +class CharacterTokenizer: + def __init__(self): + # Simplified special tokens - keep only what we use + self.special_tokens = { + "BOS": "<|bos|>", # Beginning of sequence + "EOS": "<|eos|>", # End of sequence + } + + self.char_to_idx = {token: idx for idx, token in enumerate(self.special_tokens.values())} + self.idx_to_char = {idx: token for idx, token in enumerate(self.special_tokens.values())} + self.vocab_size = len(self.special_tokens) + + # Store only needed special token indices + self.bos_idx = self.char_to_idx[self.special_tokens["BOS"]] + self.eos_idx = self.char_to_idx[self.special_tokens["EOS"]] + + def fit(self, text): + """Build vocabulary from text.""" + for char in sorted(set(text)): + if char not in self.char_to_idx: + idx = len(self.char_to_idx) + self.char_to_idx[char] = idx + self.idx_to_char[idx] = char + self.vocab_size = len(self.char_to_idx) + return self + + def encode(self, text, add_special_tokens=True): + """Convert text to token indices.""" + indices = [] + if add_special_tokens: + indices.append(self.bos_idx) + indices.extend(self.char_to_idx[char] for char in text) + if add_special_tokens: + indices.append(self.eos_idx) + return indices + + def decode(self, indices, remove_special_tokens=True): + """Convert token indices back to text.""" + chars = [] + special_values = set(self.special_tokens.values()) + + for idx in indices: + char = self.idx_to_char[idx] + if not (remove_special_tokens and char in special_values): + chars.append(char) + return "".join(chars) + + def batch_encode(self, texts, max_length=None, padding=True): + """Encode a batch of texts.""" + encoded = [self.encode(text) for text in texts] + + if max_length is None and padding: + max_length = max(len(seq) for seq in encoded) + + if padding: + # Pad sequences to max_length + encoded = [ + seq + [self.pad_idx] * (max_length - len(seq)) for seq in encoded + ] + + return encoded + + def save_vocab(self, path): + """Save vocabulary to file.""" + vocab_data = { + "char_to_idx": self.char_to_idx, + "special_tokens": self.special_tokens, + } + with open(path, "w") as f: + json.dump(vocab_data, f, indent=2) + + @classmethod + def load_vocab(cls, path): + """Load vocabulary from file.""" + with open(path) as f: + vocab_data = json.load(f) + + tokenizer = cls() + tokenizer.char_to_idx = vocab_data["char_to_idx"] + tokenizer.special_tokens = vocab_data["special_tokens"] + tokenizer.idx_to_char = { + idx: char for char, idx in tokenizer.char_to_idx.items() + } + tokenizer.vocab_size = len(tokenizer.char_to_idx) + + return tokenizer + + +def get_batch(data, batch_size, block_size, device="cpu"): + """Generate a small batch of data for training""" + ix = torch.randint(len(data) - block_size, (batch_size,)) + x = torch.stack([torch.tensor(data[i : i + block_size]) for i in ix]) + y = torch.stack([torch.tensor(data[i + 1 : i + block_size + 1]) for i in ix]) + x, y = x.to(device), y.to(device) + return x, y + + +def train_model( + model, train_data, config, epochs=10, batch_size=32, learning_rate=3e-4 +): + model.train() + optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate) + + # Create progress bar for epochs + pbar = tqdm(range(epochs), desc="Training") + + losses = [] + for epoch in pbar: + # Get random batch and compute loss + X, Y = get_batch(train_data, batch_size, config.block_size) + logits, loss = model(X, Y) + + # Backward pass and optimize + optimizer.zero_grad() + loss.backward() + optimizer.step() + + losses.append(loss.item()) + + # Update progress bar + pbar.set_postfix({"loss": f"{loss.item():.4f}"}) + + return losses + + +def generate_text(model, tokenizer, start_text, max_new_tokens=50, temperature=0.7, top_k=10): + model.eval() + context = torch.tensor(tokenizer.encode(start_text), dtype=torch.long).unsqueeze(0) + + def sample(logits, top_k=top_k): + # Apply temperature + logits = logits / temperature + + # Apply top-k filtering + k = min(top_k, logits.size(-1)) # Safety check + values, _ = torch.topk(logits, k) + min_value = values[:, -1].unsqueeze(-1) + logits = torch.where(logits < min_value, float('-inf'), logits) + + # Get probabilities and sample + probs = F.softmax(logits, dim=-1) + return torch.multinomial(probs, num_samples=1) + + generated = model.generate( + context, + max_new_tokens=max_new_tokens, + temperature=temperature, + sample_fn=sample, + ) + + return tokenizer.decode(generated[0].tolist()) + + +# Fun training data - a mix of movie quotes! +movie_quotes = """ +To infinity and beyond! +I'll be back. +May the Force be with you. +Life is like a box of chocolates. +Here's looking at you, kid. +There's no place like home. +I am your father. Or your mother. +E.T. phone home. Or not. +I see dead people and I'm not afraid. +You're gonna need a bigger boat. +Elementary, my dear Watson. +I'll have what she's having. +You can't handle the truth! +Houston, we have a problem. +Do, or do not. There is no try. +I feel the need... the need for speed! +They may take our lives, but they'll never take our freedom! +Why so serious? +I'm king of the world! +Hasta la vista, baby. +My name is Bond, James Bond. +I'm going to make him an offer he can't refuse. +You're gonna need a bigger boat. +Let's put a smile on that face. +I'm the king of the world! +What's the matter with you people? +I'm not even supposed to be here today. +Give me a break! Give peace a chance. +All right, Mr. DeMille, I'm ready for my close-up. +C'mon, let's go bowling! +Big Lebowski was a great movie. +Ich bin ein Berliner, while my name is Billy Turf. +Dude, where's my car? +Positively fourth street. +A little bit of South Philly never hurt nobody. +""" + +if __name__ == "__main__": + # Create and fit tokenizer + tokenizer = CharacterTokenizer() + tokenizer.fit(movie_quotes) + print(f"Vocabulary size: {tokenizer.vocab_size}") + + # Convert text to tokens + data = tokenizer.encode(movie_quotes) + + # Create model config with simplified parameters + config = GPTConfig( + vocab_size=tokenizer.vocab_size, + block_size=64, + n_layer=6, + n_embd=256, + num_heads=8, + dropout=0.2, + ) + + # Create model + model = MiniGPT(config) + print("Training model...") + + # Train model + losses = train_model(model, data, config, epochs=750, batch_size=8) + + # Generate some text! + print("\nGenerating text...\n") + prompts = ["I am", "Life is", "May the", "To infinity", "My name is"] + + for prompt in prompts: + generated = generate_text(model, tokenizer, prompt, max_new_tokens=50) + print(f"Prompt: '{prompt}'") + print(f"Generated: {generated}") diff --git a/src/poc/deepdream.py b/src/poc/deepdream.py new file mode 100644 index 0000000..2ba77db --- /dev/null +++ b/src/poc/deepdream.py @@ -0,0 +1,157 @@ +import torch +import torch.nn as nn +import torchvision.models as models +import torchvision.transforms as transforms +from PIL import Image +import numpy as np +import matplotlib.pyplot as plt + + +class DeepDreamer: + def __init__(self, model_name="inception_v3", layer_name="Mixed_5b"): + # Update model initialization to use weights parameter + weights = models.Inception_V3_Weights.IMAGENET1K_V1 + self.model = models.inception_v3(weights=weights) + self.model.eval() + + # Dictionary to store activations + self.activations = {} + self.layer_name = layer_name + + # Register forward hook + for name, layer in self.model.named_modules(): + if name == layer_name: + layer.register_forward_hook(self._get_activation(name)) + + def _get_activation(self, name): + def hook(model, input, output): + self.activations[name] = output + + return hook + + def preprocess_image(self, image_path, size=512): + image = Image.open(image_path) + # Resize while maintaining aspect ratio + ratio = size / min(image.size) + new_size = tuple(int(x * ratio) for x in image.size) + image = image.resize(new_size, Image.LANCZOS) + + # Convert to tensor and normalize + loader = transforms.Compose( + [ + transforms.ToTensor(), + transforms.Normalize( + mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225] + ), + ] + ) + image = loader(image).unsqueeze(0) + return image + + def deprocess_image(self, tensor): + # Convert back to image + tensor = tensor.squeeze(0) + # Denormalize + tensor = tensor * torch.tensor([0.229, 0.224, 0.225]).view( + 3, 1, 1 + ) + torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1) + tensor = torch.clamp(tensor, 0, 1) + + # Convert to PIL image + transform = transforms.ToPILImage() + return transform(tensor) + + def dream( + self, image_path, num_iterations=20, lr=0.01, octave_scale=1.4, num_octaves=4 + ): + # Load base image + base_img = self.preprocess_image(image_path) + + # Create octaves pyramid + octaves = [] + for _ in range(num_octaves - 1): + octaves.append(base_img) + base_img = torch.nn.functional.interpolate( + base_img, + scale_factor=1 / octave_scale, + mode="bicubic", + align_corners=False, + ) + + detail = None + for octave_idx, octave_base in enumerate(reversed(octaves)): + if detail is not None: + detail = torch.nn.functional.interpolate( + detail, + size=octave_base.shape[2:], + mode="bilinear", + align_corners=False, + ) + + # Add detail from previous octave to current + input_img = octave_base + detail if detail is not None else octave_base + input_img = input_img.detach() # Detach from previous graph + input_img.requires_grad_(True) # Enable gradients for new iteration + + for i in range(num_iterations): + # Clear gradients at start of iteration + if input_img.grad is not None: + input_img.grad.zero_() + + # Forward pass + out = self.model(input_img) + activation = self.activations[self.layer_name] + + # Calculate loss + loss = activation.norm() # Remove negative sign for maximization + + # Backward pass + loss.backward() + + # Ensure we have gradients + if input_img.grad is not None: + # Gradient normalization and update + grad = input_img.grad.data + grad_mean = grad.abs().mean() + grad_norm = grad / (grad_mean + 1e-8) + input_img.data += lr * grad_norm + + # Apply image regularization + input_img.data = torch.clamp(input_img.data, -1, 1) + + if (i + 1) % 5 == 0: + print( + f"Octave {octave_idx+1}/{num_octaves}, " + f"Iteration {i+1}/{num_iterations}, " + f"Loss: {loss.item():.3f}" + ) + + # Extract detail produced in this octave + detail = input_img.data - octave_base + + return self.deprocess_image(input_img.detach()) + + +# Example usage +def generate_dream(image_path, output_path, iterations=7, lr=0.09): + dreamer = DeepDreamer() + dreamed_image = dreamer.dream( + image_path, num_iterations=iterations, lr=lr, octave_scale=1.9, num_octaves=4 + ) + dreamed_image.save(output_path) + + # Display original and dreamed images + fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 7)) + ax1.imshow(Image.open(image_path)) + ax1.set_title("Original Image") + ax1.axis("off") + + ax2.imshow(dreamed_image) + ax2.set_title("DeepDream Image") + ax2.axis("off") + + plt.show() + + +if __name__ == "__main__": + generate_dream("images/cat.jpg", "images/dreamed_cat.jpg")