import sys import random import numpy as np class NeuralProbabilisticLanguageModel: def __init__(self, vocab_size, embedding_dim=100, context_size=3, hidden_dim=50, learning_rate=0.01): """ Initialize Bengio's Neural Probabilistic Language Model Args: vocab_size: Size of the vocabulary embedding_dim: Dimension of word embeddings context_size: Number of previous words to consider hidden_dim: Dimension of hidden layer learning_rate: Learning rate for optimization """ self.vocab_size = vocab_size self.embedding_dim = embedding_dim self.context_size = context_size self.hidden_dim = hidden_dim self.learning_rate = learning_rate # Initialize model parameters # C: Word embedding matrix self.C = np.random.randn(vocab_size, embedding_dim) * 0.1 # Parameters for hidden layer self.H = np.random.randn(context_size * embedding_dim, hidden_dim) * 0.1 self.b_h = np.zeros(hidden_dim) # Parameters for output layer self.U = np.random.randn(hidden_dim, vocab_size) * 0.1 self.W = np.random.randn(context_size * embedding_dim, vocab_size) * 0.1 self.b_o = np.zeros(vocab_size) def forward(self, context_words): """ Forward pass of the model Args: context_words: List of word indices for the context Returns: Probability distribution over next words """ # Lookup embeddings for context words embeddings = self.C[context_words] x = embeddings.flatten() # Concatenate embeddings # Compute hidden layer activation h = np.tanh(np.dot(x, self.H) + self.b_h) # Compute output layer # Direct connections from input to output (shortcut connections) y = np.dot(h, self.U) + np.dot(x, self.W) + self.b_o # Apply softmax to get probabilities exp_y = np.exp(y - np.max(y)) # Subtract max for numerical stability probabilities = exp_y / np.sum(exp_y) return probabilities, h, x def compute_loss(self, probabilities, target_word): """ Compute cross-entropy loss Args: probabilities: Predicted probability distribution target_word: Index of the target word Returns: Cross-entropy loss """ return -np.log(probabilities[target_word]) def backward(self, context_words, target_word, probabilities, h, x): """ Backward pass for parameter updates Args: context_words: List of word indices for the context target_word: Index of the target word probabilities: Output probabilities from forward pass h: Hidden layer activation x: Input vector (concatenated embeddings) """ # Gradient for output layer d_y = probabilities.copy() d_y[target_word] -= 1 # Gradients for parameters d_U = np.outer(h, d_y) d_W = np.outer(x, d_y) d_b_o = d_y # Gradient for hidden layer d_h = np.dot(d_y, self.U.T) d_h_input = d_h * (1 - h**2) # Derivative of tanh d_H = np.outer(x, d_h_input) d_b_h = d_h_input # Gradient for embeddings d_x = np.dot(d_h_input, self.H.T) + np.dot(d_y, self.W.T) d_C = np.zeros_like(self.C) # Update embeddings for context words for i, word_idx in enumerate(context_words): start = i * self.embedding_dim end = (i + 1) * self.embedding_dim d_C[word_idx] += d_x[start:end] # Update parameters self.U -= self.learning_rate * d_U self.W -= self.learning_rate * d_W self.b_o -= self.learning_rate * d_b_o self.H -= self.learning_rate * d_H self.b_h -= self.learning_rate * d_b_h self.C -= self.learning_rate * d_C def train_step(self, context_words, target_word): """ Perform one training step Args: context_words: List of word indices for the context target_word: Index of the target word Returns: Loss for this example """ probabilities, h, x = self.forward(context_words) loss = self.compute_loss(probabilities, target_word) self.backward(context_words, target_word, probabilities, h, x) return loss def train(self, data, n_epochs=5): """ Train the model on a dataset Args: data: List of (context_words, target_word) tuples n_epochs: Number of training epochs Returns: List of average losses per epoch """ losses = [] for epoch in range(n_epochs): epoch_loss = 0 for context_words, target_word in data: epoch_loss += self.train_step(context_words, target_word) avg_loss = epoch_loss / len(data) losses.append(avg_loss) print(f"Epoch {epoch+1}/{n_epochs}, Loss: {avg_loss:.4f}") return losses def predict_next_word(self, context_words, temperature=1.0): """ Predict the next word given a context with temperature sampling Args: context_words: List of word indices for the context temperature: Controls randomness (higher = more random, lower = more deterministic) temperature=0 is equivalent to argmax (greedy) temperature=1.0 keeps the original distribution Returns: Index of the sampled next word """ probabilities, _, _ = self.forward(context_words) if temperature == 0: # Greedy sampling (argmax) return np.argmax(probabilities) # Apply temperature scaling scaled_logits = np.log(probabilities) / temperature # Re-normalize to get a valid probability distribution exp_scaled = np.exp(scaled_logits - np.max(scaled_logits)) # Subtract max for numerical stability scaled_probs = exp_scaled / np.sum(exp_scaled) # Sample from the scaled distribution return np.random.choice(len(scaled_probs), p=scaled_probs) # Example usage def preprocess_text(text, vocab, context_size): """Convert text to training examples""" words = text.split() word_to_idx = {word: idx for idx, word in enumerate(vocab)} # Create training examples examples = [] for i in range(len(words) - context_size): context = [word_to_idx[words[i+j]] for j in range(context_size)] target = word_to_idx[words[i+context_size]] examples.append((context, target)) return examples # Small example if __name__ == "__main__": if len(sys.argv) < 2: print("synopsis: python3 main.py ") exit(1) with open(sys.argv[1], "r") as buf: text = buf.read() if len(sys.argv) > 2: n_ctx = int(sys.argv[2]) else: n_ctx = 2 if len(sys.argv) > 3: n_epochs = int(sys.argv[3]) else: n_epochs = 10 n_predict = 100 words = text.split() vocab = sorted(set(words)) vocab.append(" ") examples = preprocess_text(text, vocab, n_ctx) model = NeuralProbabilisticLanguageModel( vocab_size=len(vocab), embedding_dim=10, context_size=n_ctx, hidden_dim=8 ) losses = model.train(examples, n_epochs=n_epochs) # Test model prediction pred = [vocab.index(w) for w in words[:n_ctx]] for i in range(n_predict): context = pred[-n_ctx:] predicted_idx = model.predict_next_word(context) pred.append(predicted_idx) output = [vocab[i] for i in pred] print(" ".join(output))