add python code
This commit is contained in:
248
bengio.py
Normal file
248
bengio.py
Normal file
@@ -0,0 +1,248 @@
|
||||
import sys
|
||||
import random
|
||||
|
||||
import numpy as np
|
||||
|
||||
class NeuralProbabilisticLanguageModel:
|
||||
def __init__(self, vocab_size, embedding_dim=100, context_size=3, hidden_dim=50, learning_rate=0.01):
|
||||
"""
|
||||
Initialize Bengio's Neural Probabilistic Language Model
|
||||
|
||||
Args:
|
||||
vocab_size: Size of the vocabulary
|
||||
embedding_dim: Dimension of word embeddings
|
||||
context_size: Number of previous words to consider
|
||||
hidden_dim: Dimension of hidden layer
|
||||
learning_rate: Learning rate for optimization
|
||||
"""
|
||||
self.vocab_size = vocab_size
|
||||
self.embedding_dim = embedding_dim
|
||||
self.context_size = context_size
|
||||
self.hidden_dim = hidden_dim
|
||||
self.learning_rate = learning_rate
|
||||
|
||||
# Initialize model parameters
|
||||
# C: Word embedding matrix
|
||||
self.C = np.random.randn(vocab_size, embedding_dim) * 0.1
|
||||
|
||||
# Parameters for hidden layer
|
||||
self.H = np.random.randn(context_size * embedding_dim, hidden_dim) * 0.1
|
||||
self.b_h = np.zeros(hidden_dim)
|
||||
|
||||
# Parameters for output layer
|
||||
self.U = np.random.randn(hidden_dim, vocab_size) * 0.1
|
||||
self.W = np.random.randn(context_size * embedding_dim, vocab_size) * 0.1
|
||||
self.b_o = np.zeros(vocab_size)
|
||||
|
||||
def forward(self, context_words):
|
||||
"""
|
||||
Forward pass of the model
|
||||
|
||||
Args:
|
||||
context_words: List of word indices for the context
|
||||
|
||||
Returns:
|
||||
Probability distribution over next words
|
||||
"""
|
||||
# Lookup embeddings for context words
|
||||
embeddings = self.C[context_words]
|
||||
x = embeddings.flatten() # Concatenate embeddings
|
||||
|
||||
# Compute hidden layer activation
|
||||
h = np.tanh(np.dot(x, self.H) + self.b_h)
|
||||
|
||||
# Compute output layer
|
||||
# Direct connections from input to output (shortcut connections)
|
||||
y = np.dot(h, self.U) + np.dot(x, self.W) + self.b_o
|
||||
|
||||
# Apply softmax to get probabilities
|
||||
exp_y = np.exp(y - np.max(y)) # Subtract max for numerical stability
|
||||
probabilities = exp_y / np.sum(exp_y)
|
||||
|
||||
return probabilities, h, x
|
||||
|
||||
def compute_loss(self, probabilities, target_word):
|
||||
"""
|
||||
Compute cross-entropy loss
|
||||
|
||||
Args:
|
||||
probabilities: Predicted probability distribution
|
||||
target_word: Index of the target word
|
||||
|
||||
Returns:
|
||||
Cross-entropy loss
|
||||
"""
|
||||
return -np.log(probabilities[target_word])
|
||||
|
||||
def backward(self, context_words, target_word, probabilities, h, x):
|
||||
"""
|
||||
Backward pass for parameter updates
|
||||
|
||||
Args:
|
||||
context_words: List of word indices for the context
|
||||
target_word: Index of the target word
|
||||
probabilities: Output probabilities from forward pass
|
||||
h: Hidden layer activation
|
||||
x: Input vector (concatenated embeddings)
|
||||
"""
|
||||
# Gradient for output layer
|
||||
d_y = probabilities.copy()
|
||||
d_y[target_word] -= 1
|
||||
|
||||
# Gradients for parameters
|
||||
d_U = np.outer(h, d_y)
|
||||
d_W = np.outer(x, d_y)
|
||||
d_b_o = d_y
|
||||
|
||||
# Gradient for hidden layer
|
||||
d_h = np.dot(d_y, self.U.T)
|
||||
d_h_input = d_h * (1 - h**2) # Derivative of tanh
|
||||
|
||||
d_H = np.outer(x, d_h_input)
|
||||
d_b_h = d_h_input
|
||||
|
||||
# Gradient for embeddings
|
||||
d_x = np.dot(d_h_input, self.H.T) + np.dot(d_y, self.W.T)
|
||||
d_C = np.zeros_like(self.C)
|
||||
|
||||
# Update embeddings for context words
|
||||
for i, word_idx in enumerate(context_words):
|
||||
start = i * self.embedding_dim
|
||||
end = (i + 1) * self.embedding_dim
|
||||
d_C[word_idx] += d_x[start:end]
|
||||
|
||||
# Update parameters
|
||||
self.U -= self.learning_rate * d_U
|
||||
self.W -= self.learning_rate * d_W
|
||||
self.b_o -= self.learning_rate * d_b_o
|
||||
self.H -= self.learning_rate * d_H
|
||||
self.b_h -= self.learning_rate * d_b_h
|
||||
self.C -= self.learning_rate * d_C
|
||||
|
||||
def train_step(self, context_words, target_word):
|
||||
"""
|
||||
Perform one training step
|
||||
|
||||
Args:
|
||||
context_words: List of word indices for the context
|
||||
target_word: Index of the target word
|
||||
|
||||
Returns:
|
||||
Loss for this example
|
||||
"""
|
||||
probabilities, h, x = self.forward(context_words)
|
||||
loss = self.compute_loss(probabilities, target_word)
|
||||
self.backward(context_words, target_word, probabilities, h, x)
|
||||
return loss
|
||||
|
||||
def train(self, data, n_epochs=5):
|
||||
"""
|
||||
Train the model on a dataset
|
||||
|
||||
Args:
|
||||
data: List of (context_words, target_word) tuples
|
||||
n_epochs: Number of training epochs
|
||||
|
||||
Returns:
|
||||
List of average losses per epoch
|
||||
"""
|
||||
losses = []
|
||||
|
||||
for epoch in range(n_epochs):
|
||||
epoch_loss = 0
|
||||
for context_words, target_word in data:
|
||||
epoch_loss += self.train_step(context_words, target_word)
|
||||
|
||||
avg_loss = epoch_loss / len(data)
|
||||
losses.append(avg_loss)
|
||||
print(f"Epoch {epoch+1}/{n_epochs}, Loss: {avg_loss:.4f}")
|
||||
|
||||
return losses
|
||||
|
||||
def predict_next_word(self, context_words, temperature=1.0):
|
||||
"""
|
||||
Predict the next word given a context with temperature sampling
|
||||
|
||||
Args:
|
||||
context_words: List of word indices for the context
|
||||
temperature: Controls randomness (higher = more random, lower = more deterministic)
|
||||
temperature=0 is equivalent to argmax (greedy)
|
||||
temperature=1.0 keeps the original distribution
|
||||
|
||||
Returns:
|
||||
Index of the sampled next word
|
||||
"""
|
||||
probabilities, _, _ = self.forward(context_words)
|
||||
|
||||
if temperature == 0:
|
||||
# Greedy sampling (argmax)
|
||||
return np.argmax(probabilities)
|
||||
|
||||
# Apply temperature scaling
|
||||
scaled_logits = np.log(probabilities) / temperature
|
||||
|
||||
# Re-normalize to get a valid probability distribution
|
||||
exp_scaled = np.exp(scaled_logits - np.max(scaled_logits)) # Subtract max for numerical stability
|
||||
scaled_probs = exp_scaled / np.sum(exp_scaled)
|
||||
|
||||
# Sample from the scaled distribution
|
||||
return np.random.choice(len(scaled_probs), p=scaled_probs)
|
||||
|
||||
|
||||
# Example usage
|
||||
def preprocess_text(text, vocab, context_size):
|
||||
"""Convert text to training examples"""
|
||||
words = text.split()
|
||||
word_to_idx = {word: idx for idx, word in enumerate(vocab)}
|
||||
|
||||
# Create training examples
|
||||
examples = []
|
||||
for i in range(len(words) - context_size):
|
||||
context = [word_to_idx[words[i+j]] for j in range(context_size)]
|
||||
target = word_to_idx[words[i+context_size]]
|
||||
examples.append((context, target))
|
||||
|
||||
return examples
|
||||
|
||||
# Small example
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) < 2:
|
||||
print("synopsis: python3 main.py <file> <n_ctx> <n_epochs>")
|
||||
exit(1)
|
||||
with open(sys.argv[1], "r") as buf:
|
||||
text = buf.read()
|
||||
if len(sys.argv) > 2:
|
||||
n_ctx = int(sys.argv[2])
|
||||
else:
|
||||
n_ctx = 2
|
||||
if len(sys.argv) > 3:
|
||||
n_epochs = int(sys.argv[3])
|
||||
else:
|
||||
n_epochs = 10
|
||||
|
||||
n_predict = 100
|
||||
|
||||
words = text.split()
|
||||
vocab = sorted(set(words))
|
||||
vocab.append(" ")
|
||||
|
||||
examples = preprocess_text(text, vocab, n_ctx)
|
||||
|
||||
model = NeuralProbabilisticLanguageModel(
|
||||
vocab_size=len(vocab),
|
||||
embedding_dim=10,
|
||||
context_size=n_ctx,
|
||||
hidden_dim=8
|
||||
)
|
||||
|
||||
losses = model.train(examples, n_epochs=n_epochs)
|
||||
|
||||
# Test model prediction
|
||||
pred = [vocab.index(w) for w in words[:n_ctx]]
|
||||
for i in range(n_predict):
|
||||
context = pred[-n_ctx:]
|
||||
predicted_idx = model.predict_next_word(context)
|
||||
pred.append(predicted_idx)
|
||||
|
||||
output = [vocab[i] for i in pred]
|
||||
print(" ".join(output))
|
||||
Reference in New Issue
Block a user