إنتقل إلى المحتوى الرئيسي

التكامل مع Python

تقدم هذه الصفحة أمثلة Python عملية للتعامل مع FGA Logger — قراءة البيانات التسلسلية المباشرة، والحفظ في CSV، وتحميل البيانات للتحليل.

المتطلبات:

  • pyserial — اتصال المنفذ التسلسلي
  • pandas — تحليل البيانات (اختياري، لأمثلة التحليل)

التثبيت:

pip install pyserial pandas

قراءة البيانات التسلسلية المباشرة

يتصل هذا المثال بـ FGA Logger عبر USB ويطبع كل صف CSV وارد:

import serial

# غيّر إلى المنفذ الخاص بك:
# Windows: 'COM3', 'COM4', إلخ.
# Linux: '/dev/ttyUSB0'
# macOS: '/dev/tty.usbserial-XXXX'
PORT = 'COM3'
BAUD = 115200 # اضبط ليتطابق مع إعدادات FGA Logger التسلسلية

def read_logger(port, baud):
with serial.Serial(port, baud, timeout=2) as ser:
print(f"Connected to {port} at {baud} baud")
print("Waiting for data...\n")

while True:
line = ser.readline().decode('utf-8', errors='replace').strip()
if line:
print(line)

if __name__ == '__main__':
read_logger(PORT, BAUD)

حفظ البيانات التسلسلية في CSV

يقرأ هذا المثال من FGA Logger ويحفظ جميع الصفوف الواردة في ملف CSV محلي. تُكتب أول سطر ترويسة مستلم مرة واحدة؛ تتبعه جميع صفوف البيانات اللاحقة.

import serial
import csv
from datetime import datetime

PORT = 'COM3'
BAUD = 115200
OUTFILE = f"fga_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"

# ترويسة CSV المتوقعة من FGA Logger
EXPECTED_HEADER = [
'Timestamp_ms', 'B1x_nT', 'B1y_nT', 'B1z_nT', 'B1v_nT',
'B2x_nT', 'B2y_nT', 'B2z_nT', 'B2v_nT',
'Lat_deg', 'Lon_deg', 'Alt_m', 'SIV', 'Fix', 'HDOP_m'
]

def save_to_csv(port, baud, outfile):
header_written = False

with serial.Serial(port, baud, timeout=5) as ser, \
open(outfile, 'w', newline='') as f:

writer = csv.writer(f)
print(f"Logging to {outfile}")

try:
while True:
line = ser.readline().decode('utf-8', errors='replace').strip()
if not line:
continue

fields = line.split(',')

if not header_written:
if fields[0] == 'Timestamp_ms':
writer.writerow(fields)
else:
writer.writerow(EXPECTED_HEADER)
writer.writerow(fields)
header_written = True
f.flush()
continue

writer.writerow(fields)
f.flush()
print(f" {line[:80]}")

except KeyboardInterrupt:
print(f"\nLogging stopped. File saved: {outfile}")

if __name__ == '__main__':
save_to_csv(PORT, BAUD, OUTFILE)

تحميل ملف CSV باستخدام pandas

بمجرد حصولك على ملف CSV — إما من بطاقة SD أو محفوظ عبر الاتصال التسلسلي — قم بتحميله باستخدام pandas للتحليل:

import pandas as pd

CSV_FILE = 'fga_log_20240315_143022.csv'

df = pd.read_csv(CSV_FILE)

print("Shape:", df.shape)
print("\nFirst rows:")
print(df.head())

print("\nColumn types:")
print(df.dtypes)

print("\nBasic statistics:")
print(df[['B1x_nT', 'B1y_nT', 'B1z_nT', 'B1v_nT']].describe())

حساب التدرج (وضع مقياس التدرج)

في تكوينات مقياس التدرج بتجميعَي مستشعرَين، احسب التدرج لكل محور:

import pandas as pd

df = pd.read_csv('fga_log.csv')

# احسب التدرج (المستشعر 1 ناقص المستشعر 2) لكل محور
df['Grad_x_nT'] = df['B1x_nT'] - df['B2x_nT']
df['Grad_y_nT'] = df['B1y_nT'] - df['B2y_nT']
df['Grad_z_nT'] = df['B1z_nT'] - df['B2z_nT']

# مقدار التدرج الكلي
df['Grad_v_nT'] = (
df['Grad_x_nT']**2 +
df['Grad_y_nT']**2 +
df['Grad_z_nT']**2
) ** 0.5

print(df[['Timestamp_ms', 'Grad_x_nT', 'Grad_y_nT', 'Grad_z_nT', 'Grad_v_nT']].head(10))

التصفية حسب جودة GPS

قم بتصفية الصفوف التي لا يوجد فيها إصلاح GPS أو الدقة منخفضة قبل التحليل:

import pandas as pd

df = pd.read_csv('fga_log.csv')

# احتفظ فقط بالصفوف ذات إصلاح GPS ثلاثي الأبعاد ودقة HDOP أقل من 2.0
df_clean = df[
(df['Fix'] == 3) &
(df['HDOP_m'] < 2.0) &
(df['SIV'] >= 4)
].copy()

print(f"Total rows: {len(df)}")
print(f"Clean rows: {len(df_clean)}")
print(f"Removed: {len(df) - len(df_clean)}")

رسم الحقل الكلي

import pandas as pd
import matplotlib.pyplot as plt

df = pd.read_csv('fga_log.csv')

plt.figure(figsize=(12, 4))
plt.plot(df['Timestamp_ms'] / 1000, df['B1v_nT'], linewidth=0.8)
plt.xlabel('Time (s)')
plt.ylabel('Total Field B1 (nT)')
plt.title('FGA Logger — Total Magnetic Field')
plt.tight_layout()
plt.savefig('field_plot.png', dpi=150)
plt.show()

تصدير مسار GPS إلى GeoJSON

تصدير مسار GPS للاستخدام في QGIS أو أدوات GIS الأخرى:

import pandas as pd
import json

df = pd.read_csv('fga_log.csv')

df_gps = df[df['Fix'] >= 2].dropna(subset=['Lat_deg', 'Lon_deg'])

features = []
for _, row in df_gps.iterrows():
feature = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [row['Lon_deg'], row['Lat_deg'], row['Alt_m']]
},
"properties": {
"timestamp_ms": row['Timestamp_ms'],
"B1v_nT": row['B1v_nT'],
"B2v_nT": row['B2v_nT'],
}
}
features.append(feature)

geojson = {"type": "FeatureCollection", "features": features}

with open('track.geojson', 'w') as f:
json.dump(geojson, f, indent=2)

print(f"Exported {len(features)} points to track.geojson")