mirror of
https://gitlab.com/openstapps/openstapps.git
synced 2026-01-22 01:22:54 +00:00
committed by
Rainer Killinger
parent
42c7350c36
commit
8b457c9911
203
src/storage/bulk-storage.ts
Normal file
203
src/storage/bulk-storage.ts
Normal file
@@ -0,0 +1,203 @@
|
||||
/*
|
||||
* Copyright (C) 2019 StApps
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
import {SCBulkRequest, SCThingType} from '@openstapps/core';
|
||||
import {Logger} from '@openstapps/logger';
|
||||
import * as moment from 'moment';
|
||||
import * as NodeCache from 'node-cache';
|
||||
import {promisify} from 'util';
|
||||
import {v4} from 'uuid';
|
||||
import {Database} from './database';
|
||||
|
||||
/**
|
||||
* Possible operations with a bulk
|
||||
*/
|
||||
export type BulkOperation = 'create' | 'expired' | 'update';
|
||||
|
||||
/**
|
||||
* Describes an indexing process
|
||||
*/
|
||||
export class Bulk implements SCBulkRequest {
|
||||
|
||||
/**
|
||||
* Expiration of the bulk
|
||||
*
|
||||
* If the bulk is not finished before the expiration date is hit, the bulk
|
||||
* and all data associated with it will be deleted
|
||||
*/
|
||||
expiration: string;
|
||||
|
||||
/**
|
||||
* The data source of the bulk
|
||||
*
|
||||
* Bulks with same type and source will be replaced and the data will be
|
||||
* updated when the bulk is marked as done
|
||||
*/
|
||||
source: string;
|
||||
|
||||
/**
|
||||
* State of the bulk
|
||||
*
|
||||
* Data can be indexed for this bulk as long as the state is `in progress`
|
||||
* and the bulk is not expired
|
||||
*
|
||||
* When the bulk is marked as `done` it replaces the previous bulk with
|
||||
* the same source and type. The data will be availabe to the user when
|
||||
* the bulk switches to done
|
||||
*/
|
||||
state: 'in progress' | 'done';
|
||||
|
||||
/**
|
||||
* Type of data in the bulk
|
||||
*/
|
||||
type: SCThingType;
|
||||
|
||||
/**
|
||||
* Unique identifier of the bulk
|
||||
*/
|
||||
uid: string;
|
||||
|
||||
/**
|
||||
* Creates a new bulk process
|
||||
* @param request Data needed for requesting a bulk
|
||||
*/
|
||||
constructor(request: SCBulkRequest) {
|
||||
this.uid = v4();
|
||||
this.state = 'in progress';
|
||||
|
||||
if (typeof request.expiration === 'string') {
|
||||
this.expiration = request.expiration;
|
||||
} else {
|
||||
this.expiration = moment()
|
||||
.add(1, 'hour')
|
||||
.toISOString();
|
||||
}
|
||||
// when should this process be finished
|
||||
// where does the process come from
|
||||
this.source = request.source;
|
||||
// which type of data is this process about to index
|
||||
this.type = request.type;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cache for bulk-processes
|
||||
*/
|
||||
export class BulkStorage {
|
||||
/**
|
||||
* Cache for temporary storage
|
||||
*/
|
||||
private readonly cache: NodeCache;
|
||||
|
||||
/**
|
||||
* Creates a new BulkStorage
|
||||
* @param database the database that is controlled by this bulk storage
|
||||
*/
|
||||
constructor(public database: Database) {
|
||||
|
||||
// a bulk lives 60 minutes if no expiration is given
|
||||
// the cache is checked every 60 seconds
|
||||
this.cache = new NodeCache({stdTTL: 3600, checkperiod: 60});
|
||||
|
||||
this.cache.on('expired', async (_key, bulk: Bulk) => {
|
||||
// if the bulk is not done
|
||||
if (bulk.state !== 'done') {
|
||||
// the database can delete the data associated with this bulk
|
||||
await this.database.bulkExpired(bulk);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Saves a bulk process and assigns to it a user-defined ttl (time-to-live)
|
||||
* @param bulk the bulk process to save
|
||||
* @returns the bulk process that was saved
|
||||
*/
|
||||
private async save(bulk: Bulk): Promise<Bulk> {
|
||||
const expirationInSeconds = moment(bulk.expiration)
|
||||
// tslint:disable-next-line: no-magic-numbers
|
||||
.diff(moment.now()) / 1000;
|
||||
Logger.info('Bulk expires in ', expirationInSeconds, 'seconds');
|
||||
|
||||
// save the item in the cache with it's expected expiration
|
||||
await promisify<string, Bulk, number>(this.cache.set)(bulk.uid, bulk, expirationInSeconds);
|
||||
|
||||
return bulk;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and save a new bulk process
|
||||
* @param bulkRequest a request for a new bulk process
|
||||
* @returns a promise that contains the new bulk process
|
||||
*/
|
||||
public async create(bulkRequest: SCBulkRequest): Promise<Bulk> {
|
||||
const bulk = new Bulk(bulkRequest);
|
||||
bulk.source = bulkRequest.source;
|
||||
bulk.type = bulkRequest.type;
|
||||
|
||||
await this.save(bulk);
|
||||
|
||||
// tell the database that the bulk was created
|
||||
await this.database.bulkCreated(bulk);
|
||||
|
||||
return bulk;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a bulk process
|
||||
* @param uid uid of the bulk process
|
||||
* @returns a promise that contains the deleted bulk process
|
||||
*/
|
||||
public async delete(uid: string): Promise<Bulk> {
|
||||
const bulk = await this.read(uid);
|
||||
|
||||
if (typeof bulk === 'undefined') {
|
||||
throw new Error(`Bulk that should be deleted was not found. UID was "${uid}"`);
|
||||
}
|
||||
|
||||
// delete the bulk process from the cache
|
||||
await promisify<string>(this.cache.del)(uid);
|
||||
|
||||
// tell the database to handle the expiration of the bulk
|
||||
await this.database.bulkExpired(bulk);
|
||||
|
||||
return bulk;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update an old bulk process (replace it with the new one)
|
||||
* @param bulk new bulk process
|
||||
* @returns an empty promise
|
||||
*/
|
||||
public async markAsDone(bulk: Bulk): Promise<void> {
|
||||
bulk.state = 'done';
|
||||
await this.save(bulk);
|
||||
|
||||
// tell the database that this is the new bulk
|
||||
await this.database.bulkUpdated(bulk);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read an existing bulk process
|
||||
* @param uid uid of the bulk process
|
||||
* @returns a promise that contains a bulk
|
||||
*/
|
||||
public async read(uid: string): Promise<Bulk | undefined> {
|
||||
return promisify<string, Bulk | undefined>(this.cache.get)(uid);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user