-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdata_analysis.py
More file actions
268 lines (227 loc) · 9.04 KB
/
data_analysis.py
File metadata and controls
268 lines (227 loc) · 9.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
"""
数据分析模块 - 为数学建模论文生成示例数据和统计分析
"""
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional
import json
class DataGenerator:
"""数据生成器 - 根据问题类型生成模拟数据"""
@staticmethod
def generate_time_series_data(n_points: int = 100, trend: str = "linear",
noise_level: float = 0.1, seasonality: bool = True) -> pd.DataFrame:
"""
生成时间序列数据
:param n_points: 数据点数量
:param trend: 趋势类型 ('linear', 'exponential', 'polynomial')
:param noise_level: 噪声水平
:param seasonality: 是否包含季节性
:return: DataFrame with 'time' and 'value' columns
"""
time = np.arange(n_points)
if trend == "linear":
base = 100 + 2 * time
elif trend == "exponential":
base = 100 * np.exp(0.02 * time)
elif trend == "polynomial":
base = 100 + 0.1 * time**2
else:
base = 100 + time
if seasonality:
seasonal = 10 * np.sin(2 * np.pi * time / 12)
else:
seasonal = 0
noise = np.random.normal(0, noise_level * np.mean(base), n_points)
value = base + seasonal + noise
return pd.DataFrame({
'time': time,
'value': value,
'trend': base,
'seasonal': seasonal if seasonality else 0
})
@staticmethod
def generate_multivariate_data(n_samples: int = 200, n_features: int = 5,
correlation: float = 0.5) -> pd.DataFrame:
"""
生成多变量数据(可用于回归、分类等)
:param n_samples: 样本数量
:param n_features: 特征数量
:param correlation: 特征间相关性
:return: DataFrame
"""
# 生成相关特征
mean = np.zeros(n_features)
cov = np.eye(n_features) * (1 - correlation) + np.ones((n_features, n_features)) * correlation
cov[np.diag_indices_from(cov)] = 1
data = np.random.multivariate_normal(mean, cov, n_samples)
# 添加特征名称
columns = [f'特征{i+1}' for i in range(n_features)]
df = pd.DataFrame(data, columns=columns)
# 生成目标变量(基于特征的线性组合)
target = np.sum(data[:, :3], axis=1) + np.random.normal(0, 0.5, n_samples)
df['目标变量'] = target
return df
@staticmethod
def generate_category_data(categories: List[str], n_per_category: int = 50) -> pd.DataFrame:
"""
生成分类数据
:param categories: 类别列表
:param n_per_category: 每个类别的样本数
:return: DataFrame
"""
data = []
for cat in categories:
for _ in range(n_per_category):
data.append({
'类别': cat,
'数值1': np.random.normal(50 + hash(cat) % 20, 10),
'数值2': np.random.normal(30 + hash(cat) % 15, 8),
'数值3': np.random.uniform(0, 100)
})
return pd.DataFrame(data)
@staticmethod
def generate_optimization_data(n_items: int = 20) -> pd.DataFrame:
"""
生成优化问题数据(如资源分配、背包问题等)
:param n_items: 物品数量
:return: DataFrame
"""
return pd.DataFrame({
'物品编号': range(1, n_items + 1),
'价值': np.random.uniform(10, 100, n_items),
'成本': np.random.uniform(5, 50, n_items),
'权重': np.random.uniform(1, 10, n_items),
'需求量': np.random.randint(1, 20, n_items)
})
class StatisticalAnalyzer:
"""统计分析器 - 对数据进行统计分析"""
@staticmethod
def basic_statistics(df: pd.DataFrame, column: str) -> Dict:
"""
计算基本统计量
:param df: DataFrame
:param column: 列名
:return: 统计量字典
"""
return {
'均值': float(df[column].mean()),
'中位数': float(df[column].median()),
'标准差': float(df[column].std()),
'最小值': float(df[column].min()),
'最大值': float(df[column].max()),
'四分位数Q1': float(df[column].quantile(0.25)),
'四分位数Q3': float(df[column].quantile(0.75)),
'偏度': float(df[column].skew()),
'峰度': float(df[column].kurtosis())
}
@staticmethod
def correlation_analysis(df: pd.DataFrame) -> pd.DataFrame:
"""
相关性分析
:param df: DataFrame
:return: 相关性矩阵
"""
numeric_cols = df.select_dtypes(include=[np.number]).columns
return df[numeric_cols].corr()
@staticmethod
def regression_analysis(df: pd.DataFrame, x_col: str, y_col: str) -> Dict:
"""
简单线性回归分析
:param df: DataFrame
:param x_col: 自变量列名
:param y_col: 因变量列名
:return: 回归结果字典
"""
from scipy import stats
x = df[x_col].values
y = df[y_col].values
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
return {
'斜率': float(slope),
'截距': float(intercept),
'相关系数R': float(r_value),
'R²': float(r_value ** 2),
'p值': float(p_value),
'标准误差': float(std_err)
}
@staticmethod
def time_series_analysis(df: pd.DataFrame, time_col: str = 'time', value_col: str = 'value') -> Dict:
"""
时间序列分析
:param df: DataFrame
:param time_col: 时间列名
:param value_col: 数值列名
:return: 分析结果字典
"""
values = df[value_col].values
# 计算增长率
growth_rates = np.diff(values) / values[:-1] * 100
avg_growth_rate = np.mean(growth_rates)
# 计算移动平均
window = min(7, len(values) // 10)
moving_avg = pd.Series(values).rolling(window=window).mean().values
return {
'平均增长率(%)': float(avg_growth_rate),
'波动率': float(np.std(growth_rates)),
'趋势': '上升' if avg_growth_rate > 0 else '下降',
'移动平均': moving_avg.tolist() if len(moving_avg) > 0 else []
}
class DataProcessor:
"""数据处理器 - 数据清洗和预处理"""
@staticmethod
def normalize_data(df: pd.DataFrame, columns: Optional[List[str]] = None) -> pd.DataFrame:
"""
数据标准化(Z-score)
:param df: DataFrame
:param columns: 要标准化的列,None表示所有数值列
:return: 标准化后的DataFrame
"""
df_copy = df.copy()
if columns is None:
columns = df_copy.select_dtypes(include=[np.number]).columns
for col in columns:
mean = df_copy[col].mean()
std = df_copy[col].std()
if std > 0:
df_copy[col] = (df_copy[col] - mean) / std
return df_copy
@staticmethod
def min_max_scale(df: pd.DataFrame, columns: Optional[List[str]] = None) -> pd.DataFrame:
"""
最小-最大归一化
:param df: DataFrame
:param columns: 要归一化的列
:return: 归一化后的DataFrame
"""
df_copy = df.copy()
if columns is None:
columns = df_copy.select_dtypes(include=[np.number]).columns
for col in columns:
min_val = df_copy[col].min()
max_val = df_copy[col].max()
if max_val > min_val:
df_copy[col] = (df_copy[col] - min_val) / (max_val - min_val)
return df_copy
@staticmethod
def handle_missing_values(df: pd.DataFrame, strategy: str = 'mean') -> pd.DataFrame:
"""
处理缺失值
:param df: DataFrame
:param strategy: 策略 ('mean', 'median', 'mode', 'drop')
:return: 处理后的DataFrame
"""
df_copy = df.copy()
numeric_cols = df_copy.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
if df_copy[col].isna().any():
if strategy == 'mean':
df_copy[col].fillna(df_copy[col].mean(), inplace=True)
elif strategy == 'median':
df_copy[col].fillna(df_copy[col].median(), inplace=True)
elif strategy == 'mode':
df_copy[col].fillna(df_copy[col].mode()[0], inplace=True)
elif strategy == 'drop':
df_copy.dropna(subset=[col], inplace=True)
return df_copy
# 导出主要类
__all__ = ['DataGenerator', 'StatisticalAnalyzer', 'DataProcessor']