Blog
This is some text inside of a div block.

Building scalable RAG pipelines with Neum AI framework - Part 2

David de Matheu
November 23, 2023
15
min read

Note: This is the part 2 to the blog series Building scalable RAG pipelines with Neum AI framework where we discussed building RAG data pipelines using Neum AI.

This week we released a major release for Neum AI that introduced a new open-source framework for Python. This framework allows you to build and run data pipelines in your local environment that generate vector embeddings. It provides a collection of data source, embedding and vector database connectors to pick from. Built-in are pre-processing transformations like loading and chunking, as well as support for selectors to augment the vector embeddings with metadata.

In this blog, we will go deeper into using the Neum AI framework to build a scalable RAG solution that can tackle large datasets. To do this, we will focus on leveraging the process parallelization constructs provided by the framework alongside a worker based architecture with Celery and Redis Queues.

If you prefer to see code directly, find the completed code in GitHub.

Architecture overview

We have a separate write up that goes deep into the reasoning for the architecture. For this blog, we will focus on highlighting some of the core pieces.

To get started, the framework uses yield and Generators across most major methods. Using these constructs provides the ability to efficiently iterate over large data sets while maintaining the function’s state. What this translates to is the ability for us to take results like files being downloaded, files being processed, vector embeddings being generated and push tasks to queues where workers can grab them and execute them.

For example, let's say that I am processing a thousand PDF documents. The first thing we do is download the documents locally so that we can process them. Instead of waiting for all of them to download, as each one is downloaded, we can take it and pass it to a queue so that a worker can start processing it while the other download. The same thing can be done once it is done being processed, we don’t have to wait for the rest to be processed instead, we can start generating embeddings for the document that is ready while the rest get downloaded and processed. This provides a ton of efficiency as we can be doing different tasks at the same time.

Based on our testing, we have seen that the most efficiency is achieved from parallelizing the architecture into three main processes to which we will assign workers.

  • Data extraction: Connecting to data sources and querying the data. This async operation can take time depending on the source and requires pagination which is a perfect candidate to parallelize.
  • Data pre-processing: Taking raw data and processing it. This includes loading it based on file extensions or types and chunking it into smaller pieces. Additional custom processing logic can be inserted at this stage.
  • Data embed and ingestion: Transforming data into vector embeddings and ingesting it into vector databases.
  • Note: This last step can be broken down into two separate workers. That said, what we found was that leveraging the batch ingestion capabilities that vector databases offer where they handle the parallelization. For example Weaviate, provides a num_workers parameter that takes care of this for us.

Now that we have an idea of what we will be building, lets get down to code.

Getting started

We will start by installing some dependencies:

  • Neum AI framework

pip install neumai
  • Celery

pip install celery
  • Redis

pip install redis

In addition:

  • Install the the redis CLI to run it locally.
  • You will need an Open AI API Key for the OpenAI embedding model. To get an API Key visit OpenAI. Make sure you have configured billing for the account.
  • You will need a Weaviate Cloud Service URL and API Key for the Weaviate vector database. To get a URL and API Key visit Weaviate Cloud Service.

Next, we will set up our project. Starting with creating a new folder to contain everything:


mkdir neum-at-scale && cd neum-at-scale

In that folder we will create two files: main.py and tasks.py

Set up our tasks

In the tasks.py file, we will start by creating a celery set up with some skeleton tasks that we will fill in later. We will set up our Celery app to use a local redis broker which we will configure.


from celery import Celery
from neumai.Pipelines.Pipeline import Pipeline
from neumai.Pipelines.TriggerSyncTypeEnum import TriggerSyncTypeEnum
from neumai.Shared.CloudFile import CloudFile
from neumai.Shared.NeumDocument import NeumDocument
from neumai.Sources.SourceConnector import SourceConnector
from datetime import datetime
from typing import List
import redis

app = Celery('tasks', broker="redis://localhost:6379/0")

# Data Extraction Task
@app.task
def data_extraction(pipeline_model:dict, extract_type:TriggerSyncTypeEnum, last_extraction:datetime = None):
	"""
	Extract data with 
	pipeline.source.list_files_full
	pipeline.source.list_files_delta
	"""

# Data Processing Task
@app.task
def data_processing(pipeline_model:dict, source_model: dict, cloudFile_model:dict):
	"""
	Process data with 
	pipeline.source.download_files, 
	pipeline.source.load_data,
	pipeline.source.chunk_data
	"""

# Data Embed and Ingest Task
@app.task
def data_embed_ingest(pipeline_model:dict, chunks:List[dict]):
	"""
	Embed and Ingest data with 
	pipeline.embed.embed 
	pipeline.sink.store
	"""

As detailed in our architecture overview, we are setting up three tasks that we will attach workers to. The tasks will be joined through a broker where we will be dropping task requests and where workers will be picking up tasks. In the next sections, we will fill out the contents for each task and build our main.py where we will trigger flows.

Data extraction task

The data extraction task is simple in nature, it will concentrate on using Neum AI SourceConnector to extract data from a DataConnector. The framework provides a variety of data connectors including services like Postgres, Supabase Storage and SharePoint. All the connectors support two extraction methods: list_files_full and list_files_delta. These methods define whether we pull everything inside the connector or only pull the data since the last extraction happened. (This can be customizable to just a given datetime)

Let's modify our tasks.py file as follows:


# Data Extraction Task
@app.task
def data_extraction(pipeline:Pipeline, extract_type:TriggerSyncTypeEnum, last_extraction:datetime = None):
  """
  Extract data with 
  pipeline.source.list_files_full
  pipeline.source.list_files_delta
	"""
	pipeline = Pipeline(**pipeline_model)
	
  for source in pipeline.sources:
      if extract_type == TriggerSyncTypeEnum.full:
          for file in source.list_files_full():
              print(f"Sending file: {file.id} to data_processing")
              data_processing.apply_async(
                  kwargs={"pipeline_model":pipeline_model, "source_model": source.as_json(), "cloudFile_model": file.toJson()}, 
                  queue="data_processing"
              )
      elif extract_type == TriggerSyncTypeEnum.delta:
          for file in source.list_files_delta(last_run = last_extraction):
              print(f"Sending file: {file.id} to data_processing")
              data_processing.apply_async(
                  kwargs={"pipeline_model":pipeline_model, "source_model": source.as_json(), "cloudFile_model": file.toJson()}, 
                  queue="data_processing"
              )

The function supports both types of sync and generates an async task to be added into our broker queue for a data_processing job. The list_files_full and list_files_delta both output Generator objects so as files are extracted, they are automatically sent out to tasks. In practice it means that even as we continue to extract data, we can also start to process it in parallel.

Data processing task

Now that we have a task ready to extract data and pass it to our processing task, lets configure that.

The data processing will be done using SourceConnector methods to download, load and chunk the data. The framework provides a variety of pre-processing transformations through Loaders and  Chunkers Additional logic can be included at this level to process the data further.

Let's modify tasks.py as follows:


# Data Processing Task
@app.task
def data_processing(pipeline:Pipeline, source: SourceConnector, cloudFile:CloudFile):
	"""
  Process data with 
  pipeline.source.download_files, 
  pipeline.source.load_data,
  pipeline.source.chunk_data
	"""
	source = SourceConnector(**source_model)
	cloudFile = CloudFile.as_file(cloudFile_model)

	batch_number = 0
	batched_chunks:List[NeumDocument] = []
	for localFile in source.download_files(cloudFile=cloudFile):
		for document in source.load_data(file=localFile):
			for chunks in source.chunk_data(document=document):
				batched_chunks.extend(chunks)
				# If we have enough chunks, send to embed and ingest
				if len(batched_chunks) > 200:
					print(f"Sending batch # {batch_number} for file: {localFile.id} to data_embed_ingest")
					data_embed_ingest.apply_async(
			            kwargs={"pipeline_model":pipeline_model, "chunks":[chunk.toJson() for chunk in batched_chunks]}, 
						queue="data_embed_ingest"
					)
					batched_chunks = []
					batch_number += 1
	# If anything left, then send over
	if len(batched_chunks) > 0:
		print(f"Sending batch # {batch_number} for file: {localFile.id} to data_embed_ingest")
		data_embed_ingest.apply_async(
			kwargs={"pipeline_model":pipeline_model, "chunks":[chunk.toJson() for chunk in batched_chunks]}, 
			queue="data_embed_ingest"
		)

At this level, it might seem a bit inefficient to have nested for-loops in this manner. That said, we have found that most of the transformations like chunking and loading are lightweight enough to tackle in this manner. We do anticipate that as more complex transformation is added even some that might require an LLM, additional parallelization could be done here.

In this task, we will also take an opportunity to batch up chunks into sub-groups. Depending on the file, there might hundreds or thousands of chunks, so we want to parallelize the generation of embeddings. We have it set to an arbitrary 200 chunks per batch, but you can configure that further.

Data embed and ingest

Now that we are ready to embed and ingest data, we can configure our next task.

The data embed and ingest task will leverage both our EmbedConnector and SinkConnector methods to generate embedding and ingest them into a vector database.

Let's modify tasks.py as follows:


# Data Embed and Ingest Task
@app.task
def data_embed_ingest(pipeline:Pipeline, documents:List[NeumDocument]):
	"""
  Embed and Ingest data with 
  pipeline.embed.embed 
  pipeline.sink.store
	"""
	from neumai.Shared.NeumVector import NeumVector
	pipeline = Pipeline(**pipeline_model)
	documents: List[NeumDocument] = [NeumDocument.as_file(chunk) for chunk in chunks]

	vector_embeddings, embeddings_info = pipeline.embed.embed(documents=documents)
	vectors_to_store = [NeumVector(id=documents[i].id, vector=vector_embeddings[i], metadata=documents[i].metadata) for i in range(0,len(vector_embeddings))]
	vectors_written = pipeline.sink.store(
						            vectors_to_store = vectors_to_store,
						            pipeline_id=pipeline.id, 
						        )
	print(f"Finished embedding and storing {vectors_written} vectors")

This task is straightforward we are simply generating embeddings from the documents we processed and then passing them to the vector database for storage. In the middle we are simply aligning things together to generate NeumVector objects which contain the vector embedding, an id and metadata. This step is crucial to ensure we have unique identifiers for each vector which is key for future ingestions.

Triggering our tasks

Now that we have configured all three tasks, we can go back to our main.py and create a trigger to push code onto our broker so that it is picked up by workers running the tasks. We will start with a sample pipeline similar to the one we generated in part 1 of the blog series and add it to main.py. You will need to configure the connectors for OpenAI and Weaviate.


from neumai.Pipelines import Pipeline
from neumai.DataConnectors import WebsiteConnector
from neumai.Shared import Selector
from neumai.Loaders.HTMLLoader import HTMLLoader
from neumai.Chunkers.RecursiveChunker import RecursiveChunker
from neumai.Sources import SourceConnector
from neumai.EmbedConnectors import OpenAIEmbed
from neumai.SinkConnectors import WeaviateSink

website_connector =  WebsiteConnector(
    url = "https://www.neum.ai/post/retrieval-augmented-generation-at-scale",
    selector = Selector(
        to_metadata=['url']
    )
)

source = SourceConnector(
  data_connector=website_connector,
  loader=HTMLLoader(),
  chunker=RecursiveChunker()
)

openai_embed = OpenAIEmbed(
    api_key = "",
)

weaviate_sink = WeaviateSink(
  url = "your-weaviate-url",
  api_key = "your-api-key",
  class_name = "your-class-name",
)

pipeline = Pipeline(
  sources=[source],
  embed=openAIEmbed,
  sink=weaviateSink
)

Next, we will add a trigger to push our pipeline to the data_extraction task. There it will trigger the rest of the tasks. Let's modify main.py:


from .tasks import data_extraction
from neumai.Pipelines.TriggerSyncTypeEnum import TriggerSyncTypeEnum

data_extraction.apply_async(
	kwargs={"pipeline":pipeline, "extract_type":TriggerSyncTypeEnum.full, },
	queue="data_extraction"
)
# You can update the kwargs to do a delta trigger

This code will take out defined pipeline and add it to our data_extraction queue for processing.

Let's run it

To get everything ready to run our solution, we first need to get our redis queues running. To do this, we will use the redis CLI:


sudo service redis-server start

Once we have the redis queues running, we can now start our Celery based workers. We will have each running on its own command line.

data_extraction worker


celery -A tasks worker --concurrency 1 -Q data_extraction

data_processing worker


celery -A tasks worker --concurrency 1 -Q data_processing

data_embed_ingest worker


celery -A tasks worker --concurrency 1 -Q data_embed_ingest

Once everything is running, we can now trigger out pipeline. This will distribute the tasks from it into the different queues as it processes the data.


python main.py

Conclusions

This is just a sample architecture that you can leverage with the Neum AI framework. The number of queues can be modified to be more or less depending on your needs. You can also change the broker and worker library you want to use. The framework provides that versatility to ensure you support your workloads no matter the size or your target architecture.

Find the completed code we walked through in GitHub.

Check out our latest post

Follows us on social for additional content

Configuring RAG pipelines requires iteration across different parameters ranging from pre-processing loaders and chunkers, to the actual embedding model being used. To assist in testing different configurations, Neum AI provides several tools to test, evaluate and compare pipelines.
David de Matheu
December 6, 2023
10
min read
Real-time synchronization of embeddings into vector databases is now trivial! Learn how to create a real-time Retrieval Augmented Generation pipeline with Neum and Supabase.
Kevin Cohen
November 25, 2023
8
min read
Following the release of Neum AI framework, an open-source project to build large scale RAG pipelines, we explore how to get started building with the framework in a multi-part series.
David de Matheu
November 22, 2023
15
min read

Ready to start scaling your RAG solution?

We are here to help. Get started today with our SDK and Cloud offerings.