Aleph

How To Extend the Ingest Pipeline

By default, Aleph’s ingest pipeline already does quite a few things for you, for example text recognition or named entity extraction. But if you need something that Aleph doesn’t currently support, you can also extend the ingest pipeline with custom processing stages. This guide explains how to implement a custom processing stage.

This guide may be outdated. If you’re following this guide and something isn’t working, please let us know by opening an issue on GitHub.

Conceptual overview

All processing in Aleph is coordinated via a set of queues kept in redis, an in-memory data store. When a document is processed, the instruction to handle it is posted on that queue, which is read by multiple independent services. Each service can subscribe to one or many stages in order to receive tasks, e.g. the ingest stage (which extracts text from a file), the analyze stage (which performs NER and language detection) and finally the index stage (which adds the document to Aleph’s search index).

A new stage can be added to this pipeline at runtime, either by extending the code of the worker service, the ingest-file service, or by adding a new service that stands on its own. The last option is the cleanest, and can be based on servicelayer, the Python package that implements the queue and worker code.

Developing the service

An example implementation of a servicelayer-based processing service is available in this repository. The code should be short enough to study in full, especially the worker module.

The translation service that is provided is a tech demo. Don’t use it in production.

To run the example service, you will need to create a service account JSON key with permission to use the Google Cloud Translation API, and configure both the project ID and the path for the service account file.

Once you have added the service to the compose configuration, you can include the stage in the pipeline by adjusting the ALEPH_INGEST_PIPELINE configuration option:

ALEPH_INGEST_PIPELINE=analyze:translate

You may need to restart Aleph in order for the changes to take effect.