Merge remote-tracking branch 'backend/master'

This commit is contained in:
2023-05-31 14:00:29 +02:00
4 changed files with 149 additions and 162 deletions

View File

@@ -18,7 +18,6 @@ import {
AggregateName,
AggregationsMultiTermsBucket,
IndicesGetAliasResponse,
IndicesUpdateAliasesAction,
SearchHit,
SearchResponse,
} from '@elastic/elasticsearch/lib/api/types';
@@ -38,8 +37,13 @@ import {
ElasticsearchQueryDisMaxConfig,
ElasticsearchQueryQueryStringConfig,
} from './types/elasticsearch-config';
import {ALL_INDICES_QUERY, getThingIndexName, parseIndexName, VALID_INDEX_REGEX} from './util';
import {removeInvalidAliasChars} from './util/alias';
import {
ACTIVE_INDICES_ALIAS,
getThingIndexName,
INACTIVE_INDICES_ALIAS,
matchIndexByType,
VALID_INDEX_REGEX,
} from './util';
import {noUndefined} from './util/no-undefined';
import {retryCatch, RetryOptions} from './util/retry';
@@ -47,17 +51,6 @@ import {retryCatch, RetryOptions} from './util/retry';
* A database interface for elasticsearch
*/
export class Elasticsearch implements Database {
/**
* Holds a map of all elasticsearch indices that are available to search
*/
aliasMap: {
// each scType has an alias which can contain multiple sources
[scType: string]: {
// each source is assigned an index name in elasticsearch
[source: string]: string;
};
};
/**
* Elasticsearch client
*/
@@ -112,7 +105,6 @@ export class Elasticsearch implements Database {
}
});
this.aliasMap = {};
this.ready = false;
this.mailQueue = mailQueue;
@@ -121,7 +113,7 @@ export class Elasticsearch implements Database {
/**
* Gets a map which contains each alias and all indices that are associated with each alias
*/
private async getAliasMap(retryOptions: Partial<RetryOptions<IndicesGetAliasResponse>> = {}) {
private async cleanupDeadIndices(retryOptions: Partial<RetryOptions<IndicesGetAliasResponse>> = {}) {
const aliasResponse = await retryCatch({
maxRetries: 10,
retryInterval: 2000,
@@ -136,31 +128,18 @@ export class Elasticsearch implements Database {
...retryOptions,
});
const aliases = Object.entries(aliasResponse)
.filter(([index]) => !index.startsWith('.'))
.map(([index, alias]) => ({
index,
alias,
...parseIndexName(index),
}));
for (const {type, index, source} of aliases.filter(({type, alias}) => type in alias.aliases)) {
this.aliasMap[type] = this.aliasMap[type] || {};
this.aliasMap[type][source] = index;
}
this.ready = true;
const unusedIndices = aliases.filter(({type, alias}) => !(type in alias.aliases)).map(({index}) => index);
if (unusedIndices.length > 0) {
await this.client.indices.delete({
index: unusedIndices,
});
Logger.warn(`Deleted old indices: oldIndicesToDelete`);
}
const inactiveIndices = Object.entries(aliasResponse)
.filter(([_, aliases]) => Object.keys(aliases.aliases).includes(INACTIVE_INDICES_ALIAS))
.map(([indexName]) => indexName);
// eslint-disable-next-line unicorn/no-null
Logger.ok(`Read alias map from elasticsearch: ${JSON.stringify(this.aliasMap, null, 2)}`);
if (inactiveIndices.length > 0) {
await this.client.indices.delete({
index: inactiveIndices,
});
Logger.warn(`Deleted old indices: ${inactiveIndices}`);
}
}
/**
@@ -181,7 +160,8 @@ export class Elasticsearch implements Database {
},
},
from: 0,
index: ALL_INDICES_QUERY,
allow_no_indices: true,
index: ACTIVE_INDICES_ALIAS,
size: 1,
});
@@ -189,17 +169,12 @@ export class Elasticsearch implements Database {
return searchResponse.hits.hits[0];
}
private async prepareBulkWrite(bulk: Bulk): Promise<{index: string; alias: string}> {
private async prepareBulkWrite(bulk: Bulk): Promise<string> {
if (!this.ready) {
throw new Error('No connection to elasticsearch established yet.');
}
const index = getThingIndexName(bulk.type, bulk.source, bulk);
const alias = removeInvalidAliasChars(bulk.type, bulk.uid);
if (typeof this.aliasMap[alias] === 'undefined') {
this.aliasMap[alias] = {};
}
if (!VALID_INDEX_REGEX.test(index)) {
throw new Error(
@@ -208,21 +183,24 @@ export class Elasticsearch implements Database {
);
}
return {index, alias};
return index;
}
/**
* Should be called, when a new bulk was created. Creates a new index and applies the mapping to the index
* Should be called when a new bulk was created. Creates a new index and applies the mapping to the index
*
* @param bulk the bulk process that was created
*/
public async bulkCreated(bulk: Bulk): Promise<void> {
const {index} = await this.prepareBulkWrite(bulk);
const index = await this.prepareBulkWrite(bulk);
// re-apply the index template before each new bulk operation
await putTemplate(this.client, bulk.type);
await this.client.indices.create({
index,
aliases: {
[INACTIVE_INDICES_ALIAS]: {},
},
});
Logger.info('Created index', index);
@@ -234,7 +212,7 @@ export class Elasticsearch implements Database {
* @param bulk the bulk process that is expired
*/
public async bulkExpired(bulk: Bulk): Promise<void> {
const index: string = getThingIndexName(bulk.type, bulk.source, bulk);
const index = await this.prepareBulkWrite(bulk);
Logger.info('Bulk expired. Deleting index', index);
@@ -248,57 +226,38 @@ export class Elasticsearch implements Database {
/**
* Should be called when a bulk process is updated (replaced by a newer bulk). This will replace the old
* index and publish all data, that was index in the new instead
* index and publish all data that was indexed in the new instead
*
* @param bulk the new bulk process that should replace the old one with same type and source
* @param bulk the new bulk process that should replace the old one with the same type and source
*/
public async bulkUpdated(bulk: Bulk): Promise<void> {
const {index, alias} = await this.prepareBulkWrite(bulk);
const index = await this.prepareBulkWrite(bulk);
// create the new index if it does not exist
// eslint-disable-next-line unicorn/no-await-expression-member
if (!(await this.client.indices.exists({index}))) {
// re-apply the index template before each new bulk operation
await putTemplate(this.client, bulk.type);
await this.client.indices.create({
index,
});
}
await this.client.indices.refresh({index, allow_no_indices: false});
// get the old index from our aliasMap
const oldIndex: string = this.aliasMap[alias][bulk.source];
const activeIndices = await this.client.indices
.getAlias({
index: matchIndexByType(bulk.type, bulk.source),
name: ACTIVE_INDICES_ALIAS,
})
.then(it => Object.entries(it).map(([name]) => name))
.catch(() => [] as string[]);
// add our new index to the alias
// this was type safe with @types/elasticsearch, the new package however provides no type definitions
const actions: IndicesUpdateAliasesAction[] = [
{
add: {index: index, alias: alias},
},
];
// remove our old index if it exists
// noinspection SuspiciousTypeOfGuard
if (typeof oldIndex === 'string') {
actions.push({
remove: {index: oldIndex, alias: alias},
});
}
// refresh the index (fsync changes)
await this.client.indices.refresh({index});
// execute our alias actions
await this.client.indices.updateAliases({actions});
// swap the index in our aliasMap
this.aliasMap[alias][bulk.source] = index;
// noinspection SuspiciousTypeOfGuard
if (typeof oldIndex === 'string') {
// delete the old index
await this.client.indices.delete({index: oldIndex});
Logger.info('deleted old index', oldIndex);
}
Logger.info('swapped alias index alias', oldIndex, '=>', index);
await this.client.indices.updateAliases({
actions: [
{
add: {index, alias: ACTIVE_INDICES_ALIAS},
},
{
remove: {index, alias: INACTIVE_INDICES_ALIAS},
},
...activeIndices.map(index => ({
remove_index: {index},
})),
],
});
Logger.info(`Index for ${bulk.type} is now ${index}`);
Logger.info(`Also removed obsolete indices ${activeIndices}`);
}
/**
@@ -332,7 +291,7 @@ export class Elasticsearch implements Database {
await Monitoring.setUp(monitoringConfiguration, this.client, this.mailQueue);
}
return this.getAliasMap(retryOptions);
return this.cleanupDeadIndices(retryOptions);
}
/**
@@ -342,35 +301,34 @@ export class Elasticsearch implements Database {
* @param bulk the bulk process which item belongs to
*/
public async post(object: SCThings, bulk: Bulk): Promise<void> {
const index = await this.prepareBulkWrite(bulk);
const thing: SCThings & {creation_date: string} = {
...object,
creation_date: moment().format(),
};
const item = await this.getObject(object.uid);
// check that the item will get replaced if the index is rolled over (index with the same name excluding ending uid)
if (typeof item !== 'undefined') {
const indexOfNew = getThingIndexName(thing.type, bulk.source, bulk);
const oldIndex = item._index;
// new item doesn't replace the old one
if (
oldIndex.slice(0, Math.max(0, oldIndex.lastIndexOf('_'))) !==
indexOfNew.slice(0, Math.max(0, indexOfNew.lastIndexOf('_')))
) {
throw new Error(
// eslint-disable-next-line unicorn/no-null
`Object "${thing.uid}" already exists. Object was: ${JSON.stringify(thing, null, 2)}`,
);
}
const conflictingThing = await this.client.search({
query: {
term: {
'uid.raw': {
value: thing.uid,
},
},
},
// matches all indices but excludes indices of the same type
// https://www.elastic.co/guide/en/elasticsearch/reference/7.14/multi-index.html#multi-index
index: ['stapps_*', `-${matchIndexByType(bulk.type, bulk.source)}`],
});
if (conflictingThing.hits.hits.length > 0) {
throw new Error(
`UID conflict: ${thing.uid}. ${index} tried to post an object that already exists but which it won't replace.`,
);
}
// regular bulk update (item gets replaced when bulk is updated)
const searchResponse = await this.client.create<SCThings>({
document: thing,
id: thing.uid,
index: getThingIndexName(thing.type, bulk.source, bulk),
index,
timeout: '90s',
});
@@ -423,15 +381,15 @@ export class Elasticsearch implements Database {
| undefined,
};
const query = {
const response: SearchResponse<SCThings> = await this.client.search({
aggs: aggregations,
query: buildQuery(parameters, this.config, esConfig),
from: parameters.from,
index: ALL_INDICES_QUERY,
index: ACTIVE_INDICES_ALIAS,
allow_no_indices: true,
size: parameters.size,
sort: typeof parameters.sort !== 'undefined' ? buildSort(parameters.sort) : undefined,
};
const response: SearchResponse<SCThings> = await this.client.search(query);
});
return {
data: response.hits.hits

View File

@@ -0,0 +1,25 @@
/*
* Copyright (C) 2022 StApps
* This program is free software: you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation, version 3.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along with
* this program. If not, see <https://www.gnu.org/licenses/>.
*/
/**
*
*/
export function hashStringToInt(string_: string): number {
return [...string_].reduce(
(accumulator, current) =>
(current.codePointAt(0) ?? 0) + (accumulator << 6) + (accumulator << 16) - accumulator,
0,
);
}

View File

@@ -6,9 +6,14 @@ import {SCBulkResponse, SCThingType, SCUuid} from '@openstapps/core';
export const INDEX_UID_LENGTH = 8;
/**
* A string which matches all indices
* Matches all active indices
*/
export const ALL_INDICES_QUERY = 'stapps_*_*_*';
export const ACTIVE_INDICES_ALIAS = 'stapps_active';
/**
* Matches all inactive aliases
*/
export const INACTIVE_INDICES_ALIAS = 'stapps_inactive';
/**
* Matches index names such as stapps_<type>_<source>_<random suffix>
@@ -45,12 +50,14 @@ export function parseIndexName(index: string): ParsedIndexName {
* @param bulk bulk process which created this index
*/
export function getThingIndexName(type: SCThingType, source: string, bulk: SCBulkResponse) {
let out = type.toLowerCase();
while (out.includes(' ')) {
out = out.replace(' ', '_');
}
return `stapps_${type.replace(/\s+/g, '_')}_${source}_${getIndexUID(bulk.uid)}`;
}
return `stapps_${out}_${source}_${getIndexUID(bulk.uid)}`;
/**
* Returns an index that matches all indices with the specified type
*/
export function matchIndexByType(type: SCThingType, source: string) {
return `stapps_${type.replace(/\s+/g, '_')}_${source}_*`;
}
/**

View File

@@ -37,8 +37,13 @@ import {Logger} from '@openstapps/logger';
import {SMTP} from '@openstapps/logger/lib/smtp';
import {expect, use} from 'chai';
import chaiAsPromised from 'chai-as-promised';
import {beforeEach} from 'mocha';
import mockedEnv from 'mocked-env';
import {ALL_INDICES_QUERY, parseIndexName} from '../../../src/storage/elasticsearch/util';
import {
ACTIVE_INDICES_ALIAS,
INACTIVE_INDICES_ALIAS,
parseIndexName,
} from '../../../src/storage/elasticsearch/util';
import * as queryModule from '../../../src/storage/elasticsearch/query/query';
import * as sortModule from '../../../src/storage/elasticsearch/query/sort';
import sinon, {SinonStub} from 'sinon';
@@ -287,7 +292,6 @@ describe('Elasticsearch', function () {
let deleteStub: SinonStub;
let refreshStub: SinonStub;
let updateAliasesStub: SinonStub;
let existsStub: SinonStub;
const oldIndex = 'stapps_footype_foosource_oldindex';
beforeEach(function () {
@@ -297,7 +301,6 @@ describe('Elasticsearch', function () {
sandbox.stub(Indices.prototype, 'putTemplate').resolves({} as any);
createStub = sandbox.stub(Indices.prototype, 'create').resolves({} as any);
deleteStub = sandbox.stub(Indices.prototype, 'delete').resolves({} as any);
existsStub = sandbox.stub(Indices.prototype, 'exists').resolves({} as any);
refreshStub = sandbox.stub(Indices.prototype, 'refresh').resolves({} as any);
updateAliasesStub = sandbox.stub(Indices.prototype, 'updateAliases').resolves({} as any);
es = new Elasticsearch(configFile);
@@ -329,7 +332,7 @@ describe('Elasticsearch', function () {
await es.bulkCreated(bulk);
expect(putTemplateStub.called).to.be.true;
expect(createStub.calledWith({index})).to.be.true;
expect(createStub.calledWith({index, aliases: {[INACTIVE_INDICES_ALIAS]: {}}})).to.be.true;
});
});
@@ -341,6 +344,7 @@ describe('Elasticsearch', function () {
it('should cleanup index in case of the expired bulk for bulk whose index is not in use', async function () {
sandbox.stub(utilModule, 'getThingIndexName').returns(getIndex());
await es.init();
await es.bulkExpired({...bulk, state: 'in progress'});
expect(deleteStub.called).to.be.true;
@@ -349,6 +353,7 @@ describe('Elasticsearch', function () {
it('should not cleanup index in case of the expired bulk for bulk whose index is in use', async function () {
sandbox.stub(utilModule, 'getThingIndexName').returns(getIndex());
await es.init();
await es.bulkExpired({...bulk, state: 'done'});
expect(deleteStub.called).to.be.false;
@@ -368,44 +373,44 @@ describe('Elasticsearch', function () {
return expect(es.bulkUpdated(bulk)).to.be.rejectedWith('Index');
});
it("should create templates if index doesn't exist", async function () {
it("should refuse to finalize bulk if index doesn't exist", async function () {
await es.init();
existsStub.resolves(false);
const putTemplateSpy = sandbox.spy(templating, 'putTemplate');
await es.bulkUpdated(bulk);
refreshStub.throws();
await expect(es.bulkUpdated(bulk)).to.eventually.throw;
expect(createStub.called).to.be.true;
expect(putTemplateSpy.called).to.be.true;
expect(
refreshStub.calledWith({
index: getThingIndexName(bulk.type, bulk.source, bulk),
allow_no_indices: false,
}),
).to.be.true;
});
it('should create a new index', async function () {
const index = getIndex();
const expectedRefreshActions = [
{
add: {index: index, alias: SCThingType.Book},
add: {index: index, alias: ACTIVE_INDICES_ALIAS},
},
{
remove: {index: oldIndex, alias: SCThingType.Book},
remove: {index: index, alias: INACTIVE_INDICES_ALIAS},
},
{
remove_index: {index: oldIndex},
},
];
sandbox.stub(utilModule, 'getThingIndexName').returns(index);
sandbox.stub(es, 'aliasMap').value({
[SCThingType.Book]: {
[bulk.source]: oldIndex,
},
});
sandbox.stub(templating, 'putTemplate');
await es.init();
await es.bulkUpdated(bulk);
expect(refreshStub.calledWith({index})).to.be.true;
expect(refreshStub.calledWith({index, allow_no_indices: false})).to.be.true;
expect(
updateAliasesStub.calledWith({
actions: expectedRefreshActions,
}),
).to.be.true;
expect(deleteStub.called).to.be.true;
});
});
});
@@ -449,6 +454,10 @@ describe('Elasticsearch', function () {
es = new Elasticsearch(configFile);
});
beforeEach(function () {
sandbox.stub(Indices.prototype, 'getAlias').resolves({} as any);
});
afterEach(function () {
sandbox.restore();
});
@@ -459,33 +468,20 @@ describe('Elasticsearch', function () {
const object: SearchHit<SCMessage> = {
_id: '',
_index: oldIndex,
_score: 0,
_source: message as SCMessage,
};
sandbox.stub(es.client, 'search').resolves(searchResponse(object));
sandbox.stub(es.client, 'search').resolves(searchResponse<SCMessage>(object));
sandbox.stub(utilModule, 'getThingIndexName').returns(index);
return expect(es.post(object._source!, bulk)).to.rejectedWith('exist');
});
it('should not reject if the object already exists but in an index which will be rolled over', async function () {
const object: SearchHit<SCMessage> = {
_id: '',
_index: getIndex(),
_score: 0,
_source: message as SCMessage,
};
sandbox.stub(es.client, 'search').resolves(searchResponse(object));
// return index name with different generated UID (see getIndex method)
sandbox.stub(utilModule, 'getThingIndexName').returns(getIndex());
return expect(es.post(object._source!, bulk)).to.not.rejectedWith('exist');
await es.init();
return expect(es.post(object._source!, bulk)).to.be.rejectedWith('UID conflict');
});
it('should reject if there is an object creation error on the elasticsearch side', async function () {
sandbox.stub(es.client, 'search').resolves(searchResponse());
sandbox.stub(es.client, 'create').resolves({result: 'not_found'} as CreateResponse);
await es.init();
return expect(es.post(message as SCMessage, bulk)).to.rejectedWith('creation');
});
@@ -498,6 +494,7 @@ describe('Elasticsearch', function () {
return Promise.resolve({result: 'created'});
});
await es.init();
await es.post(message as SCMessage, bulk);
expect(createStub.called).to.be.true;
@@ -684,7 +681,7 @@ describe('Elasticsearch', function () {
query: fakeResponse,
sort: fakeBuildSortResponse,
from: parameters.from,
index: ALL_INDICES_QUERY,
index: ACTIVE_INDICES_ALIAS,
size: parameters.size,
});
});