/* * Copyright (C) 2020 StApps * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ import {ApiResponse, Client} from '@elastic/elasticsearch'; import {SCBook, SCBulkResponse, SCConfigFile, SCMessage, SCSearchQuery, SCThings, SCThingType} from '@openstapps/core'; import {instance as book} from '@openstapps/core/test/resources/Book.1.json'; import {instance as message} from '@openstapps/core/test/resources/Message.1.json'; import {Logger} from '@openstapps/logger'; import {SMTP} from '@openstapps/logger/lib/smtp'; import {expect, use} from 'chai'; import chaiAsPromised from 'chai-as-promised'; import {SearchResponse} from 'elasticsearch'; import mockedEnv from 'mocked-env'; import sinon from 'sinon'; import {configFile} from '../../../src/common'; import {MailQueue} from '../../../src/notification/mail-queue'; import * as aggregations from '../../../src/storage/elasticsearch/aggregations'; import {ElasticsearchObject} from '../../../src/storage/elasticsearch/common'; import {Elasticsearch} from '../../../src/storage/elasticsearch/elasticsearch'; import * as Monitoring from '../../../src/storage/elasticsearch/monitoring'; import * as query from '../../../src/storage/elasticsearch/query'; import * as templating from '../../../src/storage/elasticsearch/templating'; import {bulk, DEFAULT_TEST_TIMEOUT, getTransport, getIndex} from '../../common'; import fs from 'fs'; use(chaiAsPromised); describe('Elasticsearch', function () { // increase timeout for the suite this.timeout(DEFAULT_TEST_TIMEOUT); const sandbox = sinon.createSandbox(); let checkESTemplateStub: sinon.SinonStub = sandbox.stub(templating, 'checkESTemplate'); before(function () { console.log('before'); sandbox.stub(fs, 'readFileSync').returns('{}'); }); after(function () { sandbox.restore(); }); describe('getElasticsearchUrl', function () { it('should provide custom elasticsearch URL if defined', function () { const customAddress = 'http://foo-address:9200'; const restore = mockedEnv({ ES_ADDR: customAddress }); expect(Elasticsearch.getElasticsearchUrl()).to.be.equal(customAddress); // restore env variables restore(); }); it('should provide local URL as fallback', function () { const restore = mockedEnv({ ES_ADDR: undefined }); expect(Elasticsearch.getElasticsearchUrl()).to.match(/(https?:\/\/)?localhost(:\d+)?/); // restore env variables restore(); }); }); describe('getIndex (including getIndexUID)', function () { const type = 'foo bar type'; const source = 'foo_source'; const bulk: SCBulkResponse = { expiration: '', source: '', state: 'in progress', type: SCThingType.Semester, uid: 'bulk-uid-123-123-123' }; it('should provide index UID from the provided UID', function () { const indexUID = Elasticsearch.getIndexUID(bulk.uid); expect(indexUID.length).to.be.equal(Elasticsearch.INDEX_UID_LENGTH); // test starting and ending character expect(indexUID[0]).to.be.equal(bulk.uid[0]); expect(indexUID[indexUID.length - 1]).to.be.equal(bulk.uid[Elasticsearch.INDEX_UID_LENGTH - 1]); }); it('should provide index name from the provided data', function () { expect(Elasticsearch.getIndex(type as SCThingType, source, bulk)) .to.be.equal(`stapps_${type.split(' ').join('_')}_${source}_${Elasticsearch.getIndexUID(bulk.uid)}`); }); }); describe('removeAliasChars', function () { it('should remove invalid characters', function () { expect(Elasticsearch.removeAliasChars('f,o#o\\ b|ar/ * ', 'bulk-uid')).to.be.equal('foobaralias'); }); it('should remove invalid starting characters', function () { expect(Elasticsearch.removeAliasChars('-foobaralias', 'bulk-uid')).to.be.equal('foobaralias'); expect(Elasticsearch.removeAliasChars('_foobaralias', 'bulk-uid')).to.be.equal('foobaralias'); expect(Elasticsearch.removeAliasChars('+foobaralias', 'bulk-uid')).to.be.equal('foobaralias'); }); it('should replace with a placeholder in case of invalid alias', function () { expect(Elasticsearch.removeAliasChars('.', 'bulk-uid')).to.contain('placeholder'); expect(Elasticsearch.removeAliasChars('..', 'bulk-uid')).to.contain('placeholder'); }); it('should work with common cases', function () { expect(Elasticsearch.removeAliasChars('the-quick-brown-fox-jumps-over-the-lazy-dog-1234567890', 'bulk-uid')) .to.be.equal('the-quick-brown-fox-jumps-over-the-lazy-dog-1234567890'); expect(Elasticsearch.removeAliasChars('THE_QUICK_BROWN_FOX_JUMPS_OVER_THE_LAZY_DOG', 'bulk-uid')) .to.be.equal('THE_QUICK_BROWN_FOX_JUMPS_OVER_THE_LAZY_DOG'); }); it('should warn in case of characters that are invalid in future elasticsearch versions', function () { const sandbox = sinon.createSandbox(); const loggerWarnStub = sandbox.stub(Logger, 'warn'); expect(Elasticsearch.removeAliasChars('foo:bar:alias', 'bulk-uid')).to.contain('foo:bar:alias'); expect(loggerWarnStub.called).to.be.true; }); }); describe('constructor', async function () { const sandbox = sinon.createSandbox(); afterEach(function () { sandbox.restore(); }); it('should complain (throw an error) if database in config is undefined', function () { const config: SCConfigFile = {...configFile, internal: {...configFile.internal, database: undefined}}; expect(() => new Elasticsearch(config)).to.throw(Error); }); it('should complain (throw an error) if database version is not a string', function () { const config: SCConfigFile = { ...configFile, internal: { ...configFile.internal, database: { name: 'foo', version: 123 } } }; expect(() => new Elasticsearch(config)).to.throw(Error); }); it('should log an error in case of there is one when getting response from the elasticsearch client', async function () { const error = Error('Foo Error'); const loggerErrorStub = sandbox.stub(Logger, 'error').resolves('foo'); sandbox.stub(Client.prototype, 'on').yields(error); new Elasticsearch(configFile); expect(loggerErrorStub.calledWith(error)).to.be.true; }); it('should log the result in the debug mode when getting response from the elasticsearch client', async function () { const fakeResponse = {foo: 'bar'}; const loggerLogStub = sandbox.stub(Logger, 'log'); sandbox.stub(Client.prototype, 'on').yields(null, fakeResponse); new Elasticsearch(configFile); expect(loggerLogStub.calledWith(fakeResponse)).to.be.false; const restore = mockedEnv({ 'ES_DEBUG': 'true', }); new Elasticsearch(configFile); expect(loggerLogStub.calledWith(fakeResponse)).to.be.true; // restore env variables restore(); }); it('should force mapping update if related process env variable is not set', async function () { const restore = mockedEnv({ 'ES_FORCE_MAPPING_UPDATE': undefined, }); new Elasticsearch(configFile); expect(checkESTemplateStub.calledWith(false)).to.be.true; // restore env variables restore(); }); it('should force mapping update if related process env variable is set', async function () { const restore = mockedEnv({ 'ES_FORCE_MAPPING_UPDATE': 'true', }); new Elasticsearch(configFile); expect(checkESTemplateStub.calledWith(true)).to.be.true; // restore env variables restore(); }); }); describe('init', async function () { const sandbox = sinon.createSandbox(); after(function () { sandbox.restore(); }); it('should complain (throw an error) if monitoring is set but mail queue is undefined', async function () { const config: SCConfigFile = { ...configFile, internal: { ...configFile.internal, monitoring: { actions: [], watchers: [], } } }; const es = new Elasticsearch(config); return expect(es.init()).to.be.rejected; }); it('should setup the monitoring if there is monitoring is set and mail queue is defined', function () { const config: SCConfigFile = { ...configFile, internal: { ...configFile.internal, monitoring: { actions: [], watchers: [], } } }; const monitoringSetUpStub = sandbox.stub(Monitoring, 'setUp'); const es = new Elasticsearch(config, new MailQueue(getTransport(false) as unknown as SMTP)); es.init(); expect(monitoringSetUpStub.called).to.be.true; }); }); describe('Operations with bundle/index', async function () { const sandbox = sinon.createSandbox(); let es: Elasticsearch; const oldIndex = 'stapps_footype_foosource_oldindex'; beforeEach(function () { es = new Elasticsearch(configFile); // @ts-ignore es.client.indices = { // @ts-ignore getAlias: () => Promise.resolve({body: [{[oldIndex]: {aliases: {[SCThingType.Book]: {}}}}]}), // @ts-ignore putTemplate: () => Promise.resolve({}), // @ts-ignore create: () => Promise.resolve({}), // @ts-ignore delete: () => Promise.resolve({}), // @ts-ignore exists: () => Promise.resolve({}), // @ts-ignore refresh: () => Promise.resolve({}), // @ts-ignore updateAliases: () => Promise.resolve({}) }; }); afterEach(function () { sandbox.restore(); }); describe('bulkCreated', async function () { it('should reject (throw an error) if the connection to elasticsearch is not established', async function () { return expect(es.bulkCreated(bulk)).to.be.rejectedWith('elasticsearch'); }); it('should reject (throw an error) if the index name is not valid', async function () { sandbox.stub(Elasticsearch, 'getIndex').returns(`invalid_${getIndex}`); sandbox.createStubInstance(Client, {}); await es.init(); return expect(es.bulkCreated(bulk)).to.be.rejectedWith('Index'); }); it('should create a new index', async function () { const index = getIndex(); sandbox.stub(Elasticsearch, 'getIndex').returns(index); const putTemplateStub = sandbox.stub(templating, 'putTemplate'); const createStub = sandbox.stub(es.client.indices, 'create'); await es.init(); await es.bulkCreated(bulk); expect(putTemplateStub.called).to.be.true; expect(createStub.calledWith({index})).to.be.true; }); }); describe('bulkExpired', async function () { const sandbox = sinon.createSandbox(); afterEach(function () { sandbox.restore(); }); it('should cleanup index in case of the expired bulk for bulk whose index is not in use', async function () { sandbox.stub(Elasticsearch, 'getIndex').returns(getIndex()); const clientDeleteStub = sandbox.stub(es.client.indices, 'delete'); await es.bulkExpired({...bulk, state: 'in progress'}); expect(clientDeleteStub.called).to.be.true; }); it('should not cleanup index in case of the expired bulk for bulk whose index is in use', async function () { sandbox.stub(Elasticsearch, 'getIndex').returns(getIndex()); const clientDeleteStub = sandbox.stub(es.client.indices, 'delete'); await es.bulkExpired({...bulk, state: 'done'}); expect(clientDeleteStub.called).to.be.false; }); }); describe('bulkUpdated', function () { it('should reject if the connection to elasticsearch is not established', async function () { return expect(es.bulkUpdated(bulk)).to.be.rejectedWith('elasticsearch'); }); it('should reject if the index name is not valid', async function () { sandbox.stub(Elasticsearch, 'getIndex').returns(`invalid_${getIndex()}`); sandbox.createStubInstance(Client, {}); await es.init(); return expect(es.bulkUpdated(bulk)).to.be.rejectedWith('Index'); }); it('should create a new index', async function () { const index = getIndex(); const expectedRefreshActions = [ { add: {index: index, alias: SCThingType.Book}, }, { remove: {index: oldIndex, alias: SCThingType.Book}, } ]; sandbox.stub(Elasticsearch, 'getIndex').returns(index); sandbox.stub(es, 'aliasMap').value({ [SCThingType.Book]: { [bulk.source]: oldIndex, } }); const refreshStub = sandbox.stub(es.client.indices, 'refresh'); const updateAliasesStub = sandbox.stub(es.client.indices, 'updateAliases'); const deleteStub = sandbox.stub(es.client.indices, 'delete'); sandbox.stub(templating, 'putTemplate'); await es.init(); await es.bulkUpdated(bulk); expect(refreshStub.calledWith({index})).to.be.true; expect(updateAliasesStub.calledWith({ body: { actions: expectedRefreshActions } }) ).to.be.true; expect(deleteStub.called).to.be.true; }); }); }); describe('get', async function () { let es: Elasticsearch; const sandbox = sinon.createSandbox(); before(function () { es = new Elasticsearch(configFile); }); afterEach(function () { sandbox.restore(); }); it('should reject if object is not found', async function () { sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: []}}}); return expect(es.get('123')).to.rejectedWith('found'); }); it('should provide the thing if object is found', async function () { const foundObject: ElasticsearchObject = {_id: '', _index: '', _score: 0, _type: '', _source: message as SCMessage}; sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: [foundObject]}}}); return expect(await es.get('123')).to.be.eql(message); }); }); describe('post', async function () { let es: Elasticsearch; const sandbox = sinon.createSandbox(); before(function () { es = new Elasticsearch(configFile); }); afterEach(function () { sandbox.restore(); }); it('should not post if the object already exists in an index which will not be rolled over', async function () { const index = getIndex(); const oldIndex = index.replace('foosource', 'barsource'); const object: ElasticsearchObject = {_id: '', _index: oldIndex, _score: 0, _type: '', _source: message as SCMessage}; sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: [object]}}}); sandbox.stub(Elasticsearch, 'getIndex').returns(index); return expect(es.post(object._source, bulk)).to.rejectedWith('exist'); }); it('should not reject if the object already exists but in an index which will be rolled over', async function () { const object: ElasticsearchObject = {_id: '', _index: getIndex(), _score: 0, _type: '', _source: message as SCMessage}; sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: [object]}}}); // return index name with different generated UID (see getIndex method) sandbox.stub(Elasticsearch, 'getIndex').returns(getIndex()); return expect(es.post(object._source, bulk)).to.not.rejectedWith('exist'); }); it('should reject if there is an object creation error on the elasticsearch side', async function () { sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: []}}}); sandbox.stub(es.client, 'create').resolves({body: {created: false}}); return expect(es.post(message as SCMessage, bulk)).to.rejectedWith('creation'); }); it('should create a new object', async function () { let caughtParam: any; sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: []}}}); // @ts-ignore let createStub = sandbox.stub(es.client, 'create').callsFake((param) => { caughtParam = param; return Promise.resolve({body: { created: true }}); }); await es.post(message as SCMessage, bulk); expect(createStub.called).to.be.true; expect(caughtParam.body).to.be.eql({...message, creation_date: caughtParam.body.creation_date}); }); }); describe('put', async function () { let es: Elasticsearch; const sandbox = sinon.createSandbox(); before(function () { es = new Elasticsearch(configFile); }); afterEach(function () { sandbox.restore(); }); it('should reject to put if the object does not already exist', async function () { const object: ElasticsearchObject = {_id: '', _index: getIndex(), _score: 0, _type: '', _source: message as SCMessage}; sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: []}}}); return expect(es.put(object._source)).to.rejectedWith('exist'); }); it('should update the object if it already exists', async function () { let caughtParam: any; const object: ElasticsearchObject = {_id: '', _index: getIndex(), _score: 0, _type: '', _source: message as SCMessage}; sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: [object]}}}); // @ts-ignore const stubUpdate = sandbox.stub(es.client, 'update').callsFake((params) => { caughtParam = params; return Promise.resolve({body: { created: true }}); }); await es.put(object._source); expect(caughtParam.body.doc).to.be.eql(object._source); }); }); describe('search', async function () { let es: Elasticsearch; const sandbox = sinon.createSandbox(); const objectMessage: ElasticsearchObject = {_id: '123', _index: getIndex(), _score: 0, _type: '', _source: message as SCMessage}; const objectBook: ElasticsearchObject = {_id: '321', _index: getIndex(), _score: 0, _type: '', _source: book as SCBook}; const fakeEsAggregations = { '@all': { doc_count: 17, type: { doc_count_error_upper_bound: 0, sum_other_doc_count: 0, buckets: [ { key: 'person', doc_count: 13 }, { key: 'catalog', doc_count: 4 } ] } } }; const fakeSearchResponse: Partial>> = { // @ts-ignore body: { took: 12, timed_out: false, // @ts-ignore _shards: {}, // @ts-ignore hits: { hits: [ objectMessage, objectBook, ], total: 123 }, aggregations: fakeEsAggregations }, headers: {}, // @ts-ignore meta: {}, // @ts-ignore statusCode: {}, // @ts-ignore warnings: {} }; let searchStub: sinon.SinonStub; before(function () { es = new Elasticsearch(configFile); }); beforeEach(function () { searchStub = sandbox.stub(es.client, 'search').resolves(fakeSearchResponse); }); afterEach(function () { sandbox.restore(); }); it('should provide appropriate data and facets', async function () { const fakeFacets = [ { buckets: [ { count: 1, 'key': 'foo' }, { count: 1, key: 'bar' } ], field: 'type', } ]; const parseAggregationsStub = sandbox.stub(aggregations, 'parseAggregations').returns(fakeFacets); const {data, facets} = await es.search({}); expect(data).to.be.eql([objectMessage._source, objectBook._source]); expect(facets).to.be.eql(fakeFacets); expect(parseAggregationsStub.calledWith(sinon.match.any, fakeEsAggregations)).to.be.true; }); it('should provide pagination from params', async function () { const from = 30; const {pagination} = await es.search({from}); expect(pagination).to.be.eql({ count: fakeSearchResponse.body!.hits.hits.length, offset: from, total: fakeSearchResponse.body!.hits.total }); }); it('should have fallback to zero if from is not given through params', async function () { const {pagination} = await es.search({}); expect(pagination.offset).to.be.equal(0); }); it('should build the search request properly', async function () { const params: SCSearchQuery = { query: 'mathematics', from: 30, size: 5, sort: [ { type: 'ducet', order: 'desc', arguments: { field: 'name' } } ], filter: { type: 'value', arguments: { field: 'type', value: SCThingType.AcademicEvent } } }; const fakeResponse = {foo: 'bar'}; const fakeBuildSortResponse = [fakeResponse]; // @ts-ignore sandbox.stub(query, 'buildQuery').returns(fakeResponse); // @ts-ignore sandbox.stub(query, 'buildSort').returns(fakeBuildSortResponse); await es.search(params); sandbox.assert .calledWithMatch(searchStub, { body: { aggs: es.aggregationsSchema, query: fakeResponse, sort: fakeBuildSortResponse }, from: params.from, index: Elasticsearch.getListOfAllIndices(), size: params.size, } ); }); }); });