diff options
Diffstat (limited to 'contrib/postgres_fdw/connection.c')
| -rw-r--r-- | contrib/postgres_fdw/connection.c | 26 |
1 files changed, 24 insertions, 2 deletions
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index ee0b4acf0ba..54ab8edfab6 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -62,6 +62,7 @@ typedef struct ConnCacheEntry Oid serverid; /* foreign server OID used to get server name */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ + PgFdwConnState state; /* extra per-connection state */ } ConnCacheEntry; /* @@ -115,9 +116,12 @@ static bool disconnect_cached_connections(Oid serverid); * will_prep_stmt must be true if caller intends to create any prepared * statements. Since those don't go away automatically at transaction end * (not even on error), we need this flag to cue manual cleanup. + * + * If state is not NULL, *state receives the per-connection state associated + * with the PGconn. */ PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) +GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) { bool found; bool retry = false; @@ -196,6 +200,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt) */ PG_TRY(); { + /* Process a pending asynchronous request if any. */ + if (entry->state.pendingAreq) + process_pending_request(entry->state.pendingAreq); /* Start a new transaction or subtransaction if needed. */ begin_remote_xact(entry); } @@ -264,6 +271,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; + /* If caller needs access to the per-connection state, return it. */ + if (state) + *state = &entry->state; + return entry->conn; } @@ -291,6 +302,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->mapping_hashvalue = GetSysCacheHashValue1(USERMAPPINGOID, ObjectIdGetDatum(user->umid)); + memset(&entry->state, 0, sizeof(entry->state)); /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); @@ -648,8 +660,12 @@ GetPrepStmtNumber(PGconn *conn) * Caller is responsible for the error handling on the result. */ PGresult * -pgfdw_exec_query(PGconn *conn, const char *query) +pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state) { + /* First, process a pending asynchronous request, if any. */ + if (state && state->pendingAreq) + process_pending_request(state->pendingAreq); + /* * Submit a query. Since we don't use non-blocking mode, this also can * block. But its risk is relatively small, so we ignore that for now. @@ -940,6 +956,8 @@ pgfdw_xact_callback(XactEvent event, void *arg) { entry->have_prep_stmt = false; entry->have_error = false; + /* Also reset per-connection state */ + memset(&entry->state, 0, sizeof(entry->state)); } /* Disarm changing_xact_state if it all worked. */ @@ -1172,6 +1190,10 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) * Cancel the currently-in-progress query (whose query text we do not have) * and ignore the result. Returns true if we successfully cancel the query * and discard any pending result, and false if not. + * + * XXX: if the query was one sent by fetch_more_data_begin(), we could get the + * query text from the pendingAreq saved in the per-connection state, then + * report the query using it. */ static bool pgfdw_cancel_query(PGconn *conn) |
