From de6cc17ce7b4ed4ce12265c5b055550bbdc4fdc8 Mon Sep 17 00:00:00 2001 From: macemoth Date: Thu, 1 May 2025 10:03:09 +0200 Subject: [PATCH] add python code --- bengio.py | 248 +++++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 1 + 2 files changed, 249 insertions(+) create mode 100644 bengio.py create mode 100644 requirements.txt diff --git a/bengio.py b/bengio.py new file mode 100644 index 0000000..5eed769 --- /dev/null +++ b/bengio.py @@ -0,0 +1,248 @@ +import sys +import random + +import numpy as np + +class NeuralProbabilisticLanguageModel: + def __init__(self, vocab_size, embedding_dim=100, context_size=3, hidden_dim=50, learning_rate=0.01): + """ + Initialize Bengio's Neural Probabilistic Language Model + + Args: + vocab_size: Size of the vocabulary + embedding_dim: Dimension of word embeddings + context_size: Number of previous words to consider + hidden_dim: Dimension of hidden layer + learning_rate: Learning rate for optimization + """ + self.vocab_size = vocab_size + self.embedding_dim = embedding_dim + self.context_size = context_size + self.hidden_dim = hidden_dim + self.learning_rate = learning_rate + + # Initialize model parameters + # C: Word embedding matrix + self.C = np.random.randn(vocab_size, embedding_dim) * 0.1 + + # Parameters for hidden layer + self.H = np.random.randn(context_size * embedding_dim, hidden_dim) * 0.1 + self.b_h = np.zeros(hidden_dim) + + # Parameters for output layer + self.U = np.random.randn(hidden_dim, vocab_size) * 0.1 + self.W = np.random.randn(context_size * embedding_dim, vocab_size) * 0.1 + self.b_o = np.zeros(vocab_size) + + def forward(self, context_words): + """ + Forward pass of the model + + Args: + context_words: List of word indices for the context + + Returns: + Probability distribution over next words + """ + # Lookup embeddings for context words + embeddings = self.C[context_words] + x = embeddings.flatten() # Concatenate embeddings + + # Compute hidden layer activation + h = np.tanh(np.dot(x, self.H) + self.b_h) + + # Compute output layer + # Direct connections from input to output (shortcut connections) + y = np.dot(h, self.U) + np.dot(x, self.W) + self.b_o + + # Apply softmax to get probabilities + exp_y = np.exp(y - np.max(y)) # Subtract max for numerical stability + probabilities = exp_y / np.sum(exp_y) + + return probabilities, h, x + + def compute_loss(self, probabilities, target_word): + """ + Compute cross-entropy loss + + Args: + probabilities: Predicted probability distribution + target_word: Index of the target word + + Returns: + Cross-entropy loss + """ + return -np.log(probabilities[target_word]) + + def backward(self, context_words, target_word, probabilities, h, x): + """ + Backward pass for parameter updates + + Args: + context_words: List of word indices for the context + target_word: Index of the target word + probabilities: Output probabilities from forward pass + h: Hidden layer activation + x: Input vector (concatenated embeddings) + """ + # Gradient for output layer + d_y = probabilities.copy() + d_y[target_word] -= 1 + + # Gradients for parameters + d_U = np.outer(h, d_y) + d_W = np.outer(x, d_y) + d_b_o = d_y + + # Gradient for hidden layer + d_h = np.dot(d_y, self.U.T) + d_h_input = d_h * (1 - h**2) # Derivative of tanh + + d_H = np.outer(x, d_h_input) + d_b_h = d_h_input + + # Gradient for embeddings + d_x = np.dot(d_h_input, self.H.T) + np.dot(d_y, self.W.T) + d_C = np.zeros_like(self.C) + + # Update embeddings for context words + for i, word_idx in enumerate(context_words): + start = i * self.embedding_dim + end = (i + 1) * self.embedding_dim + d_C[word_idx] += d_x[start:end] + + # Update parameters + self.U -= self.learning_rate * d_U + self.W -= self.learning_rate * d_W + self.b_o -= self.learning_rate * d_b_o + self.H -= self.learning_rate * d_H + self.b_h -= self.learning_rate * d_b_h + self.C -= self.learning_rate * d_C + + def train_step(self, context_words, target_word): + """ + Perform one training step + + Args: + context_words: List of word indices for the context + target_word: Index of the target word + + Returns: + Loss for this example + """ + probabilities, h, x = self.forward(context_words) + loss = self.compute_loss(probabilities, target_word) + self.backward(context_words, target_word, probabilities, h, x) + return loss + + def train(self, data, n_epochs=5): + """ + Train the model on a dataset + + Args: + data: List of (context_words, target_word) tuples + n_epochs: Number of training epochs + + Returns: + List of average losses per epoch + """ + losses = [] + + for epoch in range(n_epochs): + epoch_loss = 0 + for context_words, target_word in data: + epoch_loss += self.train_step(context_words, target_word) + + avg_loss = epoch_loss / len(data) + losses.append(avg_loss) + print(f"Epoch {epoch+1}/{n_epochs}, Loss: {avg_loss:.4f}") + + return losses + + def predict_next_word(self, context_words, temperature=1.0): + """ + Predict the next word given a context with temperature sampling + + Args: + context_words: List of word indices for the context + temperature: Controls randomness (higher = more random, lower = more deterministic) + temperature=0 is equivalent to argmax (greedy) + temperature=1.0 keeps the original distribution + + Returns: + Index of the sampled next word + """ + probabilities, _, _ = self.forward(context_words) + + if temperature == 0: + # Greedy sampling (argmax) + return np.argmax(probabilities) + + # Apply temperature scaling + scaled_logits = np.log(probabilities) / temperature + + # Re-normalize to get a valid probability distribution + exp_scaled = np.exp(scaled_logits - np.max(scaled_logits)) # Subtract max for numerical stability + scaled_probs = exp_scaled / np.sum(exp_scaled) + + # Sample from the scaled distribution + return np.random.choice(len(scaled_probs), p=scaled_probs) + + +# Example usage +def preprocess_text(text, vocab, context_size): + """Convert text to training examples""" + words = text.split() + word_to_idx = {word: idx for idx, word in enumerate(vocab)} + + # Create training examples + examples = [] + for i in range(len(words) - context_size): + context = [word_to_idx[words[i+j]] for j in range(context_size)] + target = word_to_idx[words[i+context_size]] + examples.append((context, target)) + + return examples + +# Small example +if __name__ == "__main__": + if len(sys.argv) < 2: + print("synopsis: python3 main.py ") + exit(1) + with open(sys.argv[1], "r") as buf: + text = buf.read() + if len(sys.argv) > 2: + n_ctx = int(sys.argv[2]) + else: + n_ctx = 2 + if len(sys.argv) > 3: + n_epochs = int(sys.argv[3]) + else: + n_epochs = 10 + + n_predict = 100 + + words = text.split() + vocab = sorted(set(words)) + vocab.append(" ") + + examples = preprocess_text(text, vocab, n_ctx) + + model = NeuralProbabilisticLanguageModel( + vocab_size=len(vocab), + embedding_dim=10, + context_size=n_ctx, + hidden_dim=8 + ) + + losses = model.train(examples, n_epochs=n_epochs) + + # Test model prediction + pred = [vocab.index(w) for w in words[:n_ctx]] + for i in range(n_predict): + context = pred[-n_ctx:] + predicted_idx = model.predict_next_word(context) + pred.append(predicted_idx) + + output = [vocab[i] for i in pred] + print(" ".join(output)) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..24ce15a --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +numpy