التكامل مع Python
تقدم هذه الصفحة أمثلة Python عملية للتعامل مع FGA Logger — قراءة البيانات التسلسلية المباشرة، والحفظ في CSV، وتحميل البيانات للتحليل.
المتطلبات:
التثبيت:
pip install pyserial pandas
قراءة البيانات التسلسلية المباشرة
يتصل هذا المثال بـ FGA Logger عبر USB ويطبع كل صف CSV وارد:
import serial
# غيّر إلى المنفذ الخاص بك:
# Windows: 'COM3', 'COM4', إلخ.
# Linux: '/dev/ttyUSB0'
# macOS: '/dev/tty.usbserial-XXXX'
PORT = 'COM3'
BAUD = 115200 # اضبط ليتطابق مع إعدادات FGA Logger التسلسلية
def read_logger(port, baud):
with serial.Serial(port, baud, timeout=2) as ser:
print(f"Connected to {port} at {baud} baud")
print("Waiting for data...\n")
while True:
line = ser.readline().decode('utf-8', errors='replace').strip()
if line:
print(line)
if __name__ == '__main__':
read_logger(PORT, BAUD)
حفظ البيانات التسلسلية في CSV
يقرأ هذا المثال من FGA Logger ويحفظ جميع الصفوف الواردة في ملف CSV محلي. تُكتب أول سطر ترويسة مستلم مرة واحدة؛ تتبعه جميع صفوف البيانات اللاحقة.
import serial
import csv
from datetime import datetime
PORT = 'COM3'
BAUD = 115200
OUTFILE = f"fga_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
# ترويسة CSV المتوقعة من FGA Logger
EXPECTED_HEADER = [
'Timestamp_ms', 'B1x_nT', 'B1y_nT', 'B1z_nT', 'B1v_nT',
'B2x_nT', 'B2y_nT', 'B2z_nT', 'B2v_nT',
'Lat_deg', 'Lon_deg', 'Alt_m', 'SIV', 'Fix', 'HDOP_m'
]
def save_to_csv(port, baud, outfile):
header_written = False
with serial.Serial(port, baud, timeout=5) as ser, \
open(outfile, 'w', newline='') as f:
writer = csv.writer(f)
print(f"Logging to {outfile}")
try:
while True:
line = ser.readline().decode('utf-8', errors='replace').strip()
if not line:
continue
fields = line.split(',')
if not header_written:
if fields[0] == 'Timestamp_ms':
writer.writerow(fields)
else:
writer.writerow(EXPECTED_HEADER)
writer.writerow(fields)
header_written = True
f.flush()
continue
writer.writerow(fields)
f.flush()
print(f" {line[:80]}")
except KeyboardInterrupt:
print(f"\nLogging stopped. File saved: {outfile}")
if __name__ == '__main__':
save_to_csv(PORT, BAUD, OUTFILE)
تحميل ملف CSV باستخدام pandas
بمجرد حصولك على ملف CSV — إما من بطاقة SD أو محفوظ عبر الاتصال التسلسلي — قم بتحميله باستخدام pandas للتحليل:
import pandas as pd
CSV_FILE = 'fga_log_20240315_143022.csv'
df = pd.read_csv(CSV_FILE)
print("Shape:", df.shape)
print("\nFirst rows:")
print(df.head())
print("\nColumn types:")
print(df.dtypes)
print("\nBasic statistics:")
print(df[['B1x_nT', 'B1y_nT', 'B1z_nT', 'B1v_nT']].describe())
حساب التدرج (وضع مقياس التدرج)
في تكوينات مقياس التدرج بتجميعَي مستشعرَين، احسب التدرج لكل محور:
import pandas as pd
df = pd.read_csv('fga_log.csv')
# احسب التدرج (المستشعر 1 ناقص المستشعر 2) لكل محور
df['Grad_x_nT'] = df['B1x_nT'] - df['B2x_nT']
df['Grad_y_nT'] = df['B1y_nT'] - df['B2y_nT']
df['Grad_z_nT'] = df['B1z_nT'] - df['B2z_nT']
# مقدار التدرج الكلي
df['Grad_v_nT'] = (
df['Grad_x_nT']**2 +
df['Grad_y_nT']**2 +
df['Grad_z_nT']**2
) ** 0.5
print(df[['Timestamp_ms', 'Grad_x_nT', 'Grad_y_nT', 'Grad_z_nT', 'Grad_v_nT']].head(10))
التصفية حسب جودة GPS
قم بتصفية الصفوف التي لا يوجد فيها إصلاح GPS أو الدقة منخفضة قبل التحليل:
import pandas as pd
df = pd.read_csv('fga_log.csv')
# احتفظ فقط بالصفوف ذات إصلاح GPS ثلاثي الأبعاد ودقة HDOP أقل من 2.0
df_clean = df[
(df['Fix'] == 3) &
(df['HDOP_m'] < 2.0) &
(df['SIV'] >= 4)
].copy()
print(f"Total rows: {len(df)}")
print(f"Clean rows: {len(df_clean)}")
print(f"Removed: {len(df) - len(df_clean)}")
رسم الحقل الكلي
import pandas as pd
import matplotlib.pyplot as plt
df = pd.read_csv('fga_log.csv')
plt.figure(figsize=(12, 4))
plt.plot(df['Timestamp_ms'] / 1000, df['B1v_nT'], linewidth=0.8)
plt.xlabel('Time (s)')
plt.ylabel('Total Field B1 (nT)')
plt.title('FGA Logger — Total Magnetic Field')
plt.tight_layout()
plt.savefig('field_plot.png', dpi=150)
plt.show()
تصدير مسار GPS إلى GeoJSON
تصدير مسار GPS للاستخدام في QGIS أو أدوات GIS الأخرى:
import pandas as pd
import json
df = pd.read_csv('fga_log.csv')
df_gps = df[df['Fix'] >= 2].dropna(subset=['Lat_deg', 'Lon_deg'])
features = []
for _, row in df_gps.iterrows():
feature = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [row['Lon_deg'], row['Lat_deg'], row['Alt_m']]
},
"properties": {
"timestamp_ms": row['Timestamp_ms'],
"B1v_nT": row['B1v_nT'],
"B2v_nT": row['B2v_nT'],
}
}
features.append(feature)
geojson = {"type": "FeatureCollection", "features": features}
with open('track.geojson', 'w') as f:
json.dump(geojson, f, indent=2)
print(f"Exported {len(features)} points to track.geojson")