mirror of
https://gitlab.com/openstapps/openstapps.git
synced 2026-01-21 09:03:02 +00:00
fix: potential fixes
This commit is contained in:
@@ -40,7 +40,7 @@
|
||||
"start:debug": "cross-env STAPPS_LOG_LEVEL=31 NODE_CONFIG_ENV=elasticsearch ALLOW_NO_TRANSPORT=true node app.js",
|
||||
"test": "pnpm run test:unit",
|
||||
"test:integration": "sh integration-test.sh",
|
||||
"test:unit": "cross-env NODE_CONFIG_ENV=elasticsearch ALLOW_NO_TRANSPORT=true STAPPS_LOG_LEVEL=0 mocha --exit"
|
||||
"test:unit": "cross-env NODE_CONFIG_ENV=elasticsearch ALLOW_NO_TRANSPORT=true STAPPS_LOG_LEVEL=0 c8 mocha"
|
||||
},
|
||||
"dependencies": {
|
||||
"@elastic/elasticsearch": "8.4.0",
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
*/
|
||||
import {Logger, SMTP} from '@openstapps/logger';
|
||||
import {MailOptions} from 'nodemailer/lib/sendmail-transport';
|
||||
import Queue from 'promise-queue';
|
||||
|
||||
/**
|
||||
* A queue that can send mails in serial
|
||||
*/
|
||||
@@ -32,80 +32,45 @@ export class MailQueue {
|
||||
static readonly VERIFICATION_TIMEOUT = 5000;
|
||||
|
||||
/**
|
||||
* A queue that saves mails, before the transport is ready. When
|
||||
* the transport gets ready this mails are getting pushed in to
|
||||
* the normal queue.
|
||||
* A promise that resolves when the last mail was sent
|
||||
*/
|
||||
dryQueue: MailOptions[];
|
||||
|
||||
/**
|
||||
* A queue that saves mails, that are being sent in series
|
||||
*/
|
||||
queue: Queue;
|
||||
|
||||
/**
|
||||
* A counter for the number of verifications that failed
|
||||
*/
|
||||
verificationCounter: number;
|
||||
last?: Promise<string>;
|
||||
|
||||
/**
|
||||
* Creates a mail queue
|
||||
* @param transport Transport which is used for sending mails
|
||||
*/
|
||||
constructor(private readonly transport: SMTP) {
|
||||
this.queue = new Queue(1);
|
||||
|
||||
// this queue saves all request when the transport is not ready yet
|
||||
this.dryQueue = [];
|
||||
|
||||
this.verificationCounter = 0;
|
||||
|
||||
// if the transport can be verified it should check if it was done...
|
||||
this.checkForVerification();
|
||||
}
|
||||
constructor(private readonly transport: SMTP) {}
|
||||
|
||||
/**
|
||||
* Adds a mail into the queue so it gets send when the queue is ready
|
||||
* @param mail Information required for sending a mail
|
||||
* Wait for the transport to be verified
|
||||
*/
|
||||
private async addToQueue(mail: MailOptions) {
|
||||
return this.queue.add<string>(() => this.transport.sendMail(mail));
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify the given transport
|
||||
*/
|
||||
private checkForVerification() {
|
||||
if (this.verificationCounter >= MailQueue.MAX_VERIFICATION_ATTEMPTS) {
|
||||
throw new Error('Failed to initialize the SMTP transport for the mail queue');
|
||||
}
|
||||
|
||||
if (this.transport.isVerified()) {
|
||||
Logger.ok('Transport for mail queue was verified. We can send mails now');
|
||||
// if the transport finally was verified send all our mails from the dry queue
|
||||
for (const mail of this.dryQueue) {
|
||||
void this.addToQueue(mail);
|
||||
private async waitForVerification() {
|
||||
for (let i = 0; i < MailQueue.MAX_VERIFICATION_ATTEMPTS; i++) {
|
||||
if (this.transport.isVerified()) {
|
||||
Logger.ok('Transport for mail queue was verified. We can send mails now');
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
this.verificationCounter++;
|
||||
setTimeout(() => {
|
||||
Logger.warn('Transport not verified yet. Trying to send mails here...');
|
||||
this.checkForVerification();
|
||||
}, MailQueue.VERIFICATION_TIMEOUT);
|
||||
await new Promise(resolve => setTimeout(resolve, MailQueue.VERIFICATION_TIMEOUT));
|
||||
Logger.warn('Transport not verified yet. Trying to send mails here...');
|
||||
}
|
||||
throw new Error('Failed to initialize the SMTP transport for the mail queue');
|
||||
}
|
||||
|
||||
/**
|
||||
* Push a mail into the queue so it gets send when the queue is ready
|
||||
* @param mail Information required for sending a mail
|
||||
*/
|
||||
public async push(mail: MailOptions) {
|
||||
if (this.transport.isVerified()) {
|
||||
await this.addToQueue(mail);
|
||||
} else {
|
||||
// the transport has verification, but is not verified yet
|
||||
// push to a dry queue which gets pushed to the real queue when the transport is verified
|
||||
this.dryQueue.push(mail);
|
||||
}
|
||||
public async push(mail: MailOptions): Promise<string> {
|
||||
const previousQueue = this.last ?? this.waitForVerification();
|
||||
this.last = previousQueue.then(() =>
|
||||
Promise.race([
|
||||
this.transport.sendMail(mail),
|
||||
new Promise<string>((_, reject) =>
|
||||
setTimeout(() => reject(new Error('Timeout')), MailQueue.VERIFICATION_TIMEOUT),
|
||||
),
|
||||
]),
|
||||
);
|
||||
return this.last;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,7 +44,7 @@ describe('MailQueue', async function () {
|
||||
clock.tick(MailQueue.VERIFICATION_TIMEOUT * (MailQueue.MAX_VERIFICATION_ATTEMPTS + 1));
|
||||
};
|
||||
|
||||
expect(() => test()).to.throw();
|
||||
expect(test).to.throw();
|
||||
expect(loggerStub.callCount).to.be.equal(MailQueue.MAX_VERIFICATION_ATTEMPTS);
|
||||
});
|
||||
|
||||
|
||||
@@ -39,7 +39,6 @@ import {Elasticsearch} from '../../../src/storage/elasticsearch/elasticsearch.js
|
||||
import {bulk, DEFAULT_TEST_TIMEOUT, getTransport, getIndex} from '../../common.js';
|
||||
import fs from 'fs';
|
||||
import {backendConfig} from '../../../src/config.js';
|
||||
import {readFile} from 'fs/promises';
|
||||
import {
|
||||
ACTIVE_INDICES_ALIAS,
|
||||
getIndexUID,
|
||||
@@ -50,6 +49,8 @@ import {
|
||||
} from '../../../src/storage/elasticsearch/util/index.js';
|
||||
import cron from 'node-cron';
|
||||
import {query} from './query.js';
|
||||
import message from '@openstapps/core/test/resources/indexable/Message.1.json' assert {type: 'json'};
|
||||
import book from '@openstapps/core/test/resources/indexable/Book.1.json' assert {type: 'json'};
|
||||
|
||||
use(chaiAsPromised);
|
||||
|
||||
@@ -60,21 +61,12 @@ function searchResponse<T>(...hits: SearchHit<T>[]): SearchResponse<T> {
|
||||
return {hits: {hits}, took: 0, timed_out: false, _shards: {total: 1, failed: 0, successful: 1}};
|
||||
}
|
||||
|
||||
const message = JSON.parse(
|
||||
await readFile('node_modules/@openstapps/core/test/resources/indexable/Message.1.json', 'utf8'),
|
||||
);
|
||||
const book = JSON.parse(
|
||||
await readFile('node_modules/@openstapps/core/test/resources/indexable/Book.1.json', 'utf8'),
|
||||
);
|
||||
|
||||
describe('Elasticsearch', function () {
|
||||
// increase timeout for the suite
|
||||
this.timeout(DEFAULT_TEST_TIMEOUT);
|
||||
const sandbox = sinon.createSandbox();
|
||||
|
||||
before(function () {
|
||||
// eslint-disable-next-line no-console
|
||||
console.log('before');
|
||||
sandbox.stub(fs, 'readFileSync').returns('{}');
|
||||
});
|
||||
after(function () {
|
||||
@@ -269,7 +261,7 @@ describe('Elasticsearch', function () {
|
||||
return expect(es.init()).to.be.rejected;
|
||||
});
|
||||
|
||||
it('should setup the monitoring if there is monitoring is set and mail queue is defined', function () {
|
||||
it('should setup the monitoring if there is monitoring is set and mail queue is defined', async function () {
|
||||
const config: SCConfigFile = {
|
||||
...backendConfig,
|
||||
internal: {
|
||||
@@ -291,7 +283,7 @@ describe('Elasticsearch', function () {
|
||||
const cronSetupStub = sandbox.stub(cron, 'schedule');
|
||||
|
||||
const es = new Elasticsearch(config, new MailQueue(getTransport(false) as unknown as SMTP));
|
||||
es.init();
|
||||
await es.init();
|
||||
|
||||
expect(cronSetupStub.called).to.be.true;
|
||||
});
|
||||
@@ -445,7 +437,7 @@ describe('Elasticsearch', function () {
|
||||
_id: '',
|
||||
_index: '',
|
||||
_score: 0,
|
||||
_source: message as SCMessage,
|
||||
_source: message.instance as SCMessage,
|
||||
};
|
||||
sandbox.stub(es.client, 'search').resolves(searchResponse(foundObject));
|
||||
|
||||
@@ -475,7 +467,7 @@ describe('Elasticsearch', function () {
|
||||
const object: SearchHit<SCMessage> = {
|
||||
_id: '',
|
||||
_index: oldIndex,
|
||||
_source: message as SCMessage,
|
||||
_source: message.instance as SCMessage,
|
||||
};
|
||||
sandbox.stub(es.client, 'search').resolves(searchResponse<SCMessage>(object));
|
||||
sandbox.stub(es, 'prepareBulkWrite').resolves(index);
|
||||
@@ -489,7 +481,7 @@ describe('Elasticsearch', function () {
|
||||
sandbox.stub(es.client, 'create').resolves({result: 'not_found'} as CreateResponse);
|
||||
|
||||
await es.init();
|
||||
return expect(es.post(message as SCMessage, bulk)).to.rejectedWith('creation');
|
||||
return expect(es.post(message.instance as SCMessage, bulk)).to.rejectedWith('creation');
|
||||
});
|
||||
|
||||
it('should create a new object', async function () {
|
||||
@@ -502,11 +494,11 @@ describe('Elasticsearch', function () {
|
||||
});
|
||||
|
||||
await es.init();
|
||||
await es.post(message as SCMessage, bulk);
|
||||
await es.post(message.instance as SCMessage, bulk);
|
||||
|
||||
expect(createStub.called).to.be.true;
|
||||
expect(caughtParameter.document).to.be.eql({
|
||||
...message,
|
||||
...message.instance,
|
||||
creation_date: caughtParameter.document.creation_date,
|
||||
});
|
||||
});
|
||||
@@ -527,7 +519,7 @@ describe('Elasticsearch', function () {
|
||||
_id: '',
|
||||
_index: getIndex(),
|
||||
_score: 0,
|
||||
_source: message as SCMessage,
|
||||
_source: message.instance as SCMessage,
|
||||
};
|
||||
sandbox.stub(es.client, 'search').resolves(searchResponse());
|
||||
|
||||
@@ -541,7 +533,7 @@ describe('Elasticsearch', function () {
|
||||
_id: '',
|
||||
_index: getIndex(),
|
||||
_score: 0,
|
||||
_source: message as SCMessage,
|
||||
_source: message.instance as SCMessage,
|
||||
};
|
||||
sandbox.stub(es.client, 'search').resolves(searchResponse(object));
|
||||
// @ts-expect-error unused
|
||||
@@ -564,13 +556,13 @@ describe('Elasticsearch', function () {
|
||||
_id: '123',
|
||||
_index: getIndex(),
|
||||
_score: 0,
|
||||
_source: message as SCMessage,
|
||||
_source: message.instance as SCMessage,
|
||||
};
|
||||
const objectBook: SearchHit<SCBook> = {
|
||||
_id: '321',
|
||||
_index: getIndex(),
|
||||
_score: 0,
|
||||
_source: book as SCBook,
|
||||
_source: book.instance as SCBook,
|
||||
};
|
||||
const fakeEsAggregations = {
|
||||
'@all': {
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
nginx &
|
||||
node ./lib/cli.js
|
||||
node ./app.js
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// ESM is not supported, and cts is not detected, so we use type-checked cjs instead.
|
||||
/** @type {import('../src/common').ConfigFile} */
|
||||
const configFile = {
|
||||
activeVersions: ['1\\.0\\.\\d+', '2\\.0\\.\\d+'],
|
||||
@@ -46,7 +46,6 @@
|
||||
"@types/dockerode": "3.3.17",
|
||||
"@types/node": "18.15.3",
|
||||
"@types/sha1": "1.1.3",
|
||||
"config": "3.3.9",
|
||||
"dockerode": "3.3.5",
|
||||
"is-cidr": "4.0.2",
|
||||
"mustache": "4.2.0",
|
||||
@@ -59,7 +58,6 @@
|
||||
"@openstapps/prettier-config": "workspace:*",
|
||||
"@openstapps/tsconfig": "workspace:*",
|
||||
"@types/chai": "4.3.5",
|
||||
"@types/config": "3.3.0",
|
||||
"@types/dockerode": "3.3.17",
|
||||
"@types/mocha": "10.0.1",
|
||||
"@types/mustache": "4.2.2",
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
import {Logger, SMTP} from '@openstapps/logger';
|
||||
import config from 'config';
|
||||
import {existsSync} from 'fs';
|
||||
|
||||
// set transport on logger
|
||||
@@ -163,7 +162,7 @@ ssl_stapling_verify on;`;
|
||||
/**
|
||||
* Config file
|
||||
*/
|
||||
export const configFile: ConfigFile = config.util.toObject();
|
||||
export const configFile: ConfigFile = await import('../config/default.js').then(it => it.default);
|
||||
|
||||
/**
|
||||
* Check if path is a specific file type
|
||||
|
||||
Reference in New Issue
Block a user