diff --git a/.gitignore b/.gitignore index b6de5e8..3d91656 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,5 @@ lerna-debug.log* # env .env + +/file-storage diff --git a/package.json b/package.json index e2a380e..c194899 100644 --- a/package.json +++ b/package.json @@ -26,12 +26,15 @@ "@nestjs/config": "^2.2.0", "@nestjs/core": "^9.0.0", "@nestjs/platform-express": "^9.0.0", + "ellipsize": "^0.5.1", "eslint-import-resolver-typescript": "^3.2.5", "joi": "^17.6.0", + "lodash": "^4.17.21", "markdown-escape": "^1.1.0", "nestjs-telegraf": "^2.5.0", "reflect-metadata": "^0.1.13", "rimraf": "^3.0.2", + "rss-parser": "^3.12.0", "rxjs": "^7.2.0", "telegraf": "^4.8.5" }, @@ -43,13 +46,15 @@ "@nestjs/testing": "^9.0.0", "@types/express": "^4.17.13", "@types/jest": "28.1.4", + "@types/lodash": "^4.14.182", "@types/node": "^16.0.0", "@types/supertest": "^2.0.11", - "eslint-import-resolver-typescript": "^3.2.5", + "@types/ellipsize": "^0.1.1", "@typescript-eslint/eslint-plugin": "^5.0.0", "@typescript-eslint/parser": "^5.0.0", "eslint": "^8.0.1", "eslint-config-prettier": "^8.3.0", + "eslint-import-resolver-typescript": "^3.2.5", "eslint-plugin-import": "^2.26.0", "eslint-plugin-prettier": "^4.0.0", "husky": "^8.0.0", diff --git a/src/app.module.ts b/src/app.module.ts index c6a09f3..a5848ab 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -3,10 +3,9 @@ import { Module } from '@nestjs/common'; import { BotModule } from './bot/bot.module'; import { ChannelModule } from './channel/channel.module'; import { CoreModule } from './core/core.module'; -import { RssModule } from './rss/rss.module'; @Module({ - imports: [CoreModule, ChannelModule, BotModule, RssModule], + imports: [CoreModule, ChannelModule, BotModule], controllers: [], providers: [], }) diff --git a/src/bot/bot.update.ts b/src/bot/bot.update.ts index 9072f99..7e25209 100644 --- a/src/bot/bot.update.ts +++ b/src/bot/bot.update.ts @@ -1,11 +1,9 @@ -import { Command, Ctx, Start, Update } from 'nestjs-telegraf'; +import { Ctx, Start, Update } from 'nestjs-telegraf'; import { Context } from 'telegraf'; -import { RssManagerService } from './../rss/rss-manager.service'; - @Update() export class BotUpdate { - constructor(private rssManager: RssManagerService) {} + // constructor() {} @Start() async start(@Ctx() ctx: Context) { @@ -17,79 +15,79 @@ export class BotUpdate { ); } - @Command('add') - async add(@Ctx() ctx: Context) { - if (!('text' in ctx.message)) { - await ctx.reply('Cannot find text in ctx'); - return; - } - - const [name, href, ...rest] = ctx.message.text.split(' ').slice(1); - - if (!name || !href || rest.length) { - await ctx.reply('Incorrect command'); - return; - } - - const rssList = this.rssManager.list(); - - if (Object.keys(rssList).includes(name)) { - await ctx.reply('Name already exists. Choose another one'); - return; - } - - if (Object.values(rssList).includes(href)) { - await ctx.reply('This rss link already exists.'); - return; - } - - try { - await this.rssManager.add(name, href); - await ctx.reply(`Added ${name} - ${href}`); - } catch (error) { - await ctx.reply("Error occured, can't add rss link"); - } - } - - @Command('remove') - async remove(@Ctx() ctx: Context) { - if (!('text' in ctx.message)) { - await ctx.reply('Cannot find text in ctx'); - return; - } - - const [name, ...rest] = ctx.message.text.split(' ').slice(1); - - if (!name || rest.length) { - await ctx.reply('Incorrect command'); - return; - } - - const rssList = this.rssManager.list(); - - if (!Object.keys(rssList).includes(name)) { - await ctx.reply("Rss link with such name doesn't exist"); - return; - } - - this.rssManager.remove(name); - await ctx.reply(`Removed ${name}`); - } - - @Command('list') - async list(@Ctx() ctx: Context) { - const rssList = this.rssManager.list(); - - if (!Object.keys(rssList).length) { - await ctx.reply('Empty list'); - return; - } - - let formattedMessage = 'List:\n'; - for (const name in rssList) { - formattedMessage += `${name}: ${rssList[name]}\n`; - } - - await ctx.reply(formattedMessage); - } + // @Command('add') + // async add(@Ctx() ctx: Context) { + // if (!('text' in ctx.message)) { + // await ctx.reply('Cannot find text in ctx'); + // return; + // } + + // const [name, href, ...rest] = ctx.message.text.split(' ').slice(1); + + // if (!name || !href || rest.length) { + // await ctx.reply('Incorrect command'); + // return; + // } + + // const rssList = this.rssManager.list(); + + // if (Object.keys(rssList).includes(name)) { + // await ctx.reply('Name already exists. Choose another one'); + // return; + // } + + // if (Object.values(rssList).includes(href)) { + // await ctx.reply('This rss link already exists.'); + // return; + // } + + // try { + // await this.rssManager.add(name, href); + // await ctx.reply(`Added ${name} - ${href}`); + // } catch (error) { + // await ctx.reply("Error occured, can't add rss link"); + // } + // } + + // @Command('remove') + // async remove(@Ctx() ctx: Context) { + // if (!('text' in ctx.message)) { + // await ctx.reply('Cannot find text in ctx'); + // return; + // } + + // const [name, ...rest] = ctx.message.text.split(' ').slice(1); + + // if (!name || rest.length) { + // await ctx.reply('Incorrect command'); + // return; + // } + + // const rssList = this.rssManager.list(); + + // if (!Object.keys(rssList).includes(name)) { + // await ctx.reply("Rss link with such name doesn't exist"); + // return; + // } + + // this.rssManager.remove(name); + // await ctx.reply(`Removed ${name}`); + // } + + // @Command('list') + // async list(@Ctx() ctx: Context) { + // const rssList = this.rssManager.list(); + + // if (!Object.keys(rssList).length) { + // await ctx.reply('Empty list'); + // return; + // } + + // let formattedMessage = 'List:\n'; + // for (const name in rssList) { + // formattedMessage += `${name}: ${rssList[name]}\n`; + // } + + // await ctx.reply(formattedMessage); + // } } diff --git a/src/channel/channel.module.ts b/src/channel/channel.module.ts index 85e12af..2258d6d 100644 --- a/src/channel/channel.module.ts +++ b/src/channel/channel.module.ts @@ -1,8 +1,10 @@ import { Module } from '@nestjs/common'; +import { RssModule } from 'src/rss/rss.module'; import { ChannelService } from './channel.service'; @Module({ + imports: [RssModule], providers: [ChannelService], exports: [ChannelService], }) diff --git a/src/channel/channel.service.ts b/src/channel/channel.service.ts index 5be314c..220da64 100644 --- a/src/channel/channel.service.ts +++ b/src/channel/channel.service.ts @@ -1,19 +1,40 @@ import { Inject, Injectable } from '@nestjs/common'; import { ConfigType } from '@nestjs/config'; -import mdEscape from 'markdown-escape'; +import ellipsize from 'ellipsize'; import { InjectBot } from 'nestjs-telegraf'; +import { concatMap, ignoreElements, startWith, timer } from 'rxjs'; import { Context, Telegraf } from 'telegraf'; import telegramConfig from 'src/config/telegram.config'; +import { RssService } from './../rss/rss.service'; import { PostMessage } from './models/post-message.model'; @Injectable() export class ChannelService { + private readonly MESSAGE_DELAY = 3500; + constructor( @InjectBot() private bot: Telegraf, @Inject(telegramConfig.KEY) private tgConfig: ConfigType, - ) {} + private rssService: RssService, + ) { + this.rssService.posts$ + .pipe( + concatMap((value) => + timer(this.MESSAGE_DELAY).pipe(ignoreElements(), startWith(value)), + ), + ) + .subscribe((post) => { + this.postMessage({ + title: post.title, + text: ellipsize(post.contentSnippet, 500), + author: post.creator, + date: new Date(post.pubDate), + href: post.link, + }); + }); + } async postMessage(post: PostMessage) { await this.bot.telegram.sendMessage( @@ -25,9 +46,9 @@ export class ChannelService { private formatMessage({ title, text, author, date, href }: PostMessage) { return ( - `*${mdEscape(title)}*\n` + - `${mdEscape(text)}\n` + - `_${mdEscape(author)} - ${mdEscape(date.toUTCString())}_\n\n` + + `*${title}*\n` + + `${text}\n` + + `_${author} - ${date.toUTCString()}_\n\n` + `${href}` ); } diff --git a/src/core/core.module.ts b/src/core/core.module.ts index 21bd62b..cccee70 100644 --- a/src/core/core.module.ts +++ b/src/core/core.module.ts @@ -1,4 +1,4 @@ -import { Module } from '@nestjs/common'; +import { Global, Module } from '@nestjs/common'; import { ConfigModule, ConfigType } from '@nestjs/config'; import * as Joi from 'joi'; import { TelegrafModule } from 'nestjs-telegraf'; @@ -6,7 +6,9 @@ import { TelegrafModule } from 'nestjs-telegraf'; import telegramConfig, { telegramValidationSchema, } from 'src/config/telegram.config'; +import { FileStorageService } from './file-storage.service'; +@Global() @Module({ imports: [ ConfigModule.forRoot({ @@ -23,5 +25,7 @@ import telegramConfig, { }), }), ], + providers: [FileStorageService], + exports: [FileStorageService], }) export class CoreModule {} diff --git a/src/core/file-storage.service.ts b/src/core/file-storage.service.ts new file mode 100644 index 0000000..5da9907 --- /dev/null +++ b/src/core/file-storage.service.ts @@ -0,0 +1,38 @@ +import fs from 'fs'; +import path from 'path'; + +import { Injectable } from '@nestjs/common'; + +@Injectable() +export class FileStorageService { + private readonly STORAGE_PATH = './file-storage'; + + constructor() { + if (!fs.existsSync(this.STORAGE_PATH)) { + fs.mkdirSync(this.STORAGE_PATH); + } + } + + async get(fileName: string): Promise { + const pathname = path.join(this.STORAGE_PATH, `${fileName}.json`); + + if (!fs.existsSync(pathname)) { + return null; + } + + const fileData = await fs.promises.readFile(pathname, { encoding: 'utf8' }); + + return JSON.parse(fileData) as T; + } + + async save(fileName: string, data: T) { + const pathname = path.join(this.STORAGE_PATH, `${fileName}.json`); + const json = JSON.stringify(data); + + try { + await fs.promises.writeFile(pathname, json); + } catch { + console.error(`File with "${pathname}" name wasn't saved`); + } + } +} diff --git a/src/rss/rss-manager.service.ts b/src/rss/rss-manager.service.ts deleted file mode 100644 index ed3494c..0000000 --- a/src/rss/rss-manager.service.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -@Injectable() -export class RssManagerService { - private rssList: Record = {}; - - async add(name: string, href: string) { - this.rssList[name] = href; - } - - remove(name: string) { - delete this.rssList[name]; - } - - list() { - return this.rssList; - } -} diff --git a/src/rss/rss.module.ts b/src/rss/rss.module.ts index adb0e7f..1e2ce8e 100644 --- a/src/rss/rss.module.ts +++ b/src/rss/rss.module.ts @@ -1,9 +1,9 @@ import { Module } from '@nestjs/common'; -import { RssManagerService } from './rss-manager.service'; +import { RssService } from './rss.service'; @Module({ - providers: [RssManagerService], - exports: [RssManagerService], + providers: [RssService], + exports: [RssService], }) export class RssModule {} diff --git a/src/rss/rss.service.ts b/src/rss/rss.service.ts new file mode 100644 index 0000000..475a761 --- /dev/null +++ b/src/rss/rss.service.ts @@ -0,0 +1,142 @@ +import { Injectable } from '@nestjs/common'; +import { filter, some } from 'lodash'; +import Parser, { Item } from 'rss-parser'; +import { Observable, Subject } from 'rxjs'; + +import { FileStorageService } from './../core/file-storage.service'; + +export const PARSE_INTERVAL = 60 * 1000; +export const RETRY_INTERVAL = 2 * 60 * 1000; + +export type ParseResult = { + [key: string]: any; +} & Parser.Output<{ + [key: string]: any; +}>; + +interface RssListItem { + name: string; + link: string; + failed: boolean; +} + +@Injectable() +export class RssService { + get posts$(): Observable { + return this.posts.asObservable(); + } + + private posts = new Subject(); + private rssFeedList: Array = []; + + constructor(private fileStorage: FileStorageService) { + this.subscribe( + 'Whitehouse', + 'https://www.whitehouse.gov/briefing-room/feed/', + ); + + this.subscribe('Kremlin', 'http://kremlin.ru/events/all/feed'); + + this.subscribe( + 'UN', + 'https://news.un.org/feed/subscribe/ru/news/region/europe/feed/rss.xml', + ); + + this.subscribe('Гос дума', 'http://duma.gov.ru/news/duma/feed/'); + + this.subscribe( + 'Презедент Украины', + 'https://www.president.gov.ua/ru/rss/news/all.rss', + ); + } + + subscribe(name: string, link: string) { + if (some(this.rssFeedList, { name })) { + throw new Error(`Rss with "${name}" name already exists`); + } + + if (some(this.rssFeedList, { link })) { + throw new Error(`Rss with "${name}" link already exists`); + } + + const rssFeed = { name, link, failed: false }; + + this.rssFeedList.push(rssFeed); + this.startParsingWithInterval(rssFeed); + + console.log(`Subscribed to ${name}: ${link}`); + } + + unsubscribe(name: string) { + this.rssFeedList = filter(this.rssFeedList, { name }); + } + + getSubscriptions() { + return this.rssFeedList; + } + + private startParsingWithInterval(rssFeed: RssListItem, retry = 1) { + if (retry > 3) return; + + if (!some(this.rssFeedList, rssFeed)) return; + + const { name, link } = rssFeed; + + setTimeout(async () => { + try { + const posts = await this.fetchNewPosts(name, link); + posts.forEach((post) => this.posts.next(post)); + this.startParsingWithInterval(rssFeed, 1); + } catch (error) { + rssFeed.failed = true; + console.error(error); + console.log(this.rssFeedList); + + setTimeout(() => { + this.startParsingWithInterval(rssFeed, retry + 1); + }, retry * RETRY_INTERVAL); + } + }, PARSE_INTERVAL); + } + + // TODO: Refactor any type to post model + private async fetchNewPosts(name: string, link: string): Promise> { + console.log(`Start fetching new rss: ${name}`); + + const parser = new Parser({ + timeout: PARSE_INTERVAL, + requestOptions: { + headers: { + Connection: 'keep-alive', + }, + }, + }); + const rss = await parser.parseURL(link); + const previousRss = await this.fileStorage.get(name); + + if (!previousRss) { + console.log(`Previous file with rss feed wasn't found: ${name}`); + await this.fileStorage.save(name, rss); + return rss.items; + } + + const newPosts = this.filterNewPosts(rss, previousRss); + await this.fileStorage.save(name, rss); + + console.log( + newPosts.length + ? `${newPosts.length} were found: ${name}` + : `New posts were not found: ${name}`, + ); + console.log(`End fetch new rss: ${name}`); + + return newPosts; + } + + private filterNewPosts(rss, previousRss) { + return rss.items.filter( + (newItem) => + !previousRss.items.find((item) => item.title === newItem.title), + ); + } +} diff --git a/yarn.lock b/yarn.lock index 09202c9..5321ef5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1029,6 +1029,11 @@ resolved "https://registry.npmjs.org/@types/cookiejar/-/cookiejar-2.1.2.tgz" integrity sha512-t73xJJrvdTjXrn4jLS9VSGRbz0nUY3cl2DMGDU48lKl+HR9dbbjW2A9r3g40VA++mQpy6uuHg33gy7du2BKpog== +"@types/ellipsize@^0.1.1": + version "0.1.1" + resolved "https://registry.yarnpkg.com/@types/ellipsize/-/ellipsize-0.1.1.tgz#e9ba8d19f46d97ae6e6d25a786cc397efde81ed9" + integrity sha512-MIOU8SHM4/q4Jjdy3FJS/ynSeBT4qb+aAn72ixOM1lkSDn+QXQKwrcgJFRJzq+AWRX/7I/LVRlo0b2DgNK6wLQ== + "@types/eslint-scope@^3.7.3": version "3.7.4" resolved "https://registry.npmjs.org/@types/eslint-scope/-/eslint-scope-3.7.4.tgz" @@ -1118,6 +1123,11 @@ resolved "https://registry.npmjs.org/@types/json5/-/json5-0.0.29.tgz" integrity sha512-dRLjCWHYg4oaA77cxO64oO+7JwCwnIzkZPdrrC71jQmQtlhM556pwKo5bUzqvZndkVbeFLIIi+9TC40JNF5hNQ== +"@types/lodash@^4.14.182": + version "4.14.182" + resolved "https://registry.yarnpkg.com/@types/lodash/-/lodash-4.14.182.tgz#05301a4d5e62963227eaafe0ce04dd77c54ea5c2" + integrity sha512-/THyiqyQAP9AfARo4pF+aCGcyiQ94tX/Is2I7HofNRqoYLgN1PBoOWu2/zTA5zMxzP5EFutMtWtGAFRKUe961Q== + "@types/mime@^1": version "1.3.2" resolved "https://registry.npmjs.org/@types/mime/-/mime-1.3.2.tgz" @@ -2344,6 +2354,11 @@ electron-to-chromium@^1.4.172: resolved "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.4.186.tgz" integrity sha512-YoVeFrGd/7ROjz4R9uPoND1K/hSRC/xADy9639ZmIZeJSaBnKdYx3I6LMPsY7CXLpK7JFgKQVzeZ/dk2br6Eaw== +ellipsize@^0.5.1: + version "0.5.1" + resolved "https://registry.yarnpkg.com/ellipsize/-/ellipsize-0.5.1.tgz#2ac8bba69b2190d629cc5065a9c2bf28d99f237e" + integrity sha512-0jEAyuIRU6U8MN0S5yUqIrkK/AQWkChh642N3zQuGV57s9bsUWYLc0jJOoDIUkZ2sbEL3ySq8xfq71BvG4q3hw== + emittery@^0.10.2: version "0.10.2" resolved "https://registry.npmjs.org/emittery/-/emittery-0.10.2.tgz" @@ -2379,6 +2394,11 @@ enhanced-resolve@^5.0.0, enhanced-resolve@^5.10.0, enhanced-resolve@^5.7.0, enha graceful-fs "^4.2.4" tapable "^2.2.0" +entities@^2.0.3: + version "2.2.0" + resolved "https://registry.yarnpkg.com/entities/-/entities-2.2.0.tgz#098dc90ebb83d8dffa089d55256b351d34c4da55" + integrity sha512-p92if5Nz619I0w+akJrLZH0MX0Pb5DX39XOwQTtXSdQQOaYH03S1uIQp4mhOZtAXrxq4ViO67YTiLBo2638o9A== + error-ex@^1.3.1: version "1.3.2" resolved "https://registry.npmjs.org/error-ex/-/error-ex-1.3.2.tgz" @@ -5008,6 +5028,14 @@ rimraf@3.0.2, rimraf@^3.0.0, rimraf@^3.0.2: dependencies: glob "^7.1.3" +rss-parser@^3.12.0: + version "3.12.0" + resolved "https://registry.yarnpkg.com/rss-parser/-/rss-parser-3.12.0.tgz#b8888699ea46304a74363fbd8144671b2997984c" + integrity sha512-aqD3E8iavcCdkhVxNDIdg1nkBI17jgqF+9OqPS1orwNaOgySdpvq6B+DoONLhzjzwV8mWg37sb60e4bmLK117A== + dependencies: + entities "^2.0.3" + xml2js "^0.4.19" + run-async@^2.4.0: version "2.4.1" resolved "https://registry.npmjs.org/run-async/-/run-async-2.4.1.tgz" @@ -5061,6 +5089,11 @@ sandwich-stream@^2.0.2: resolved "https://registry.yarnpkg.com/sandwich-stream/-/sandwich-stream-2.0.2.tgz#6d1feb6cf7e9fe9fadb41513459a72c2e84000fa" integrity sha512-jLYV0DORrzY3xaz/S9ydJL6Iz7essZeAfnAavsJ+zsJGZ1MOnsS52yRjU3uF3pJa/lla7+wisp//fxOwOH8SKQ== +sax@>=0.6.0: + version "1.2.4" + resolved "https://registry.yarnpkg.com/sax/-/sax-1.2.4.tgz#2816234e2378bddc4e5354fab5caa895df7100d9" + integrity sha512-NqVDv9TpANUjFm0N8uM5GxL36UgKi9/atZw+x7YFnQ8ckwFGKrl4xX4yWtrey3UJm5nP1kUbnYgLopqWNSRhWw== + schema-utils@^3.1.0, schema-utils@^3.1.1: version "3.1.1" resolved "https://registry.npmjs.org/schema-utils/-/schema-utils-3.1.1.tgz" @@ -5974,6 +6007,19 @@ write-file-atomic@^4.0.1: imurmurhash "^0.1.4" signal-exit "^3.0.7" +xml2js@^0.4.19: + version "0.4.23" + resolved "https://registry.yarnpkg.com/xml2js/-/xml2js-0.4.23.tgz#a0c69516752421eb2ac758ee4d4ccf58843eac66" + integrity sha512-ySPiMjM0+pLDftHgXY4By0uswI3SPKLDw/i3UXbnO8M/p28zqexCUoPmQFrYD+/1BzhGJSs2i1ERWKJAtiLrug== + dependencies: + sax ">=0.6.0" + xmlbuilder "~11.0.0" + +xmlbuilder@~11.0.0: + version "11.0.1" + resolved "https://registry.yarnpkg.com/xmlbuilder/-/xmlbuilder-11.0.1.tgz#be9bae1c8a046e76b31127726347d0ad7002beb3" + integrity sha512-fDlsI/kFEx7gLvbecc0/ohLG50fugQp8ryHzMTuW9vSa1GJ0XYWKnhsUx7oie3G98+r56aTQIUB4kht42R3JvA== + xtend@^4.0.0: version "4.0.2" resolved "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz"