Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Сдача проектной работы 9 спринта

## Задание 1. Повышение безопасности системы

1. [диаграмма](https://drive.google.com/file/d/1bP4TGxW8FGkh9_jxoARsXJKgMDPVWP05/view?usp=sharing, "диаграмма")

## Задача 2. Улучшите безопасность существующего приложения, заменив Code Grant на PKCE

Clickhouse слишком хлопотно, как и выгрузка за конкретные даты, нужно интерфейс писать. Реализовал OLAP на PostgreSQL

1. [диаграмма]("https://drive.google.com/file/d/1X3OlEfDEQUajJNN5xkmB_01UY0uVSaeD/view?usp=sharing", "диаграмма")
2. Код Airflow в папке проекта **airflow**
3. API в папке **backend**
4. user1 не имеет доступа, получит ошибку **403: Insufficient role**. prothetic1 имеет доступ и получит свой отчёт
5. [UI кнопка для отчёта](https://disk.yandex.ru/i/-kQhnAhK2RM9VA, "UI кнопка")

## Как запустить
1. UP docker-compose.yaml
2. Отклываем Airflow UI http://localhost:8081/home, убеждается, что пайплайны подняты: **init_data** и **olap_pipeline**
3. **init_data** срабатывает единожды
4. **olap_pipeline** срабатывает каждые 5 минут
5. http://localhost:3000/ логинимся под **prothetic[1..3]**, нажимаем кнопку **Download Report**
6. Выгружаются только от **prothetic[1..3]**
6 changes: 6 additions & 0 deletions airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
FROM apache/airflow:2.9.1-python3.9
USER root

COPY ./airflow/requirements.txt .
RUN pip3 install --upgrade pip
RUN pip3 install --no-cache-dir -r requirements.txt
Binary file added airflow/dags/__pycache__/dag_olap.cpython-39.pyc
Binary file not shown.
Binary file not shown.
178 changes: 178 additions & 0 deletions airflow/dags/dag_olap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
import logging
import pandas as pd

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

def extract_data():
try:
pg_hook = PostgresHook(postgres_conn_id='write_to_pg')

query = """
SELECT
c.user_id,
c.user_name,
c.email,
c.prosthesis_model,
c.prosthesis_serial,
c.installation_date,
t.usage_date,
COUNT(t.session_count) as total_sessions,
SUM(t.total_usage_minutes) as total_usage_minutes,
AVG(t.max_force_application) as avg_max_force,
AVG(t.avg_battery_level) as avg_battery_level,
COUNT(t.error_codes) as total_error_count,
MAX(t.last_active) as last_active
FROM crm_table c
LEFT JOIN telemetry_table t ON c.user_id = t.user_id
GROUP BY
c.user_id, c.user_name, c.email, c.prosthesis_model,
c.prosthesis_serial, c.installation_date, t.usage_date
"""

connection = pg_hook.get_conn()
cursor = connection.cursor()
cursor.execute(query)

columns = [desc[0] for desc in cursor.description]
results = cursor.fetchall()

cursor.close()
connection.close()

return pd.DataFrame(results, columns=columns)

except Exception as e:
logging.error(f"Error extracting data: {str(e)}")
raise

def transform_data(**kwargs):
ti = kwargs['ti']
df = ti.xcom_pull(task_ids='extract_data')

if df.empty:
logging.info("No data to transform")
return pd.DataFrame()

df['usage_date'] = pd.to_datetime(df['usage_date'])
df['installation_date'] = pd.to_datetime(df['installation_date'])
df['last_active'] = pd.to_datetime(df['last_active'])

df.fillna({
'total_sessions': 0,
'total_usage_minutes': 0,
'avg_max_force': 0,
'avg_battery_level': 0,
'total_error_count': 0
}, inplace=True)

return df

def load_to_olap(**kwargs):
ti = kwargs['ti']
df = ti.xcom_pull(task_ids='transform_data')

if df.empty:
logging.info("No data to load")
return

try:
pg_hook = PostgresHook(postgres_conn_id='write_to_pg')
connection = pg_hook.get_conn()
cursor = connection.cursor()

insert_sql = """
INSERT INTO olap_table (
user_id, user_name, email, prosthesis_model, prosthesis_serial,
installation_date, usage_date, total_sessions, total_usage_minutes,
avg_max_force, avg_battery_level, total_error_count, last_active
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""

records = []
for _, row in df.iterrows():
record = (
row['user_id'], row['user_name'], row['email'],
row['prosthesis_model'], row['prosthesis_serial'],
row['installation_date'], row['usage_date'],
row['total_sessions'], row['total_usage_minutes'],
row['avg_max_force'], row['avg_battery_level'],
row['total_error_count'], row['last_active']
)
records.append(record)

cursor.executemany(insert_sql, records)
connection.commit()

logging.info(f"Successfully loaded {len(records)} records to OLAP table")

cursor.close()
connection.close()

except Exception as e:
logging.error(f"Error loading data to OLAP: {str(e)}")
raise

with DAG('olap_pipeline',
default_args=default_args,
schedule_interval='*/5 * * * *',
tags=['olap', 'prosthesis', 'etl'],
catchup=False) as dag:

create_olap_table = PostgresOperator(
task_id='create_olap_table',
postgres_conn_id='write_to_pg',
sql="""
DROP TABLE IF EXISTS olap_table;
CREATE TABLE olap_table (
id SERIAL PRIMARY KEY,
user_id UUID NOT NULL,
user_name VARCHAR(100) NOT NULL,
email VARCHAR(150) NOT NULL,
prosthesis_model VARCHAR(100) NOT NULL,
prosthesis_serial VARCHAR(50) NOT NULL,
installation_date DATE NOT NULL,
usage_date DATE NOT NULL,
total_sessions INTEGER NOT NULL DEFAULT 0,
total_usage_minutes INTEGER NOT NULL DEFAULT 0,
avg_max_force FLOAT NOT NULL DEFAULT 0,
avg_battery_level FLOAT NOT NULL DEFAULT 0,
total_error_count INTEGER NOT NULL DEFAULT 0,
last_active TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
)

extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag,
)

transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=dag,
)

load_task = PythonOperator(
task_id='load_to_olap',
python_callable=load_to_olap,
dag=dag,
)

create_olap_table>>extract_task >> transform_task >> load_task
116 changes: 116 additions & 0 deletions airflow/dags/dag_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
import csv

default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 12, 1),
}

def generate_crm():
CSV_FILE_PATH = './dags/data/crm_sample.csv'
with open( CSV_FILE_PATH, 'r') as csvfile:
csvreader = csv.reader(csvfile, delimiter=";")

insert_queries = []
is_header = True
for row in csvreader:
if is_header:
is_header = False
continue
insert_query = f"INSERT INTO crm_table (id,user_id,user_name,email,prosthesis_model,prosthesis_serial,installation_date) VALUES ({row[0]}, '{row[1]}', '{row[2]}','{row[3]}','{row[4]}','{row[5]}','{row[6]}');"
insert_queries.append(insert_query)

with open('./dags/sql/insert_crm.sql', 'w') as f:
for query in insert_queries:
f.write(f"{query}\n")

def generate_telemetry():
CSV_FILE_PATH = './dags/data/telemetry_sample.csv'
with open( CSV_FILE_PATH, 'r') as csvfile:
csvreader = csv.reader(csvfile, delimiter=";")

insert_queries = []
is_header = True
for row in csvreader:
if is_header:
is_header = False
continue
insert_query = f"INSERT INTO telemetry_table (id,user_id,usage_date,session_count,total_usage_minutes,max_force_application,avg_battery_level,error_codes,last_active) VALUES ({row[0]}, '{row[1]}', '{row[2]}',{row[3]},{row[4]},{row[5]},{row[6]},'{row[7]}','{row[8]}');"
insert_queries.append(insert_query)

with open('./dags/sql/insert_telemetry.sql', 'w') as f:
for query in insert_queries:
f.write(f"{query}\n")

with DAG('init_data',
default_args=default_args,
schedule_interval='@once',
tags=['init', 'etl'],
catchup=False) as dag:

create_crm_table = PostgresOperator(
task_id='create_crm_table',
postgres_conn_id='write_to_pg',
sql="""
DROP TABLE IF EXISTS crm_table;
CREATE TABLE crm_table (
id SERIAL PRIMARY KEY,
user_id UUID NOT NULL,
user_name VARCHAR(100) NOT NULL,
email VARCHAR(150) NOT NULL,
prosthesis_model VARCHAR(100) NOT NULL,
prosthesis_serial VARCHAR(50) NOT NULL UNIQUE,
installation_date DATE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
)

create_telemetry_table = PostgresOperator(
task_id='create_telemetry_table',
postgres_conn_id='write_to_pg',
sql="""
DROP TABLE IF EXISTS telemetry_table;
CREATE TABLE telemetry_table (
id SERIAL PRIMARY KEY,
user_id UUID NOT NULL,
usage_date DATE NOT NULL,
session_count INTEGER NOT NULL DEFAULT 0,
total_usage_minutes INTEGER NOT NULL DEFAULT 0,
max_force_application FLOAT NOT NULL DEFAULT 0,
avg_battery_level FLOAT NOT NULL DEFAULT 0,
error_codes JSONB DEFAULT '[]'::jsonb,
last_active TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
)

generate_crm_queries = PythonOperator(
task_id='generate_crm_queries',
python_callable=generate_crm
)

generate_telemetry_queries = PythonOperator(
task_id='generate_telemetry_queries',
python_callable=generate_telemetry
)

run_insert_crm_queries = PostgresOperator(
task_id='run_insert_crm_queries',
postgres_conn_id='write_to_pg',
sql='sql/insert_crm.sql'
)

run_insert_telemetry_queries = PostgresOperator(
task_id='run_insert_telemetry_queries',
postgres_conn_id='write_to_pg',
sql='sql/insert_telemetry.sql'
)

create_crm_table>>create_telemetry_table>>generate_crm_queries>>generate_telemetry_queries>>run_insert_crm_queries>>run_insert_telemetry_queries
11 changes: 11 additions & 0 deletions airflow/dags/data/crm_sample.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
id;user_id;customer_name;customer_email;prosthesis_model;prosthesis_serial;installation_date
1;728fb50b-9774-4433-adb8-117133edf7ed;prothetic1;prothetic1@example.com;AlphaLimb Pro;SN-ALP2023-001;2023-05-15
2;0cda865e-05ea-40a1-8392-a3eef7e677bb;prothetic2;prothetic2@example.com;BioHand Plus;SN-BHP2023-045;2023-06-20
3;5e21faf7-f526-4c89-af6f-23562aed7e12;prothetic3;prothetic3@example.com;SmartGrip Ultra;SN-SGU2023-178;2023-07-10
4;728fb50b-9774-4433-adb8-117133edf7ed;prothetic1;prothetic1@example.com;AlphaLimb Pro v2;SN-ALP2024-002;2024-01-12
5;0cda865e-05ea-40a1-8392-a3eef7e677bb;prothetic2;prothetic2@example.com;BioHand Elite;SN-BHE2024-087;2024-02-18
6;5e21faf7-f526-4c89-af6f-23562aed7e12;prothetic3;prothetic3@example.com;SmartGrip Pro;SN-SGP2024-192;2024-03-22
7;728fb50b-9774-4433-adb8-117133edf7ed;prothetic1;prothetic1@example.com;AlphaLimb Pro;SN-ALP2023-003;2023-08-15
8;0cda865e-05ea-40a1-8392-a3eef7e677bb;prothetic2;prothetic2@example.com;BioHand Plus;SN-BHP2023-051;2023-09-30
9;5e21faf7-f526-4c89-af6f-23562aed7e12;prothetic3;prothetic3@example.com;SmartGrip Ultra;SN-SGU2023-181;2023-10-14
10;728fb50b-9774-4433-adb8-117133edf7ed;prothetic1;prothetic1@example.com;AlphaLimb Pro v2;SN-ALP2024-004;2024-04-01
11 changes: 11 additions & 0 deletions airflow/dags/data/telemetry_sample.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
id;user_id;usage_date;session_count;total_usage_minutes;max_force_application;avg_battery_level;error_codes;last_active
1;728fb50b-9774-4433-adb8-117133edf7ed;2024-12-01;8;320;12.5;87.2;"[""ERR-001"",""ERR-003""]";2024-12-01 18:45:32
2;0cda865e-05ea-40a1-8392-a3eef7e677bb;2024-12-01;12;480;15.8;92.1;"[""ERR-002""]";2024-12-01 19:30:15
3;5e21faf7-f526-4c89-af6f-23562aed7e12;2024-12-01;6;210;9.3;95.4;"[]";2024-12-01 17:20:48
4;728fb50b-9774-4433-adb8-117133edf7ed;2024-12-02;10;380;11.2;89.7;"[""ERR-001""]";2024-12-02 19:15:22
5;0cda865e-05ea-40a1-8392-a3eef7e677bb;2024-12-02;15;550;16.5;90.8;"[""ERR-002"",""ERR-004""]";2024-12-02 20:05:37
6;5e21faf7-f526-4c89-af6f-23562aed7e12;2024-12-02;7;260;10.1;93.2;"[]";2024-12-02 18:40:19
7;728fb50b-9774-4433-adb8-117133edf7ed;2024-12-03;9;340;13.1;88.5;"[""ERR-003""]";2024-12-03 18:30:45
8;0cda865e-05ea-40a1-8392-a3eef7e677bb;2024-12-03;11;420;14.9;91.3;"[""ERR-005""]";2024-12-03 19:45:28
9;5e21faf7-f526-4c89-af6f-23562aed7e12;2024-12-03;5;190;8.7;94.8;"[]";2024-12-03 16:55:12
10;728fb50b-9774-4433-adb8-117133edf7ed;2024-12-04;7;270;10.5;86.4;"[""ERR-001"",""ERR-006""]";2024-12-04 17:45:18
10 changes: 10 additions & 0 deletions airflow/dags/sql/insert_crm.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
INSERT INTO crm_table (id,user_id,user_name,email,prosthesis_model,prosthesis_serial,installation_date) VALUES (1, '728fb50b-9774-4433-adb8-117133edf7ed', 'prothetic1','prothetic1@example.com','AlphaLimb Pro','SN-ALP2023-001','2023-05-15');
INSERT INTO crm_table (id,user_id,user_name,email,prosthesis_model,prosthesis_serial,installation_date) VALUES (2, '0cda865e-05ea-40a1-8392-a3eef7e677bb', 'prothetic2','prothetic2@example.com','BioHand Plus','SN-BHP2023-045','2023-06-20');
INSERT INTO crm_table (id,user_id,user_name,email,prosthesis_model,prosthesis_serial,installation_date) VALUES (3, '5e21faf7-f526-4c89-af6f-23562aed7e12', 'prothetic3','prothetic3@example.com','SmartGrip Ultra','SN-SGU2023-178','2023-07-10');
INSERT INTO crm_table (id,user_id,user_name,email,prosthesis_model,prosthesis_serial,installation_date) VALUES (4, '728fb50b-9774-4433-adb8-117133edf7ed', 'prothetic1','prothetic1@example.com','AlphaLimb Pro v2','SN-ALP2024-002','2024-01-12');
INSERT INTO crm_table (id,user_id,user_name,email,prosthesis_model,prosthesis_serial,installation_date) VALUES (5, '0cda865e-05ea-40a1-8392-a3eef7e677bb', 'prothetic2','prothetic2@example.com','BioHand Elite','SN-BHE2024-087','2024-02-18');
INSERT INTO crm_table (id,user_id,user_name,email,prosthesis_model,prosthesis_serial,installation_date) VALUES (6, '5e21faf7-f526-4c89-af6f-23562aed7e12', 'prothetic3','prothetic3@example.com','SmartGrip Pro','SN-SGP2024-192','2024-03-22');
INSERT INTO crm_table (id,user_id,user_name,email,prosthesis_model,prosthesis_serial,installation_date) VALUES (7, '728fb50b-9774-4433-adb8-117133edf7ed', 'prothetic1','prothetic1@example.com','AlphaLimb Pro','SN-ALP2023-003','2023-08-15');
INSERT INTO crm_table (id,user_id,user_name,email,prosthesis_model,prosthesis_serial,installation_date) VALUES (8, '0cda865e-05ea-40a1-8392-a3eef7e677bb', 'prothetic2','prothetic2@example.com','BioHand Plus','SN-BHP2023-051','2023-09-30');
INSERT INTO crm_table (id,user_id,user_name,email,prosthesis_model,prosthesis_serial,installation_date) VALUES (9, '5e21faf7-f526-4c89-af6f-23562aed7e12', 'prothetic3','prothetic3@example.com','SmartGrip Ultra','SN-SGU2023-181','2023-10-14');
INSERT INTO crm_table (id,user_id,user_name,email,prosthesis_model,prosthesis_serial,installation_date) VALUES (10, '728fb50b-9774-4433-adb8-117133edf7ed', 'prothetic1','prothetic1@example.com','AlphaLimb Pro v2','SN-ALP2024-004','2024-04-01');
10 changes: 10 additions & 0 deletions airflow/dags/sql/insert_telemetry.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
INSERT INTO telemetry_table (id,user_id,usage_date,session_count,total_usage_minutes,max_force_application,avg_battery_level,error_codes,last_active) VALUES (1, '728fb50b-9774-4433-adb8-117133edf7ed', '2024-12-01',8,320,12.5,87.2,'["ERR-001","ERR-003"]','2024-12-01 18:45:32');
INSERT INTO telemetry_table (id,user_id,usage_date,session_count,total_usage_minutes,max_force_application,avg_battery_level,error_codes,last_active) VALUES (2, '0cda865e-05ea-40a1-8392-a3eef7e677bb', '2024-12-01',12,480,15.8,92.1,'["ERR-002"]','2024-12-01 19:30:15');
INSERT INTO telemetry_table (id,user_id,usage_date,session_count,total_usage_minutes,max_force_application,avg_battery_level,error_codes,last_active) VALUES (3, '5e21faf7-f526-4c89-af6f-23562aed7e12', '2024-12-01',6,210,9.3,95.4,'[]','2024-12-01 17:20:48');
INSERT INTO telemetry_table (id,user_id,usage_date,session_count,total_usage_minutes,max_force_application,avg_battery_level,error_codes,last_active) VALUES (4, '728fb50b-9774-4433-adb8-117133edf7ed', '2024-12-02',10,380,11.2,89.7,'["ERR-001"]','2024-12-02 19:15:22');
INSERT INTO telemetry_table (id,user_id,usage_date,session_count,total_usage_minutes,max_force_application,avg_battery_level,error_codes,last_active) VALUES (5, '0cda865e-05ea-40a1-8392-a3eef7e677bb', '2024-12-02',15,550,16.5,90.8,'["ERR-002","ERR-004"]','2024-12-02 20:05:37');
INSERT INTO telemetry_table (id,user_id,usage_date,session_count,total_usage_minutes,max_force_application,avg_battery_level,error_codes,last_active) VALUES (6, '5e21faf7-f526-4c89-af6f-23562aed7e12', '2024-12-02',7,260,10.1,93.2,'[]','2024-12-02 18:40:19');
INSERT INTO telemetry_table (id,user_id,usage_date,session_count,total_usage_minutes,max_force_application,avg_battery_level,error_codes,last_active) VALUES (7, '728fb50b-9774-4433-adb8-117133edf7ed', '2024-12-03',9,340,13.1,88.5,'["ERR-003"]','2024-12-03 18:30:45');
INSERT INTO telemetry_table (id,user_id,usage_date,session_count,total_usage_minutes,max_force_application,avg_battery_level,error_codes,last_active) VALUES (8, '0cda865e-05ea-40a1-8392-a3eef7e677bb', '2024-12-03',11,420,14.9,91.3,'["ERR-005"]','2024-12-03 19:45:28');
INSERT INTO telemetry_table (id,user_id,usage_date,session_count,total_usage_minutes,max_force_application,avg_battery_level,error_codes,last_active) VALUES (9, '5e21faf7-f526-4c89-af6f-23562aed7e12', '2024-12-03',5,190,8.7,94.8,'[]','2024-12-03 16:55:12');
INSERT INTO telemetry_table (id,user_id,usage_date,session_count,total_usage_minutes,max_force_application,avg_battery_level,error_codes,last_active) VALUES (10, '728fb50b-9774-4433-adb8-117133edf7ed', '2024-12-04',7,270,10.5,86.4,'["ERR-001","ERR-006"]','2024-12-04 17:45:18');
2 changes: 2 additions & 0 deletions airflow/db/init-db.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
CREATE DATABASE sample;
GRANT ALL PRIVILEGES ON DATABASE sample TO airflow;
Loading