/* * Copyright (C) 2019 StApps * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ import { SCBulkResponse, SCConfigFile, SCFacet, SCSearchQuery, SCSearchResponse, SCThings, SCThingTypes, SCUuid, } from '@openstapps/core'; import * as ES from 'elasticsearch'; import * as moment from 'moment'; import { logger } from '../../common'; import { MailQueue } from '../../notification/MailQueue'; import { Bulk } from '../BulkStorage'; import { Database } from '../Database'; import { buildAggregations, parseAggregations } from './aggregations'; import { AggregationSchema, ElasticsearchConfig, ElasticsearchObject } from './common'; import * as Monitoring from './monitoring'; import { buildQuery, buildSort } from './query'; import { putTemplate } from './templating'; // this will match index names such as stapps___ const indexRegex = /^stapps_([A-z0-9_]+)_([a-z0-9-_]+)_([-a-z0-9^_]+)$/; /** * A database interface for elasticsearch */ export class Elasticsearch implements Database { aggregationsSchema: AggregationSchema; /** * Holds a map of all elasticsearch indices that are available to search */ aliasMap: { // each scType has a alias which can contain multiple sources [scType: string]: { // each source is assigned a index name in elasticsearch [source: string]: string; }, }; client: ES.Client; ready: boolean; /** * Create a new interface for elasticsearch * @param config an assembled config file * @param mailQueue a mailqueue for monitoring */ constructor(private config: SCConfigFile, mailQueue?: MailQueue) { if (!config.internal.database || typeof config.internal.database.version === 'undefined') { throw new Error('Database version is undefined. Check you config file'); } const options = { apiVersion: config.internal.database.version, host: this.getElasticsearchUrl(), log: 'error', }; // enable verbose logging for all request to elasticsearch if (process.env.ES_DEBUG === 'true') { options.log = 'trace'; } this.client = new ES.Client(options); this.aliasMap = {}; this.ready = false; this.aggregationsSchema = buildAggregations(this.config.internal.aggregations); this.getAliasMap(); const monitoringConfiguration = this.config.internal.monitoring; if (typeof monitoringConfiguration !== 'undefined') { if (typeof mailQueue === 'undefined') { throw new Error('Monitoring is defined, but MailQueue is undefined. A MailQueue is obligatory for monitoring.'); } // read all watches and schedule searches on the client Monitoring.setUp(monitoringConfiguration, this.client, mailQueue); } } /** * Tests if an object already exists * * Returns Elasticsearch Object if it exists */ private async doesItemExist(object: SCThings): Promise<{exists: boolean; object?: ElasticsearchObject}> { const searchResponse = await this.client.search({ body: { query: { term: { 'uid.raw': { value: object.uid, }, }, }, }, from: 0, index: this.getListOfAllIndices(), size: 1, }); if (searchResponse.hits.total > 1) { return { exists: true, object: searchResponse.hits.hits[0], }; } return { exists: false, }; } /** * Gets a map which contains each alias and all indices that are associated with each alias */ private async getAliasMap() { // create a list of old indices that are not in use const oldIndicesToDelete: string[] = []; let aliases: { [index: string]: { aliases: { [K in SCThingTypes]: any }, }, }; try { aliases = await this.client.indices.getAlias({}); } catch (error) { logger.error('Failed getting alias map:', error); setTimeout(() => { this.getAliasMap(); }, 5000); // retry in 5 seconds return; } for (const index in aliases) { if (aliases.hasOwnProperty(index)) { const matches = indexRegex.exec(index); if (matches !== null) { const type = matches[1]; const source = matches[2]; // check if there is an alias for the current index // check that alias equals type const hasAlias = type in aliases[index].aliases; if (hasAlias) { if (typeof this.aliasMap[type] === 'undefined') { this.aliasMap[type] = {}; } this.aliasMap[type][source] = index; } else { oldIndicesToDelete.push(index); } } } } this.ready = true; // delete old indices that are not used in any alias if (oldIndicesToDelete.length > 0) { await this.client.indices.delete({ index: oldIndicesToDelete, }); logger.warn('Deleted old indices: ' + oldIndicesToDelete); } logger.ok('Read alias map from elasticsearch: ' + JSON.stringify(this.aliasMap, null, 2)); } /** * Get the url of elasticsearch */ private getElasticsearchUrl(): string { // check if we have a docker link if (process.env.ES_PORT_9200_TCP_ADDR !== undefined && process.env.ES_PORT_9200_TCP_PORT !== undefined) { return process.env.ES_PORT_9200_TCP_ADDR + ':' + process.env.ES_PORT_9200_TCP_PORT; } // default return 'localhost:9200'; } /** * Gets the index name in elasticsearch for one SCThingType * @param type SCThingType of data in the index * @param source source of data in the index * @param bulk bulk process which created this index */ private getIndex(type: SCThingTypes, source: string, bulk: SCBulkResponse) { return `stapps_${type.toLowerCase().replace(' ', '_')}_${source}_${bulk.uid.substring(0, 8)}`; } /** * Generates a string which matches all indices */ private getListOfAllIndices(): string { // map each SC type in upper camel case return 'stapps_*_*_*'; } /** * Should be called, when a new bulk was created. Creates a new index and applies a the mapping to the index * @param bulk the bulk process that was created */ public async bulkCreated(bulk: Bulk): Promise { // if our es instance is not ready yet, we cannot serve this request if (!this.ready) { throw new Error('No connection to elasticsearch established yet.'); } // index name for elasticsearch const index: string = this.getIndex(bulk.type, bulk.source, bulk); // there already is an index with this type and source. We will index the new one and switch the alias to it // the old one is deleted const alias = bulk.type; if (typeof this.aliasMap[alias] === 'undefined') { this.aliasMap[alias] = {}; } if (!indexRegex.test(index)) { throw new Error( 'Index names can only consist of lowercase letters from a-z, "-", "_" and integer numbers.\n' + 'Make sure to set the bulk "source" and "type" to names consisting of the characters above.', ); } // re-apply the index template before each new bulk operation await putTemplate(this.client); await this.client.indices.create({ index, }); logger.info('Created index', index); } /** * Should be called when a bulk process is expired. The index that was created with this bulk gets deleted * @param bulk the bulk process that is expired */ public async bulkExpired(bulk: Bulk): Promise { // index name for elasticsearch const index: string = this.getIndex(bulk.type, bulk.source, bulk); logger.info('Bulk expired. Deleting index', index); // don't delete indices that are in use already if (bulk.state !== 'done') { logger.info('deleting obsolete index', index); return await this.client.indices.delete({ index }); } } /** * Should be called when a bulk process is updated (replaced by a newer bulk). This will replace the old * index and publish all data, that was index in the new instead * @param bulk the new bulk process that should replace the old one with same type and source */ public async bulkUpdated(bulk: Bulk): Promise { // if our es instance is not ready yet, we cannot serve this request if (!this.ready) { throw new Error('Elasticsearch not ready'); } // index name for elasticsearch const index: string = this.getIndex(bulk.type, bulk.source, bulk); // alias for the indices const alias = bulk.type; if (typeof this.aliasMap[alias] === 'undefined') { this.aliasMap[alias] = {}; } if (!indexRegex.test(index)) { throw new Error( 'Index names can only consist of lowercase letters from a-z, "-", "_" and integer numbers.\n' + 'Make sure to set the bulk "source" and "type" to names consisting of the characters above.', ); } // create the new index if it does not exists if (!(await this.client.indices.exists({ index }))) { // re-apply the index template before each new bulk operation await putTemplate(this.client); await this.client.indices.create({ index, }); } // get the old index from our aliasMap const oldIndex: string = this.aliasMap[alias][bulk.source]; // add our new index to the alias const actions: ES.IndicesUpdateAliasesParamsAction[] = [ { add: { index: index, alias: alias }, }, ]; // remove our old index if it exists if (typeof oldIndex === 'string') { actions.push({ remove: { index: oldIndex, alias: alias }, }); } // refresh the index (fsync changes) await this.client.indices.refresh({ index, }); // execute our alias actions await this.client.indices.updateAliases({ body: { actions, }, }); // swap the index in our aliasMap this.aliasMap[alias][bulk.source] = index; if (typeof oldIndex === 'string') { // delete the old index await this.client.indices.delete({ index: oldIndex }); logger.info('deleted old index', oldIndex); } logger.info('swapped alias index alias', oldIndex, '=>', index); } /** * Gets an SCThing from all indexed data * @param uid uid of an SCThing */ public async get(uid: SCUuid): Promise { const searchResponse = await this.client.search({ body: { query: { term: { uid, }, }, }, index: this.getListOfAllIndices(), }); // get data from response const hits = searchResponse.hits.hits; if (hits.length !== 1) { throw new Error('No unique item found.'); } else { return hits[0]._source as SCThings; } } /** * Add an item to an index * @param object the SCThing to add to the index * @param bulk the bulk process which item belongs to */ public async post(object: SCThings, bulk: Bulk): Promise { const obj: SCThings & {creation_date: string} = { ...object, creation_date: moment().format(), }; const itemMeta = await this.doesItemExist(obj); // we have to check that the item will get replaced if the index is rolled over if (itemMeta.exists && typeof itemMeta.object !== 'undefined') { const indexOfNew = this.getIndex(obj.type, bulk.source, bulk); const oldIndex = itemMeta.object._index; // new item doesn't replace the old one if (oldIndex.substring(0, oldIndex.length - 9) !== indexOfNew.substring(0, indexOfNew.length - 9)) { throw new Error( 'Object \"' + obj.uid + '\" already exists. Object was: ' + JSON.stringify(obj, null, 2), ); } } // regular bulk update (item gets replaced when bulk is updated) const searchResponse = await this.client.create({ body: obj, id: obj.uid, index: this.getIndex(obj.type, bulk.source, bulk), timeout: '90s', type: obj.type, }); if (!searchResponse.created) { throw new Error('Object creation Error: Instance was: ' + JSON.stringify(obj)); } } /** * Put (update) an existing item * @param object SCThing to put */ public async put(object: SCThings) { const itemMeta = await this.doesItemExist(object); if (itemMeta.exists && typeof itemMeta.object !== 'undefined') { return await this.client.update({ body: { doc: object, }, id: object.uid, index: itemMeta.object._index, type: object.type.toLowerCase(), }); } throw new Error('You tried to PUT an non-existing object. PUT is only supported on existing objects.'); } /** * Search all indexed data * @param params search query */ public async search(params: SCSearchQuery): Promise { if (typeof this.config.internal.database === 'undefined') { throw new Error('Database is undefined. You have to configure the query build'); } const searchRequest: ES.SearchParams = { body: { aggs: this.aggregationsSchema, // use cached version of aggregations (they only change if config changes) query: buildQuery(params, this.config, this.config.internal.database as ElasticsearchConfig), }, from: params.from, index: this.getListOfAllIndices(), size: params.size, }; if (typeof params.sort !== 'undefined') { searchRequest.body.sort = buildSort(params.sort); } // perform the search against elasticsearch const response = await this.client.search(searchRequest); // gather pagination information const pagination = { count: response.hits.hits.length, offset: (typeof params.from === 'number') ? params.from : 0, total: response.hits.total, }; // gather statistics about this search const stats = { time: response.took, }; // we only directly return the _source documents // elasticsearch provides much more information, the user shouldn't see const data = response.hits.hits.map((hit) => { return hit._source; // SCThing }); let facets: SCFacet[] = []; // read the aggregations from elasticsearch and parse them to facets by our configuration if (typeof response.aggregations !== 'undefined') { facets = parseAggregations(this.aggregationsSchema, response.aggregations); } return { data, facets, pagination, stats, }; } }