From d8abbc0552ee7a57da12db2a1905f1acbfe1e3eb Mon Sep 17 00:00:00 2001 From: Debanjum Singh Solanky Date: Wed, 15 Sep 2021 22:54:03 -0700 Subject: [PATCH] Use XMP metadata in images to improve image search - Details - The CLIP model can represent images, text in the same vector space - Enhance CLIP's image understanding by augmenting the plain image with it's text based metadata. Specifically with any subject, description XMP tags on the image - Improve results by combining plain image similarity score with metadata similarity scores for the highest ranked images - Minor Fixes - Convert verbose to integer from bool in image_search. It's already passed as integer from the main program entrypoint - Process images with ".jpeg" extensions too --- src/main.py | 7 +- src/search_type/image_search.py | 106 ++++++++--- src/utils/exiftool.py | 325 ++++++++++++++++++++++++++++++++ 3 files changed, 405 insertions(+), 33 deletions(-) create mode 100644 src/utils/exiftool.py diff --git a/src/main.py b/src/main.py index 258a0237..a38336ce 100644 --- a/src/main.py +++ b/src/main.py @@ -68,6 +68,7 @@ def search(q: str, n: Optional[int] = 5, t: Optional[str] = None): hits = image_search.query_images( user_query, image_embeddings, + image_metadata_embeddings, image_encoder, results_count, args.verbose) @@ -124,8 +125,10 @@ def regenerate(t: Optional[str] = None): if (t == 'image' or t == None) and image_search_enabled: # Extract Images, Generate Embeddings global image_embeddings + global image_metadata_embeddings global image_names - image_names, image_embeddings, _ = image_search.setup( + + image_names, image_embeddings, image_metadata_embeddings, _ = image_search.setup( pathlib.Path(image_config['input-directory']), pathlib.Path(image_config['embeddings-file']), regenerate=True, @@ -181,7 +184,7 @@ if __name__ == '__main__': image_search_enabled = False if image_config and 'input-directory' in image_config: image_search_enabled = True - image_names, image_embeddings, image_encoder = image_search.setup( + image_names, image_embeddings, image_metadata_embeddings, image_encoder = image_search.setup( pathlib.Path(image_config['input-directory']), pathlib.Path(image_config['embeddings-file']), args.regenerate, diff --git a/src/search_type/image_search.py b/src/search_type/image_search.py index 7cad8624..fe936559 100644 --- a/src/search_type/image_search.py +++ b/src/search_type/image_search.py @@ -10,7 +10,7 @@ import torch # Internal Packages from utils.helpers import get_absolute_path, resolve_absolute_path - +import utils.exiftool as exiftool def initialize_model(): # Initialize Model @@ -19,59 +19,103 @@ def initialize_model(): return model -def extract_entries(image_directory, verbose=False): +def extract_entries(image_directory, verbose=0): image_directory = resolve_absolute_path(image_directory, strict=True) image_names = list(image_directory.glob('*.jpg')) - if verbose: + image_names.extend(list(image_directory.glob('*.jpeg'))) + + if verbose > 0: print(f'Found {len(image_names)} images in {image_directory}') return image_names -def compute_embeddings(image_names, model, embeddings_file, regenerate=False, verbose=False): +def compute_embeddings(image_names, model, embeddings_file, regenerate=False, verbose=0): "Compute (and Save) Embeddings or Load Pre-Computed Embeddings" image_embeddings = None + image_metadata_embeddings = None - # Load pre-computed embeddings from file if exists + # Load pre-computed image embeddings from file if exists if resolve_absolute_path(embeddings_file).exists() and not regenerate: image_embeddings = torch.load(embeddings_file) - if verbose: + if verbose > 0: print(f"Loaded pre-computed embeddings from {embeddings_file}") - else: # Else compute the image_embeddings from scratch, which can take a while - images = [] - if verbose: + # load pre-computed image metadata embedding file if exists + if resolve_absolute_path(f"{embeddings_file}_metadata").exists() and not regenerate: + image_metadata_embeddings = torch.load(f"{embeddings_file}_metadata") + if verbose > 0: + print(f"Loaded pre-computed embeddings from {embeddings_file}_metadata") + + if image_embeddings is None or image_metadata_embeddings is None: # Else compute the image_embeddings from scratch, which can take a while + if verbose > 0: print(f"Loading the {len(image_names)} images into memory") - for image_name in image_names: - images.append(copy.deepcopy(Image.open(image_name))) - if len(images) > 0: - image_embeddings = model.encode(images, batch_size=128, convert_to_tensor=True, show_progress_bar=True) - torch.save(image_embeddings, embeddings_file) - if verbose: - print(f"Saved computed embeddings to {embeddings_file}") + if image_embeddings is None: + image_embeddings = model.encode( + [Image.open(image_name).copy() for image_name in image_names], + batch_size=128, convert_to_tensor=True, show_progress_bar=verbose > 0) - return image_embeddings + torch.save(image_embeddings, embeddings_file) + + if verbose > 0: + print(f"Saved computed embeddings to {embeddings_file}") + + if image_metadata_embeddings is None: + image_metadata_embeddings = model.encode( + [extract_metadata(image_name, verbose) for image_name in image_names], + batch_size=128, convert_to_tensor=True, show_progress_bar=verbose > 0) + + torch.save(image_metadata_embeddings, f"{embeddings_file}_metadata") + + if verbose > 0: + print(f"Saved computed metadata embeddings to {embeddings_file}_metadata") + + return image_embeddings, image_metadata_embeddings -def query_images(query, image_embeddings, model, count=3, verbose=False): +def extract_metadata(image_name, verbose=0): + with exiftool.ExifTool() as et: + image_metadata = et.get_tags(["XMP:Subject", "XMP:Description"], str(image_name)) + image_metadata_subjects = set([subject.split(":")[1] for subject in image_metadata.get("XMP:Subject", "") if ":" in subject]) + image_processed_metadata = image_metadata.get("XMP:Description", "") + ". " + ", ".join(image_metadata_subjects) + if verbose > 1: + print(f"{image_name}:\t{image_processed_metadata}") + return image_processed_metadata + + +def query_images(query, image_embeddings, image_metadata_embeddings, model, count=3, verbose=0): # Set query to image content if query is a filepath if pathlib.Path(query).is_file(): query_imagepath = resolve_absolute_path(pathlib.Path(query), strict=True) query = copy.deepcopy(Image.open(query_imagepath)) - if verbose: + if verbose > 0: print(f"Find Images similar to Image at {query_imagepath}") else: - print(f"Find Images by Text: {query}") + if verbose > 0: + print(f"Find Images by Text: {query}") # Now we encode the query (which can either be an image or a text string) query_embedding = model.encode([query], convert_to_tensor=True, show_progress_bar=False) - # Then, we use the util.semantic_search function, which computes the cosine-similarity - # between the query embedding and all image embeddings. - # It then returns the top_k highest ranked images, which we output - hits = util.semantic_search(query_embedding, image_embeddings, top_k=count)[0] + # Compute top_k ranked images based on cosine-similarity b/w query and all image embeddings. + image_hits = {result['corpus_id']: result['score'] + for result + in util.semantic_search(query_embedding, image_embeddings, top_k=count)[0]} - return hits + # Compute top_k ranked images based on cosine-similarity b/w query and all image metadata embeddings. + metadata_hits = {result['corpus_id']: result['score'] + for result + in util.semantic_search(query_embedding, image_metadata_embeddings, top_k=count)[0]} + + # Sum metadata, image scores of the highest ranked images + for corpus_id, score in metadata_hits.items(): + image_hits[corpus_id] = image_hits.get(corpus_id, 0) + score + + # Reformat results in original form from sentence transformer semantic_search() + hits = [{'corpus_id': corpus_id, 'score': score} for corpus_id, score in image_hits.items()] + + # Sort the images based on their combined metadata, image scores + return sorted(hits, key=lambda hit: hit["score"], reverse=True) def render_results(hits, image_names, image_directory, count): @@ -95,7 +139,7 @@ def collate_results(hits, image_names, image_directory, count=5): in hits[0:count]] -def setup(image_directory, embeddings_file, regenerate=False, verbose=False): +def setup(image_directory, embeddings_file, regenerate=False, verbose=0): # Initialize Model model = initialize_model() @@ -105,9 +149,9 @@ def setup(image_directory, embeddings_file, regenerate=False, verbose=False): # Compute or Load Embeddings embeddings_file = resolve_absolute_path(embeddings_file) - image_embeddings = compute_embeddings(image_names, model, embeddings_file, regenerate=regenerate, verbose=verbose) + image_embeddings, image_metadata_embeddings = compute_embeddings(image_names, model, embeddings_file, regenerate=regenerate, verbose=verbose) - return image_names, image_embeddings, model + return image_names, image_embeddings, image_metadata_embeddings, model if __name__ == '__main__': @@ -118,10 +162,10 @@ if __name__ == '__main__': parser.add_argument('--regenerate', action='store_true', default=False, help="Regenerate embeddings of Images in Image Directory . Default: false") parser.add_argument('--results-count', '-n', default=5, type=int, help="Number of results to render. Default: 5") parser.add_argument('--interactive', action='store_true', default=False, help="Interactive mode allows user to run queries on the model. Default: true") - parser.add_argument('--verbose', action='store_true', default=False, help="Show verbose conversion logs. Default: false") + parser.add_argument('--verbose', action='count', default=0, help="Show verbose conversion logs. Default: 0") args = parser.parse_args() - image_names, image_embeddings, model = setup(args.image_directory, args.embeddings_file, regenerate=args.regenerate) + image_names, image_embeddings, image_metadata_embeddings, model = setup(args.image_directory, args.embeddings_file, regenerate=args.regenerate) # Run User Queries on Entries in Interactive Mode while args.interactive: @@ -131,7 +175,7 @@ if __name__ == '__main__': exit(0) # query images - hits = query_images(user_query, image_embeddings, model, args.results_count, args.verbose) + hits = query_images(user_query, image_embeddings, image_metadata_embeddings, model, args.results_count, args.verbose) # render results render_results(hits, image_names, args.image_directory, count=args.results_count) diff --git a/src/utils/exiftool.py b/src/utils/exiftool.py new file mode 100644 index 00000000..8a11daa1 --- /dev/null +++ b/src/utils/exiftool.py @@ -0,0 +1,325 @@ +# -*- coding: utf-8 -*- +# PyExifTool +# Copyright 2012 Sven Marnach + +# This file is part of PyExifTool. +# +# PyExifTool is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the licence, or +# (at your option) any later version, or the BSD licence. +# +# PyExifTool is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. +# +# See COPYING.GPL or COPYING.BSD for more details. + +""" +PyExifTool is a Python library to communicate with an instance of Phil +Harvey's excellent ExifTool_ command-line application. The library +provides the class :py:class:`ExifTool` that runs the command-line +tool in batch mode and features methods to send commands to that +program, including methods to extract meta-information from one or +more image files. Since ``exiftool`` is run in batch mode, only a +single instance needs to be launched and can be reused for many +queries. This is much more efficient than launching a separate +process for every single query. + +.. _ExifTool: http://www.sno.phy.queensu.ca/~phil/exiftool/ + +The source code can be checked out from the github repository with + +:: + + git clone git://github.com/smarnach/pyexiftool.git + +Alternatively, you can download a tarball_. There haven't been any +releases yet. + +.. _tarball: https://github.com/smarnach/pyexiftool/tarball/master + +PyExifTool is licenced under GNU GPL version 3 or later. + +Example usage:: + + import exiftool + + files = ["a.jpg", "b.png", "c.tif"] + with exiftool.ExifTool() as et: + metadata = et.get_metadata_batch(files) + for d in metadata: + print("{:20.20} {:20.20}".format(d["SourceFile"], + d["EXIF:DateTimeOriginal"])) +""" + +from __future__ import unicode_literals + +import sys +import subprocess +import os +import json +import warnings +import codecs + +try: # Py3k compatibility + basestring +except NameError: + basestring = (bytes, str) + +executable = "exiftool" +"""The name of the executable to run. + +If the executable is not located in one of the paths listed in the +``PATH`` environment variable, the full path should be given here. +""" + +# Sentinel indicating the end of the output of a sequence of commands. +# The standard value should be fine. +sentinel = b"{ready}" + +# The block size when reading from exiftool. The standard value +# should be fine, though other values might give better performance in +# some cases. +block_size = 4096 + +# This code has been adapted from Lib/os.py in the Python source tree +# (sha1 265e36e277f3) +def _fscodec(): + encoding = sys.getfilesystemencoding() + errors = "strict" + if encoding != "mbcs": + try: + codecs.lookup_error("surrogateescape") + except LookupError: + pass + else: + errors = "surrogateescape" + + def fsencode(filename): + """ + Encode filename to the filesystem encoding with 'surrogateescape' error + handler, return bytes unchanged. On Windows, use 'strict' error handler if + the file system encoding is 'mbcs' (which is the default encoding). + """ + if isinstance(filename, bytes): + return filename + else: + return filename.encode(encoding, errors) + + return fsencode + +fsencode = _fscodec() +del _fscodec + +class ExifTool(object): + """Run the `exiftool` command-line tool and communicate to it. + + You can pass the file name of the ``exiftool`` executable as an + argument to the constructor. The default value ``exiftool`` will + only work if the executable is in your ``PATH``. + + Most methods of this class are only available after calling + :py:meth:`start()`, which will actually launch the subprocess. To + avoid leaving the subprocess running, make sure to call + :py:meth:`terminate()` method when finished using the instance. + This method will also be implicitly called when the instance is + garbage collected, but there are circumstance when this won't ever + happen, so you should not rely on the implicit process + termination. Subprocesses won't be automatically terminated if + the parent process exits, so a leaked subprocess will stay around + until manually killed. + + A convenient way to make sure that the subprocess is terminated is + to use the :py:class:`ExifTool` instance as a context manager:: + + with ExifTool() as et: + ... + + .. warning:: Note that there is no error handling. Nonsensical + options will be silently ignored by exiftool, so there's not + much that can be done in that regard. You should avoid passing + non-existent files to any of the methods, since this will lead + to undefied behaviour. + + .. py:attribute:: running + + A Boolean value indicating whether this instance is currently + associated with a running subprocess. + """ + + def __init__(self, executable_=None): + if executable_ is None: + self.executable = executable + else: + self.executable = executable_ + self.running = False + + def start(self): + """Start an ``exiftool`` process in batch mode for this instance. + + This method will issue a ``UserWarning`` if the subprocess is + already running. The process is started with the ``-G`` and + ``-n`` as common arguments, which are automatically included + in every command you run with :py:meth:`execute()`. + """ + if self.running: + warnings.warn("ExifTool already running; doing nothing.") + return + with open(os.devnull, "w") as devnull: + self._process = subprocess.Popen( + [self.executable, "-stay_open", "True", "-@", "-", + "-common_args", "-G", "-n"], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=devnull) + self.running = True + + def terminate(self): + """Terminate the ``exiftool`` process of this instance. + + If the subprocess isn't running, this method will do nothing. + """ + if not self.running: + return + self._process.stdin.write(b"-stay_open\nFalse\n") + self._process.stdin.flush() + self._process.communicate() + del self._process + self.running = False + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.terminate() + + def __del__(self): + self.terminate() + + def execute(self, *params): + """Execute the given batch of parameters with ``exiftool``. + + This method accepts any number of parameters and sends them to + the attached ``exiftool`` process. The process must be + running, otherwise ``ValueError`` is raised. The final + ``-execute`` necessary to actually run the batch is appended + automatically; see the documentation of :py:meth:`start()` for + the common options. The ``exiftool`` output is read up to the + end-of-output sentinel and returned as a raw ``bytes`` object, + excluding the sentinel. + + The parameters must also be raw ``bytes``, in whatever + encoding exiftool accepts. For filenames, this should be the + system's filesystem encoding. + + .. note:: This is considered a low-level method, and should + rarely be needed by application developers. + """ + if not self.running: + raise ValueError("ExifTool instance not running.") + self._process.stdin.write(b"\n".join(params + (b"-execute\n",))) + self._process.stdin.flush() + output = b"" + fd = self._process.stdout.fileno() + while not output[-32:].strip().endswith(sentinel): + output += os.read(fd, block_size) + return output.strip()[:-len(sentinel)] + + def execute_json(self, *params): + """Execute the given batch of parameters and parse the JSON output. + + This method is similar to :py:meth:`execute()`. It + automatically adds the parameter ``-j`` to request JSON output + from ``exiftool`` and parses the output. The return value is + a list of dictionaries, mapping tag names to the corresponding + values. All keys are Unicode strings with the tag names + including the ExifTool group name in the format :. + The values can have multiple types. All strings occurring as + values will be Unicode strings. Each dictionary contains the + name of the file it corresponds to in the key ``"SourceFile"``. + + The parameters to this function must be either raw strings + (type ``str`` in Python 2.x, type ``bytes`` in Python 3.x) or + Unicode strings (type ``unicode`` in Python 2.x, type ``str`` + in Python 3.x). Unicode strings will be encoded using + system's filesystem encoding. This behaviour means you can + pass in filenames according to the convention of the + respective Python version – as raw strings in Python 2.x and + as Unicode strings in Python 3.x. + """ + params = map(fsencode, params) + return json.loads(self.execute(b"-j", *params).decode("utf-8")) + + def get_metadata_batch(self, filenames): + """Return all meta-data for the given files. + + The return value will have the format described in the + documentation of :py:meth:`execute_json()`. + """ + return self.execute_json(*filenames) + + def get_metadata(self, filename): + """Return meta-data for a single file. + + The returned dictionary has the format described in the + documentation of :py:meth:`execute_json()`. + """ + return self.execute_json(filename)[0] + + def get_tags_batch(self, tags, filenames): + """Return only specified tags for the given files. + + The first argument is an iterable of tags. The tag names may + include group names, as usual in the format :. + + The second argument is an iterable of file names. + + The format of the return value is the same as for + :py:meth:`execute_json()`. + """ + # Explicitly ruling out strings here because passing in a + # string would lead to strange and hard-to-find errors + if isinstance(tags, basestring): + raise TypeError("The argument 'tags' must be " + "an iterable of strings") + if isinstance(filenames, basestring): + raise TypeError("The argument 'filenames' must be " + "an iterable of strings") + params = ["-" + t for t in tags] + params.extend(filenames) + return self.execute_json(*params) + + def get_tags(self, tags, filename): + """Return only specified tags for a single file. + + The returned dictionary has the format described in the + documentation of :py:meth:`execute_json()`. + """ + return self.get_tags_batch(tags, [filename])[0] + + def get_tag_batch(self, tag, filenames): + """Extract a single tag from the given files. + + The first argument is a single tag name, as usual in the + format :. + + The second argument is an iterable of file names. + + The return value is a list of tag values or ``None`` for + non-existent tags, in the same order as ``filenames``. + """ + data = self.get_tags_batch([tag], filenames) + result = [] + for d in data: + d.pop("SourceFile") + result.append(next(iter(d.values()), None)) + return result + + def get_tag(self, tag, filename): + """Extract a single tag from a single file. + + The return value is the value of the specified tag, or + ``None`` if this tag was not found in the file. + """ + return self.get_tag_batch(tag, [filename])[0]