Use XMP metadata in images to improve image search

- Details
  - The CLIP model can represent images, text in the same vector space

  - Enhance CLIP's image understanding by augmenting the plain image
    with it's text based metadata.
    Specifically with any subject, description XMP tags on the image

  - Improve results by combining plain image similarity score with
    metadata similarity scores for the highest ranked images

- Minor Fixes
  - Convert verbose to integer from bool in image_search.
    It's already passed as integer from the main program entrypoint

  - Process images with ".jpeg" extensions too
This commit is contained in:
Debanjum Singh Solanky 2021-09-15 22:54:03 -07:00
parent 0e34c8f493
commit d8abbc0552
3 changed files with 405 additions and 33 deletions

View file

@ -68,6 +68,7 @@ def search(q: str, n: Optional[int] = 5, t: Optional[str] = None):
hits = image_search.query_images( hits = image_search.query_images(
user_query, user_query,
image_embeddings, image_embeddings,
image_metadata_embeddings,
image_encoder, image_encoder,
results_count, results_count,
args.verbose) args.verbose)
@ -124,8 +125,10 @@ def regenerate(t: Optional[str] = None):
if (t == 'image' or t == None) and image_search_enabled: if (t == 'image' or t == None) and image_search_enabled:
# Extract Images, Generate Embeddings # Extract Images, Generate Embeddings
global image_embeddings global image_embeddings
global image_metadata_embeddings
global image_names global image_names
image_names, image_embeddings, _ = image_search.setup(
image_names, image_embeddings, image_metadata_embeddings, _ = image_search.setup(
pathlib.Path(image_config['input-directory']), pathlib.Path(image_config['input-directory']),
pathlib.Path(image_config['embeddings-file']), pathlib.Path(image_config['embeddings-file']),
regenerate=True, regenerate=True,
@ -181,7 +184,7 @@ if __name__ == '__main__':
image_search_enabled = False image_search_enabled = False
if image_config and 'input-directory' in image_config: if image_config and 'input-directory' in image_config:
image_search_enabled = True image_search_enabled = True
image_names, image_embeddings, image_encoder = image_search.setup( image_names, image_embeddings, image_metadata_embeddings, image_encoder = image_search.setup(
pathlib.Path(image_config['input-directory']), pathlib.Path(image_config['input-directory']),
pathlib.Path(image_config['embeddings-file']), pathlib.Path(image_config['embeddings-file']),
args.regenerate, args.regenerate,

View file

@ -10,7 +10,7 @@ import torch
# Internal Packages # Internal Packages
from utils.helpers import get_absolute_path, resolve_absolute_path from utils.helpers import get_absolute_path, resolve_absolute_path
import utils.exiftool as exiftool
def initialize_model(): def initialize_model():
# Initialize Model # Initialize Model
@ -19,59 +19,103 @@ def initialize_model():
return model return model
def extract_entries(image_directory, verbose=False): def extract_entries(image_directory, verbose=0):
image_directory = resolve_absolute_path(image_directory, strict=True) image_directory = resolve_absolute_path(image_directory, strict=True)
image_names = list(image_directory.glob('*.jpg')) image_names = list(image_directory.glob('*.jpg'))
if verbose: image_names.extend(list(image_directory.glob('*.jpeg')))
if verbose > 0:
print(f'Found {len(image_names)} images in {image_directory}') print(f'Found {len(image_names)} images in {image_directory}')
return image_names return image_names
def compute_embeddings(image_names, model, embeddings_file, regenerate=False, verbose=False): def compute_embeddings(image_names, model, embeddings_file, regenerate=False, verbose=0):
"Compute (and Save) Embeddings or Load Pre-Computed Embeddings" "Compute (and Save) Embeddings or Load Pre-Computed Embeddings"
image_embeddings = None image_embeddings = None
image_metadata_embeddings = None
# Load pre-computed embeddings from file if exists # Load pre-computed image embeddings from file if exists
if resolve_absolute_path(embeddings_file).exists() and not regenerate: if resolve_absolute_path(embeddings_file).exists() and not regenerate:
image_embeddings = torch.load(embeddings_file) image_embeddings = torch.load(embeddings_file)
if verbose: if verbose > 0:
print(f"Loaded pre-computed embeddings from {embeddings_file}") print(f"Loaded pre-computed embeddings from {embeddings_file}")
else: # Else compute the image_embeddings from scratch, which can take a while # load pre-computed image metadata embedding file if exists
images = [] if resolve_absolute_path(f"{embeddings_file}_metadata").exists() and not regenerate:
if verbose: image_metadata_embeddings = torch.load(f"{embeddings_file}_metadata")
if verbose > 0:
print(f"Loaded pre-computed embeddings from {embeddings_file}_metadata")
if image_embeddings is None or image_metadata_embeddings is None: # Else compute the image_embeddings from scratch, which can take a while
if verbose > 0:
print(f"Loading the {len(image_names)} images into memory") print(f"Loading the {len(image_names)} images into memory")
for image_name in image_names:
images.append(copy.deepcopy(Image.open(image_name)))
if len(images) > 0: if image_embeddings is None:
image_embeddings = model.encode(images, batch_size=128, convert_to_tensor=True, show_progress_bar=True) image_embeddings = model.encode(
torch.save(image_embeddings, embeddings_file) [Image.open(image_name).copy() for image_name in image_names],
if verbose: batch_size=128, convert_to_tensor=True, show_progress_bar=verbose > 0)
print(f"Saved computed embeddings to {embeddings_file}")
return image_embeddings torch.save(image_embeddings, embeddings_file)
if verbose > 0:
print(f"Saved computed embeddings to {embeddings_file}")
if image_metadata_embeddings is None:
image_metadata_embeddings = model.encode(
[extract_metadata(image_name, verbose) for image_name in image_names],
batch_size=128, convert_to_tensor=True, show_progress_bar=verbose > 0)
torch.save(image_metadata_embeddings, f"{embeddings_file}_metadata")
if verbose > 0:
print(f"Saved computed metadata embeddings to {embeddings_file}_metadata")
return image_embeddings, image_metadata_embeddings
def query_images(query, image_embeddings, model, count=3, verbose=False): def extract_metadata(image_name, verbose=0):
with exiftool.ExifTool() as et:
image_metadata = et.get_tags(["XMP:Subject", "XMP:Description"], str(image_name))
image_metadata_subjects = set([subject.split(":")[1] for subject in image_metadata.get("XMP:Subject", "") if ":" in subject])
image_processed_metadata = image_metadata.get("XMP:Description", "") + ". " + ", ".join(image_metadata_subjects)
if verbose > 1:
print(f"{image_name}:\t{image_processed_metadata}")
return image_processed_metadata
def query_images(query, image_embeddings, image_metadata_embeddings, model, count=3, verbose=0):
# Set query to image content if query is a filepath # Set query to image content if query is a filepath
if pathlib.Path(query).is_file(): if pathlib.Path(query).is_file():
query_imagepath = resolve_absolute_path(pathlib.Path(query), strict=True) query_imagepath = resolve_absolute_path(pathlib.Path(query), strict=True)
query = copy.deepcopy(Image.open(query_imagepath)) query = copy.deepcopy(Image.open(query_imagepath))
if verbose: if verbose > 0:
print(f"Find Images similar to Image at {query_imagepath}") print(f"Find Images similar to Image at {query_imagepath}")
else: else:
print(f"Find Images by Text: {query}") if verbose > 0:
print(f"Find Images by Text: {query}")
# Now we encode the query (which can either be an image or a text string) # Now we encode the query (which can either be an image or a text string)
query_embedding = model.encode([query], convert_to_tensor=True, show_progress_bar=False) query_embedding = model.encode([query], convert_to_tensor=True, show_progress_bar=False)
# Then, we use the util.semantic_search function, which computes the cosine-similarity # Compute top_k ranked images based on cosine-similarity b/w query and all image embeddings.
# between the query embedding and all image embeddings. image_hits = {result['corpus_id']: result['score']
# It then returns the top_k highest ranked images, which we output for result
hits = util.semantic_search(query_embedding, image_embeddings, top_k=count)[0] in util.semantic_search(query_embedding, image_embeddings, top_k=count)[0]}
return hits # Compute top_k ranked images based on cosine-similarity b/w query and all image metadata embeddings.
metadata_hits = {result['corpus_id']: result['score']
for result
in util.semantic_search(query_embedding, image_metadata_embeddings, top_k=count)[0]}
# Sum metadata, image scores of the highest ranked images
for corpus_id, score in metadata_hits.items():
image_hits[corpus_id] = image_hits.get(corpus_id, 0) + score
# Reformat results in original form from sentence transformer semantic_search()
hits = [{'corpus_id': corpus_id, 'score': score} for corpus_id, score in image_hits.items()]
# Sort the images based on their combined metadata, image scores
return sorted(hits, key=lambda hit: hit["score"], reverse=True)
def render_results(hits, image_names, image_directory, count): def render_results(hits, image_names, image_directory, count):
@ -95,7 +139,7 @@ def collate_results(hits, image_names, image_directory, count=5):
in hits[0:count]] in hits[0:count]]
def setup(image_directory, embeddings_file, regenerate=False, verbose=False): def setup(image_directory, embeddings_file, regenerate=False, verbose=0):
# Initialize Model # Initialize Model
model = initialize_model() model = initialize_model()
@ -105,9 +149,9 @@ def setup(image_directory, embeddings_file, regenerate=False, verbose=False):
# Compute or Load Embeddings # Compute or Load Embeddings
embeddings_file = resolve_absolute_path(embeddings_file) embeddings_file = resolve_absolute_path(embeddings_file)
image_embeddings = compute_embeddings(image_names, model, embeddings_file, regenerate=regenerate, verbose=verbose) image_embeddings, image_metadata_embeddings = compute_embeddings(image_names, model, embeddings_file, regenerate=regenerate, verbose=verbose)
return image_names, image_embeddings, model return image_names, image_embeddings, image_metadata_embeddings, model
if __name__ == '__main__': if __name__ == '__main__':
@ -118,10 +162,10 @@ if __name__ == '__main__':
parser.add_argument('--regenerate', action='store_true', default=False, help="Regenerate embeddings of Images in Image Directory . Default: false") parser.add_argument('--regenerate', action='store_true', default=False, help="Regenerate embeddings of Images in Image Directory . Default: false")
parser.add_argument('--results-count', '-n', default=5, type=int, help="Number of results to render. Default: 5") parser.add_argument('--results-count', '-n', default=5, type=int, help="Number of results to render. Default: 5")
parser.add_argument('--interactive', action='store_true', default=False, help="Interactive mode allows user to run queries on the model. Default: true") parser.add_argument('--interactive', action='store_true', default=False, help="Interactive mode allows user to run queries on the model. Default: true")
parser.add_argument('--verbose', action='store_true', default=False, help="Show verbose conversion logs. Default: false") parser.add_argument('--verbose', action='count', default=0, help="Show verbose conversion logs. Default: 0")
args = parser.parse_args() args = parser.parse_args()
image_names, image_embeddings, model = setup(args.image_directory, args.embeddings_file, regenerate=args.regenerate) image_names, image_embeddings, image_metadata_embeddings, model = setup(args.image_directory, args.embeddings_file, regenerate=args.regenerate)
# Run User Queries on Entries in Interactive Mode # Run User Queries on Entries in Interactive Mode
while args.interactive: while args.interactive:
@ -131,7 +175,7 @@ if __name__ == '__main__':
exit(0) exit(0)
# query images # query images
hits = query_images(user_query, image_embeddings, model, args.results_count, args.verbose) hits = query_images(user_query, image_embeddings, image_metadata_embeddings, model, args.results_count, args.verbose)
# render results # render results
render_results(hits, image_names, args.image_directory, count=args.results_count) render_results(hits, image_names, args.image_directory, count=args.results_count)

325
src/utils/exiftool.py Normal file
View file

@ -0,0 +1,325 @@
# -*- coding: utf-8 -*-
# PyExifTool <http://github.com/smarnach/pyexiftool>
# Copyright 2012 Sven Marnach
# This file is part of PyExifTool.
#
# PyExifTool is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the licence, or
# (at your option) any later version, or the BSD licence.
#
# PyExifTool is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See COPYING.GPL or COPYING.BSD for more details.
"""
PyExifTool is a Python library to communicate with an instance of Phil
Harvey's excellent ExifTool_ command-line application. The library
provides the class :py:class:`ExifTool` that runs the command-line
tool in batch mode and features methods to send commands to that
program, including methods to extract meta-information from one or
more image files. Since ``exiftool`` is run in batch mode, only a
single instance needs to be launched and can be reused for many
queries. This is much more efficient than launching a separate
process for every single query.
.. _ExifTool: http://www.sno.phy.queensu.ca/~phil/exiftool/
The source code can be checked out from the github repository with
::
git clone git://github.com/smarnach/pyexiftool.git
Alternatively, you can download a tarball_. There haven't been any
releases yet.
.. _tarball: https://github.com/smarnach/pyexiftool/tarball/master
PyExifTool is licenced under GNU GPL version 3 or later.
Example usage::
import exiftool
files = ["a.jpg", "b.png", "c.tif"]
with exiftool.ExifTool() as et:
metadata = et.get_metadata_batch(files)
for d in metadata:
print("{:20.20} {:20.20}".format(d["SourceFile"],
d["EXIF:DateTimeOriginal"]))
"""
from __future__ import unicode_literals
import sys
import subprocess
import os
import json
import warnings
import codecs
try: # Py3k compatibility
basestring
except NameError:
basestring = (bytes, str)
executable = "exiftool"
"""The name of the executable to run.
If the executable is not located in one of the paths listed in the
``PATH`` environment variable, the full path should be given here.
"""
# Sentinel indicating the end of the output of a sequence of commands.
# The standard value should be fine.
sentinel = b"{ready}"
# The block size when reading from exiftool. The standard value
# should be fine, though other values might give better performance in
# some cases.
block_size = 4096
# This code has been adapted from Lib/os.py in the Python source tree
# (sha1 265e36e277f3)
def _fscodec():
encoding = sys.getfilesystemencoding()
errors = "strict"
if encoding != "mbcs":
try:
codecs.lookup_error("surrogateescape")
except LookupError:
pass
else:
errors = "surrogateescape"
def fsencode(filename):
"""
Encode filename to the filesystem encoding with 'surrogateescape' error
handler, return bytes unchanged. On Windows, use 'strict' error handler if
the file system encoding is 'mbcs' (which is the default encoding).
"""
if isinstance(filename, bytes):
return filename
else:
return filename.encode(encoding, errors)
return fsencode
fsencode = _fscodec()
del _fscodec
class ExifTool(object):
"""Run the `exiftool` command-line tool and communicate to it.
You can pass the file name of the ``exiftool`` executable as an
argument to the constructor. The default value ``exiftool`` will
only work if the executable is in your ``PATH``.
Most methods of this class are only available after calling
:py:meth:`start()`, which will actually launch the subprocess. To
avoid leaving the subprocess running, make sure to call
:py:meth:`terminate()` method when finished using the instance.
This method will also be implicitly called when the instance is
garbage collected, but there are circumstance when this won't ever
happen, so you should not rely on the implicit process
termination. Subprocesses won't be automatically terminated if
the parent process exits, so a leaked subprocess will stay around
until manually killed.
A convenient way to make sure that the subprocess is terminated is
to use the :py:class:`ExifTool` instance as a context manager::
with ExifTool() as et:
...
.. warning:: Note that there is no error handling. Nonsensical
options will be silently ignored by exiftool, so there's not
much that can be done in that regard. You should avoid passing
non-existent files to any of the methods, since this will lead
to undefied behaviour.
.. py:attribute:: running
A Boolean value indicating whether this instance is currently
associated with a running subprocess.
"""
def __init__(self, executable_=None):
if executable_ is None:
self.executable = executable
else:
self.executable = executable_
self.running = False
def start(self):
"""Start an ``exiftool`` process in batch mode for this instance.
This method will issue a ``UserWarning`` if the subprocess is
already running. The process is started with the ``-G`` and
``-n`` as common arguments, which are automatically included
in every command you run with :py:meth:`execute()`.
"""
if self.running:
warnings.warn("ExifTool already running; doing nothing.")
return
with open(os.devnull, "w") as devnull:
self._process = subprocess.Popen(
[self.executable, "-stay_open", "True", "-@", "-",
"-common_args", "-G", "-n"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=devnull)
self.running = True
def terminate(self):
"""Terminate the ``exiftool`` process of this instance.
If the subprocess isn't running, this method will do nothing.
"""
if not self.running:
return
self._process.stdin.write(b"-stay_open\nFalse\n")
self._process.stdin.flush()
self._process.communicate()
del self._process
self.running = False
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.terminate()
def __del__(self):
self.terminate()
def execute(self, *params):
"""Execute the given batch of parameters with ``exiftool``.
This method accepts any number of parameters and sends them to
the attached ``exiftool`` process. The process must be
running, otherwise ``ValueError`` is raised. The final
``-execute`` necessary to actually run the batch is appended
automatically; see the documentation of :py:meth:`start()` for
the common options. The ``exiftool`` output is read up to the
end-of-output sentinel and returned as a raw ``bytes`` object,
excluding the sentinel.
The parameters must also be raw ``bytes``, in whatever
encoding exiftool accepts. For filenames, this should be the
system's filesystem encoding.
.. note:: This is considered a low-level method, and should
rarely be needed by application developers.
"""
if not self.running:
raise ValueError("ExifTool instance not running.")
self._process.stdin.write(b"\n".join(params + (b"-execute\n",)))
self._process.stdin.flush()
output = b""
fd = self._process.stdout.fileno()
while not output[-32:].strip().endswith(sentinel):
output += os.read(fd, block_size)
return output.strip()[:-len(sentinel)]
def execute_json(self, *params):
"""Execute the given batch of parameters and parse the JSON output.
This method is similar to :py:meth:`execute()`. It
automatically adds the parameter ``-j`` to request JSON output
from ``exiftool`` and parses the output. The return value is
a list of dictionaries, mapping tag names to the corresponding
values. All keys are Unicode strings with the tag names
including the ExifTool group name in the format <group>:<tag>.
The values can have multiple types. All strings occurring as
values will be Unicode strings. Each dictionary contains the
name of the file it corresponds to in the key ``"SourceFile"``.
The parameters to this function must be either raw strings
(type ``str`` in Python 2.x, type ``bytes`` in Python 3.x) or
Unicode strings (type ``unicode`` in Python 2.x, type ``str``
in Python 3.x). Unicode strings will be encoded using
system's filesystem encoding. This behaviour means you can
pass in filenames according to the convention of the
respective Python version as raw strings in Python 2.x and
as Unicode strings in Python 3.x.
"""
params = map(fsencode, params)
return json.loads(self.execute(b"-j", *params).decode("utf-8"))
def get_metadata_batch(self, filenames):
"""Return all meta-data for the given files.
The return value will have the format described in the
documentation of :py:meth:`execute_json()`.
"""
return self.execute_json(*filenames)
def get_metadata(self, filename):
"""Return meta-data for a single file.
The returned dictionary has the format described in the
documentation of :py:meth:`execute_json()`.
"""
return self.execute_json(filename)[0]
def get_tags_batch(self, tags, filenames):
"""Return only specified tags for the given files.
The first argument is an iterable of tags. The tag names may
include group names, as usual in the format <group>:<tag>.
The second argument is an iterable of file names.
The format of the return value is the same as for
:py:meth:`execute_json()`.
"""
# Explicitly ruling out strings here because passing in a
# string would lead to strange and hard-to-find errors
if isinstance(tags, basestring):
raise TypeError("The argument 'tags' must be "
"an iterable of strings")
if isinstance(filenames, basestring):
raise TypeError("The argument 'filenames' must be "
"an iterable of strings")
params = ["-" + t for t in tags]
params.extend(filenames)
return self.execute_json(*params)
def get_tags(self, tags, filename):
"""Return only specified tags for a single file.
The returned dictionary has the format described in the
documentation of :py:meth:`execute_json()`.
"""
return self.get_tags_batch(tags, [filename])[0]
def get_tag_batch(self, tag, filenames):
"""Extract a single tag from the given files.
The first argument is a single tag name, as usual in the
format <group>:<tag>.
The second argument is an iterable of file names.
The return value is a list of tag values or ``None`` for
non-existent tags, in the same order as ``filenames``.
"""
data = self.get_tags_batch([tag], filenames)
result = []
for d in data:
d.pop("SourceFile")
result.append(next(iter(d.values()), None))
return result
def get_tag(self, tag, filename):
"""Extract a single tag from a single file.
The return value is the value of the specified tag, or
``None`` if this tag was not found in the file.
"""
return self.get_tag_batch(tag, [filename])[0]