Blog
This is some text inside of a div block.

Building scalable RAG pipelines with Neum AI framework  -  Part 1

David de Matheu
November 22, 2023
15
min read

This week we released a major release for Neum AI that introduced a new open-source framework for Python. This framework allows you to build and run data pipelines in your local environment that generate vector embeddings. It provides a collection of data source, embedding and vector database connectors to pick from. Built-ins are pre-processing transformations like loading and chunking, as well as support for selectors to augment the vector embeddings with metadata.

In this blog, we will showcase the process of getting started with creating a basic pipeline that runs locally in our environment. In sub-sequent blogs, we will go deeper into scaling the pipelines.

Brief overview of RAG

If you are new to RAG, welcome! RAG or Retrieval Augmented Generation is a technique to contextualize Large Language Models with data. Imagine you ask an LLM about last week’s meeting. It probably won’t know what you are talking about because it hasn’t been trained with the transcript or data from the meeting. By using RAG, you can take the transcript and data from the meeting and make it searchable so that the model can get context. So when you ask, what did John say during the meeting, the model can search the data and find what John said and use it to answer your question.

The goal with RAG is to do this same process but across large datasets (think all the docs a company has). To do this, the data must be ingested into indexes. To translate the data into a format that we can index, we use vector embeddings. (i.e. numerical representation of the semantic meaning of the data). The vector can be augmented with metadata like the title of the meeting or the time and day to help reduce the search space and improve the quality of the retrieved information for the model.

Building RAG with Neum AI

Neum AI is built as a highly distributable and modular data extraction framework that supports RAG. It provides:

  • Data connectors that extract data from structured and unstructured sources
  • Embed connectors that convert data into vector embeddings
  • Sink connectors that ingest vector embeddings into Vector Databases to be indexed

In addition to the main connectors, Loaders, Chunkers and selectors are available to pre-process data.

  • Loaders help to translate data types and formats into text
  • Chunkers breakdown large text documents into smaller pieces
  • Selectors capture metadata from connectors and loaders to augments vector embeddings.

With this structure, Neum AI can help connect your data to large language models and optimize the process of searching for specific context.

Getting started

To get started with Neum AI, we will install the neumai package:


pip install neumai

In addition to a Neum AI package, we will use:

  • Open AI embeddings model for which you will need an Open AI API Key. To get an API Key visit OpenAI. Make sure you have configured billing for the account.
  • Weaviate vector database for which you will need a Weaviate Cloud Service URL and API Key. To get a URL and API Key visit Weaviate Cloud Service.

For this example, we will be configuring a simple Website connector that will extract data from a

website. We will use one of our previous blogs as an example. (Blog: Neum AI — Retrieval Augmented Generation at scale — Building a distributed system for synchronizing and ingesting billions of text embeddings)

Configure the website connector

We will start by configuring the WebsiteConnector . The connector takes a url property. It also supports a Selector to define what information from the connector should be attached to the vector as metadata. In this case we will pick the url


from neumai.DataConnectors import WebsiteConnector
from neumai.Shared import Selector

website_connector =  WebsiteConnector(
    url = "https://www.neum.ai/post/retrieval-augmented-generation-at-scale",
    selector = Selector(
        to_metadata=['url']
    )
)

To provide a bit more context, the resulting vectors that will be extracted from this connector will look something like this:


{
	"vector" : [......], // List of floats
	"metadata" : {
		"text" : "Extracted text from the website",
		"url" : "URL from the website"
	}
}

Next, we will choose a loader and chunker to be used to pre-process the data extracted from the source. For the Website connector, we will use an HTML Loader as we are extracting HTML code and will use the Recursive Chunker to split up the text. We will configure the Data Connector, Loader and Chunker into a SourceConnector.


from neumai.Loaders.HTMLLoader import HTMLLoader
from neumai.Chunkers.RecursiveChunker import RecursiveChunker
from neumai.Sources import SourceConnector

source = SourceConnector(
  data_connector=website_connector,
  loader=HTMLLoader(),
  chunker=RecursiveChunker()
)

The source connector will take care of extracting, pre-processing and readying the data to be embedded. A single Neum AI pipeline can support multiple source connectors, each with their bespoke configuration.

The source connector has several built-in methods to extract and process the data. The methods can used independently to test sources, but are all integrated upstream. It includes:

  • list_files: This method will extract the the identifiers from the data source. This can include download links for hosted files or raw data for structured data sources like Postgres. There are two flavors of this method, full and delta. Full will extract everything vs Delta will only extract data since a given date. It outputs a CloudFile object.
  • download_files: This method downloads to local storage / memory the data so that it can be processed. It outputs a LocalFile object.
  • load_data: Takes the LocalFile objects and loads them into text. Outputs a NeumDocument objects.
  • chunk_data:  Takes the NeumDocument objects and splits them into smaller documents. Outputs NeumDocument objects as well.

Configure embed connector

Once we have our data ready to be transformed into vector embeddings, we can now configure the embedding service. For this example, we will use the Open AI connector. This connector uses text-ada-002 the most popular embedding model in the market to generate vector embeddings.


from neumai.EmbedConnectors import OpenAIEmbed

openai_embed = OpenAIEmbed(
    api_key = "OPEN AI KEY",
)

This connector has a single method: embed which takes NeumDocument objects and outputs vector embeddings which are represented as lists of floats. This vectors have to be matched back to the NeumDocuments to add text and metadata. Neum AI has a built-in capability to this using NeumVector constructs.

Configure sink connector

The final connector to configure is the SinkConnector. For this example, we will use the Weaviate connector. Weaviate is a popular open-source vector database.

To configure the Weaviate connector, you will need a connection parameters including: url and api_key. Other parameters are available to further configure the connector. For example, we will use class_name to define a name for the index we are creating.

The sink connectors provide several methods to ingest the data, search its contents and monitor what is inside.

  • store: Takes NeumVectors as input and returns the number for vectors ingested.
  • search: Takes a vector embedding which can be generated using the embed.embed_query() method and returns NeumSearchResult objects that contain the retrieved vector, metadata and similarity score.
  • info: Provides a NeumSinkInfo object that contains the total number of vectors stored in the sink.

Now that we have configured all the pieces, we can now use them together to extract, pre-process, transform and ingest data into a vector database.

Run the pipeline

To put all the connectors together, we provide a Pipeline construct. Alternatively, you can use the set of methods provided to push the data through the different stages. We will configure a pipeline object to help us.


from neumai.Pipelines import Pipeline

pipeline = Pipeline(
  sources=[source],
  embed=openai_embed ,
  sink=weaviate_sink 
)

Then we will run the pipeline locally with the provided built-in methods:


print(f"Vector written: {pipeline.run()}")

Output:


Vector written: 44

The run method is not recommended for production use cases as it is quite inefficient. It is simply provided as a means to test configurations and make sure things are working. The reason it is inefficient is because of how its designed:


def run(self) -> int:
        try:
            self.config_validation()
        except Exception as e:
            raise e
        
        try:
            total_vectors_stored = 0
            for source in self.sources:
                for cloudFile in source.list_files_full():
                    for localFile in source.download_files(cloudFile=cloudFile):
                        for document in source.load_data(file=localFile):
                            for chunks in source.chunk_data(document=document):
                                embeddings, embeddings_info = self.embed.embed(documents=chunks)
                                vectors_to_store = [NeumVector(id=str(uuid4()), vector=embeddings[i], metadata=chunks[i].metadata) for i in range(0,len(embeddings))]
                                total_vectors_stored += self.sink.store(vectors_to_store=vectors_to_store, pipeline_id=self.id)
            return total_vectors_stored
        except Exception as e:
            raise e

It is simply nested for-loops working through the different stages. The important thing to mention here, is how the methods used are architected. All the methods leverage Python yield and Generator concepts. This means that the methods output data as it is ready allowing to parallelize the next steps in the process. For example as files are listed, I can parallelize the process of downloading them. Once they are downloaded I can parallelize the process of pre-processing them and generating embeddings. It also means that I can batch things so that I can distribute the load across workers.

We won’t go into those details in this blog, but will be the core of what we will talk about in Part 2.

Search the pipeline

To close up this blog, we will test the data that we extracted, processed and ingested into the vector database. We will use the built-in search method in our Pipeline object. This method takes care of generating a vector embedding from the query that we can use to search our vector collection.


results = pipeline.search(
  query="What are the challenges with scaling RAG?", 
  number_of_results=3
)

for result in results:
    print(result.metadata)

Conclusions

In this blog, we introduced the Neum AI framework for building RAG data pipelines. We showed how the framework can be used alongside existing data sources, embedding services and vector embeddings. We also showed how the framework can be customized with pre-processing steps like chunking and loading, as well as augmented with Selectors to add metadata to the vector embeddings.

Check out our latest post

Follows us on social for additional content

Configuring RAG pipelines requires iteration across different parameters ranging from pre-processing loaders and chunkers, to the actual embedding model being used. To assist in testing different configurations, Neum AI provides several tools to test, evaluate and compare pipelines.
David de Matheu
December 6, 2023
10
min read
Real-time synchronization of embeddings into vector databases is now trivial! Learn how to create a real-time Retrieval Augmented Generation pipeline with Neum and Supabase.
Kevin Cohen
November 25, 2023
8
min read
Following the release of Neum AI framework, an open-source project to build large scale RAG pipelines, we explore how to get started building with the framework in a multi-part series.
David de Matheu
November 22, 2023
15
min read

Ready to start scaling your RAG solution?

We are here to help. Get started today with our SDK and Cloud offerings.