Files
openstapps/backend/backend/src/storage/elasticsearch/elasticsearch.ts

455 lines
14 KiB
TypeScript

/*
* Copyright (C) 2022 StApps
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import {Client, events} from '@elastic/elasticsearch';
import {
AggregateName,
AggregationsMultiTermsBucket,
IndicesGetAliasResponse,
SearchHit,
SearchResponse,
} from '@elastic/elasticsearch/lib/api/types.js';
import {SCConfigFile, SCSearchQuery, SCSearchResponse, SCThings, SCUuid} from '@openstapps/core';
import {Logger} from '@openstapps/logger';
import moment from 'moment';
import {MailQueue} from '../../notification/mail-queue.js';
import {Bulk} from '../bulk-storage.js';
import {Database, SupplementaryGeoJSON, SupplementaryGeoJSONThing} from '../database.js';
import {parseAggregations} from './aggregations.js';
import * as Monitoring from './monitoring.js';
import {buildQuery} from './query/query.js';
import {buildSort} from './query/sort.js';
import {aggregations, putTemplate} from './templating.js';
import {
ElasticsearchConfig,
ElasticsearchQueryDisMaxConfig,
ElasticsearchQueryQueryStringConfig,
} from './types/elasticsearch-config.js';
import {
ACTIVE_INDICES_ALIAS,
getThingIndexName,
INACTIVE_INDICES_ALIAS,
matchIndexByType,
VALID_INDEX_REGEX,
} from './util/index.js';
import {noUndefined} from './util/no-undefined.js';
import {retryCatch, RetryOptions} from './util/retry.js';
import {Feature, Point, Polygon} from 'geojson';
/**
* A database interface for elasticsearch
*/
export class Elasticsearch implements Database {
/**
* Elasticsearch client
*/
client: Client;
/**
* Queue of mails to be sent
*/
mailQueue: MailQueue | undefined;
/**
* Stores information if elasticsearch is ready (connection to it has been established)
*/
ready: boolean;
/**
* Get the url of elasticsearch
*/
static getElasticsearchUrl(): string {
// check if we have a docker link
if (process.env.ES_ADDR !== undefined) {
return process.env.ES_ADDR;
}
// default
return 'http://localhost:9200';
}
/**
* Create a new interface for elasticsearch
* @param config an assembled config file
* @param mailQueue a mail queue for monitoring
*/
constructor(
private readonly config: SCConfigFile,
mailQueue?: MailQueue,
) {
if (config.internal.database === undefined || typeof config.internal.database.version !== 'string') {
throw new TypeError('Database version is undefined. Check your config file');
}
this.client = new Client({
node: Elasticsearch.getElasticsearchUrl(),
});
this.client.diagnostic.on(events.REQUEST, async (error: Error | null, result: unknown) => {
if (error !== null) {
await Logger.error(error);
}
if (process.env.ES_DEBUG === 'true') {
Logger.log(result);
}
});
this.ready = false;
this.mailQueue = mailQueue;
}
/**
* Gets a map which contains each alias and all indices that are associated with each alias
*/
private async cleanupDeadIndices(retryOptions: Partial<RetryOptions<IndicesGetAliasResponse>> = {}) {
const aliasResponse = await retryCatch({
maxRetries: 10,
retryInterval: 2000,
doAction: () => this.client.indices.getAlias(),
onFailedAttempt: (attempt, error, {maxRetries, retryInterval}) => {
Logger.warn('Failed getting alias map:', error);
Logger.warn(`Retrying in ${retryInterval} milliseconds. (${attempt} of ${maxRetries})`);
},
onFail: ({maxRetries}) => {
throw new TypeError(`Failed to retrieve alias map after ${maxRetries} attempts!`);
},
...retryOptions,
});
this.ready = true;
const inactiveIndices = Object.entries(aliasResponse)
.filter(([_, aliases]) => Object.keys(aliases.aliases).includes(INACTIVE_INDICES_ALIAS))
.map(([indexName]) => indexName);
if (inactiveIndices.length > 0) {
await this.client.indices.delete({
index: inactiveIndices,
});
Logger.warn(`Deleted old indices: ${inactiveIndices}`);
}
}
/**
* Provides an elasticsearch object using containing thing's UID
* @param uid an UID to use for the search
* @returns an elasticsearch object containing the thing
*/
private async getObject(uid: SCUuid): Promise<SearchHit<SCThings> | undefined> {
const searchResponse = await this.client.search<SCThings>({
body: {
query: {
term: {
'uid.raw': {
value: uid,
},
},
},
},
from: 0,
allow_no_indices: true,
index: ACTIVE_INDICES_ALIAS,
size: 1,
});
// return data from response
return searchResponse.hits.hits[0];
}
async prepareBulkWrite(bulk: Bulk): Promise<string> {
if (!this.ready) {
throw new Error('No connection to elasticsearch established yet.');
}
const index = getThingIndexName(bulk.type, bulk.source, bulk);
if (!VALID_INDEX_REGEX.test(index)) {
throw new Error(
`Index names can only consist of lowercase letters from a-z, "-", "_" and integer numbers.
Make sure to set the bulk "source" and "type" to names consisting of the characters above.`,
);
}
return index;
}
/**
* Should be called when a new bulk was created. Creates a new index and applies the mapping to the index
* @param bulk the bulk process that was created
*/
public async bulkCreated(bulk: Bulk): Promise<void> {
const index = await this.prepareBulkWrite(bulk);
// re-apply the index template before each new bulk operation
await putTemplate(this.client, bulk.type);
await this.client.indices.create({
index,
aliases: {
[INACTIVE_INDICES_ALIAS]: {},
},
});
Logger.info('Created index', index);
}
/**
* Should be called when a bulk process is expired. The index that was created with this bulk gets deleted
* @param bulk the bulk process that is expired
*/
public async bulkExpired(bulk: Bulk): Promise<void> {
const index = await this.prepareBulkWrite(bulk);
Logger.info('Bulk expired. Deleting index', index);
// don't delete indices that are in use already
if (bulk.state !== 'done') {
Logger.info('deleting obsolete index', index);
await this.client.indices.delete({index});
}
}
/**
* Should be called when a bulk process is updated (replaced by a newer bulk). This will replace the old
* index and publish all data that was indexed in the new instead
* @param bulk the new bulk process that should replace the old one with the same type and source
*/
public async bulkUpdated(bulk: Bulk): Promise<void> {
const index = await this.prepareBulkWrite(bulk);
await this.client.indices.refresh({index, allow_no_indices: false});
const activeIndices = await this.client.indices
.getAlias({
index: matchIndexByType(bulk.type, bulk.source),
name: ACTIVE_INDICES_ALIAS,
})
.then(it => Object.entries(it).map(([name]) => name))
.catch(() => [] as string[]);
await this.client.indices.updateAliases({
actions: [
{
add: {index, alias: ACTIVE_INDICES_ALIAS},
},
{
remove: {index, alias: INACTIVE_INDICES_ALIAS},
},
...activeIndices.map(index => ({
remove_index: {index},
})),
],
});
Logger.info(`Index for ${bulk.type} is now ${index}`);
Logger.info(`Also removed obsolete indices ${activeIndices}`);
}
/**
* Gets an SCThing from all indexed data
* @param uid uid of an SCThing
*/
public async get(uid: SCUuid): Promise<SCThings> {
const object = await this.getObject(uid);
if (object?._source === undefined) {
throw new TypeError('Item not found.');
}
return object._source;
}
/**
* Initialize the elasticsearch database (call all needed methods)
*/
public async init(retryOptions: Partial<RetryOptions<IndicesGetAliasResponse>> = {}): Promise<void> {
const monitoringConfiguration = this.config.internal.monitoring;
if (monitoringConfiguration !== undefined) {
if (this.mailQueue === undefined) {
throw new TypeError(
'Monitoring is defined, but MailQueue is undefined. A MailQueue is obligatory for monitoring.',
);
}
// read all watches and schedule searches on the client
await Monitoring.setUp(monitoringConfiguration, this.client, this.mailQueue);
}
return this.cleanupDeadIndices(retryOptions);
}
/**
* Add an item to an index
* @param object the SCThing to add to the index
* @param bulk the bulk process which item belongs to
*/
public async post(object: SCThings, bulk: Bulk): Promise<void> {
const index = await this.prepareBulkWrite(bulk);
const thing: SCThings & {creation_date: string} = {
...object,
creation_date: moment().format(),
};
const conflictingThing = await this.client.search({
query: {
term: {
'uid.raw': {
value: thing.uid,
},
},
},
// matches all indices but excludes indices of the same type
// https://www.elastic.co/guide/en/elasticsearch/reference/7.14/multi-index.html#multi-index
index: ['stapps_*', `-${matchIndexByType(bulk.type, bulk.source)}`],
});
if (conflictingThing.hits.hits.length > 0) {
throw new Error(
`UID conflict: ${thing.uid}. ${index} tried to post an object that already exists but which it won't replace.`,
);
}
const searchResponse = await this.client.create<SCThings>({
document: thing,
id: thing.uid,
index,
timeout: '90s',
});
if (searchResponse.result !== 'created') {
throw new Error(
`Object creation Error (${searchResponse.result}: Instance was: ${JSON.stringify(thing)}`,
);
}
}
/**
* Put (update) an existing item
* @param object SCThing to put
*/
public async put(object: SCThings): Promise<void> {
const item = await this.getObject(object.uid);
if (item !== undefined) {
await this.client.update({
body: {
doc: object,
},
id: object.uid,
index: item._index,
});
return;
}
throw new Error('You tried to PUT an non-existing object. PUT is only supported on existing objects.');
}
/**
* Search all indexed data
* @param parameters search query
*/
public async search(parameters: SCSearchQuery): Promise<SCSearchResponse> {
if (this.config.internal.database === undefined) {
throw new TypeError('Database is undefined. You have to configure the query build');
}
const esConfig: ElasticsearchConfig = {
name: this.config.internal.database.name as 'elasticsearch',
version: this.config.internal.database.version as string,
query: this.config.internal.database.query as
| ElasticsearchQueryDisMaxConfig
| ElasticsearchQueryQueryStringConfig
| undefined,
};
const response: SearchResponse<SCThings> = await this.client.search({
aggs: aggregations,
query: buildQuery(parameters, this.config, esConfig),
from: parameters.from,
index: ACTIVE_INDICES_ALIAS,
allow_no_indices: true,
size: parameters.size,
sort: parameters.sort === undefined ? undefined : buildSort(parameters.sort),
});
return {
data: response.hits.hits
.map(hit => {
// we only directly return the _source documents
// elasticsearch provides much more information, the user shouldn't see
return hit._source;
})
.filter(noUndefined),
facets:
response.aggregations === undefined
? []
: parseAggregations(response.aggregations as Record<AggregateName, AggregationsMultiTermsBucket>),
pagination: {
count: response.hits.hits.length,
offset: typeof parameters.from === 'number' ? parameters.from : 0,
total:
typeof response.hits.total === 'number' ? response.hits.total : response.hits.total?.value ?? 0,
},
stats: {
time: response.took,
},
};
}
async geo(): Promise<SupplementaryGeoJSON> {
const searchResponse = await this.client.search<Extract<SCThings, {geo: unknown}>>({
body: {
query: {
exists: {
field: 'geo',
},
},
},
from: 0,
allow_no_indices: true,
index: ACTIVE_INDICES_ALIAS,
size: 1,
});
return {
type: 'FeatureCollection',
features: searchResponse.hits.hits
.map(thing => {
return thing._source?.geo
? ({
id: Number(thing._source.identifiers?.['OSM']) || undefined,
type: 'Feature',
geometry: thing._source.geo.polygon ?? thing._source.geo.point,
properties: {
name: thing._source.name,
sameAs: thing._source.sameAs,
image: thing._source.image,
alternateNames: thing._source.alternateNames,
description: thing._source.description,
identifiers: thing._source.identifiers,
categories: thing._source.categories,
categorySpecificValues: thing._source.categorySpecificValues,
openingHours: thing._source.openingHours,
address: thing._source.address,
uid: thing._source.uid,
type: thing._source.type,
},
} satisfies Feature<Polygon | Point, SupplementaryGeoJSONThing>)
: undefined;
})
.filter(noUndefined),
};
}
}