/* eslint-disable @typescript-eslint/no-explicit-any,unicorn/no-null */ /* * Copyright (C) 2020 StApps * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ import {Client, Diagnostic} from '@elastic/elasticsearch'; import Indices from '@elastic/elasticsearch/lib/api/api/indices.js'; import {CreateResponse, SearchHit, SearchResponse} from '@elastic/elasticsearch/lib/api/types.js'; import { SCBook, SCBulkResponse, SCConfigFile, SCMessage, SCSearchQuery, SCThings, SCThingType, } from '@openstapps/core'; import {Logger, SMTP} from '@openstapps/logger'; import {expect, use} from 'chai'; import chaiAsPromised from 'chai-as-promised'; import {beforeEach} from 'mocha'; import mockedEnv from 'mocked-env'; import sinon, {SinonStub} from 'sinon'; import {removeInvalidAliasChars} from '../../../src/storage/elasticsearch/util/alias.js'; import {MailQueue} from '../../../src/notification/mail-queue.js'; import {aggregations} from '../../../src/storage/elasticsearch/templating.js'; import {Elasticsearch} from '../../../src/storage/elasticsearch/elasticsearch.js'; import {bulk, DEFAULT_TEST_TIMEOUT, getTransport, getIndex} from '../../common.js'; import fs from 'fs'; import {backendConfig} from '../../../src/config.js'; import {readFile} from 'fs/promises'; import { ACTIVE_INDICES_ALIAS, getIndexUID, getThingIndexName, INACTIVE_INDICES_ALIAS, INDEX_UID_LENGTH, parseIndexName, } from '../../../src/storage/elasticsearch/util/index.js'; import cron from 'node-cron'; import {query} from './query.js'; use(chaiAsPromised); /** * */ function searchResponse(...hits: SearchHit[]): SearchResponse { return {hits: {hits}, took: 0, timed_out: false, _shards: {total: 1, failed: 0, successful: 1}}; } const message = JSON.parse( await readFile('node_modules/@openstapps/core/test/resources/indexable/Message.1.json', 'utf8'), ); const book = JSON.parse( await readFile('node_modules/@openstapps/core/test/resources/indexable/Book.1.json', 'utf8'), ); describe('Elasticsearch', function () { // increase timeout for the suite this.timeout(DEFAULT_TEST_TIMEOUT); const sandbox = sinon.createSandbox(); before(function () { // eslint-disable-next-line no-console console.log('before'); sandbox.stub(fs, 'readFileSync').returns('{}'); }); after(function () { sandbox.restore(); }); describe('getElasticsearchUrl', function () { it('should provide custom elasticsearch URL if defined', function () { const customAddress = 'http://foo-address:9200'; // @ts-expect-error wrong type defs for some reason const restore = mockedEnv({ ES_ADDR: customAddress, }); expect(Elasticsearch.getElasticsearchUrl()).to.be.equal(customAddress); // restore env variables restore(); }); it('should provide local URL as fallback', function () { // @ts-expect-error wrong type defs for some reason const restore = mockedEnv({ ES_ADDR: undefined, }); expect(Elasticsearch.getElasticsearchUrl()).to.match(/(https?:\/\/)?localhost(:\d+)?/); // restore env variables restore(); }); }); describe('getAliasMap', function () { it('should fail after retries', async function () { const es = new Elasticsearch(backendConfig); sandbox.stub(es.client.indices, 'getAlias').throws(); await expect(es.init({maxRetries: 1, retryInterval: 10})).to.be.rejected; }); }); describe('getIndex (including getIndexUID)', function () { const type = 'foo bar type'; const source = 'foo_source'; const bulk: SCBulkResponse = { expiration: '', source: '', state: 'in progress', type: SCThingType.Semester, uid: 'bulk-uid-123-123-123', }; it('should provide index UID from the provided UID', function () { const indexUID = getIndexUID(bulk.uid); expect(indexUID.length).to.be.equal(INDEX_UID_LENGTH); // test starting and ending character expect(indexUID[0]).to.be.equal(bulk.uid[0]); expect(indexUID[indexUID.length - 1]).to.be.equal(bulk.uid[INDEX_UID_LENGTH - 1]); }); it('should provide index name from the provided data', function () { expect(getThingIndexName(type as SCThingType, source, bulk)).to.be.equal( `stapps_${type.split(' ').join('_')}_${source}_${getIndexUID(bulk.uid)}`, ); }); it('should reject invalid index names', function () { expect(() => parseIndexName(':)')).to.throw(SyntaxError); }); }); describe('removeAliasChars', function () { it('should remove spaces from both ends', function () { expect(removeInvalidAliasChars(' foobaralias ', 'bulk-uid')).to.be.equal('foobaralias'); }); it('should replace inner spaces with underscores', function () { expect(removeInvalidAliasChars('foo bar alias', 'bulk-uid')).to.be.equal('foo_bar_alias'); }); it('should remove invalid characters', function () { expect(removeInvalidAliasChars('f,o#o\\b|ar/* ', 'bulk-uid')).to.be.equal('foobaralias'); }); it('should remove invalid starting characters', function () { expect(removeInvalidAliasChars('-foobaralias', 'bulk-uid')).to.be.equal('foobaralias'); expect(removeInvalidAliasChars('_foobaralias', 'bulk-uid')).to.be.equal('foobaralias'); expect(removeInvalidAliasChars('+foobaralias', 'bulk-uid')).to.be.equal('foobaralias'); }); it('should replace with a placeholder in case of invalid alias', function () { expect(removeInvalidAliasChars('.', 'bulk-uid')).to.contain('placeholder'); expect(removeInvalidAliasChars('..', 'bulk-uid')).to.contain('placeholder'); }); it('should work with common cases', function () { expect( removeInvalidAliasChars('the-quick-brown-fox-jumps-over-the-lazy-dog-1234567890', 'bulk-uid'), ).to.be.equal('the-quick-brown-fox-jumps-over-the-lazy-dog-1234567890'); expect(removeInvalidAliasChars('THE_QUICK_BROWN_FOX_JUMPS_OVER_THE_LAZY_DOG', 'bulk-uid')).to.be.equal( 'THE_QUICK_BROWN_FOX_JUMPS_OVER_THE_LAZY_DOG', ); }); it('should warn in case of characters that are invalid in future elasticsearch versions', function () { const sandbox = sinon.createSandbox(); const loggerWarnStub = sandbox.stub(Logger, 'warn'); expect(removeInvalidAliasChars('foo:bar:alias', 'bulk-uid')).to.contain('foo:bar:alias'); expect(loggerWarnStub.called).to.be.true; }); }); describe('constructor', async function () { const sandbox = sinon.createSandbox(); afterEach(function () { sandbox.restore(); }); it('should complain (throw an error) if database in config is undefined', function () { const config: SCConfigFile = { ...backendConfig, internal: {...backendConfig.internal, database: undefined}, }; expect(() => new Elasticsearch(config)).to.throw(Error); }); it('should complain (throw an error) if database version is not a string', function () { const config: SCConfigFile = { ...backendConfig, internal: { ...backendConfig.internal, database: { name: 'foo', version: 123, }, }, }; expect(() => new Elasticsearch(config)).to.throw(Error); }); it('should log an error in case of there is one when getting response from the elasticsearch client', async function () { const error = new Error('Foo Error'); const loggerErrorStub = sandbox.stub(Logger, 'error').resolves('foo'); sandbox.stub(Diagnostic.prototype, 'on').yields(error); new Elasticsearch(backendConfig); expect(loggerErrorStub.calledWith(error)).to.be.true; }); it('should log the result in the debug mode when getting response from the elasticsearch client', async function () { const fakeResponse = {foo: 'bar'}; const loggerLogStub = sandbox.stub(Logger, 'log'); sandbox.stub(Diagnostic.prototype, 'on').yields(null, fakeResponse); new Elasticsearch(backendConfig); expect(loggerLogStub.calledWith(fakeResponse)).to.be.false; // @ts-expect-error wrong type defs for some reason const restore = mockedEnv({ ES_DEBUG: 'true', }); new Elasticsearch(backendConfig); expect(loggerLogStub.calledWith(fakeResponse)).to.be.true; // restore env variables restore(); }); describe('init', async function () { const sandbox = sinon.createSandbox(); after(function () { sandbox.restore(); }); it('should complain (throw an error) if monitoring is set but mail queue is undefined', async function () { const config: SCConfigFile = { ...backendConfig, internal: { ...backendConfig.internal, monitoring: { actions: [], watchers: [], }, }, }; const es = new Elasticsearch(config); return expect(es.init()).to.be.rejected; }); it('should setup the monitoring if there is monitoring is set and mail queue is defined', function () { const config: SCConfigFile = { ...backendConfig, internal: { ...backendConfig.internal, monitoring: { actions: [], watchers: [ { triggers: [{executionTime: 'daily', name: 'trigger'}], name: 'watcher', actions: [], query: {}, conditions: [], }, ], }, }, }; const cronSetupStub = sandbox.stub(cron, 'schedule'); const es = new Elasticsearch(config, new MailQueue(getTransport(false) as unknown as SMTP)); es.init(); expect(cronSetupStub.called).to.be.true; }); }); describe('Operations with bundle/index', async function () { const sandbox = sinon.createSandbox(); let es: Elasticsearch; let createStub: SinonStub; let deleteStub: SinonStub; let refreshStub: SinonStub; let updateAliasesStub: SinonStub; const oldIndex = 'stapps_footype_foosource_oldindex'; beforeEach(function () { sandbox .stub(Indices.default.prototype, 'getAlias') .resolves({[oldIndex]: {aliases: {[SCThingType.Book]: {}}}} as any); sandbox.stub(Indices.default.prototype, 'putTemplate').resolves({} as any); createStub = sandbox.stub(Indices.default.prototype, 'create').resolves({} as any); deleteStub = sandbox.stub(Indices.default.prototype, 'delete').resolves({} as any); refreshStub = sandbox.stub(Indices.default.prototype, 'refresh').resolves({} as any); updateAliasesStub = sandbox.stub(Indices.default.prototype, 'updateAliases').resolves({} as any); es = new Elasticsearch(backendConfig); }); afterEach(function () { sandbox.restore(); }); describe('bulkCreated', async function () { it('should reject (throw an error) if the connection to elasticsearch is not established', async function () { return expect(es.bulkCreated(bulk)).to.be.rejectedWith('elasticsearch'); }); it('should reject (throw an error) if the index name is not valid', async function () { sandbox.createStubInstance(Client, {}); const invalidBulk = {...bulk, source: '%#$^'}; await es.init(); return expect(es.bulkCreated(invalidBulk)).to.be.rejectedWith('Index'); }); it('should create a new index', async function () { const index = getIndex(); sandbox.stub(es, 'prepareBulkWrite').resolves(index); await es.init(); await es.bulkCreated(bulk); expect(createStub.calledWith({index, aliases: {[INACTIVE_INDICES_ALIAS]: {}}})).to.be.true; }); }); describe('bulkExpired', async function () { const sandbox = sinon.createSandbox(); afterEach(function () { sandbox.restore(); }); it('should cleanup index in case of the expired bulk for bulk whose index is not in use', async function () { sandbox.stub(es, 'prepareBulkWrite').resolves(getIndex()); await es.init(); await es.bulkExpired({...bulk, state: 'in progress'}); expect(deleteStub.called).to.be.true; }); it('should not cleanup index in case of the expired bulk for bulk whose index is in use', async function () { sandbox.stub(es, 'prepareBulkWrite').resolves(getIndex()); await es.init(); await es.bulkExpired({...bulk, state: 'done'}); expect(deleteStub.called).to.be.false; }); }); describe('bulkUpdated', function () { it('should reject if the connection to elasticsearch is not established', async function () { return expect(es.bulkUpdated(bulk)).to.be.rejectedWith('elasticsearch'); }); it('should reject if the index name is not valid', async function () { const invalidBulk = {...bulk, source: '%#$^'}; sandbox.createStubInstance(Client, {}); await es.init(); return expect(es.bulkUpdated(invalidBulk)).to.be.rejectedWith('Index'); }); it("should refuse to finalize bulk if index doesn't exist", async function () { await es.init(); refreshStub.throws(); await expect(es.bulkUpdated(bulk)).to.eventually.throw; expect( refreshStub.calledWith({ index: getThingIndexName(bulk.type, bulk.source, bulk), allow_no_indices: false, }), ).to.be.true; }); it('should create a new index', async function () { const index = getIndex(); const expectedRefreshActions = [ { add: {index: index, alias: ACTIVE_INDICES_ALIAS}, }, { remove: {index: index, alias: INACTIVE_INDICES_ALIAS}, }, { remove_index: {index: oldIndex}, }, ]; sandbox.stub(es, 'prepareBulkWrite').resolves(index); await es.init(); await es.bulkUpdated(bulk); expect(refreshStub.calledWith({index, allow_no_indices: false})).to.be.true; expect( updateAliasesStub.calledWith({ actions: expectedRefreshActions, }), ).to.be.true; }); }); }); describe('get', async function () { let es: Elasticsearch; const sandbox = sinon.createSandbox(); before(function () { es = new Elasticsearch(backendConfig); }); afterEach(function () { sandbox.restore(); }); it('should reject if object is not found', async function () { sandbox.stub(es.client, 'search').resolves(searchResponse()); return expect(es.get('123')).to.rejectedWith('found'); }); it('should provide the thing if object is found', async function () { const foundObject: SearchHit = { _id: '', _index: '', _score: 0, _source: message as SCMessage, }; sandbox.stub(es.client, 'search').resolves(searchResponse(foundObject)); return expect(await es.get('123')).to.be.eql(message); }); }); describe('post', async function () { let es: Elasticsearch; const sandbox = sinon.createSandbox(); before(function () { es = new Elasticsearch(backendConfig); }); beforeEach(function () { sandbox.stub(Indices.default.prototype, 'getAlias').resolves({} as any); }); afterEach(function () { sandbox.restore(); }); it('should not post if the object already exists in an index which will not be rolled over', async function () { const index = getIndex(); const oldIndex = index.replace('foosource', 'barsource'); const object: SearchHit = { _id: '', _index: oldIndex, _source: message as SCMessage, }; sandbox.stub(es.client, 'search').resolves(searchResponse(object)); sandbox.stub(es, 'prepareBulkWrite').resolves(index); await es.init(); return expect(es.post(object._source!, bulk)).to.be.rejectedWith('UID conflict'); }); it('should reject if there is an object creation error on the elasticsearch side', async function () { sandbox.stub(es.client, 'search').resolves(searchResponse()); sandbox.stub(es.client, 'create').resolves({result: 'not_found'} as CreateResponse); await es.init(); return expect(es.post(message as SCMessage, bulk)).to.rejectedWith('creation'); }); it('should create a new object', async function () { let caughtParameter: any; sandbox.stub(es.client, 'search').resolves(searchResponse()); // @ts-expect-error call const createStub = sandbox.stub(es.client, 'create').callsFake(parameter => { caughtParameter = parameter; return Promise.resolve({result: 'created'}); }); await es.init(); await es.post(message as SCMessage, bulk); expect(createStub.called).to.be.true; expect(caughtParameter.document).to.be.eql({ ...message, creation_date: caughtParameter.document.creation_date, }); }); }); describe('put', async function () { let es: Elasticsearch; const sandbox = sinon.createSandbox(); before(function () { es = new Elasticsearch(backendConfig); }); afterEach(function () { sandbox.restore(); }); it('should reject to put if the object does not already exist', async function () { const object: SearchHit = { _id: '', _index: getIndex(), _score: 0, _source: message as SCMessage, }; sandbox.stub(es.client, 'search').resolves(searchResponse()); return expect(es.put(object._source!)).to.rejectedWith('exist'); }); // noinspection JSUnusedLocalSymbols it('should update the object if it already exists', async function () { let caughtParameter: any; const object: SearchHit = { _id: '', _index: getIndex(), _score: 0, _source: message as SCMessage, }; sandbox.stub(es.client, 'search').resolves(searchResponse(object)); // @ts-expect-error unused // eslint-disable-next-line @typescript-eslint/no-unused-vars const stubUpdate = sandbox.stub(es.client, 'update').callsFake(parameters => { caughtParameter = parameters; return Promise.resolve({body: {created: true}}); }); await es.put(object._source!); expect(caughtParameter.body.doc).to.be.eql(object._source); }); }); describe('search', async function () { let es: Elasticsearch; const sandbox = sinon.createSandbox(); const objectMessage: SearchHit = { _id: '123', _index: getIndex(), _score: 0, _source: message as SCMessage, }; const objectBook: SearchHit = { _id: '321', _index: getIndex(), _score: 0, _source: book as SCBook, }; const fakeEsAggregations = { '@all': { doc_count: 17, type: { doc_count_error_upper_bound: 0, sum_other_doc_count: 0, buckets: [ { key: 'person', doc_count: 13, }, { key: 'catalog', doc_count: 4, }, ], }, }, }; const fakeSearchResponse: SearchResponse = { took: 12, timed_out: false, // @ts-expect-error not assignable _shards: {}, hits: { hits: [objectMessage, objectBook], total: 123, }, aggregations: fakeEsAggregations, }; let searchStub: sinon.SinonStub; before(function () { es = new Elasticsearch(backendConfig); }); beforeEach(function () { searchStub = sandbox.stub(es.client, 'search').resolves(fakeSearchResponse); }); afterEach(function () { sandbox.restore(); }); it('should provide appropriate data and facets', async function () { const fakeFacets = [ { buckets: [ { count: 13, key: 'person', }, { count: 4, key: 'catalog', }, ], field: 'type', }, ]; const {data, facets} = await es.search({}); expect(data).to.be.eql([objectMessage._source, objectBook._source]); expect(facets).to.be.eql(fakeFacets); }); it('should provide pagination from params', async function () { const from = 30; const {pagination} = await es.search({from}); expect(pagination).to.be.eql({ count: fakeSearchResponse.hits.hits.length, offset: from, total: fakeSearchResponse.hits.total, }); }); it('should have fallback to zero if from is not given through params', async function () { const {pagination} = await es.search({}); expect(pagination.offset).to.be.equal(0); }); it('should build the search request properly', async function () { const parameters: SCSearchQuery = { query: 'mathematics', from: 30, size: 5, sort: [ { type: 'ducet', order: 'desc', arguments: { field: 'name', }, }, ], filter: { type: 'value', arguments: { field: 'type', value: SCThingType.AcademicEvent, }, }, }; await es.search(parameters); expect(searchStub.firstCall.firstArg).to.be.deep.equal({ aggs: aggregations, query, allow_no_indices: true, sort: [{'name.sort': 'desc'}], from: parameters.from, index: ACTIVE_INDICES_ALIAS, size: parameters.size, }); }); }); }); });