Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions src_cpp/include/node_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ namespace main {
class ConnectionExecuteAsyncWorker : public Napi::AsyncWorker {
public:
ConnectionExecuteAsyncWorker(Napi::Function& callback, std::shared_ptr<Connection>& connection,
std::shared_ptr<Database>& database,
std::shared_ptr<PreparedStatement> preparedStatement, NodeQueryResult* nodeQueryResult,
std::unordered_map<std::string, std::unique_ptr<Value>> params,
Napi::Value progressCallback)
: Napi::AsyncWorker(callback), connection(connection),
: Napi::AsyncWorker(callback), connection(connection), database(database),
preparedStatement(std::move(preparedStatement)), nodeQueryResult(nodeQueryResult),
params(std::move(params)) {
if (progressCallback.IsFunction()) {
Expand All @@ -100,12 +101,11 @@ class ConnectionExecuteAsyncWorker : public Napi::AsyncWorker {
auto result =
connection
->executeWithParamsWithID(preparedStatement.get(), std::move(params), queryID);
auto* resultRaw = result.get();
nodeQueryResult->AdoptQueryResult(std::move(result));
if (!resultRaw->isSuccess()) {
SetError(resultRaw->getErrorMessage());
if (!result->isSuccess()) {
SetError(result->getErrorMessage());
return;
}
nodeQueryResult->AdoptQueryResult(std::move(result), database);
} catch (const std::exception& exc) {
SetError(std::string(exc.what()));
}
Expand All @@ -122,6 +122,7 @@ class ConnectionExecuteAsyncWorker : public Napi::AsyncWorker {

private:
std::shared_ptr<Connection> connection;
std::shared_ptr<Database> database;
std::shared_ptr<PreparedStatement> preparedStatement;
NodeQueryResult* nodeQueryResult;
std::unordered_map<std::string, std::unique_ptr<Value>> params;
Expand All @@ -131,9 +132,10 @@ class ConnectionExecuteAsyncWorker : public Napi::AsyncWorker {
class ConnectionQueryAsyncWorker : public Napi::AsyncWorker {
public:
ConnectionQueryAsyncWorker(Napi::Function& callback, std::shared_ptr<Connection>& connection,
std::shared_ptr<Database>& database,
std::string statement, NodeQueryResult* nodeQueryResult, Napi::Value progressCallback)
: Napi::AsyncWorker(callback), connection(connection), statement(std::move(statement)),
nodeQueryResult(nodeQueryResult) {
: Napi::AsyncWorker(callback), connection(connection), database(database),
statement(std::move(statement)), nodeQueryResult(nodeQueryResult) {
if (progressCallback.IsFunction()) {
this->progressCallback = Napi::ThreadSafeFunction::New(Env(),
progressCallback.As<Napi::Function>(), "ProgressCallback", 0, 1);
Expand All @@ -156,11 +158,11 @@ class ConnectionQueryAsyncWorker : public Napi::AsyncWorker {
}
try {
auto result = connection->queryWithID(statement, queryID);
auto* resultRaw = result.get();
nodeQueryResult->AdoptQueryResult(std::move(result));
if (!resultRaw->isSuccess()) {
SetError(resultRaw->getErrorMessage());
if (!result->isSuccess()) {
SetError(result->getErrorMessage());
return;
}
nodeQueryResult->AdoptQueryResult(std::move(result), database);
} catch (const std::exception& exc) {
SetError(std::string(exc.what()));
}
Expand All @@ -177,6 +179,7 @@ class ConnectionQueryAsyncWorker : public Napi::AsyncWorker {

private:
std::shared_ptr<Connection> connection;
std::shared_ptr<Database> database;
std::string statement;
NodeQueryResult* nodeQueryResult;
std::optional<Napi::ThreadSafeFunction> progressCallback;
Expand Down
9 changes: 6 additions & 3 deletions src_cpp/include/node_query_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ class NodeQueryResult : public Napi::ObjectWrap<NodeQueryResult> {

public:
static Napi::Object Init(Napi::Env env, Napi::Object exports);
static Napi::Object NewInstance(Napi::Env env, std::unique_ptr<QueryResult> queryResult);
static Napi::Object NewInstance(Napi::Env env, std::unique_ptr<QueryResult> queryResult,
std::shared_ptr<Database> db);
explicit NodeQueryResult(const Napi::CallbackInfo& info);
void AdoptQueryResult(std::unique_ptr<QueryResult> queryResult);
void AdoptQueryResult(std::unique_ptr<QueryResult> queryResult, std::shared_ptr<Database> db);
std::unique_ptr<QueryResult> DetachNextQueryResult();
~NodeQueryResult() override;

Expand Down Expand Up @@ -52,6 +53,7 @@ class NodeQueryResult : public Napi::ObjectWrap<NodeQueryResult> {
private:
static Napi::FunctionReference constructor;
std::unique_ptr<QueryResult> ownedQueryResult = nullptr;
std::shared_ptr<Database> database = nullptr;
std::unique_ptr<std::vector<std::string>> columnNames = nullptr;
std::atomic<uint32_t> activeAsyncUses = 0;
};
Expand Down Expand Up @@ -202,7 +204,8 @@ class NodeQueryResultGetNextQueryResultAsyncWorker : public Napi::AsyncWorker {
Callback().Call({env.Null(), env.Undefined()});
return;
}
Callback().Call({env.Null(), NodeQueryResult::NewInstance(env, std::move(nextOwnedResult))});
Callback().Call({env.Null(), NodeQueryResult::NewInstance(env, std::move(nextOwnedResult),
currQueryResult->database)});
}

void OnError(Napi::Error const& error) override {
Expand Down
21 changes: 9 additions & 12 deletions src_cpp/node_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ void NodeConnection::InitCppConnection() {
this->connection = std::make_shared<Connection>(database.get());
ProgressBar::Get(*connection->getClientContext())
->setDisplay(std::make_shared<NodeProgressBarDisplay>());
// After the connection is initialized, we do not need to hold a reference to the database.
database.reset();
}

void NodeConnection::SetMaxNumThreadForExec(const Napi::CallbackInfo& info) {
Expand Down Expand Up @@ -87,6 +85,7 @@ void NodeConnection::Close(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
Napi::HandleScope scope(env);
this->connection.reset();
this->database.reset();
}

Napi::Value NodeConnection::ExecuteAsync(const Napi::CallbackInfo& info) {
Expand All @@ -98,7 +97,7 @@ Napi::Value NodeConnection::ExecuteAsync(const Napi::CallbackInfo& info) {
auto callback = info[3].As<Napi::Function>();
try {
auto params = Util::TransformParametersForExec(info[2].As<Napi::Array>());
auto asyncWorker = new ConnectionExecuteAsyncWorker(callback, connection,
auto asyncWorker = new ConnectionExecuteAsyncWorker(callback, connection, database,
nodePreparedStatement->preparedStatement, nodeQueryResult, std::move(params), info[4]);
asyncWorker->Queue();
} catch (const std::exception& exc) {
Expand All @@ -114,11 +113,10 @@ Napi::Value NodeConnection::QuerySync(const Napi::CallbackInfo& info) {
auto nodeQueryResult = Napi::ObjectWrap<NodeQueryResult>::Unwrap(info[1].As<Napi::Object>());
try {
auto result = connection->query(statement);
auto* resultRaw = result.get();
nodeQueryResult->AdoptQueryResult(std::move(result));
if (!resultRaw->isSuccess()) {
Napi::Error::New(env, resultRaw->getErrorMessage()).ThrowAsJavaScriptException();
if (!result->isSuccess()) {
Napi::Error::New(env, result->getErrorMessage()).ThrowAsJavaScriptException();
}
nodeQueryResult->AdoptQueryResult(std::move(result), database);
} catch (const std::exception& exc) {
Napi::Error::New(env, std::string(exc.what())).ThrowAsJavaScriptException();
}
Expand All @@ -135,11 +133,10 @@ Napi::Value NodeConnection::ExecuteSync(const Napi::CallbackInfo& info) {
auto params = Util::TransformParametersForExec(info[2].As<Napi::Array>());
auto result = connection->executeWithParams(nodePreparedStatement->preparedStatement.get(),
std::move(params));
auto* resultRaw = result.get();
nodeQueryResult->AdoptQueryResult(std::move(result));
if (!resultRaw->isSuccess()) {
Napi::Error::New(env, resultRaw->getErrorMessage()).ThrowAsJavaScriptException();
if (!result->isSuccess()) {
Napi::Error::New(env, result->getErrorMessage()).ThrowAsJavaScriptException();
}
nodeQueryResult->AdoptQueryResult(std::move(result), database);
} catch (const std::exception& exc) {
Napi::Error::New(env, std::string(exc.what())).ThrowAsJavaScriptException();
}
Expand All @@ -153,7 +150,7 @@ Napi::Value NodeConnection::QueryAsync(const Napi::CallbackInfo& info) {
auto nodeQueryResult = Napi::ObjectWrap<NodeQueryResult>::Unwrap(info[1].As<Napi::Object>());
auto callback = info[2].As<Napi::Function>();
auto asyncWorker =
new ConnectionQueryAsyncWorker(callback, connection, statement, nodeQueryResult, info[3]);
new ConnectionQueryAsyncWorker(callback, connection, database, statement, nodeQueryResult, info[3]);
asyncWorker->Queue();
return info.Env().Undefined();
}
11 changes: 7 additions & 4 deletions src_cpp/node_query_result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ Napi::Object NodeQueryResult::Init(Napi::Env env, Napi::Object exports) {
}

Napi::Object NodeQueryResult::NewInstance(
Napi::Env /*env*/, std::unique_ptr<QueryResult> queryResult) {
Napi::Env /*env*/, std::unique_ptr<QueryResult> queryResult, std::shared_ptr<Database> db) {
auto obj = constructor.New({});
auto* nodeQueryResult = Napi::ObjectWrap<NodeQueryResult>::Unwrap(obj);
nodeQueryResult->AdoptQueryResult(std::move(queryResult));
nodeQueryResult->AdoptQueryResult(std::move(queryResult), std::move(db));
return obj;
}

Expand All @@ -51,10 +51,12 @@ NodeQueryResult::~NodeQueryResult() {
this->Close();
}

void NodeQueryResult::AdoptQueryResult(std::unique_ptr<QueryResult> queryResult) {
void NodeQueryResult::AdoptQueryResult(
std::unique_ptr<QueryResult> queryResult, std::shared_ptr<Database> db) {
ThrowIfAsyncOperationInFlight("replace");
columnNames.reset();
ownedQueryResult = std::move(queryResult);
database = std::move(db);
}

std::unique_ptr<QueryResult> NodeQueryResult::DetachNextQueryResult() {
Expand Down Expand Up @@ -140,7 +142,7 @@ Napi::Value NodeQueryResult::GetNextQueryResultSync(const Napi::CallbackInfo& in
.ThrowAsJavaScriptException();
return env.Undefined();
}
return NewInstance(env, std::move(nextOwnedResult));
return NewInstance(env, std::move(nextOwnedResult), database);
} catch (const std::exception& exc) {
Napi::Error::New(env, std::string(exc.what())).ThrowAsJavaScriptException();
}
Expand Down Expand Up @@ -286,4 +288,5 @@ void NodeQueryResult::Close(const Napi::CallbackInfo& info) {
void NodeQueryResult::Close() {
columnNames.reset();
ownedQueryResult.reset();
database.reset();
}
28 changes: 26 additions & 2 deletions test/test_database.js
Original file line number Diff line number Diff line change
Expand Up @@ -505,11 +505,35 @@ describe("Database close", function () {
assert.equal(res.getNumTuples(), 1);
const tuple = await res.getNext();
assert.deepEqual(tuple, { "+(1,1)": 2 });
// Close in reverse order: db first, then conn, then result. None should crash.
testDb.closeSync();
assert.isTrue(testDb._isClosed);
assert.throws(() => conn.querySync("RETURN 1+1"), Error, "Runtime exception: The current operation is not allowed because the parent database is closed.");
conn.closeSync();
assert.isTrue(conn._isClosed);
assert.throws(() => res.resetIterator(), Error, "Runtime exception: The current operation is not allowed because the parent database is closed.");
res.close();
});

it("should not crash when discarded query results are GC'd after database is closed", async function () {
// Regression test for a double-free bug: NodeQueryResult holds a
// MaterializedQueryResult whose FactorizedTable destructor accesses
// database-owned memory. If the Database is destroyed before the GC
// finalizer for NodeQueryResult runs, that destructor crashes. The fix
// is for NodeQueryResult to hold a shared_ptr<Database> so the Database
// cannot be freed until every result that references it is gone.
//
// The key pattern being tested is: query results are *not stored*, making
// them immediately eligible for GC. conn.closeSync() and db.closeSync()
// are then called before GC has had a chance to collect them. When the GC
// finalizer eventually runs (possibly later in this mocha process), it must
// not crash.
const testDb = new lbug.Database();
const conn = new lbug.Connection(testDb);
await conn.query("CREATE NODE TABLE T(id STRING PRIMARY KEY)");
await conn.query(`CREATE (:T {id: 'test-${Date.now()}'})`);
await conn.query("MATCH (t:T) RETURN t.id");
conn.closeSync();
testDb.closeSync();
assert.isTrue(conn._isClosed);
assert.isTrue(testDb._isClosed);
});
});
Loading