Skip to content
24 changes: 23 additions & 1 deletion InfoLogger/lib/controller/QueryController.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/

const { LogManager, updateAndSendExpressResponseFromNativeError } = require('@aliceo2/web-ui');
const { AbortError, throwIfQueryAborted } = require('../utils/queryCancellation');

/**
* Gateway for all calls that are to query InfoLogger database
Expand All @@ -37,15 +38,36 @@ class QueryController {
* @returns {void}
*/
async getLogs(req, res) {
const abortController = new AbortController();
const { signal } = abortController;
try {
const { body: { criterias, options } } = req;
if (!criterias || Object.keys(criterias).length === 0) {
res.status(400).json({ error: 'Invalid query parameters provided' });
return;
}
const logs = await this._queryService.queryFromFilters(criterias, options);

let responseInProgress = true;

res.on('finish', () => {
responseInProgress = false;
});

res.on('close', () => {
if (responseInProgress) {
abortController.abort();
}
});

const logs = await this._queryService.queryFromFilters(criterias, options, signal);
throwIfQueryAborted(signal);

res.status(200).json(logs);
} catch (error) {
if (signal.aborted || error instanceof AbortError) {
this._logger.infoMessage('Query was cancelled by the client');
return;
}
this._logger.errorMessage(error.toString());
updateAndSendExpressResponseFromNativeError(res, error);
}
Expand Down
32 changes: 25 additions & 7 deletions InfoLogger/lib/services/QueryService.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const mariadb = require('mariadb');
const { LogManager, InvalidInputError } = require('@aliceo2/web-ui');
const { fromSqlToNativeError } = require('../utils/fromSqlToNativeError');
const { processPreparedSQLStatement } = require('../utils/preparedStatementParser');
const { throwIfQueryAborted, attachAbortDestroyHandler } = require('../utils/queryCancellation');

class QueryService {
/**
Expand Down Expand Up @@ -92,32 +93,49 @@ class QueryService {
* @param {object} filters - criteria like MongoDB
* @param {object} options - specific options for the query
* @param {number} options.limit - how many rows to get
* @param {AbortSignal} [signal] - optional signal to cancel the query; when aborted, the DB connection is destroyed
* @returns {Promise.<object>} - {total, more, limit, rows, count, time}
*/
async queryFromFilters(filters, options) {
async queryFromFilters(filters, options, signal = null) {
const { limit = 100000 } = options;
const { criteria, values } = this._filtersToSqlConditions(filters);
const criteriaString = this._getCriteriaAsString(criteria);

const requestRows = `SELECT * FROM \`messages\` ${criteriaString} ORDER BY \`TIMESTAMP\` LIMIT ?;`;
const queryValues = [...values, limit];
const startTime = Date.now(); // ms

this._logger.debugMessage(`SQL to execute: ${processPreparedSQLStatement(requestRows, values, limit)}`);

let rows = [];
let connection = null;
let connectionDestroyed = false;
try {
if (!this._pool) {
throw new Error('No database connection available');
}
rows = await this._pool.query(
{
sql: requestRows,
timeout: this._timeout,
connection = await this._pool.getConnection();
throwIfQueryAborted(signal);

const detachAbortHandler = attachAbortDestroyHandler(
signal,
connection,
() => {
connectionDestroyed = true;
},
[...values, limit],
);

try {
rows = await connection.query({ sql: requestRows, timeout: this._timeout }, queryValues);
} finally {
detachAbortHandler();
}
} catch (error) {
throwIfQueryAborted(signal);
fromSqlToNativeError(error);
} finally {
if (connection && !connectionDestroyed) {
connection.release();
}
}

const totalTime = Date.now() - startTime; // ms
Expand Down
67 changes: 67 additions & 0 deletions InfoLogger/lib/utils/queryCancellation.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* @license
* Copyright 2019-2020 CERN and copyright holders of ALICE O2.
* See http://alice-o2.web.cern.ch/copyright for details of the copyright holders.
* All rights not expressly granted are reserved.
*
* This software is distributed under the terms of the GNU General Public
* License v3 (GPL Version 3), copied verbatim in the file "COPYING".
*
* In applying this license CERN does not waive the privileges and immunities
* granted to it by virtue of its status as an Intergovernmental Organization
* or submit itself to any jurisdiction.
*/

/**
* Custom error class for query cancellation
*/
class AbortError extends Error {
/**
* Create an AbortError
* @param {string} message - error message
*/
constructor(message = 'Query cancelled by client') {
super(message);
this.name = 'AbortError';
this.code = 'QUERY_CANCELLED';
}
}

/**
* Throw an error if the given signal is already aborted.
* @param {AbortSignal|null} signal - optional abort signal
* @throws {AbortError} if signal is aborted
*/
const throwIfQueryAborted = (signal) => {
if (signal?.aborted) {
throw new AbortError();
}
};

/**
* Attach a one-time abort handler to the signal that destroys the connection.
* Return a cleanup function to remove the listener
* @param {AbortSignal|null} signal - optional abort signal
* @param {object} connection - mariadb connection-like object
* @param {function(): void} beforeDestroy - callback called just before destroying connection
* @returns {function(): void} cleanup callback to remove the listener
*/
const attachAbortDestroyHandler = (signal, connection, beforeDestroy) => {
if (!signal) {
return () => {};
}

const abortHandler = () => {
beforeDestroy();
connection.destroy();
};

signal.addEventListener('abort', abortHandler, { once: true });
return () => signal.removeEventListener('abort', abortHandler);
};

module.exports = {
AbortError,
throwIfQueryAborted,
attachAbortDestroyHandler,
};
91 changes: 90 additions & 1 deletion InfoLogger/test/lib/controller/mocha-query-controller.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const assert = require('assert');
const { QueryController } = require('../../../lib/controller/QueryController');
const { spy, stub } = require('sinon');
const { TimeoutError } = require('@aliceo2/web-ui');
const { AbortError } = require('../../../lib/utils/queryCancellation');

describe('QueryController test suite', () => {
describe('getQueryStats() - test suite', () => {
Expand Down Expand Up @@ -106,11 +107,21 @@ describe('QueryController test suite', () => {
criterias: { key: 'value' },
options: { },
};
const callbacks = {};
res.on = (event, fn) => {
callbacks[event] = fn;
return res;
};
queryService.queryFromFilters.resolves(logs);

await queryController.getLogs({ body }, res);

assert.ok(queryService.queryFromFilters.calledWith(body.criterias, body.options));
assert.ok(queryService.queryFromFilters.called);
const call = queryService.queryFromFilters.getCall(0);
assert.strictEqual(call.args[0], body.criterias);
assert.strictEqual(call.args[1], body.options);
assert.ok(!call.args[2].aborted); // signal should not have been aborted on success
assert.ok(call.args[2] instanceof AbortSignal);
assert.ok(res.status.calledWith(200));
assert.ok(res.json.calledWith(logs));
});
Expand All @@ -120,12 +131,90 @@ describe('QueryController test suite', () => {
criterias: { key: 'value' },
options: {},
};
const callbacks = {};
res.on = (event, fn) => {
callbacks[event] = fn;
return res;
};
queryService.queryFromFilters.rejects(new TimeoutError('QUERY TIMED OUT'));

await queryController.getLogs({ body }, res);

assert.ok(res.status.calledWith(408));
assert.ok(res.json.calledWith({ title: 'Timeout', message: 'QUERY TIMED OUT', status: 408 }));
});

it('should not send response when client closes connection before query completes', async () => {
const body = {
criterias: { key: 'value' },
options: {},
};
const callbacks = {};
res.on = (event, fn) => {
callbacks[event] = fn;
return res;
};

// Query that completes after close event
queryService.queryFromFilters.callsFake(() => new Promise((resolve) => {
setTimeout(() => {
resolve([{ id: 1, message: 'log1' }]);
}, 50);
}));

const queryPromise = queryController.getLogs({ body }, res);
// Simulate client closing connection before query completes
await new Promise((r) => setTimeout(r, 10));
callbacks.close();
await queryPromise;

// Response should not be sent
assert.ok(res.status.notCalled);
assert.ok(res.json.notCalled);
});

it('should successfully return logs even if close event fires after finish event', async () => {
const logs = [{ id: 1, message: 'log1' }];
const body = {
criterias: { key: 'value' },
options: {},
};
const callbacks = {};
res.on = (event, fn) => {
callbacks[event] = fn;
return res;
};

queryService.queryFromFilters.resolves(logs);

const queryPromise = queryController.getLogs({ body }, res);
await queryPromise;

// Simulate normal flow: finish fires, then close
callbacks.finish();
callbacks.close();

assert.ok(res.status.calledWith(200));
assert.ok(res.json.calledWith(logs));
});

it('should handle QUERY_CANCELLED error and not send response', async () => {
const body = {
criterias: { key: 'value' },
options: {},
};
const callbacks = {};
res.on = (event, fn) => {
callbacks[event] = fn;
return res;
};
queryService.queryFromFilters.rejects(new AbortError());

await queryController.getLogs({ body }, res);

// Should not send any response on cancellation
assert.ok(res.status.notCalled);
assert.ok(res.json.notCalled);
});
});
});
Loading
Loading