From 7c3ccc35af5914adb86427f2279445156603920b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thea=20Sch=C3=B6bl?= Date: Wed, 31 May 2023 11:40:08 +0000 Subject: [PATCH] fix: search uses inactive indices --- src/storage/elasticsearch/elasticsearch.ts | 194 +++++++----------- src/storage/elasticsearch/util/index.ts | 21 +- .../elasticsearch/elasticsearch.spec.ts | 71 +++---- 3 files changed, 124 insertions(+), 162 deletions(-) diff --git a/src/storage/elasticsearch/elasticsearch.ts b/src/storage/elasticsearch/elasticsearch.ts index 02ca1e2e..f3c52d41 100644 --- a/src/storage/elasticsearch/elasticsearch.ts +++ b/src/storage/elasticsearch/elasticsearch.ts @@ -18,7 +18,6 @@ import { AggregateName, AggregationsMultiTermsBucket, IndicesGetAliasResponse, - IndicesUpdateAliasesAction, SearchHit, SearchResponse, } from '@elastic/elasticsearch/lib/api/types'; @@ -38,8 +37,13 @@ import { ElasticsearchQueryDisMaxConfig, ElasticsearchQueryQueryStringConfig, } from './types/elasticsearch-config'; -import {ALL_INDICES_QUERY, getThingIndexName, parseIndexName, VALID_INDEX_REGEX} from './util'; -import {removeInvalidAliasChars} from './util/alias'; +import { + ACTIVE_INDICES_ALIAS, + getThingIndexName, + INACTIVE_INDICES_ALIAS, + matchIndexByType, + VALID_INDEX_REGEX, +} from './util'; import {noUndefined} from './util/no-undefined'; import {retryCatch, RetryOptions} from './util/retry'; @@ -47,17 +51,6 @@ import {retryCatch, RetryOptions} from './util/retry'; * A database interface for elasticsearch */ export class Elasticsearch implements Database { - /** - * Holds a map of all elasticsearch indices that are available to search - */ - aliasMap: { - // each scType has an alias which can contain multiple sources - [scType: string]: { - // each source is assigned an index name in elasticsearch - [source: string]: string; - }; - }; - /** * Elasticsearch client */ @@ -112,7 +105,6 @@ export class Elasticsearch implements Database { } }); - this.aliasMap = {}; this.ready = false; this.mailQueue = mailQueue; @@ -121,7 +113,7 @@ export class Elasticsearch implements Database { /** * Gets a map which contains each alias and all indices that are associated with each alias */ - private async getAliasMap(retryOptions: Partial> = {}) { + private async cleanupDeadIndices(retryOptions: Partial> = {}) { const aliasResponse = await retryCatch({ maxRetries: 10, retryInterval: 2000, @@ -136,31 +128,18 @@ export class Elasticsearch implements Database { ...retryOptions, }); - const aliases = Object.entries(aliasResponse) - .filter(([index]) => !index.startsWith('.')) - .map(([index, alias]) => ({ - index, - alias, - ...parseIndexName(index), - })); - - for (const {type, index, source} of aliases.filter(({type, alias}) => type in alias.aliases)) { - this.aliasMap[type] = this.aliasMap[type] || {}; - this.aliasMap[type][source] = index; - } - this.ready = true; - const unusedIndices = aliases.filter(({type, alias}) => !(type in alias.aliases)).map(({index}) => index); - if (unusedIndices.length > 0) { - await this.client.indices.delete({ - index: unusedIndices, - }); - Logger.warn(`Deleted old indices: oldIndicesToDelete`); - } + const inactiveIndices = Object.entries(aliasResponse) + .filter(([_, aliases]) => Object.keys(aliases.aliases).includes(INACTIVE_INDICES_ALIAS)) + .map(([indexName]) => indexName); - // eslint-disable-next-line unicorn/no-null - Logger.ok(`Read alias map from elasticsearch: ${JSON.stringify(this.aliasMap, null, 2)}`); + if (inactiveIndices.length > 0) { + await this.client.indices.delete({ + index: inactiveIndices, + }); + Logger.warn(`Deleted old indices: ${inactiveIndices}`); + } } /** @@ -181,7 +160,8 @@ export class Elasticsearch implements Database { }, }, from: 0, - index: ALL_INDICES_QUERY, + allow_no_indices: true, + index: ACTIVE_INDICES_ALIAS, size: 1, }); @@ -189,17 +169,12 @@ export class Elasticsearch implements Database { return searchResponse.hits.hits[0]; } - private async prepareBulkWrite(bulk: Bulk): Promise<{index: string; alias: string}> { + private async prepareBulkWrite(bulk: Bulk): Promise { if (!this.ready) { throw new Error('No connection to elasticsearch established yet.'); } const index = getThingIndexName(bulk.type, bulk.source, bulk); - const alias = removeInvalidAliasChars(bulk.type, bulk.uid); - - if (typeof this.aliasMap[alias] === 'undefined') { - this.aliasMap[alias] = {}; - } if (!VALID_INDEX_REGEX.test(index)) { throw new Error( @@ -208,21 +183,24 @@ export class Elasticsearch implements Database { ); } - return {index, alias}; + return index; } /** - * Should be called, when a new bulk was created. Creates a new index and applies the mapping to the index + * Should be called when a new bulk was created. Creates a new index and applies the mapping to the index * * @param bulk the bulk process that was created */ public async bulkCreated(bulk: Bulk): Promise { - const {index} = await this.prepareBulkWrite(bulk); + const index = await this.prepareBulkWrite(bulk); // re-apply the index template before each new bulk operation await putTemplate(this.client, bulk.type); await this.client.indices.create({ index, + aliases: { + [INACTIVE_INDICES_ALIAS]: {}, + }, }); Logger.info('Created index', index); @@ -234,7 +212,7 @@ export class Elasticsearch implements Database { * @param bulk the bulk process that is expired */ public async bulkExpired(bulk: Bulk): Promise { - const index: string = getThingIndexName(bulk.type, bulk.source, bulk); + const index = await this.prepareBulkWrite(bulk); Logger.info('Bulk expired. Deleting index', index); @@ -248,57 +226,38 @@ export class Elasticsearch implements Database { /** * Should be called when a bulk process is updated (replaced by a newer bulk). This will replace the old - * index and publish all data, that was index in the new instead + * index and publish all data that was indexed in the new instead * - * @param bulk the new bulk process that should replace the old one with same type and source + * @param bulk the new bulk process that should replace the old one with the same type and source */ public async bulkUpdated(bulk: Bulk): Promise { - const {index, alias} = await this.prepareBulkWrite(bulk); + const index = await this.prepareBulkWrite(bulk); - // create the new index if it does not exist - // eslint-disable-next-line unicorn/no-await-expression-member - if (!(await this.client.indices.exists({index}))) { - // re-apply the index template before each new bulk operation - await putTemplate(this.client, bulk.type); - await this.client.indices.create({ - index, - }); - } + await this.client.indices.refresh({index, allow_no_indices: false}); - // get the old index from our aliasMap - const oldIndex: string = this.aliasMap[alias][bulk.source]; + const activeIndices = await this.client.indices + .getAlias({ + index: matchIndexByType(bulk.type, bulk.source), + name: ACTIVE_INDICES_ALIAS, + }) + .then(it => Object.entries(it).map(([name]) => name)) + .catch(() => [] as string[]); - // add our new index to the alias - // this was type safe with @types/elasticsearch, the new package however provides no type definitions - const actions: IndicesUpdateAliasesAction[] = [ - { - add: {index: index, alias: alias}, - }, - ]; - - // remove our old index if it exists - // noinspection SuspiciousTypeOfGuard - if (typeof oldIndex === 'string') { - actions.push({ - remove: {index: oldIndex, alias: alias}, - }); - } - - // refresh the index (fsync changes) - await this.client.indices.refresh({index}); - - // execute our alias actions - await this.client.indices.updateAliases({actions}); - - // swap the index in our aliasMap - this.aliasMap[alias][bulk.source] = index; - // noinspection SuspiciousTypeOfGuard - if (typeof oldIndex === 'string') { - // delete the old index - await this.client.indices.delete({index: oldIndex}); - Logger.info('deleted old index', oldIndex); - } - Logger.info('swapped alias index alias', oldIndex, '=>', index); + await this.client.indices.updateAliases({ + actions: [ + { + add: {index, alias: ACTIVE_INDICES_ALIAS}, + }, + { + remove: {index, alias: INACTIVE_INDICES_ALIAS}, + }, + ...activeIndices.map(index => ({ + remove_index: {index}, + })), + ], + }); + Logger.info(`Index for ${bulk.type} is now ${index}`); + Logger.info(`Also removed obsolete indices ${activeIndices}`); } /** @@ -332,7 +291,7 @@ export class Elasticsearch implements Database { await Monitoring.setUp(monitoringConfiguration, this.client, this.mailQueue); } - return this.getAliasMap(retryOptions); + return this.cleanupDeadIndices(retryOptions); } /** @@ -342,35 +301,34 @@ export class Elasticsearch implements Database { * @param bulk the bulk process which item belongs to */ public async post(object: SCThings, bulk: Bulk): Promise { + const index = await this.prepareBulkWrite(bulk); const thing: SCThings & {creation_date: string} = { ...object, creation_date: moment().format(), }; - const item = await this.getObject(object.uid); - - // check that the item will get replaced if the index is rolled over (index with the same name excluding ending uid) - if (typeof item !== 'undefined') { - const indexOfNew = getThingIndexName(thing.type, bulk.source, bulk); - const oldIndex = item._index; - - // new item doesn't replace the old one - if ( - oldIndex.slice(0, Math.max(0, oldIndex.lastIndexOf('_'))) !== - indexOfNew.slice(0, Math.max(0, indexOfNew.lastIndexOf('_'))) - ) { - throw new Error( - // eslint-disable-next-line unicorn/no-null - `Object "${thing.uid}" already exists. Object was: ${JSON.stringify(thing, null, 2)}`, - ); - } + const conflictingThing = await this.client.search({ + query: { + term: { + 'uid.raw': { + value: thing.uid, + }, + }, + }, + // matches all indices but excludes indices of the same type + // https://www.elastic.co/guide/en/elasticsearch/reference/7.14/multi-index.html#multi-index + index: ['stapps_*', `-${matchIndexByType(bulk.type, bulk.source)}`], + }); + if (conflictingThing.hits.hits.length > 0) { + throw new Error( + `UID conflict: ${thing.uid}. ${index} tried to post an object that already exists but which it won't replace.`, + ); } - // regular bulk update (item gets replaced when bulk is updated) const searchResponse = await this.client.create({ document: thing, id: thing.uid, - index: getThingIndexName(thing.type, bulk.source, bulk), + index, timeout: '90s', }); @@ -423,15 +381,15 @@ export class Elasticsearch implements Database { | undefined, }; - const query = { + const response: SearchResponse = await this.client.search({ aggs: aggregations, query: buildQuery(parameters, this.config, esConfig), from: parameters.from, - index: ALL_INDICES_QUERY, + index: ACTIVE_INDICES_ALIAS, + allow_no_indices: true, size: parameters.size, sort: typeof parameters.sort !== 'undefined' ? buildSort(parameters.sort) : undefined, - }; - const response: SearchResponse = await this.client.search(query); + }); return { data: response.hits.hits diff --git a/src/storage/elasticsearch/util/index.ts b/src/storage/elasticsearch/util/index.ts index 24ba5999..4b037891 100644 --- a/src/storage/elasticsearch/util/index.ts +++ b/src/storage/elasticsearch/util/index.ts @@ -6,9 +6,14 @@ import {SCBulkResponse, SCThingType, SCUuid} from '@openstapps/core'; export const INDEX_UID_LENGTH = 8; /** - * A string which matches all indices + * Matches all active indices */ -export const ALL_INDICES_QUERY = 'stapps_*_*_*'; +export const ACTIVE_INDICES_ALIAS = 'stapps_active'; + +/** + * Matches all inactive aliases + */ +export const INACTIVE_INDICES_ALIAS = 'stapps_inactive'; /** * Matches index names such as stapps___ @@ -45,12 +50,14 @@ export function parseIndexName(index: string): ParsedIndexName { * @param bulk bulk process which created this index */ export function getThingIndexName(type: SCThingType, source: string, bulk: SCBulkResponse) { - let out = type.toLowerCase(); - while (out.includes(' ')) { - out = out.replace(' ', '_'); - } + return `stapps_${type.replace(/\s+/g, '_')}_${source}_${getIndexUID(bulk.uid)}`; +} - return `stapps_${out}_${source}_${getIndexUID(bulk.uid)}`; +/** + * Returns an index that matches all indices with the specified type + */ +export function matchIndexByType(type: SCThingType, source: string) { + return `stapps_${type.replace(/\s+/g, '_')}_${source}_*`; } /** diff --git a/test/storage/elasticsearch/elasticsearch.spec.ts b/test/storage/elasticsearch/elasticsearch.spec.ts index 1b736c22..ab981df7 100644 --- a/test/storage/elasticsearch/elasticsearch.spec.ts +++ b/test/storage/elasticsearch/elasticsearch.spec.ts @@ -37,8 +37,13 @@ import {Logger} from '@openstapps/logger'; import {SMTP} from '@openstapps/logger/lib/smtp'; import {expect, use} from 'chai'; import chaiAsPromised from 'chai-as-promised'; +import {beforeEach} from 'mocha'; import mockedEnv from 'mocked-env'; -import {ALL_INDICES_QUERY, parseIndexName} from '../../../src/storage/elasticsearch/util'; +import { + ACTIVE_INDICES_ALIAS, + INACTIVE_INDICES_ALIAS, + parseIndexName, +} from '../../../src/storage/elasticsearch/util'; import * as queryModule from '../../../src/storage/elasticsearch/query/query'; import * as sortModule from '../../../src/storage/elasticsearch/query/sort'; import sinon, {SinonStub} from 'sinon'; @@ -287,7 +292,6 @@ describe('Elasticsearch', function () { let deleteStub: SinonStub; let refreshStub: SinonStub; let updateAliasesStub: SinonStub; - let existsStub: SinonStub; const oldIndex = 'stapps_footype_foosource_oldindex'; beforeEach(function () { @@ -297,7 +301,6 @@ describe('Elasticsearch', function () { sandbox.stub(Indices.prototype, 'putTemplate').resolves({} as any); createStub = sandbox.stub(Indices.prototype, 'create').resolves({} as any); deleteStub = sandbox.stub(Indices.prototype, 'delete').resolves({} as any); - existsStub = sandbox.stub(Indices.prototype, 'exists').resolves({} as any); refreshStub = sandbox.stub(Indices.prototype, 'refresh').resolves({} as any); updateAliasesStub = sandbox.stub(Indices.prototype, 'updateAliases').resolves({} as any); es = new Elasticsearch(configFile); @@ -329,7 +332,7 @@ describe('Elasticsearch', function () { await es.bulkCreated(bulk); expect(putTemplateStub.called).to.be.true; - expect(createStub.calledWith({index})).to.be.true; + expect(createStub.calledWith({index, aliases: {[INACTIVE_INDICES_ALIAS]: {}}})).to.be.true; }); }); @@ -341,6 +344,7 @@ describe('Elasticsearch', function () { it('should cleanup index in case of the expired bulk for bulk whose index is not in use', async function () { sandbox.stub(utilModule, 'getThingIndexName').returns(getIndex()); + await es.init(); await es.bulkExpired({...bulk, state: 'in progress'}); expect(deleteStub.called).to.be.true; @@ -349,6 +353,7 @@ describe('Elasticsearch', function () { it('should not cleanup index in case of the expired bulk for bulk whose index is in use', async function () { sandbox.stub(utilModule, 'getThingIndexName').returns(getIndex()); + await es.init(); await es.bulkExpired({...bulk, state: 'done'}); expect(deleteStub.called).to.be.false; @@ -368,44 +373,44 @@ describe('Elasticsearch', function () { return expect(es.bulkUpdated(bulk)).to.be.rejectedWith('Index'); }); - it("should create templates if index doesn't exist", async function () { + it("should refuse to finalize bulk if index doesn't exist", async function () { await es.init(); - existsStub.resolves(false); - const putTemplateSpy = sandbox.spy(templating, 'putTemplate'); - await es.bulkUpdated(bulk); + refreshStub.throws(); + await expect(es.bulkUpdated(bulk)).to.eventually.throw; - expect(createStub.called).to.be.true; - expect(putTemplateSpy.called).to.be.true; + expect( + refreshStub.calledWith({ + index: getThingIndexName(bulk.type, bulk.source, bulk), + allow_no_indices: false, + }), + ).to.be.true; }); it('should create a new index', async function () { const index = getIndex(); const expectedRefreshActions = [ { - add: {index: index, alias: SCThingType.Book}, + add: {index: index, alias: ACTIVE_INDICES_ALIAS}, }, { - remove: {index: oldIndex, alias: SCThingType.Book}, + remove: {index: index, alias: INACTIVE_INDICES_ALIAS}, + }, + { + remove_index: {index: oldIndex}, }, ]; sandbox.stub(utilModule, 'getThingIndexName').returns(index); - sandbox.stub(es, 'aliasMap').value({ - [SCThingType.Book]: { - [bulk.source]: oldIndex, - }, - }); sandbox.stub(templating, 'putTemplate'); await es.init(); await es.bulkUpdated(bulk); - expect(refreshStub.calledWith({index})).to.be.true; + expect(refreshStub.calledWith({index, allow_no_indices: false})).to.be.true; expect( updateAliasesStub.calledWith({ actions: expectedRefreshActions, }), ).to.be.true; - expect(deleteStub.called).to.be.true; }); }); }); @@ -449,6 +454,10 @@ describe('Elasticsearch', function () { es = new Elasticsearch(configFile); }); + beforeEach(function () { + sandbox.stub(Indices.prototype, 'getAlias').resolves({} as any); + }); + afterEach(function () { sandbox.restore(); }); @@ -459,33 +468,20 @@ describe('Elasticsearch', function () { const object: SearchHit = { _id: '', _index: oldIndex, - _score: 0, _source: message as SCMessage, }; - sandbox.stub(es.client, 'search').resolves(searchResponse(object)); + sandbox.stub(es.client, 'search').resolves(searchResponse(object)); sandbox.stub(utilModule, 'getThingIndexName').returns(index); - return expect(es.post(object._source!, bulk)).to.rejectedWith('exist'); - }); - - it('should not reject if the object already exists but in an index which will be rolled over', async function () { - const object: SearchHit = { - _id: '', - _index: getIndex(), - _score: 0, - _source: message as SCMessage, - }; - sandbox.stub(es.client, 'search').resolves(searchResponse(object)); - // return index name with different generated UID (see getIndex method) - sandbox.stub(utilModule, 'getThingIndexName').returns(getIndex()); - - return expect(es.post(object._source!, bulk)).to.not.rejectedWith('exist'); + await es.init(); + return expect(es.post(object._source!, bulk)).to.be.rejectedWith('UID conflict'); }); it('should reject if there is an object creation error on the elasticsearch side', async function () { sandbox.stub(es.client, 'search').resolves(searchResponse()); sandbox.stub(es.client, 'create').resolves({result: 'not_found'} as CreateResponse); + await es.init(); return expect(es.post(message as SCMessage, bulk)).to.rejectedWith('creation'); }); @@ -498,6 +494,7 @@ describe('Elasticsearch', function () { return Promise.resolve({result: 'created'}); }); + await es.init(); await es.post(message as SCMessage, bulk); expect(createStub.called).to.be.true; @@ -684,7 +681,7 @@ describe('Elasticsearch', function () { query: fakeResponse, sort: fakeBuildSortResponse, from: parameters.from, - index: ALL_INDICES_QUERY, + index: ACTIVE_INDICES_ALIAS, size: parameters.size, }); });