captcha_solver/main.py

483 lines
15 KiB
Python

"""
Title: OCR model for reading Captchas
Author: [A_K_Nain](https://twitter.com/A_K_Nain)
Date created: 2020/06/14
Last modified: 2024/03/13
Description: How to implement an OCR model using CNNs, RNNs and CTC loss.
Accelerator: GPU
Converted to Keras 3 by: [Sitam Meur](https://github.com/sitamgithub-MSIT)
"""
"""
## Introduction
This example demonstrates a simple OCR model built with the Functional API. Apart from
combining CNN and RNN, it also illustrates how you can instantiate a new layer
and use it as an "Endpoint layer" for implementing CTC loss. For a detailed
guide to layer subclassing, please check out
[this page](https://keras.io/guides/making_new_layers_and_models_via_subclassing/)
in the developer guides.
"""
"""
## Setup
"""
import requests
import re
from os import makedirs, walk, environ, path, listdir
from dotenv import load_dotenv
load_dotenv()
# Constants
IMAGE_HEIGHT = 70
IMAGE_WIDTH = 200
DOWNLOAD_PATH = environ.get("DOWNLOAD_PATH")
TESTING_PATH = environ.get("TESTING_PATH")
TRAINING_PATH = environ.get("TRAINING_PATH")
PERCENT_OF_TESTING = int(environ.get("PERCENT_OF_TESTING"))
environ["KERAS_BACKEND"] = "tensorflow"
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import tensorflow as tf
import keras
from keras import ops
from keras import layers
def prepare_dirs():
"""Create necessary directories for downloading and storing images."""
makedirs(DOWNLOAD_PATH, exist_ok=True)
makedirs(TESTING_PATH, exist_ok=True)
makedirs(TRAINING_PATH, exist_ok=True)
def fetch_captcha(id):
"""Fetch a captcha image by its ID and save it to the download path."""
try:
response = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/{id}")
response.raise_for_status()
captcha = response.json()["captcha"]
captcha_file_path = path.join(DOWNLOAD_PATH, f"{captcha['hash']}_{captcha['solution']}.jpeg")
with open(captcha_file_path, 'wb') as captcha_file:
captcha_file.write(b64decode(captcha['image']))
except requests.RequestException as e:
print(f"Error fetching captcha {id}: {e}")
def search_saved_captcha(hash, path):
"""Check if a captcha with the given hash exists in the specified path."""
regex = re.compile(f"{hash}_\\w{{6}}\\.jpeg")
for _, _, files in walk(path):
for file in files:
if regex.match(file):
return True
return False
def search_and_download_new(captchas):
"""Search for new captchas and download them if they don't already exist."""
for captcha in captchas:
id = captcha["id"]
hash = captcha["hash"]
if not (search_saved_captcha(hash, TRAINING_PATH) or
search_saved_captcha(hash, TESTING_PATH) or
search_saved_captcha(hash, DOWNLOAD_PATH)):
fetch_captcha(id)
def sort_datasets():
"""Sort downloaded captchas into training and testing datasets."""
amount_of_new_data = len([file for file in listdir(DOWNLOAD_PATH) if path.isfile(path.join(DOWNLOAD_PATH, file))])
amount_to_send_to_test = round(amount_of_new_data * (PERCENT_OF_TESTING / 100))
files = listdir(DOWNLOAD_PATH)
for index, file in enumerate(files):
if index < amount_to_send_to_test:
move(path.join(DOWNLOAD_PATH, file), TESTING_PATH)
else:
move(path.join(DOWNLOAD_PATH, file), TRAINING_PATH)
def download_dataset():
"""Download the dataset of captchas and sort them into training and testing sets."""
prepare_dirs()
try:
response = requests.get(f"{environ.get('CAPTCHA_AGGREGATOR_API')}/captcha/all")
response.raise_for_status()
captchas = response.json()["captchas"]
search_and_download_new(captchas)
sort_datasets()
except requests.RequestException as e:
print(f"Error downloading dataset: {e}")
download_dataset()
# Path to the data directory
data_dir = Path("./datasets/training")
# Get list of all the images
images = sorted(list(map(str, list(data_dir.glob("*.jpeg")))))
labels = [img.split(path.sep)[-1].split(".jpeg")[0].split("_")[1].upper() for img in images]
characters = set(char for label in labels for char in label)
characters = sorted(list(characters))
print("Number of images found: ", len(images))
print("Number of labels found: ", len(labels))
print("Number of unique characters: ", len(characters))
print("Characters present: ", characters)
# Batch size for training and validation
batch_size = 16
# Desired image dimensions
img_width = 200
img_height = 70
# Factor by which the image is going to be downsampled
# by the convolutional blocks. We will be using two
# convolution blocks and each block will have
# a pooling layer which downsample the features by a factor of 2.
# Hence total downsampling factor would be 4.
downsample_factor = 4
# Maximum length of any captcha in the dataset
# print([len(label) for label in labels])
max_length = max([len(label) for label in labels])
"""
## Preprocessing
"""
# Mapping characters to integers
char_to_num = layers.StringLookup(vocabulary=list(characters), mask_token=None)
# Mapping integers back to original characters
num_to_char = layers.StringLookup(
vocabulary=char_to_num.get_vocabulary(), mask_token=None, invert=True
)
def split_data(images, labels, train_size=0.9, shuffle=True):
# 1. Get the total size of the dataset
size = len(images)
# 2. Make an indices array and shuffle it, if required
indices = ops.arange(size)
if shuffle:
indices = keras.random.shuffle(indices)
# 3. Get the size of training samples
train_samples = int(size * train_size)
# 4. Split data into training and validation sets
x_train, y_train = images[indices[:train_samples]], labels[indices[:train_samples]]
x_valid, y_valid = images[indices[train_samples:]], labels[indices[train_samples:]]
return x_train, x_valid, y_train, y_valid
# Splitting data into training and validation sets
x_train, x_valid, y_train, y_valid = split_data(np.array(images), np.array(labels))
def encode_single_sample(img_path, label):
# 1. Read image
img = tf.io.read_file(img_path)
# 2. Decode and convert to grayscale
img = tf.io.decode_jpeg(img, channels=1)
# 3. Convert to float32 in [0, 1] range
img = tf.image.convert_image_dtype(img, tf.float32)
# 4. Resize to the desired size
img = ops.image.resize(img, [img_height, img_width])
# 5. Transpose the image because we want the time
# dimension to correspond to the width of the image.
img = ops.transpose(img, axes=[1, 0, 2])
# 6. Map the characters in label to numbers
label = char_to_num(tf.strings.unicode_split(label, input_encoding="UTF-8"))
# 7. Return a dict as our model is expecting two inputs
return {"image": img, "label": label}
"""
## Create `Dataset` objects
"""
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = (
train_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
.batch(batch_size)
.prefetch(buffer_size=tf.data.AUTOTUNE)
)
validation_dataset = tf.data.Dataset.from_tensor_slices((x_valid, y_valid))
validation_dataset = (
validation_dataset.map(encode_single_sample, num_parallel_calls=tf.data.AUTOTUNE)
.batch(batch_size)
.prefetch(buffer_size=tf.data.AUTOTUNE)
)
"""
## Visualize the data
"""
_, ax = plt.subplots(4, 4, figsize=(10, 5))
for batch in train_dataset.take(1):
images = batch["image"]
labels = batch["label"]
for i in range(16):
img = (images[i] * 255).numpy().astype("uint8")
label = tf.strings.reduce_join(num_to_char(labels[i])).numpy().decode("utf-8")
ax[i // 4, i % 4].imshow(img[:, :, 0].T, cmap="gray")
ax[i // 4, i % 4].set_title(label)
ax[i // 4, i % 4].axis("off")
plt.show()
"""
## Model
"""
def ctc_batch_cost(y_true, y_pred, input_length, label_length):
label_length = ops.cast(ops.squeeze(label_length, axis=-1), dtype="int32")
input_length = ops.cast(ops.squeeze(input_length, axis=-1), dtype="int32")
sparse_labels = ops.cast(
ctc_label_dense_to_sparse(y_true, label_length), dtype="int32"
)
y_pred = ops.log(ops.transpose(y_pred, axes=[1, 0, 2]) + keras.backend.epsilon())
return ops.expand_dims(
tf.compat.v1.nn.ctc_loss(
inputs=y_pred, labels=sparse_labels, sequence_length=input_length
),
1,
)
def ctc_label_dense_to_sparse(labels, label_lengths):
label_shape = ops.shape(labels)
num_batches_tns = ops.stack([label_shape[0]])
max_num_labels_tns = ops.stack([label_shape[1]])
def range_less_than(old_input, current_input):
return ops.expand_dims(ops.arange(ops.shape(old_input)[1]), 0) < tf.fill(
max_num_labels_tns, current_input
)
init = ops.cast(tf.fill([1, label_shape[1]], 0), dtype="bool")
dense_mask = tf.compat.v1.scan(
range_less_than, label_lengths, initializer=init, parallel_iterations=1
)
dense_mask = dense_mask[:, 0, :]
label_array = ops.reshape(
ops.tile(ops.arange(0, label_shape[1]), num_batches_tns), label_shape
)
label_ind = tf.compat.v1.boolean_mask(label_array, dense_mask)
batch_array = ops.transpose(
ops.reshape(
ops.tile(ops.arange(0, label_shape[0]), max_num_labels_tns),
tf.reverse(label_shape, [0]),
)
)
batch_ind = tf.compat.v1.boolean_mask(batch_array, dense_mask)
indices = ops.transpose(
ops.reshape(ops.concatenate([batch_ind, label_ind], axis=0), [2, -1])
)
vals_sparse = tf.compat.v1.gather_nd(labels, indices)
return tf.SparseTensor(
ops.cast(indices, dtype="int64"),
vals_sparse,
ops.cast(label_shape, dtype="int64"),
)
class CTCLayer(layers.Layer):
def __init__(self, name=None):
super().__init__(name=name)
self.loss_fn = ctc_batch_cost
def call(self, y_true, y_pred):
# Compute the training-time loss value and add it
# to the layer using `self.add_loss()`.
batch_len = ops.cast(ops.shape(y_true)[0], dtype="int64")
input_length = ops.cast(ops.shape(y_pred)[1], dtype="int64")
label_length = ops.cast(ops.shape(y_true)[1], dtype="int64")
input_length = input_length * ops.ones(shape=(batch_len, 1), dtype="int64")
label_length = label_length * ops.ones(shape=(batch_len, 1), dtype="int64")
loss = self.loss_fn(y_true, y_pred, input_length, label_length)
self.add_loss(loss)
# At test time, just return the computed predictions
return y_pred
def build_model():
# Inputs to the model
input_img = layers.Input(
shape=(img_width, img_height, 1), name="image", dtype="float32"
)
labels = layers.Input(name="label", shape=(None,), dtype="float32")
# First conv block
x = layers.Conv2D(
32,
(3, 3),
activation="relu",
kernel_initializer="he_normal",
padding="same",
name="Conv1",
)(input_img)
x = layers.MaxPooling2D((2, 2), name="pool1")(x)
# Second conv block
x = layers.Conv2D(
64,
(3, 3),
activation="relu",
kernel_initializer="he_normal",
padding="same",
name="Conv2",
)(x)
x = layers.MaxPooling2D((2, 2), name="pool2")(x)
# We have used two max pool with pool size and strides 2.
# Hence, downsampled feature maps are 4x smaller. The number of
# filters in the last layer is 64. Reshape accordingly before
# passing the output to the RNN part of the model
new_shape = ((img_width // 4), (img_height // 4) * 64)
x = layers.Reshape(target_shape=new_shape, name="reshape")(x)
x = layers.Dense(64, activation="relu", name="dense1")(x)
x = layers.Dropout(0.2)(x)
# RNNs
x = layers.Bidirectional(layers.LSTM(128, return_sequences=True, dropout=0.25))(x)
x = layers.Bidirectional(layers.LSTM(64, return_sequences=True, dropout=0.25))(x)
# Output layer
x = layers.Dense(
len(char_to_num.get_vocabulary()) + 1, activation="softmax", name="dense2"
)(x)
# Add CTC layer for calculating CTC loss at each step
output = CTCLayer(name="ctc_loss")(labels, x)
# Define the model
model = keras.models.Model(
inputs=[input_img, labels], outputs=output, name="captcha_solver"
)
# Optimizer
opt = keras.optimizers.Adam()
# Compile the model and return
model.compile(optimizer=opt)
return model
# Get the model
model = build_model()
model.summary()
"""
## Training
"""
# TODO restore epoch count.
epochs = 100
early_stopping_patience = 10
# Add early stopping
early_stopping = keras.callbacks.EarlyStopping(
monitor="val_loss", patience=early_stopping_patience, restore_best_weights=True
)
# Train the model
history = model.fit(
train_dataset,
validation_data=validation_dataset,
epochs=epochs,
callbacks=[early_stopping],
)
"""
## Inference
You can use the trained model hosted on [Hugging Face Hub](https://huggingface.co/keras-io/ocr-for-captcha)
and try the demo on [Hugging Face Spaces](https://huggingface.co/spaces/keras-io/ocr-for-captcha).
"""
def ctc_decode(y_pred, input_length, greedy=True, beam_width=100, top_paths=1):
input_shape = ops.shape(y_pred)
num_samples, num_steps = input_shape[0], input_shape[1]
y_pred = ops.log(ops.transpose(y_pred, axes=[1, 0, 2]) + keras.backend.epsilon())
input_length = ops.cast(input_length, dtype="int32")
if greedy:
(decoded, log_prob) = tf.nn.ctc_greedy_decoder(
inputs=y_pred, sequence_length=input_length
)
else:
(decoded, log_prob) = tf.compat.v1.nn.ctc_beam_search_decoder(
inputs=y_pred,
sequence_length=input_length,
beam_width=beam_width,
top_paths=top_paths,
)
decoded_dense = []
for st in decoded:
st = tf.SparseTensor(st.indices, st.values, (num_samples, num_steps))
decoded_dense.append(tf.sparse.to_dense(sp_input=st, default_value=-1))
return (decoded_dense, log_prob)
# Get the prediction model by extracting layers till the output layer
prediction_model = keras.models.Model(
model.input[0], model.get_layer(name="dense2").output
)
prediction_model.summary()
# A utility function to decode the output of the network
def decode_batch_predictions(pred):
input_len = np.ones(pred.shape[0]) * pred.shape[1]
# Use greedy search. For complex tasks, you can use beam search
results = ctc_decode(pred, input_length=input_len, greedy=True)[0][0][
:, :max_length
]
# Iterate over the results and get back the text
output_text = []
for res in results:
res = tf.strings.reduce_join(num_to_char(res)).numpy().decode("utf-8")
output_text.append(res)
return output_text
# Let's check results on some validation samples
for batch in validation_dataset.take(1):
batch_images = batch["image"]
batch_labels = batch["label"]
preds = prediction_model.predict(batch_images)
pred_texts = decode_batch_predictions(preds)
orig_texts = []
for label in batch_labels:
label = tf.strings.reduce_join(num_to_char(label)).numpy().decode("utf-8")
orig_texts.append(label)
_, ax = plt.subplots(4, 4, figsize=(15, 5))
for i in range(len(pred_texts)):
img = (batch_images[i, :, :, 0] * 255).numpy().astype(np.uint8)
img = img.T
title = f"Prediction: {pred_texts[i]}"
ax[i // 4, i % 4].imshow(img, cmap="gray")
ax[i // 4, i % 4].set_title(title)
ax[i // 4, i % 4].axis("off")
plt.show()