feat: use new Elasticsearch package

This commit is contained in:
Wieland Schöbl
2020-01-21 11:10:29 +01:00
committed by Rainer Killinger
parent 54301ae8fb
commit 1bad092185
8 changed files with 368 additions and 286 deletions

View File

@@ -14,10 +14,10 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import {SCFacet, SCThingType} from '@openstapps/core';
import {AggregationSchema} from '@openstapps/core-tools/lib/mappings/aggregation-definitions';
import {readFileSync} from 'fs';
import {
AggregationResponse,
AggregationSchema,
isBucketAggregation,
isESAggMatchAllFilter,
isESNestedAggregation,

View File

@@ -13,14 +13,16 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import {SCThingType} from '@openstapps/core';
import {SCThings, SCThingType} from '@openstapps/core';
import {SCThing} from '@openstapps/core';
import {
ESAggMatchAllFilter,
ESAggTypeFilter,
ESNestedAggregation,
ESAggTypeFilter, ESNestedAggregation,
ESTermsFilter,
} from '@openstapps/core-tools/lib/mappings/aggregation-definitions';
// we only have the @types package because some things type definitions are still missing from the official
// @elastic/elasticsearch package
// tslint:disable-next-line:no-implicit-dependencies
import {NameList} from 'elasticsearch';
/**
@@ -71,6 +73,21 @@ export function isBucketAggregation(agg: BucketAggregation | number): agg is Buc
return typeof agg !== 'number';
}
/**
* A response that contains info about whether the item exists plus the object itself
*/
export interface ItemExistsResponse {
/**
* Whether the item exists
*/
exists: boolean;
/**
* The object if it exists
*/
object?: ElasticsearchObject<SCThings>;
}
/**
* An aggregation that contains more aggregations nested inside
*/
@@ -94,6 +111,14 @@ export function isNestedAggregation(agg: BucketAggregation | NestedAggregation):
return typeof (agg as BucketAggregation).buckets === 'undefined';
}
/**
* An elasticsearch bucket aggregation
* @see https://www.elastic.co/guide/en/elasticsearch/reference/5.6/search-aggregations-bucket.html
*/
export interface AggregationSchema {
[aggregationName: string]: ESTermsFilter | ESNestedAggregation;
}
/**
* A configuration for using the Dis Max Query
*

View File

@@ -13,6 +13,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import {ApiResponse, Client, events, RequestParams} from '@elastic/elasticsearch';
import {
SCBulkResponse,
SCConfigFile,
@@ -23,19 +24,23 @@ import {
SCThingType,
SCUuid,
} from '@openstapps/core';
import {AggregationSchema} from '@openstapps/core-tools/lib/mappings/aggregation-definitions';
import {Logger} from '@openstapps/logger';
import * as ES from 'elasticsearch';
// we only have the @types package because some things type definitions are still missing from the official
// @elastic/elasticsearch package
// tslint:disable-next-line:no-implicit-dependencies
import {IndicesUpdateAliasesParamsAction, SearchResponse} from 'elasticsearch';
import * as moment from 'moment';
import {MailQueue} from '../../notification/mail-queue';
import {Bulk} from '../bulk-storage';
import {Database} from '../database';
import {buildAggregations, parseAggregations} from './aggregations';
import {
AggregationResponse,
AggregationSchema,
ElasticsearchConfig,
ElasticsearchObject,
ElasticsearchQueryDisMaxConfig,
ElasticsearchQueryQueryStringConfig,
ItemExistsResponse,
} from './common';
import * as Monitoring from './monitoring';
import {buildQuery, buildSort} from './query';
@@ -75,7 +80,7 @@ export class Elasticsearch implements Database {
/**
* Elasticsearch client
*/
client: ES.Client;
client: Client;
/**
* Queue of mails to be sent
@@ -92,12 +97,12 @@ export class Elasticsearch implements Database {
*/
static getElasticsearchUrl(): string {
// check if we have a docker link
if (process.env.ES_PORT_9200_TCP_ADDR !== undefined && process.env.ES_PORT_9200_TCP_PORT !== undefined) {
return `${process.env.ES_PORT_9200_TCP_ADDR}:${process.env.ES_PORT_9200_TCP_PORT}`;
if (process.env.ES_ADDR !== undefined ) {
return process.env.ES_ADDR;
}
// default
return 'localhost:9200';
return 'http://localhost:9200';
}
/**
@@ -146,28 +151,28 @@ export class Elasticsearch implements Database {
['\\', '/', '*', '?', '"', '<', '>', '|', ',', '#'].forEach((value) => {
if (formattedAlias.includes(value)) {
formattedAlias = formattedAlias.replace(value, '');
Logger.warn(`Type of the bulk ${uid} contains an invalid character '${value}'. This can lead to two bulks `
+ `having the same alias despite having different types, as invalid characters are removed automatically. ` +
`New alias name is "${formattedAlias}."`);
Logger.warn(`Type of the bulk ${uid} contains an invalid character '${value}'. This can lead to two bulks
having the same alias despite having different types, as invalid characters are removed automatically.
New alias name is "${formattedAlias}."`);
}
});
['-', '_', '+'].forEach((value) => {
if (formattedAlias.charAt(0) === value) {
formattedAlias = formattedAlias.substring(1);
Logger.warn(`Type of the bulk ${uid} begins with '${value}'. This can lead to two bulks `
+ `having the same alias despite having different types, as invalid characters are removed automatically. ` +
`New alias name is "${formattedAlias}."`);
Logger.warn(`Type of the bulk ${uid} begins with '${value}'. This can lead to two bulks having the same
alias despite having different types, as invalid characters are removed automatically.
New alias name is "${formattedAlias}."`);
}
});
if (formattedAlias === '.' || formattedAlias === '..') {
Logger.warn(`Type of the bulk ${uid} is ${formattedAlias}. This is an invalid name, please consider using ` +
`another one, as it will be replaced with 'alias_placeholder', which can lead to strange errors.`);
Logger.warn(`Type of the bulk ${uid} is ${formattedAlias}. This is an invalid name, please consider using
another one, as it will be replaced with 'alias_placeholder', which can lead to strange errors.`);
return 'alias_placeholder';
}
if (formattedAlias.includes(':')) {
Logger.warn(`Type of the bulk ${uid} contains a ':'. This isn't an issue now, but will be in future ` +
`Elasticsearch versions!`);
Logger.warn(`Type of the bulk ${uid} contains a ':'. This isn't an issue now, but will be in future
Elasticsearch versions!`);
}
return formattedAlias;
@@ -185,28 +190,24 @@ export class Elasticsearch implements Database {
throw new Error('Database version is undefined. Check your config file');
}
const options: ES.ConfigOptions = {
apiVersion: config.internal.database.version,
host: Elasticsearch.getElasticsearchUrl(),
// enable verbose logging for all request to elasticsearch
log: process.env.ES_DEBUG === 'true' ? 'trace' : 'error',
};
this.client = new Client({
node: Elasticsearch.getElasticsearchUrl(),
});
this.client.on(events.REQUEST, async (err: Error | null, result: ApiResponse<unknown>) => {
if (err !== null) {
await Logger.error(err);
}
if (process.env.ES_DEBUG === 'true') {
Logger.log(result);
}
});
this.client = new ES.Client(options);
this.aliasMap = {};
this.ready = false;
checkESTemplate(typeof process.env.ES_FORCE_MAPPING_UPDATE !== 'undefined' ?
process.env.ES_FORCE_MAPPING_UPDATE === 'true' : false);
this.aggregationsSchema = buildAggregations();
this.mailQueue = mailQueue;
/*refreshAllTemplates(this.client)
.then(() => {
// noop
});*/
}
/**
@@ -214,9 +215,8 @@ export class Elasticsearch implements Database {
*
* Returns Elasticsearch Object if it exists
*/
// tslint:disable-next-line: completed-docs
private async doesItemExist(object: SCThings): Promise<{exists: boolean; object?: ElasticsearchObject<SCThings>; }> {
const searchResponse = await this.client.search<SCThings>({
private async doesItemExist(object: SCThings): Promise<ItemExistsResponse> {
const searchResponse: ApiResponse<SearchResponse<SCThings>> = await this.client.search({
body: {
query: {
term: {
@@ -231,10 +231,10 @@ export class Elasticsearch implements Database {
size: 1,
});
if (searchResponse.hits.total > 1) {
if (searchResponse.body.hits.total > 1) {
return {
exists: true,
object: searchResponse.hits.hits[0],
object: searchResponse.body.hits.hits[0],
};
}
@@ -264,7 +264,7 @@ export class Elasticsearch implements Database {
};
try {
aliases = await this.client.indices.getAlias({});
aliases = (await this.client.indices.getAlias({})).body;
} catch (error) {
await Logger.error('Failed getting alias map:', error);
setTimeout(async () => {
@@ -335,7 +335,7 @@ export class Elasticsearch implements Database {
if (!indexRegex.test(index)) {
throw new Error(
`Index names can only consist of lowercase letters from a-z, "-", "_" and integer numbers.
`Index names can only consist of lowercase letters from a-z, "-", "_" and integer numbers.
Make sure to set the bulk "source" and "type" to names consisting of the characters above.`,
);
}
@@ -363,7 +363,7 @@ export class Elasticsearch implements Database {
if (bulk.state !== 'done') {
Logger.info('deleting obsolete index', index);
return this.client.indices.delete({index});
await this.client.indices.delete({index});
}
}
@@ -390,13 +390,13 @@ export class Elasticsearch implements Database {
if (!indexRegex.test(index)) {
throw new Error(
`Index names can only consist of lowercase letters from a-z, "-", "_" and integer numbers.
`Index names can only consist of lowercase letters from a-z, "-", "_" and integer numbers.
Make sure to set the bulk "source" and "type" to names consisting of the characters above.`,
);
}
// create the new index if it does not exists
if (!(await this.client.indices.exists({index}))) {
if (!(await this.client.indices.exists({index})).body) {
// re-apply the index template before each new bulk operation
await putTemplate(this.client, bulk.type);
await this.client.indices.create({
@@ -408,7 +408,8 @@ export class Elasticsearch implements Database {
const oldIndex: string = this.aliasMap[alias][bulk.source];
// add our new index to the alias
const actions: ES.IndicesUpdateAliasesParamsAction[] = [
// this was type safe with @types/elasticsearch, the new package however provides no type definitions
const actions: IndicesUpdateAliasesParamsAction[] = [
{
add: {index: index, alias: alias},
},
@@ -423,7 +424,7 @@ export class Elasticsearch implements Database {
// refresh the index (fsync changes)
await this.client.indices.refresh({
index,
index: index,
});
// execute our alias actions
@@ -448,7 +449,7 @@ export class Elasticsearch implements Database {
* @param uid uid of an SCThing
*/
public async get(uid: SCUuid): Promise<SCThings> {
const searchResponse = await this.client.search({
const searchResponse: ApiResponse<SearchResponse<SCThings>> = await this.client.search({
body: {
query: {
term: {
@@ -460,13 +461,13 @@ export class Elasticsearch implements Database {
});
// get data from response
const hits = searchResponse.hits.hits;
const hits = searchResponse.body.hits.hits;
if (hits.length !== 1) {
throw new Error('No unique item found.');
}
return hits[0]._source as SCThings;
return hits[0]._source;
}
/**
@@ -483,6 +484,9 @@ export class Elasticsearch implements Database {
Monitoring.setUp(monitoringConfiguration, this.client, this.mailQueue);
}
checkESTemplate(typeof process.env.ES_FORCE_MAPPING_UPDATE !== 'undefined' ?
process.env.ES_FORCE_MAPPING_UPDATE === 'true' : false);
return this.getAliasMap();
}
@@ -494,7 +498,7 @@ export class Elasticsearch implements Database {
public async post(object: SCThings, bulk: Bulk): Promise<void> {
// tslint:disable-next-line: completed-docs
const obj: SCThings & {creation_date: string; } = {
const obj: SCThings & { creation_date: string; } = {
...object,
creation_date: moment()
.format(),
@@ -510,10 +514,10 @@ export class Elasticsearch implements Database {
// new item doesn't replace the old one
if (oldIndex.substring(0, oldIndex.length - Elasticsearch.INDEX_UID_LENGTH + 1)
!== indexOfNew.substring(0, indexOfNew.length - Elasticsearch.INDEX_UID_LENGTH + 1)) {
throw new Error(
// tslint:disable-next-line: no-magic-numbers
`Object "${obj.uid}" already exists. Object was: ${JSON.stringify(obj, null, 2)}`,
);
throw new Error(
// tslint:disable-next-line: no-magic-numbers
`Object "${obj.uid}" already exists. Object was: ${JSON.stringify(obj, null, 2)}`,
);
}
}
@@ -526,7 +530,7 @@ export class Elasticsearch implements Database {
type: obj.type,
});
if (!searchResponse.created) {
if (!searchResponse.body.created) {
throw new Error(`Object creation Error: Instance was: ${JSON.stringify(obj)}`);
}
}
@@ -535,12 +539,12 @@ export class Elasticsearch implements Database {
* Put (update) an existing item
* @param object SCThing to put
*/
public async put(object: SCThings) {
public async put(object: SCThings): Promise<void> {
const itemMeta = await this.doesItemExist(object);
if (itemMeta.exists && typeof itemMeta.object !== 'undefined') {
return this.client.update({
await this.client.update({
body: {
doc: object,
},
@@ -548,6 +552,8 @@ export class Elasticsearch implements Database {
index: itemMeta.object._index,
type: object.type.toLowerCase(),
});
return;
}
throw new Error('You tried to PUT an non-existing object. PUT is only supported on existing objects.');
@@ -581,7 +587,7 @@ export class Elasticsearch implements Database {
.query as ElasticsearchQueryDisMaxConfig | ElasticsearchQueryQueryStringConfig;
}
const searchRequest: ES.SearchParams = {
const searchRequest: RequestParams.Search = {
body: {
aggs: this.aggregationsSchema, // use cached version of aggregations (they only change if config changes)
query: buildQuery(params, this.config, esConfig),
@@ -596,31 +602,31 @@ export class Elasticsearch implements Database {
}
// perform the search against elasticsearch
const response = await this.client.search<SCThings>(searchRequest);
const response: ApiResponse<SearchResponse<SCThings>> = await this.client.search(searchRequest);
// gather pagination information
const pagination = {
count: response.hits.hits.length,
count: response.body.hits.hits.length,
offset: (typeof params.from === 'number') ? params.from : 0,
total: response.hits.total,
total: response.body.hits.total,
};
// gather statistics about this search
const stats = {
time: response.took,
time: response.body.took,
};
// we only directly return the _source documents
// elasticsearch provides much more information, the user shouldn't see
const data = response.hits.hits.map((hit) => {
const data = response.body.hits.hits.map((hit) => {
return hit._source; // SCThing
});
let facets: SCFacet[] = [];
// read the aggregations from elasticsearch and parse them to facets by our configuration
if (typeof response.aggregations !== 'undefined') {
facets = parseAggregations(this.aggregationsSchema, response.aggregations);
if (typeof response.body.aggregations !== 'undefined') {
facets = parseAggregations(this.aggregationsSchema, response.body.aggregations as AggregationResponse);
}
return {

View File

@@ -13,15 +13,20 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import {ApiResponse, Client, RequestParams} from '@elastic/elasticsearch';
import {
SCMonitoringConfiguration,
SCMonitoringLogAction,
SCMonitoringMailAction,
SCMonitoringMaximumLengthCondition,
SCMonitoringMinimumLengthCondition,
SCThings,
} from '@openstapps/core';
import {Logger} from '@openstapps/logger';
import * as ES from 'elasticsearch';
// we only have the @types package because some things type definitions are still missing from the official
// @elastic/elasticsearch package
// tslint:disable-next-line:no-implicit-dependencies
import {SearchResponse} from 'elasticsearch';
import * as cron from 'node-cron';
import {MailQueue} from '../../notification/mail-queue';
@@ -47,7 +52,7 @@ function conditionFails(
* @param total Number of results
*/
function minConditionFails(minimumLength: number, total: number) {
return typeof minimumLength === 'number' && minimumLength > total;
return minimumLength > total;
}
/**
@@ -99,7 +104,7 @@ export function runActions(
* @param esClient elasticsearch client
* @param mailQueue mailQueue for mail actions
*/
export function setUp(monitoringConfig: SCMonitoringConfiguration, esClient: ES.Client, mailQueue: MailQueue) {
export function setUp(monitoringConfig: SCMonitoringConfiguration, esClient: Client, mailQueue: MailQueue) {
// set up Watches
monitoringConfig.watchers.forEach((watcher) => {
@@ -122,10 +127,11 @@ export function setUp(monitoringConfig: SCMonitoringConfiguration, esClient: ES.
cron.schedule(trigger.executionTime, async () => {
// execute watch (search->condition->action)
const result = await esClient.search(watcher.query as ES.SearchParams);
const result: ApiResponse<SearchResponse<SCThings>> =
await esClient.search(watcher.query as RequestParams.Search);
// check conditions
const total = result.hits.total;
const total = result.body.hits.total;
watcher.conditions.forEach((condition) => {
if (conditionFails(condition, total)) {

View File

@@ -13,11 +13,11 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import {Client} from '@elastic/elasticsearch';
import {SCThingType} from '@openstapps/core';
import {getProjectReflection} from '@openstapps/core-tools/lib/common';
import {generateTemplate} from '@openstapps/core-tools/lib/mapping';
import {Logger} from '@openstapps/logger';
import {Client} from 'elasticsearch';
import {existsSync, writeFileSync} from 'fs';
import {readFile} from 'fs-extra';
import {resolve} from 'path';