mmdvm_watch.py

# coding: utf-8
import os
import time
import json
import re
import requests
from datetime import datetime, timezone, timedelta
import socket
import logging
import signal
import sys

# --- 設定 ---
ENDPOINT = "http://log.forums.gr.jp/wp-json/wpsd/v1/ingest"
TOKEN = "UguaDxA2ZMCr8sn"
LOG_DIR = "/var/log/pi-star"
STATE_FILE = os.path.join(LOG_DIR, ".mmdvm_watch.state")

# --- ログ設定 ---
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)

# --- 正規表現パターン ---
HEADER_PATTERN = re.compile(
r'(\d{4}-\d{2}-\d{2})\s+(\d{2}:\d{2}:\d{2}.\d{3})\s+DMR\s+Slot\s+(\d+),\s+received\s+(RF|NETWORK)\s+voice\s+header\s+from\s+(.+?)\s+to\s+(TG|PC|REF)\s+(\d+|ALL)',
re.IGNORECASE
)
END_PATTERN = re.compile(
r'(\d{4}-\d{2}-\d{2})\s+(\d{2}:\d{2}:\d{2}.\d{3})\s+DMR\s+Slot\s+(\d+),\s+received\s+(RF|NETWORK)\s+end\s+of\s+voice\s+transmission\s+from\s+(.+?)\s+to\s+(TG|PC|REF)\s+(\d+|ALL),?.*',
re.IGNORECASE
)

def load_state():
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, 'r') as f:
return json.load(f)
except (IOError, json.JSONDecodeError) as e:
logging.error(f"Failed to load state file: {e}. Starting fresh.")
return {}

def save_state(state):
try:
with open(STATE_FILE, 'w') as f:
json.dump(state, f)
except IOError as e:
logging.error(f"Failed to save state file: {e}")

def create_session_payload(header_line, end_line):
match_header = HEADER_PATTERN.search(header_line)
match_end = END_PATTERN.search(end_line)
if not match_header or not match_end:
logging.error("Failed to match header or end line for payload creation.")
return None
date_str, time_str, slot, src, callsign, dst_type, dst = match_header.groups()[:7]
try:
dt_utc = datetime.strptime(f'{date_str} {time_str}', '%Y-%m-%d %H:%M:%S.%f').replace(tzinfo=timezone.utc)
dt_jst = dt_utc + timedelta(hours=9)
except ValueError:
logging.error(f"Timestamp parsing failed for line: {header_line}")
return None
node_name = socket.gethostname()
payload = {
"type": "session",
"node": node_name,
"timestamp": dt_jst.strftime("%Y-%m-%d %H:%M:%S"),
"timestamp_local": dt_jst.isoformat(),
"dmr": {
"slot": int(slot),
"src": src.upper(),
"callsign": callsign.upper(),
"dst_type": dst_type.upper(),
"dst": dst
},
"raw": {
"header": header_line.strip(),
"end": end_line.strip()
},
"callsign": callsign.upper(),
"dmr_id": None,
"name": None
}
if dst_type.upper() == "TG":
if dst.isdigit():
payload['dmr']['tg'] = int(dst)
else:
payload['dmr']['tg'] = dst
return payload

def post_json(data):
payload = json.dumps(data).encode('utf-8')
headers = {
"Content-Type": "application/json; charset=utf-8",
"X-WPSD-Token": TOKEN,
"Authorization": f"Bearer {TOKEN}",
}
urls = [ENDPOINT]
if ENDPOINT.endswith("/wpsd/v1/ingest"):
base_url = ENDPOINT.rsplit("/wp-json", 1)[0]
urls.append(f"{base_url}/?rest_route=/wpsd/v1/ingest")
for url in urls:
try:
response = requests.post(url, data=payload, headers=headers, timeout=8)
if response.status_code == 200:
callsign = data.get('callsign', 'Unknown')
logging.info(f"Successfully POSTed data for {callsign} to {url}")
return True
else:
logging.warning(f"POST failed with status code {response.status_code} at {url}")
except requests.exceptions.RequestException as e:
logging.error(f"An error occurred during POST: {e}")
return False
logging.error("POST failed after all retries.")
return False

def signal_handler(sig, frame):
logging.info("Stopping the watcher.")
sys.exit(0)

def main():
signal.signal(signal.SIGINT, signal_handler)
if not os.path.isdir(LOG_DIR):
logging.error(f"Log directory not found: {LOG_DIR}")
return

prev_files = load_state()
session_buffer = {}

logging.info(f"Watching directory: {LOG_DIR}")

current_files_on_start = set(os.listdir(LOG_DIR))
for file in current_files_on_start:
filepath = os.path.join(LOG_DIR, file)
if os.path.isfile(filepath) and re.match(r'MMDVM-\d{4}-\d{2}-\d{2}\.log', file):
if file not in prev_files:
prev_files[file] = os.path.getsize(filepath)
logging.info(f"Initializing state for new log file {file} from its end.")
save_state(prev_files)

while True:
time.sleep(2)
current_files = set(os.listdir(LOG_DIR))

added = current_files - set(prev_files.keys())
removed = set(prev_files.keys()) - current_files

for file in added:
filepath = os.path.join(LOG_DIR, file)
if os.path.isfile(filepath) and re.match(r'MMDVM-\d{4}-\d{2}-\d{2}\.log', file):
prev_files[file] = os.path.getsize(filepath)
logging.info(f"New log file detected: {file}. Monitoring from its end.")
save_state(prev_files)

for file in removed:
logging.info(f"Log file removed: {file}")
if file in prev_files:
del prev_files[file]
save_state(prev_files)

for file in current_files.intersection(set(prev_files.keys())):
filepath = os.path.join(LOG_DIR, file)

if not re.match(r'MMDVM-\d{4}-\d{2}-\d{2}\.log', file):
continue

try:
current_size = os.path.getsize(filepath)
except OSError as e:
logging.error(f"Error accessing file {filepath}: {e}")
continue

prev_size = prev_files.get(file, 0)

if current_size > prev_size:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
f.seek(prev_size)
new_content = f.read()
for line in new_content.strip().splitlines():
if 'voice header' in line:
match = HEADER_PATTERN.search(line)
if match:
callsign = match.group(5)
slot = match.group(3)
session_key = f"{callsign}_{slot}"
session_buffer[session_key] = line
elif 'end of voice' in line:
match = END_PATTERN.search(line)
if match:
callsign = match.group(5)
slot = match.group(3)
session_key = f"{callsign}_{slot}"
if session_key in session_buffer:
payload = create_session_payload(session_buffer[session_key], line)
if payload:
post_json(payload)
del session_buffer[session_key]
prev_files[file] = current_size
save_state(prev_files)

elif current_size < prev_size:
logging.info(f"File truncated or rotated: {file}. Re-initializing monitoring.")
prev_files[file] = current_size
session_buffer.clear()
save_state(prev_files)

if __name__ == "__main__":
main()

Leave a Reply

Your email address will not be published. Required fields are marked *