fix: search uses inactive indices

This commit is contained in:
Thea Schöbl
2023-05-31 11:40:08 +00:00
parent f4b2d747a3
commit 7c3ccc35af
3 changed files with 124 additions and 162 deletions

View File

@@ -18,7 +18,6 @@ import {
AggregateName, AggregateName,
AggregationsMultiTermsBucket, AggregationsMultiTermsBucket,
IndicesGetAliasResponse, IndicesGetAliasResponse,
IndicesUpdateAliasesAction,
SearchHit, SearchHit,
SearchResponse, SearchResponse,
} from '@elastic/elasticsearch/lib/api/types'; } from '@elastic/elasticsearch/lib/api/types';
@@ -38,8 +37,13 @@ import {
ElasticsearchQueryDisMaxConfig, ElasticsearchQueryDisMaxConfig,
ElasticsearchQueryQueryStringConfig, ElasticsearchQueryQueryStringConfig,
} from './types/elasticsearch-config'; } from './types/elasticsearch-config';
import {ALL_INDICES_QUERY, getThingIndexName, parseIndexName, VALID_INDEX_REGEX} from './util'; import {
import {removeInvalidAliasChars} from './util/alias'; ACTIVE_INDICES_ALIAS,
getThingIndexName,
INACTIVE_INDICES_ALIAS,
matchIndexByType,
VALID_INDEX_REGEX,
} from './util';
import {noUndefined} from './util/no-undefined'; import {noUndefined} from './util/no-undefined';
import {retryCatch, RetryOptions} from './util/retry'; import {retryCatch, RetryOptions} from './util/retry';
@@ -47,17 +51,6 @@ import {retryCatch, RetryOptions} from './util/retry';
* A database interface for elasticsearch * A database interface for elasticsearch
*/ */
export class Elasticsearch implements Database { export class Elasticsearch implements Database {
/**
* Holds a map of all elasticsearch indices that are available to search
*/
aliasMap: {
// each scType has an alias which can contain multiple sources
[scType: string]: {
// each source is assigned an index name in elasticsearch
[source: string]: string;
};
};
/** /**
* Elasticsearch client * Elasticsearch client
*/ */
@@ -112,7 +105,6 @@ export class Elasticsearch implements Database {
} }
}); });
this.aliasMap = {};
this.ready = false; this.ready = false;
this.mailQueue = mailQueue; this.mailQueue = mailQueue;
@@ -121,7 +113,7 @@ export class Elasticsearch implements Database {
/** /**
* Gets a map which contains each alias and all indices that are associated with each alias * Gets a map which contains each alias and all indices that are associated with each alias
*/ */
private async getAliasMap(retryOptions: Partial<RetryOptions<IndicesGetAliasResponse>> = {}) { private async cleanupDeadIndices(retryOptions: Partial<RetryOptions<IndicesGetAliasResponse>> = {}) {
const aliasResponse = await retryCatch({ const aliasResponse = await retryCatch({
maxRetries: 10, maxRetries: 10,
retryInterval: 2000, retryInterval: 2000,
@@ -136,31 +128,18 @@ export class Elasticsearch implements Database {
...retryOptions, ...retryOptions,
}); });
const aliases = Object.entries(aliasResponse)
.filter(([index]) => !index.startsWith('.'))
.map(([index, alias]) => ({
index,
alias,
...parseIndexName(index),
}));
for (const {type, index, source} of aliases.filter(({type, alias}) => type in alias.aliases)) {
this.aliasMap[type] = this.aliasMap[type] || {};
this.aliasMap[type][source] = index;
}
this.ready = true; this.ready = true;
const unusedIndices = aliases.filter(({type, alias}) => !(type in alias.aliases)).map(({index}) => index); const inactiveIndices = Object.entries(aliasResponse)
if (unusedIndices.length > 0) { .filter(([_, aliases]) => Object.keys(aliases.aliases).includes(INACTIVE_INDICES_ALIAS))
await this.client.indices.delete({ .map(([indexName]) => indexName);
index: unusedIndices,
});
Logger.warn(`Deleted old indices: oldIndicesToDelete`);
}
// eslint-disable-next-line unicorn/no-null if (inactiveIndices.length > 0) {
Logger.ok(`Read alias map from elasticsearch: ${JSON.stringify(this.aliasMap, null, 2)}`); await this.client.indices.delete({
index: inactiveIndices,
});
Logger.warn(`Deleted old indices: ${inactiveIndices}`);
}
} }
/** /**
@@ -181,7 +160,8 @@ export class Elasticsearch implements Database {
}, },
}, },
from: 0, from: 0,
index: ALL_INDICES_QUERY, allow_no_indices: true,
index: ACTIVE_INDICES_ALIAS,
size: 1, size: 1,
}); });
@@ -189,17 +169,12 @@ export class Elasticsearch implements Database {
return searchResponse.hits.hits[0]; return searchResponse.hits.hits[0];
} }
private async prepareBulkWrite(bulk: Bulk): Promise<{index: string; alias: string}> { private async prepareBulkWrite(bulk: Bulk): Promise<string> {
if (!this.ready) { if (!this.ready) {
throw new Error('No connection to elasticsearch established yet.'); throw new Error('No connection to elasticsearch established yet.');
} }
const index = getThingIndexName(bulk.type, bulk.source, bulk); const index = getThingIndexName(bulk.type, bulk.source, bulk);
const alias = removeInvalidAliasChars(bulk.type, bulk.uid);
if (typeof this.aliasMap[alias] === 'undefined') {
this.aliasMap[alias] = {};
}
if (!VALID_INDEX_REGEX.test(index)) { if (!VALID_INDEX_REGEX.test(index)) {
throw new Error( throw new Error(
@@ -208,21 +183,24 @@ export class Elasticsearch implements Database {
); );
} }
return {index, alias}; return index;
} }
/** /**
* Should be called, when a new bulk was created. Creates a new index and applies the mapping to the index * Should be called when a new bulk was created. Creates a new index and applies the mapping to the index
* *
* @param bulk the bulk process that was created * @param bulk the bulk process that was created
*/ */
public async bulkCreated(bulk: Bulk): Promise<void> { public async bulkCreated(bulk: Bulk): Promise<void> {
const {index} = await this.prepareBulkWrite(bulk); const index = await this.prepareBulkWrite(bulk);
// re-apply the index template before each new bulk operation // re-apply the index template before each new bulk operation
await putTemplate(this.client, bulk.type); await putTemplate(this.client, bulk.type);
await this.client.indices.create({ await this.client.indices.create({
index, index,
aliases: {
[INACTIVE_INDICES_ALIAS]: {},
},
}); });
Logger.info('Created index', index); Logger.info('Created index', index);
@@ -234,7 +212,7 @@ export class Elasticsearch implements Database {
* @param bulk the bulk process that is expired * @param bulk the bulk process that is expired
*/ */
public async bulkExpired(bulk: Bulk): Promise<void> { public async bulkExpired(bulk: Bulk): Promise<void> {
const index: string = getThingIndexName(bulk.type, bulk.source, bulk); const index = await this.prepareBulkWrite(bulk);
Logger.info('Bulk expired. Deleting index', index); Logger.info('Bulk expired. Deleting index', index);
@@ -248,57 +226,38 @@ export class Elasticsearch implements Database {
/** /**
* Should be called when a bulk process is updated (replaced by a newer bulk). This will replace the old * Should be called when a bulk process is updated (replaced by a newer bulk). This will replace the old
* index and publish all data, that was index in the new instead * index and publish all data that was indexed in the new instead
* *
* @param bulk the new bulk process that should replace the old one with same type and source * @param bulk the new bulk process that should replace the old one with the same type and source
*/ */
public async bulkUpdated(bulk: Bulk): Promise<void> { public async bulkUpdated(bulk: Bulk): Promise<void> {
const {index, alias} = await this.prepareBulkWrite(bulk); const index = await this.prepareBulkWrite(bulk);
// create the new index if it does not exist await this.client.indices.refresh({index, allow_no_indices: false});
// eslint-disable-next-line unicorn/no-await-expression-member
if (!(await this.client.indices.exists({index}))) {
// re-apply the index template before each new bulk operation
await putTemplate(this.client, bulk.type);
await this.client.indices.create({
index,
});
}
// get the old index from our aliasMap const activeIndices = await this.client.indices
const oldIndex: string = this.aliasMap[alias][bulk.source]; .getAlias({
index: matchIndexByType(bulk.type, bulk.source),
name: ACTIVE_INDICES_ALIAS,
})
.then(it => Object.entries(it).map(([name]) => name))
.catch(() => [] as string[]);
// add our new index to the alias await this.client.indices.updateAliases({
// this was type safe with @types/elasticsearch, the new package however provides no type definitions actions: [
const actions: IndicesUpdateAliasesAction[] = [ {
{ add: {index, alias: ACTIVE_INDICES_ALIAS},
add: {index: index, alias: alias}, },
}, {
]; remove: {index, alias: INACTIVE_INDICES_ALIAS},
},
// remove our old index if it exists ...activeIndices.map(index => ({
// noinspection SuspiciousTypeOfGuard remove_index: {index},
if (typeof oldIndex === 'string') { })),
actions.push({ ],
remove: {index: oldIndex, alias: alias}, });
}); Logger.info(`Index for ${bulk.type} is now ${index}`);
} Logger.info(`Also removed obsolete indices ${activeIndices}`);
// refresh the index (fsync changes)
await this.client.indices.refresh({index});
// execute our alias actions
await this.client.indices.updateAliases({actions});
// swap the index in our aliasMap
this.aliasMap[alias][bulk.source] = index;
// noinspection SuspiciousTypeOfGuard
if (typeof oldIndex === 'string') {
// delete the old index
await this.client.indices.delete({index: oldIndex});
Logger.info('deleted old index', oldIndex);
}
Logger.info('swapped alias index alias', oldIndex, '=>', index);
} }
/** /**
@@ -332,7 +291,7 @@ export class Elasticsearch implements Database {
await Monitoring.setUp(monitoringConfiguration, this.client, this.mailQueue); await Monitoring.setUp(monitoringConfiguration, this.client, this.mailQueue);
} }
return this.getAliasMap(retryOptions); return this.cleanupDeadIndices(retryOptions);
} }
/** /**
@@ -342,35 +301,34 @@ export class Elasticsearch implements Database {
* @param bulk the bulk process which item belongs to * @param bulk the bulk process which item belongs to
*/ */
public async post(object: SCThings, bulk: Bulk): Promise<void> { public async post(object: SCThings, bulk: Bulk): Promise<void> {
const index = await this.prepareBulkWrite(bulk);
const thing: SCThings & {creation_date: string} = { const thing: SCThings & {creation_date: string} = {
...object, ...object,
creation_date: moment().format(), creation_date: moment().format(),
}; };
const item = await this.getObject(object.uid); const conflictingThing = await this.client.search({
query: {
// check that the item will get replaced if the index is rolled over (index with the same name excluding ending uid) term: {
if (typeof item !== 'undefined') { 'uid.raw': {
const indexOfNew = getThingIndexName(thing.type, bulk.source, bulk); value: thing.uid,
const oldIndex = item._index; },
},
// new item doesn't replace the old one },
if ( // matches all indices but excludes indices of the same type
oldIndex.slice(0, Math.max(0, oldIndex.lastIndexOf('_'))) !== // https://www.elastic.co/guide/en/elasticsearch/reference/7.14/multi-index.html#multi-index
indexOfNew.slice(0, Math.max(0, indexOfNew.lastIndexOf('_'))) index: ['stapps_*', `-${matchIndexByType(bulk.type, bulk.source)}`],
) { });
throw new Error( if (conflictingThing.hits.hits.length > 0) {
// eslint-disable-next-line unicorn/no-null throw new Error(
`Object "${thing.uid}" already exists. Object was: ${JSON.stringify(thing, null, 2)}`, `UID conflict: ${thing.uid}. ${index} tried to post an object that already exists but which it won't replace.`,
); );
}
} }
// regular bulk update (item gets replaced when bulk is updated)
const searchResponse = await this.client.create<SCThings>({ const searchResponse = await this.client.create<SCThings>({
document: thing, document: thing,
id: thing.uid, id: thing.uid,
index: getThingIndexName(thing.type, bulk.source, bulk), index,
timeout: '90s', timeout: '90s',
}); });
@@ -423,15 +381,15 @@ export class Elasticsearch implements Database {
| undefined, | undefined,
}; };
const query = { const response: SearchResponse<SCThings> = await this.client.search({
aggs: aggregations, aggs: aggregations,
query: buildQuery(parameters, this.config, esConfig), query: buildQuery(parameters, this.config, esConfig),
from: parameters.from, from: parameters.from,
index: ALL_INDICES_QUERY, index: ACTIVE_INDICES_ALIAS,
allow_no_indices: true,
size: parameters.size, size: parameters.size,
sort: typeof parameters.sort !== 'undefined' ? buildSort(parameters.sort) : undefined, sort: typeof parameters.sort !== 'undefined' ? buildSort(parameters.sort) : undefined,
}; });
const response: SearchResponse<SCThings> = await this.client.search(query);
return { return {
data: response.hits.hits data: response.hits.hits

View File

@@ -6,9 +6,14 @@ import {SCBulkResponse, SCThingType, SCUuid} from '@openstapps/core';
export const INDEX_UID_LENGTH = 8; export const INDEX_UID_LENGTH = 8;
/** /**
* A string which matches all indices * Matches all active indices
*/ */
export const ALL_INDICES_QUERY = 'stapps_*_*_*'; export const ACTIVE_INDICES_ALIAS = 'stapps_active';
/**
* Matches all inactive aliases
*/
export const INACTIVE_INDICES_ALIAS = 'stapps_inactive';
/** /**
* Matches index names such as stapps_<type>_<source>_<random suffix> * Matches index names such as stapps_<type>_<source>_<random suffix>
@@ -45,12 +50,14 @@ export function parseIndexName(index: string): ParsedIndexName {
* @param bulk bulk process which created this index * @param bulk bulk process which created this index
*/ */
export function getThingIndexName(type: SCThingType, source: string, bulk: SCBulkResponse) { export function getThingIndexName(type: SCThingType, source: string, bulk: SCBulkResponse) {
let out = type.toLowerCase(); return `stapps_${type.replace(/\s+/g, '_')}_${source}_${getIndexUID(bulk.uid)}`;
while (out.includes(' ')) { }
out = out.replace(' ', '_');
}
return `stapps_${out}_${source}_${getIndexUID(bulk.uid)}`; /**
* Returns an index that matches all indices with the specified type
*/
export function matchIndexByType(type: SCThingType, source: string) {
return `stapps_${type.replace(/\s+/g, '_')}_${source}_*`;
} }
/** /**

View File

@@ -37,8 +37,13 @@ import {Logger} from '@openstapps/logger';
import {SMTP} from '@openstapps/logger/lib/smtp'; import {SMTP} from '@openstapps/logger/lib/smtp';
import {expect, use} from 'chai'; import {expect, use} from 'chai';
import chaiAsPromised from 'chai-as-promised'; import chaiAsPromised from 'chai-as-promised';
import {beforeEach} from 'mocha';
import mockedEnv from 'mocked-env'; import mockedEnv from 'mocked-env';
import {ALL_INDICES_QUERY, parseIndexName} from '../../../src/storage/elasticsearch/util'; import {
ACTIVE_INDICES_ALIAS,
INACTIVE_INDICES_ALIAS,
parseIndexName,
} from '../../../src/storage/elasticsearch/util';
import * as queryModule from '../../../src/storage/elasticsearch/query/query'; import * as queryModule from '../../../src/storage/elasticsearch/query/query';
import * as sortModule from '../../../src/storage/elasticsearch/query/sort'; import * as sortModule from '../../../src/storage/elasticsearch/query/sort';
import sinon, {SinonStub} from 'sinon'; import sinon, {SinonStub} from 'sinon';
@@ -287,7 +292,6 @@ describe('Elasticsearch', function () {
let deleteStub: SinonStub; let deleteStub: SinonStub;
let refreshStub: SinonStub; let refreshStub: SinonStub;
let updateAliasesStub: SinonStub; let updateAliasesStub: SinonStub;
let existsStub: SinonStub;
const oldIndex = 'stapps_footype_foosource_oldindex'; const oldIndex = 'stapps_footype_foosource_oldindex';
beforeEach(function () { beforeEach(function () {
@@ -297,7 +301,6 @@ describe('Elasticsearch', function () {
sandbox.stub(Indices.prototype, 'putTemplate').resolves({} as any); sandbox.stub(Indices.prototype, 'putTemplate').resolves({} as any);
createStub = sandbox.stub(Indices.prototype, 'create').resolves({} as any); createStub = sandbox.stub(Indices.prototype, 'create').resolves({} as any);
deleteStub = sandbox.stub(Indices.prototype, 'delete').resolves({} as any); deleteStub = sandbox.stub(Indices.prototype, 'delete').resolves({} as any);
existsStub = sandbox.stub(Indices.prototype, 'exists').resolves({} as any);
refreshStub = sandbox.stub(Indices.prototype, 'refresh').resolves({} as any); refreshStub = sandbox.stub(Indices.prototype, 'refresh').resolves({} as any);
updateAliasesStub = sandbox.stub(Indices.prototype, 'updateAliases').resolves({} as any); updateAliasesStub = sandbox.stub(Indices.prototype, 'updateAliases').resolves({} as any);
es = new Elasticsearch(configFile); es = new Elasticsearch(configFile);
@@ -329,7 +332,7 @@ describe('Elasticsearch', function () {
await es.bulkCreated(bulk); await es.bulkCreated(bulk);
expect(putTemplateStub.called).to.be.true; expect(putTemplateStub.called).to.be.true;
expect(createStub.calledWith({index})).to.be.true; expect(createStub.calledWith({index, aliases: {[INACTIVE_INDICES_ALIAS]: {}}})).to.be.true;
}); });
}); });
@@ -341,6 +344,7 @@ describe('Elasticsearch', function () {
it('should cleanup index in case of the expired bulk for bulk whose index is not in use', async function () { it('should cleanup index in case of the expired bulk for bulk whose index is not in use', async function () {
sandbox.stub(utilModule, 'getThingIndexName').returns(getIndex()); sandbox.stub(utilModule, 'getThingIndexName').returns(getIndex());
await es.init();
await es.bulkExpired({...bulk, state: 'in progress'}); await es.bulkExpired({...bulk, state: 'in progress'});
expect(deleteStub.called).to.be.true; expect(deleteStub.called).to.be.true;
@@ -349,6 +353,7 @@ describe('Elasticsearch', function () {
it('should not cleanup index in case of the expired bulk for bulk whose index is in use', async function () { it('should not cleanup index in case of the expired bulk for bulk whose index is in use', async function () {
sandbox.stub(utilModule, 'getThingIndexName').returns(getIndex()); sandbox.stub(utilModule, 'getThingIndexName').returns(getIndex());
await es.init();
await es.bulkExpired({...bulk, state: 'done'}); await es.bulkExpired({...bulk, state: 'done'});
expect(deleteStub.called).to.be.false; expect(deleteStub.called).to.be.false;
@@ -368,44 +373,44 @@ describe('Elasticsearch', function () {
return expect(es.bulkUpdated(bulk)).to.be.rejectedWith('Index'); return expect(es.bulkUpdated(bulk)).to.be.rejectedWith('Index');
}); });
it("should create templates if index doesn't exist", async function () { it("should refuse to finalize bulk if index doesn't exist", async function () {
await es.init(); await es.init();
existsStub.resolves(false); refreshStub.throws();
const putTemplateSpy = sandbox.spy(templating, 'putTemplate'); await expect(es.bulkUpdated(bulk)).to.eventually.throw;
await es.bulkUpdated(bulk);
expect(createStub.called).to.be.true; expect(
expect(putTemplateSpy.called).to.be.true; refreshStub.calledWith({
index: getThingIndexName(bulk.type, bulk.source, bulk),
allow_no_indices: false,
}),
).to.be.true;
}); });
it('should create a new index', async function () { it('should create a new index', async function () {
const index = getIndex(); const index = getIndex();
const expectedRefreshActions = [ const expectedRefreshActions = [
{ {
add: {index: index, alias: SCThingType.Book}, add: {index: index, alias: ACTIVE_INDICES_ALIAS},
}, },
{ {
remove: {index: oldIndex, alias: SCThingType.Book}, remove: {index: index, alias: INACTIVE_INDICES_ALIAS},
},
{
remove_index: {index: oldIndex},
}, },
]; ];
sandbox.stub(utilModule, 'getThingIndexName').returns(index); sandbox.stub(utilModule, 'getThingIndexName').returns(index);
sandbox.stub(es, 'aliasMap').value({
[SCThingType.Book]: {
[bulk.source]: oldIndex,
},
});
sandbox.stub(templating, 'putTemplate'); sandbox.stub(templating, 'putTemplate');
await es.init(); await es.init();
await es.bulkUpdated(bulk); await es.bulkUpdated(bulk);
expect(refreshStub.calledWith({index})).to.be.true; expect(refreshStub.calledWith({index, allow_no_indices: false})).to.be.true;
expect( expect(
updateAliasesStub.calledWith({ updateAliasesStub.calledWith({
actions: expectedRefreshActions, actions: expectedRefreshActions,
}), }),
).to.be.true; ).to.be.true;
expect(deleteStub.called).to.be.true;
}); });
}); });
}); });
@@ -449,6 +454,10 @@ describe('Elasticsearch', function () {
es = new Elasticsearch(configFile); es = new Elasticsearch(configFile);
}); });
beforeEach(function () {
sandbox.stub(Indices.prototype, 'getAlias').resolves({} as any);
});
afterEach(function () { afterEach(function () {
sandbox.restore(); sandbox.restore();
}); });
@@ -459,33 +468,20 @@ describe('Elasticsearch', function () {
const object: SearchHit<SCMessage> = { const object: SearchHit<SCMessage> = {
_id: '', _id: '',
_index: oldIndex, _index: oldIndex,
_score: 0,
_source: message as SCMessage, _source: message as SCMessage,
}; };
sandbox.stub(es.client, 'search').resolves(searchResponse(object)); sandbox.stub(es.client, 'search').resolves(searchResponse<SCMessage>(object));
sandbox.stub(utilModule, 'getThingIndexName').returns(index); sandbox.stub(utilModule, 'getThingIndexName').returns(index);
return expect(es.post(object._source!, bulk)).to.rejectedWith('exist'); await es.init();
}); return expect(es.post(object._source!, bulk)).to.be.rejectedWith('UID conflict');
it('should not reject if the object already exists but in an index which will be rolled over', async function () {
const object: SearchHit<SCMessage> = {
_id: '',
_index: getIndex(),
_score: 0,
_source: message as SCMessage,
};
sandbox.stub(es.client, 'search').resolves(searchResponse(object));
// return index name with different generated UID (see getIndex method)
sandbox.stub(utilModule, 'getThingIndexName').returns(getIndex());
return expect(es.post(object._source!, bulk)).to.not.rejectedWith('exist');
}); });
it('should reject if there is an object creation error on the elasticsearch side', async function () { it('should reject if there is an object creation error on the elasticsearch side', async function () {
sandbox.stub(es.client, 'search').resolves(searchResponse()); sandbox.stub(es.client, 'search').resolves(searchResponse());
sandbox.stub(es.client, 'create').resolves({result: 'not_found'} as CreateResponse); sandbox.stub(es.client, 'create').resolves({result: 'not_found'} as CreateResponse);
await es.init();
return expect(es.post(message as SCMessage, bulk)).to.rejectedWith('creation'); return expect(es.post(message as SCMessage, bulk)).to.rejectedWith('creation');
}); });
@@ -498,6 +494,7 @@ describe('Elasticsearch', function () {
return Promise.resolve({result: 'created'}); return Promise.resolve({result: 'created'});
}); });
await es.init();
await es.post(message as SCMessage, bulk); await es.post(message as SCMessage, bulk);
expect(createStub.called).to.be.true; expect(createStub.called).to.be.true;
@@ -684,7 +681,7 @@ describe('Elasticsearch', function () {
query: fakeResponse, query: fakeResponse,
sort: fakeBuildSortResponse, sort: fakeBuildSortResponse,
from: parameters.from, from: parameters.from,
index: ALL_INDICES_QUERY, index: ACTIVE_INDICES_ALIAS,
size: parameters.size, size: parameters.size,
}); });
}); });