/* * Copyright (C) 2019 StApps * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ import {SCBulkRequest, SCThingType} from '@openstapps/core'; import {Logger} from '@openstapps/logger'; import moment from 'moment'; import NodeCache from 'node-cache'; import {v4} from 'uuid'; import {Database} from './database'; /** * Possible operations with a bulk */ export type BulkOperation = 'create' | 'expired' | 'update'; /** * Describes an indexing process */ export class Bulk implements SCBulkRequest { /** * Expiration of the bulk * * If the bulk is not finished before the expiration date is hit, the bulk * and all data associated with it will be deleted */ expiration: string; /** * The data source of the bulk * * Bulks with same type and source will be replaced and the data will be * updated when the bulk is marked as done */ source: string; /** * State of the bulk * * Data can be indexed for this bulk as long as the state is `in progress` * and the bulk is not expired * * When the bulk is marked as `done` it replaces the previous bulk with * the same source and type. The data will be availabe to the user when * the bulk switches to done */ state: 'in progress' | 'done'; /** * Type of data in the bulk */ type: SCThingType; /** * Unique identifier of the bulk */ uid: string; /** * Creates a new bulk process * * @param request Data needed for requesting a bulk */ constructor(request: SCBulkRequest) { this.uid = v4(); this.state = 'in progress'; this.expiration = typeof request.expiration === 'string' ? request.expiration : moment().add(1, 'hour').toISOString(); // when should this process be finished // where does the process come from this.source = request.source; // which type of data is this process about to index this.type = request.type; } } /** * Cache for bulk-processes */ export class BulkStorage { /** * Cache for temporary storage */ private readonly cache: NodeCache; /** * Creates a new BulkStorage * * @param database the database that is controlled by this bulk storage */ constructor(public database: Database) { // a bulk lives 60 minutes if no expiration is given // the cache is checked every 60 seconds this.cache = new NodeCache({stdTTL: 3600, checkperiod: 60}); this.cache.on('expired', async (_key, bulk: Bulk) => { // if the bulk is not done if (bulk.state !== 'done') { // the database can delete the data associated with this bulk await this.database.bulkExpired(bulk); } }); } /** * Saves a bulk process and assigns to it a user-defined ttl (time-to-live) * * @param bulk the bulk process to save * @returns the bulk process that was saved */ private save(bulk: Bulk): Bulk { const expirationInSeconds = moment(bulk.expiration).diff(moment.now()) / 1000; Logger.info('Bulk expires in ', expirationInSeconds, 'seconds'); // save the item in the cache with it's expected expiration this.cache.set(bulk.uid, bulk, expirationInSeconds); return bulk; } /** * Create and save a new bulk process * * @param bulkRequest a request for a new bulk process * @returns a promise that contains the new bulk process */ public async create(bulkRequest: SCBulkRequest): Promise { const bulk = new Bulk(bulkRequest); bulk.source = bulkRequest.source; bulk.type = bulkRequest.type; this.save(bulk); // tell the database that the bulk was created await this.database.bulkCreated(bulk); return bulk; } /** * Delete a bulk process * * @param uid uid of the bulk process * @returns a promise that contains the deleted bulk process */ public async delete(uid: string): Promise { const bulk = this.read(uid); if (typeof bulk === 'undefined') { throw new TypeError(`Bulk that should be deleted was not found. UID was "${uid}"`); } // delete the bulk process from the cache this.cache.del(uid); // tell the database to handle the expiration of the bulk await this.database.bulkExpired(bulk); return bulk; } /** * Update an old bulk process (replace it with the new one) * * @param bulk new bulk process * @returns an empty promise */ public async markAsDone(bulk: Bulk): Promise { bulk.state = 'done'; this.save(bulk); // tell the database that this is the new bulk await this.database.bulkUpdated(bulk); return; } /** * Read an existing bulk process * * @param uid uid of the bulk process * @returns a promise that contains a bulk */ public read(uid: string): Bulk | undefined { return this.cache.get(uid); } }