mirror of
https://gitlab.com/openstapps/openstapps.git
synced 2026-01-20 00:23:03 +00:00
feat: make backend work with automatically generated aggregations
This commit is contained in:
committed by
Rainer Killinger
parent
496e6c5bd0
commit
ba2c6f655c
@@ -13,56 +13,25 @@
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
import {SCBackendAggregationConfiguration, SCFacet} from '@openstapps/core';
|
||||
import {SCFacet, SCThingType} from '@openstapps/core';
|
||||
import {AggregationSchema} from '@openstapps/core-tools/lib/mappings/aggregation-definitions';
|
||||
import {readFileSync} from 'fs';
|
||||
import {
|
||||
AggregationResponse,
|
||||
AggregationSchema,
|
||||
ESNestedAggregation,
|
||||
isBucketAggregation,
|
||||
isESAggMatchAllFilter,
|
||||
isESNestedAggregation,
|
||||
isESTermsFilter, isNestedAggregation,
|
||||
isESTermsFilter,
|
||||
isNestedAggregation,
|
||||
} from './common';
|
||||
import {aggregationsPath} from './templating';
|
||||
|
||||
/**
|
||||
* Builds the aggregation
|
||||
* @returns a schema to tell elasticsearch which aggregations to collect
|
||||
*/
|
||||
export function buildAggregations(aggsConfig: SCBackendAggregationConfiguration[]): AggregationSchema {
|
||||
|
||||
const result: AggregationSchema = {};
|
||||
|
||||
for (const aggregation of aggsConfig) {
|
||||
if (typeof aggregation.onlyOnTypes !== 'undefined') {
|
||||
for (const type of aggregation.onlyOnTypes) {
|
||||
if (typeof result[type] === 'undefined') {
|
||||
result[type] = {
|
||||
aggs: {},
|
||||
filter: {
|
||||
type: {
|
||||
value: type,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
(result[type] as ESNestedAggregation).aggs[aggregation.fieldName] = {
|
||||
terms: {
|
||||
field: `${aggregation.fieldName}.keyword`,
|
||||
size: 1000,
|
||||
},
|
||||
};
|
||||
}
|
||||
} else {
|
||||
result[aggregation.fieldName] = {
|
||||
terms: {
|
||||
field: `${aggregation.fieldName}.keyword`,
|
||||
size: 1000,
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
export function buildAggregations(): AggregationSchema {
|
||||
return JSON.parse((readFileSync(aggregationsPath, 'utf8')).toString());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -103,7 +72,8 @@ export function parseAggregations(
|
||||
};
|
||||
}),
|
||||
field: fieldName,
|
||||
onlyOnType: type.filter.type.value,
|
||||
onlyOnType: isESAggMatchAllFilter(type.filter)
|
||||
? undefined : type.filter.type.value as SCThingType,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,6 +15,12 @@
|
||||
*/
|
||||
import {SCThingType} from '@openstapps/core';
|
||||
import {SCThing} from '@openstapps/core';
|
||||
import {
|
||||
ESAggMatchAllFilter,
|
||||
ESAggTypeFilter,
|
||||
ESNestedAggregation,
|
||||
ESTermsFilter,
|
||||
} from '@openstapps/core-tools/lib/mappings/aggregation-definitions';
|
||||
import {NameList} from 'elasticsearch';
|
||||
|
||||
/**
|
||||
@@ -88,14 +94,6 @@ export function isNestedAggregation(agg: BucketAggregation | NestedAggregation):
|
||||
return typeof (agg as BucketAggregation).buckets === 'undefined';
|
||||
}
|
||||
|
||||
/**
|
||||
* An elasticsearch bucket aggregation
|
||||
* @see https://www.elastic.co/guide/en/elasticsearch/reference/5.6/search-aggregations-bucket.html
|
||||
*/
|
||||
export interface AggregationSchema {
|
||||
[aggregationName: string]: ESTermsFilter | ESNestedAggregation;
|
||||
}
|
||||
|
||||
/**
|
||||
* A configuration for using the Dis Max Query
|
||||
*
|
||||
@@ -275,26 +273,6 @@ export interface ESTermFilter {
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* An elasticsearch terms filter
|
||||
*/
|
||||
export interface ESTermsFilter {
|
||||
/**
|
||||
* Terms filter definition
|
||||
*/
|
||||
terms: {
|
||||
/**
|
||||
* Field to apply filter to
|
||||
*/
|
||||
field: string;
|
||||
|
||||
/**
|
||||
* Number of results
|
||||
*/
|
||||
size?: number;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the parameter is of type ESTermsFilter
|
||||
* @param agg the value to check
|
||||
@@ -303,30 +281,6 @@ export function isESTermsFilter(agg: ESTermsFilter | ESNestedAggregation): agg i
|
||||
return typeof (agg as ESTermsFilter).terms !== 'undefined';
|
||||
}
|
||||
|
||||
/**
|
||||
* For nested aggregations
|
||||
*/
|
||||
export interface ESNestedAggregation {
|
||||
/**
|
||||
* Possible nested Aggregations
|
||||
*/
|
||||
aggs: AggregationSchema;
|
||||
/**
|
||||
* Possible filter for types
|
||||
*/
|
||||
filter: {
|
||||
/**
|
||||
* The type of the object to find
|
||||
*/
|
||||
type: {
|
||||
/**
|
||||
* The name of the type
|
||||
*/
|
||||
value: SCThingType;
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the parameter is of type ESTermsFilter
|
||||
* @param agg the value to check
|
||||
@@ -335,6 +289,15 @@ export function isESNestedAggregation(agg: ESTermsFilter | ESNestedAggregation):
|
||||
return typeof (agg as ESNestedAggregation).aggs !== 'undefined';
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the parameter is of type
|
||||
*
|
||||
* @param filter the filter to narrow the type of
|
||||
*/
|
||||
export function isESAggMatchAllFilter(filter: ESAggTypeFilter | ESAggMatchAllFilter): filter is ESAggMatchAllFilter {
|
||||
return filter.hasOwnProperty('match_all');
|
||||
}
|
||||
|
||||
/**
|
||||
* An elasticsearch type filter
|
||||
*/
|
||||
@@ -358,6 +321,7 @@ export interface ESGeoDistanceFilterArguments {
|
||||
* The radius of the circle centred on the specified location
|
||||
*/
|
||||
distance: string;
|
||||
|
||||
[fieldName: string]: {
|
||||
/**
|
||||
* Latitute
|
||||
@@ -412,9 +376,9 @@ export interface ESBooleanFilterArguments<T> {
|
||||
* An elasticsearch boolean filter
|
||||
*/
|
||||
export interface ESBooleanFilter<T> {
|
||||
/**
|
||||
* @see ESBooleanFilterArguments
|
||||
*/
|
||||
/**
|
||||
* @see ESBooleanFilterArguments
|
||||
*/
|
||||
bool: ESBooleanFilterArguments<T>;
|
||||
}
|
||||
|
||||
@@ -485,6 +449,7 @@ export interface ESGeoDistanceSortArguments {
|
||||
* Value unit
|
||||
*/
|
||||
unit: 'm';
|
||||
|
||||
[field: string]: {
|
||||
/**
|
||||
* Latitute
|
||||
@@ -512,9 +477,9 @@ export interface ESGeoDistanceSort {
|
||||
* An elasticsearch script sort
|
||||
*/
|
||||
export interface ScriptSort {
|
||||
/**
|
||||
* A script
|
||||
*/
|
||||
/**
|
||||
* A script
|
||||
*/
|
||||
_script: {
|
||||
/**
|
||||
* Order
|
||||
|
||||
@@ -23,6 +23,7 @@ import {
|
||||
SCThingType,
|
||||
SCUuid,
|
||||
} from '@openstapps/core';
|
||||
import {AggregationSchema} from '@openstapps/core-tools/lib/mappings/aggregation-definitions';
|
||||
import {Logger} from '@openstapps/logger';
|
||||
import * as ES from 'elasticsearch';
|
||||
import * as moment from 'moment';
|
||||
@@ -31,7 +32,6 @@ import {Bulk} from '../bulk-storage';
|
||||
import {Database} from '../database';
|
||||
import {buildAggregations, parseAggregations} from './aggregations';
|
||||
import {
|
||||
AggregationSchema,
|
||||
ElasticsearchConfig,
|
||||
ElasticsearchObject,
|
||||
ElasticsearchQueryDisMaxConfig,
|
||||
@@ -196,9 +196,17 @@ export class Elasticsearch implements Database {
|
||||
this.aliasMap = {};
|
||||
this.ready = false;
|
||||
|
||||
this.aggregationsSchema = buildAggregations(this.config.internal.aggregations);
|
||||
checkESTemplate(typeof process.env.ES_FORCE_MAPPING_UPDATE !== 'undefined' ?
|
||||
process.env.ES_FORCE_MAPPING_UPDATE === 'true' : false);
|
||||
|
||||
this.aggregationsSchema = buildAggregations();
|
||||
|
||||
this.mailQueue = mailQueue;
|
||||
|
||||
/*refreshAllTemplates(this.client)
|
||||
.then(() => {
|
||||
// noop
|
||||
});*/
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -333,7 +341,7 @@ export class Elasticsearch implements Database {
|
||||
}
|
||||
|
||||
// re-apply the index template before each new bulk operation
|
||||
await putTemplate(this.client);
|
||||
await putTemplate(this.client, bulk.type);
|
||||
await this.client.indices.create({
|
||||
index,
|
||||
});
|
||||
@@ -390,7 +398,7 @@ export class Elasticsearch implements Database {
|
||||
// create the new index if it does not exists
|
||||
if (!(await this.client.indices.exists({index}))) {
|
||||
// re-apply the index template before each new bulk operation
|
||||
await putTemplate(this.client);
|
||||
await putTemplate(this.client, bulk.type);
|
||||
await this.client.indices.create({
|
||||
index,
|
||||
});
|
||||
@@ -475,9 +483,6 @@ export class Elasticsearch implements Database {
|
||||
Monitoring.setUp(monitoringConfiguration, this.client, this.mailQueue);
|
||||
}
|
||||
|
||||
checkESTemplate(typeof process.env.ES_FORCE_MAPPING_UPDATE !== 'undefined' ?
|
||||
process.env.ES_FORCE_MAPPING_UPDATE === 'true' : false);
|
||||
|
||||
return this.getAliasMap();
|
||||
}
|
||||
|
||||
|
||||
@@ -246,6 +246,7 @@ function buildFunctionsForBoostingTypes(
|
||||
|
||||
return functions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds body for Elasticsearch requests
|
||||
* @param params Parameters for querying the backend
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
import {SCThingType} from '@openstapps/core';
|
||||
import {getProjectReflection} from '@openstapps/core-tools/lib/common';
|
||||
import {generateTemplate} from '@openstapps/core-tools/lib/mapping';
|
||||
import {Logger} from '@openstapps/logger';
|
||||
@@ -24,51 +25,95 @@ import sanitize = require('sanitize-filename');
|
||||
import {configFile, coreVersion} from '../../common';
|
||||
|
||||
const dirPath = resolve('src', 'storage', 'elasticsearch', 'templates');
|
||||
const templatePath = resolve(dirPath, sanitize(`template_${coreVersion}.json`, {replacement: '-'}));
|
||||
const errorPath = resolve(dirPath, sanitize(`failed_template_${coreVersion}.json`, {replacement: '-'}));
|
||||
const errorReportPath = resolve(dirPath, sanitize(`error_report_${coreVersion}.txt`, {replacement: '-'}));
|
||||
export const aggregationsPath = resolve(dirPath, sanitize(`${coreVersion}-aggregations.json`, {replacement: '-'}));
|
||||
const templateErrorPath = resolve(dirPath, sanitize(`${coreVersion}-template-[type].error.json`, {replacement: '-'}));
|
||||
const aggregationsErrorPath = resolve(dirPath, sanitize(`${coreVersion}-aggregations.error.json`, {replacement: '-'}));
|
||||
const errorReportPath = resolve(dirPath, sanitize(`${coreVersion}-error-report.txt`, {replacement: '-'}));
|
||||
|
||||
/**
|
||||
* Check if the correct template exists
|
||||
*/
|
||||
export function checkESTemplate(forceUpdate: boolean) {
|
||||
// as the forced mapping update is only meant for development, print a warning if it is enabled
|
||||
if (forceUpdate) {
|
||||
Logger.warn('CAUTION: Force update of the mapping files is enabled. This causes the backend to ignore' +
|
||||
' existing mapping files on start.');
|
||||
}
|
||||
if (!existsSync(templatePath) || forceUpdate) {
|
||||
// we don't exactly know which files are there, so we just check if the aggregations exist
|
||||
// for the current core version
|
||||
if (forceUpdate || !existsSync(aggregationsPath)) {
|
||||
Logger.info(`No mapping for Core version ${coreVersion} found, starting automatic mapping generation. ` +
|
||||
`This may take a while.`);
|
||||
const map = generateTemplate(getProjectReflection(resolve('node_modules', '@openstapps', 'core', 'src')),
|
||||
configFile.backend.mappingIgnoredTags, false);
|
||||
|
||||
if (map.errors.length > 0) {
|
||||
for (const type of Object.keys(map.mappings)) {
|
||||
writeFileSync(getTemplatePath(Object.keys(map.mappings[type].mappings)[0] as SCThingType, true),
|
||||
// tslint:disable-next-line:no-magic-numbers
|
||||
JSON.stringify(map.mappings[type], null, 2));
|
||||
}
|
||||
// tslint:disable-next-line:no-magic-numbers
|
||||
writeFileSync(errorPath, JSON.stringify(map.template, null, 2));
|
||||
writeFileSync(aggregationsErrorPath, JSON.stringify(map.aggregations, null, 2));
|
||||
|
||||
writeFileSync(errorReportPath, `ERROR REPORT FOR CORE VERSION ${coreVersion}\n${map.errors.join('\n')}`);
|
||||
|
||||
// tslint:disable-next-line:no-floating-promises
|
||||
Logger.error(`There were errors while generating the template, and the backend cannot continue. A list of all ` +
|
||||
`errors can be found at ${errorReportPath}. To resolve this` +
|
||||
` issue by hand you can go to "${errorPath}" and correct the issues manually, then move it to ${templatePath}.`);
|
||||
void Logger.error(`There were errors while generating the template, and the backend cannot continue. A list of ` +
|
||||
`all errors can be found at ${errorReportPath}. To resolve this` +
|
||||
` issue by hand you can go to "${templateErrorPath}" and "${aggregationsErrorPath}", then correct the issues` +
|
||||
` manually and move the files to the template paths and "${aggregationsPath}" respectively.`);
|
||||
process.exit(1);
|
||||
} else {
|
||||
Logger.ok('Mapping files were generated successfully.');
|
||||
writeFileSync(templatePath, JSON.stringify(map.template));
|
||||
for (const type of Object.keys(map.mappings)) {
|
||||
writeFileSync(getTemplatePath(Object.keys(map.mappings[type].mappings)[0] as SCThingType, false),
|
||||
// tslint:disable-next-line:no-magic-numbers
|
||||
JSON.stringify(map.mappings[type], null, 2));
|
||||
}
|
||||
writeFileSync(aggregationsPath, JSON.stringify(map.aggregations));
|
||||
}
|
||||
} else {
|
||||
Logger.info(`Using existing mapping at "${templatePath}"`);
|
||||
Logger.info(`Using existing mappings for core version ${coreVersion}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Puts a new global template
|
||||
* Generates the path to the template of an SCThingType
|
||||
*
|
||||
* @param type the type for the path
|
||||
* @param error whether an error occurred in the file
|
||||
*/
|
||||
function getTemplatePath(type: SCThingType, error = false): string {
|
||||
return resolve(dirPath, sanitize(`${coreVersion}-template-${type}${error ? '.error' : ''}.json`, {replacement: '-'}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Re-applies all interfaces for every type
|
||||
*
|
||||
* @param client An elasticsearch client to use
|
||||
*/
|
||||
export async function putTemplate(client: Client): Promise<void> {
|
||||
export async function refreshAllTemplates(client: Client) {
|
||||
for (const type of Object.values(SCThingType)) {
|
||||
await putTemplate(client, type as SCThingType);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepares all indices
|
||||
*
|
||||
* This includes applying the mapping, settings
|
||||
*
|
||||
* @param type the SCThingType of which the template should be set
|
||||
* @param client An elasticsearch client to use
|
||||
*/
|
||||
export async function putTemplate(client: Client, type: SCThingType) {
|
||||
let out = type.toLowerCase();
|
||||
while (out.includes(' ')) {
|
||||
out = out.replace(' ', '_');
|
||||
}
|
||||
|
||||
return client.indices.putTemplate({
|
||||
body: JSON.parse((await readFile(templatePath, 'utf8')).toString()),
|
||||
name: 'global',
|
||||
body: JSON.parse((await readFile(getTemplatePath(type), 'utf8')).toString()),
|
||||
name: `template_${out}`,
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user