Skip to content

Store initialization race condition - Table 'langchain_pg_collection' is already defined for this MetaData instance #165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
MartinGotelli opened this issue Feb 26, 2025 · 4 comments

Comments

@MartinGotelli
Copy link
Contributor

MartinGotelli commented Feb 26, 2025

sqlalchemy.exc.InvalidRequestError: Table 'langchain_pg_collection' is already defined for this MetaData instance

This error occurs when you create two instances of the PGVector instance at the same time. Since the models (CollectionStore and EmbeddingStore) are created "dynamically", they can be instantiated at the same time, since SQLAlchemy uses a metadata cache, this introduces a race condition.

        must_exist = kw.pop("must_exist", kw.pop("mustexist", False))
        key = _get_table_key(name, schema)
        if key in metadata.tables:
            if not keep_existing and not extend_existing and bool(args):
                raise exc.InvalidRequestError(
                    f"Table '{key}' is already defined for this MetaData "
                    "instance.  Specify 'extend_existing=True' "
                    "to redefine "
                    "options and columns on an "
                    "existing Table object."
                )

I have a couple of suggestions, one can be a simple mutex on the _get_embedding_collection_store method, and another one can be defining the models with extend_existing or keep_existing table_args. Finally, receiving table args by parameter and sending them to the models.

What do you think? I can create the PR, but I want to know what you prefer.

I would go with adding table args with "extend_existing" as True

@pytest.mark.parametrize("execution_number", range(10))
def test_race_condition(execution_number):
    from langchain_postgres.vectorstores import PGVector
    from langchain_openai import OpenAIEmbeddings
    from threading import Thread

    def store():
        PGVector(
            connection="postgresql+psycopg://postgres:postgres@localhost:5432/embeddings",
            embeddings=OpenAIEmbeddings(),
            collection_name="test_collection",
        )

    Thread(target=store).start()
    store()

EDIT: There is also a race condition on the table creation, a mutex makes sense in there

@CJ-Lab7
Copy link

CJ-Lab7 commented Mar 12, 2025

I'm having the same issue. This seems related to langchain-ai/langchain#14699

[2025-03-11 21:21:00,989: ERROR/ForkPoolWorker-8] Task esp_ai_ext.tasks.task_retriever[378edf3a-ad12-4b1b-a669-c5f93b4d46b7] raised unexpected: InvalidRequestError("Table 'langchain_pg_collection' is already defined for this MetaData instance. Specify 'extend_existing=True' to redefine options and columns on an existing Table object.")
Traceback (most recent call last):
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/celery/app/trace.py", line 453, in trace_task
R = retval = fun(*args, **kwargs)
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/celery/app/trace.py", line 736, in protected_call
return self.run(*args, **kwargs)
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/celery/app/autoretry.py", line 38, in run
return task._orig_run(*args, **kwargs)
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/esp_ai_ext/tasks.py", line 65, in task_retriever
results = loop.run_until_complete(
File "/usr/local/Cellar/[email protected]/3.9.21/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/langgraph/pregel/init.py", line 1989, in ainvoke
async for chunk in self.astream(
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/langgraph/pregel/init.py", line 1874, in astream
async for _ in runner.atick(
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/langgraph/pregel/runner.py", line 444, in atick
_panic_or_proceed(
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/langgraph/pregel/runner.py", line 539, in _panic_or_proceed
raise exc
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/langgraph/pregel/retry.py", line 132, in arun_with_retry
return await task.proc.ainvoke(task.input, config)
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/langgraph/utils/runnable.py", line 445, in ainvoke
input = await step.ainvoke(input, config, **kwargs)
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/langgraph/utils/runnable.py", line 238, in ainvoke
ret = await self.afunc(input, **kwargs)
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/langchain_core/runnables/config.py", line 588, in run_in_executor
return await asyncio.get_running_loop().run_in_executor(
File "/usr/local/Cellar/[email protected]/3.9.21/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/langchain_core/runnables/config.py", line 579, in wrapper
return func(*args, **kwargs)
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/esp_ai_ext/retrieval_graph/graph.py", line 60, in retrieve_documents
vs = get_vectorstore(RAG_DOC_NAMESPACE)
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/esp_ai_ext/vector_store/retriever.py", line 9, in get_vectorstore
return PGVector(
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/langchain_postgres/vectorstores.py", line 463, in init
self.post_init()
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/langchain_postgres/vectorstores.py", line 472, in post_init
EmbeddingStore, CollectionStore = _get_embedding_collection_store(
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/langchain_postgres/vectorstores.py", line 108, in _get_embedding_collection_store
class CollectionStore(Base):
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/sqlalchemy/orm/decl_api.py", line 198, in init
as_declarative(reg, cls, dict)
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/sqlalchemy/orm/decl_base.py", line 244, in _as_declarative
return MapperConfig.setup_mapping(registry, cls, dict, None, {})
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/sqlalchemy/orm/decl_base.py", line 325, in setup_mapping
return _ClassScanMapperConfig(
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/sqlalchemy/orm/decl_base.py", line 576, in init
self._setup_table(table)
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/sqlalchemy/orm/decl_base.py", line 1757, in _setup_table
table_cls(
File "", line 2, in new
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/sqlalchemy/util/deprecations.py", line 281, in warned
return fn(*args, **kwargs) # type: ignore[no-any-return]
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/sqlalchemy/sql/schema.py", line 430, in new
return cls._new(*args, **kw)
File "/Users/cj/bitrepos/esp3/esp/core/Data/extensions/isolated/venvs/esp-ai-ext/lib/python3.9/site-packages/sqlalchemy/sql/schema.py", line 462, in _new
raise exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Table 'langchain_pg_collection' is already defined for this MetaData instance. Specify 'extend_existing=True' to redefine options and columns on an existing Table object.

@MartinGotelli
Copy link
Contributor Author

Hey @CJ-Lab7, I have more context about this issue.
The "MetaData instance" problem happens once per "run", so once you start up the service, that problem stops happening, you need to ensure that you instantiate your PGVector instance once at the start-up of the application (or process).

If you enable the tables or collections creations, you can have another race condition more "probable". We are doing retries, but we are planning on implementing a mutex on the Vectorstore creation.

I hope this helps you, and let me know if you want to know more

@CJ-Lab7
Copy link

CJ-Lab7 commented Mar 12, 2025

Thanks @MartinGotelli, that helped.

I was instantiating the vectorstore in langgraph nodes running in parallel which caused the issue

@0xrushi
Copy link

0xrushi commented May 9, 2025

Looks like this won't happen in the new v2

if you still need v1, #209 should do it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants