fix: error thrown on consecutive connector executions

Fixes #73
This commit is contained in:
Jovan Krunić
2020-11-10 15:38:42 +01:00
committed by Rainer Killinger
parent 18359fdab3
commit 2259da317a
3 changed files with 28 additions and 15 deletions

View File

@@ -485,14 +485,15 @@ export class Elasticsearch implements Database {
const item = await this.getObject(object.uid); const item = await this.getObject(object.uid);
// we have to check that the item will get replaced if the index is rolled over // check that the item will get replaced if the index is rolled over (index with the same name excluding ending uid)
if (typeof item !== 'undefined') { if (typeof item !== 'undefined') {
const indexOfNew = Elasticsearch.getIndex(obj.type, bulk.source, bulk); const indexOfNew = Elasticsearch.getIndex(obj.type, bulk.source, bulk);
const oldIndex = item._index; const oldIndex = item._index;
// new item doesn't replace the old one // new item doesn't replace the old one
if (oldIndex.substring(0, oldIndex.length - Elasticsearch.INDEX_UID_LENGTH + 1) // tslint:disable-next-line:no-magic-numbers
!== indexOfNew.substring(0, indexOfNew.length - Elasticsearch.INDEX_UID_LENGTH + 1)) { if (oldIndex.substring(0, oldIndex.lastIndexOf('_'))
!== indexOfNew.substring(0, indexOfNew.lastIndexOf('_'))) {
throw new Error( throw new Error(
// tslint:disable-next-line: no-magic-numbers // tslint:disable-next-line: no-magic-numbers
`Object "${obj.uid}" already exists. Object was: ${JSON.stringify(obj, null, 2)}`, `Object "${obj.uid}" already exists. Object was: ${JSON.stringify(obj, null, 2)}`,

View File

@@ -24,6 +24,8 @@ import {MailQueue} from '../src/notification/mail-queue';
import {Bulk, BulkStorage} from '../src/storage/bulk-storage'; import {Bulk, BulkStorage} from '../src/storage/bulk-storage';
import getPort from 'get-port'; import getPort from 'get-port';
import {Database} from '../src/storage/database'; import {Database} from '../src/storage/database';
import {Elasticsearch} from '../src/storage/elasticsearch/elasticsearch';
import {v4} from 'uuid';
/** /**
* Adds routers and configures an (express) app * Adds routers and configures an (express) app
@@ -141,4 +143,4 @@ export const getTransport = (verified: boolean) => {
} }
} }
export const index = 'stapps_footype_foosource_foobar'; export const getIndex = (uid?: string) => `stapps_footype_foosource_${uid ? uid : Elasticsearch.getIndexUID(v4())}`;

View File

@@ -32,7 +32,7 @@ import {Elasticsearch} from '../../../src/storage/elasticsearch/elasticsearch';
import * as Monitoring from '../../../src/storage/elasticsearch/monitoring'; import * as Monitoring from '../../../src/storage/elasticsearch/monitoring';
import * as query from '../../../src/storage/elasticsearch/query'; import * as query from '../../../src/storage/elasticsearch/query';
import * as templating from '../../../src/storage/elasticsearch/templating'; import * as templating from '../../../src/storage/elasticsearch/templating';
import {bulk, DEFAULT_TEST_TIMEOUT, getTransport, index} from '../../common'; import {bulk, DEFAULT_TEST_TIMEOUT, getTransport, getIndex} from '../../common';
import fs from 'fs'; import fs from 'fs';
use(chaiAsPromised); use(chaiAsPromised);
@@ -292,7 +292,7 @@ describe('Elasticsearch', function () {
}); });
it('should reject (throw an error) if the index name is not valid', async function () { it('should reject (throw an error) if the index name is not valid', async function () {
sandbox.stub(Elasticsearch, 'getIndex').returns(`invalid_${index}`); sandbox.stub(Elasticsearch, 'getIndex').returns(`invalid_${getIndex}`);
sandbox.createStubInstance(Client, {}); sandbox.createStubInstance(Client, {});
await es.init(); await es.init();
@@ -300,6 +300,7 @@ describe('Elasticsearch', function () {
}); });
it('should create a new index', async function () { it('should create a new index', async function () {
const index = getIndex();
sandbox.stub(Elasticsearch, 'getIndex').returns(index); sandbox.stub(Elasticsearch, 'getIndex').returns(index);
const putTemplateStub = sandbox.stub(templating, 'putTemplate'); const putTemplateStub = sandbox.stub(templating, 'putTemplate');
const createStub = sandbox.stub(es.client.indices, 'create'); const createStub = sandbox.stub(es.client.indices, 'create');
@@ -318,7 +319,7 @@ describe('Elasticsearch', function () {
sandbox.restore(); sandbox.restore();
}); });
it('should cleanup index in case of the expired bulk for bulk whose index is not in use', async function () { it('should cleanup index in case of the expired bulk for bulk whose index is not in use', async function () {
sandbox.stub(Elasticsearch, 'getIndex').returns(index); sandbox.stub(Elasticsearch, 'getIndex').returns(getIndex());
const clientDeleteStub = sandbox.stub(es.client.indices, 'delete'); const clientDeleteStub = sandbox.stub(es.client.indices, 'delete');
await es.bulkExpired({...bulk, state: 'in progress'}); await es.bulkExpired({...bulk, state: 'in progress'});
@@ -327,7 +328,7 @@ describe('Elasticsearch', function () {
}); });
it('should not cleanup index in case of the expired bulk for bulk whose index is in use', async function () { it('should not cleanup index in case of the expired bulk for bulk whose index is in use', async function () {
sandbox.stub(Elasticsearch, 'getIndex').returns(index); sandbox.stub(Elasticsearch, 'getIndex').returns(getIndex());
const clientDeleteStub = sandbox.stub(es.client.indices, 'delete'); const clientDeleteStub = sandbox.stub(es.client.indices, 'delete');
await es.bulkExpired({...bulk, state: 'done'}); await es.bulkExpired({...bulk, state: 'done'});
@@ -342,7 +343,7 @@ describe('Elasticsearch', function () {
}); });
it('should reject if the index name is not valid', async function () { it('should reject if the index name is not valid', async function () {
sandbox.stub(Elasticsearch, 'getIndex').returns(`invalid_${index}`); sandbox.stub(Elasticsearch, 'getIndex').returns(`invalid_${getIndex()}`);
sandbox.createStubInstance(Client, {}); sandbox.createStubInstance(Client, {});
await es.init(); await es.init();
@@ -350,6 +351,7 @@ describe('Elasticsearch', function () {
}); });
it('should create a new index', async function () { it('should create a new index', async function () {
const index = getIndex();
const expectedRefreshActions = [ const expectedRefreshActions = [
{ {
add: {index: index, alias: SCThingType.Book}, add: {index: index, alias: SCThingType.Book},
@@ -423,6 +425,7 @@ describe('Elasticsearch', function () {
}); });
it('should not post if the object already exists in an index which will not be rolled over', async function () { it('should not post if the object already exists in an index which will not be rolled over', async function () {
const index = getIndex();
const oldIndex = index.replace('foosource', 'barsource'); const oldIndex = index.replace('foosource', 'barsource');
const object: ElasticsearchObject<SCMessage> = {_id: '', _index: oldIndex, _score: 0, _type: '', _source: message as SCMessage}; const object: ElasticsearchObject<SCMessage> = {_id: '', _index: oldIndex, _score: 0, _type: '', _source: message as SCMessage};
sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: [object]}}}); sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: [object]}}});
@@ -431,6 +434,15 @@ describe('Elasticsearch', function () {
return expect(es.post(object._source, bulk)).to.rejectedWith('exist'); return expect(es.post(object._source, bulk)).to.rejectedWith('exist');
}); });
it('should not reject if the object already exists but in an index which will be rolled over', async function () {
const object: ElasticsearchObject<SCMessage> = {_id: '', _index: getIndex(), _score: 0, _type: '', _source: message as SCMessage};
sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: [object]}}});
// return index name with different generated UID (see getIndex method)
sandbox.stub(Elasticsearch, 'getIndex').returns(getIndex());
return expect(es.post(object._source, bulk)).to.not.rejectedWith('exist');
});
it('should reject if there is an object creation error on the elasticsearch side', async function () { it('should reject if there is an object creation error on the elasticsearch side', async function () {
sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: []}}}); sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: []}}});
sandbox.stub(es.client, 'create').resolves({body: {created: false}}); sandbox.stub(es.client, 'create').resolves({body: {created: false}});
@@ -465,8 +477,7 @@ describe('Elasticsearch', function () {
sandbox.restore(); sandbox.restore();
}); });
it('should reject to put if the object does not already exist', async function () { it('should reject to put if the object does not already exist', async function () {
const oldIndex = index.replace('foosource', 'barsource'); const object: ElasticsearchObject<SCMessage> = {_id: '', _index: getIndex(), _score: 0, _type: '', _source: message as SCMessage};
const object: ElasticsearchObject<SCMessage> = {_id: '', _index: oldIndex, _score: 0, _type: '', _source: message as SCMessage};
sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: []}}}); sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: []}}});
return expect(es.put(object._source)).to.rejectedWith('exist'); return expect(es.put(object._source)).to.rejectedWith('exist');
@@ -474,8 +485,7 @@ describe('Elasticsearch', function () {
it('should update the object if it already exists', async function () { it('should update the object if it already exists', async function () {
let caughtParam: any; let caughtParam: any;
const oldIndex = index.replace('foosource', 'barsource'); const object: ElasticsearchObject<SCMessage> = {_id: '', _index: getIndex(), _score: 0, _type: '', _source: message as SCMessage};
const object: ElasticsearchObject<SCMessage> = {_id: '', _index: oldIndex, _score: 0, _type: '', _source: message as SCMessage};
sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: [object]}}}); sandbox.stub(es.client, 'search').resolves({body:{hits: { hits: [object]}}});
// @ts-ignore // @ts-ignore
const stubUpdate = sandbox.stub(es.client, 'update').callsFake((params) => { const stubUpdate = sandbox.stub(es.client, 'update').callsFake((params) => {
@@ -492,8 +502,8 @@ describe('Elasticsearch', function () {
describe('search', async function () { describe('search', async function () {
let es: Elasticsearch; let es: Elasticsearch;
const sandbox = sinon.createSandbox(); const sandbox = sinon.createSandbox();
const objectMessage: ElasticsearchObject<SCMessage> = {_id: '123', _index: index, _score: 0, _type: '', _source: message as SCMessage}; const objectMessage: ElasticsearchObject<SCMessage> = {_id: '123', _index: getIndex(), _score: 0, _type: '', _source: message as SCMessage};
const objectBook: ElasticsearchObject<SCBook> = {_id: '321', _index: index, _score: 0, _type: '', _source: book as SCBook}; const objectBook: ElasticsearchObject<SCBook> = {_id: '321', _index: getIndex(), _score: 0, _type: '', _source: book as SCBook};
const fakeEsAggregations = { const fakeEsAggregations = {
'@all': { '@all': {
doc_count: 17, doc_count: 17,