Skip to content
This repository was archived by the owner on Mar 23, 2026. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1566,37 +1566,46 @@ private Void innerCall() throws Exception {
doQuery(cxn, new NullOutputStream());
// success = true;
} else {
doQuery(cxn, os);
try {
doQuery(cxn, os);
// success = true;
/*
* GROUP_COMMIT: For mutation requests, calling flush() on the
* output stream unblocks the client and allows it to proceed
* BEFORE the write set of a mutation has been melded into a
* group commit. This is only a problem for UPDATE requests.
*
* The correct way to handle this is to allow the servlet
* container to close the output stream. That way the close
* occurs only after the group commit and when the control has
* been returned to the servlet container layer.
*
* There are some REST API methods (DELETE-WITH-QUERY,
* UPDATE-WITH-QUERY) that reenter the API using a
* PipedInputStream / PipedOutputStream to run a query (against
* the last commit time) and pipe the results into a parser that
* then executes a mutation without requiring the results to be
* fully buffered. In order for those operations to not deadlock
* we MUST flush() and close() the PipedOutputStream here (at
* last for now - it looks like we probably need to execute those
* REST API methods differently in order to support group commit
* since reading from the lastCommitTime does NOT provide the
* proper visibility guarantees when there could already be
* multiple write sets buffered for the necessary indices by
* other mutation tasks within the current commit group.)
*/
if (os instanceof PipedOutputStream) {
os.flush();
os.close();
}
}
catch(Exception e){
log.error(e);
throw e;
}
finally {
/*
* GROUP_COMMIT: For mutation requests, calling flush() on the
* output stream unblocks the client and allows it to proceed
* BEFORE the write set of a mutation has been melded into a
* group commit. This is only a problem for UPDATE requests.
*
* The correct way to handle this is to allow the servlet
* container to close the output stream. That way the close
* occurs only after the group commit and when the control has
* been returned to the servlet container layer.
*
* There are some REST API methods (DELETE-WITH-QUERY,
* UPDATE-WITH-QUERY) that reenter the API using a
* PipedInputStream / PipedOutputStream to run a query (against
* the last commit time) and pipe the results into a parser that
* then executes a mutation without requiring the results to be
* fully buffered. In order for those operations to not deadlock
* we MUST flush() and close() the PipedOutputStream here (at
* last for now - it looks like we probably need to execute those
* REST API methods differently in order to support group commit
* since reading from the lastCommitTime does NOT provide the
* proper visibility guarantees when there could already be
* multiple write sets buffered for the necessary indices by
* other mutation tasks within the current commit group.)
*/

if (os instanceof PipedOutputStream) {
os.flush();
os.close();
}
}
}
if (log.isTraceEnabled())
log.trace("Query done.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,12 @@ public Void call() throws Exception {
final FutureTask<Void> ft = new FutureTask<Void>(
queryTask);

// Submit query for evaluation.
context.queryService.execute(ft);

// Reads on the statements produced by the query.
final InputStream is = newPipedInputStream(os);

// Submit query for evaluation.
context.queryService.execute(ft);

// Run parser : visited statements will be deleted.
rdfParser.parse(is, baseURI);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,12 @@ public Void call() throws Exception {
final FutureTask<Void> ft = new FutureTask<Void>(
queryTask);

// Submit query for evaluation.
context.queryService.execute(ft);

// Reads on the statements produced by the query.
final InputStream is = newPipedInputStream(os);

// Submit query for evaluation.
context.queryService.execute(ft);

// Run parser : visited statements will be deleted.
rdfParser.parse(is, baseURI);

Expand Down