こんにちは。割と本気で精神を病んでます。
TS抜きチューナーやワンセグチューナーからパイプで標準入力に入ってきた地上デジタル放送のストリームを読み込んで、その中にあるTDT(時刻情報パケット)を取り出して現在時刻を表示する時計をPython3で作りました。
./recfsusb2n –trim 10000 27 – – | ./isdbclock.py
たとえばrecfsusb2nならこのように動かすと…
GUIで地デジベースの時刻を教えてくれます。これで電波時計の電波は届かなくて、インターネットにもつながってないけど、なぜか地デジの電波だけ入る場所でも時刻がわかるね!!
残念ながら、たぶん変調復調、CATVでのメディアコンバートのせいで遅延があって、0.7秒くらい標準時から遅れます。なのでネタ用途にしか使えないです。
時計GUI部分はむかしつくったNTP時計を流用してるので、微妙なコードです。そのへんはご愛嬌ですね。
地デジ部分は、sys.stdin.buffer.read(packet_length)とやって標準入力から読み込んで、いったんio.BytesIOにwrite()してバッファしてから読み込んでいます。ファイルから読み込むのよりもやっぱり難しいですね。でもこれでやり方がつかめたので、いろいろ活用が広げられそうです。
パースはbitstringを使用していますが、Pure Pythonなのでめっっっちゃ遅いです。i5でも5秒分の処理に7秒位かかります。仕方ないので今回は2つにひとつのパケットのみ処理することにしました。syncとPIDのパースを自分で実装したら、結構早くなるかもです。
bitstringのfindは文字列で検索バイトを渡すというのと、BytesIOやStringIOは終端ではなくてseek位置から書き込んで、seek位置から読み込むというのと、バイナリがほしいならPython3ではsys.stdin.bufferを使うのあたりでハマりました。
以上です。今度もまたなんか面白いことやりたいです。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
#coding:utf-8 | |
import json | |
import random | |
import time | |
try: | |
from gi import pygtkcompat | |
except ImportError: | |
pygtkcompat = None | |
if pygtkcompat is not None: | |
pygtkcompat.enable() | |
pygtkcompat.enable_gtk(version='3.0') | |
import gtk | |
import gobject | |
import math | |
import bitstring | |
import sys | |
import io | |
import datetime | |
packet_length = 188 | |
#packet_length = 204 | |
syncbyte = 0x47 | |
count = 0 | |
failcount = 0 | |
readpos = 0 | |
bio = io.BytesIO() | |
#quoted from https://gist.github.com/jiffyclub/1294443 | |
def mjd_to_date(mjd): | |
jd = mjd + 2400000.5 | |
jd = jd + 0.5 | |
F, I = math.modf(jd) | |
I = int(I) | |
A = math.trunc((I – 1867216.25)/36524.25) | |
if I > 2299160: | |
B = I + 1 + A – math.trunc(A / 4.) | |
else: | |
B = I | |
C = B + 1524 | |
D = math.trunc((C – 122.1) / 365.25) | |
E = math.trunc(365.25 * D) | |
G = math.trunc((C – E) / 30.6001) | |
day = int(C – E + F – math.trunc(30.6001 * G)) | |
if G < 13.5: | |
month = G – 1 | |
else: | |
month = G – 13 | |
if month > 2.5: | |
year = D – 4716 | |
else: | |
year = D – 4715 | |
return year, month, day | |
class TS: | |
def __init__(self, packet): | |
self.packet = packet | |
def unpack(self): | |
if packet_length == 188: | |
#188-5(header+pointer)-4(CRC)=179, 179*8=1432bits | |
self.sync, self.error, self.pid, self.scramble, self.counter, self.pointer, self.payload, self.crc = \ | |
self.packet.unpack('uint:8, bool, pad:2, uint:13, bits:2, pad:2, uint:4, uint:8, bits:1432, bytes:4') | |
else: | |
#204(16がリードソロモン) | |
self.sync, self.error, self.pid, self.scramble, self.counter, self.pointer, self.payload, self.crc, self.reedsolomon = \ | |
self.packet.unpack('uint:8, bool, pad:2, uint:13, bits:2, pad:2, uint:4, uint:8, bits:1432, bytes:4, bytes:16') | |
#find sync | |
if self.sync != syncbyte: | |
found = self.packet.find('0x47', bytealigned=True) #findには文字列で渡す… | |
return True | |
return False | |
def tdt_unpack(self): | |
table_id, mjd16, h1, h2, m1, m2, s1, s2 = \ | |
self.payload.unpack('uint:8, pad:16, uint:16, uint:4, uint:4, uint:4, uint:4, uint:4, uint:4') | |
year, month, day = mjd_to_date(mjd16) | |
hour = int(str(h1)+str(h2)) | |
minute = int(str(m1)+str(m2)) | |
sec = int(str(s1)+str(s2)) | |
print('TIME {}-{}-{} {}:{}:{} JST'.format(year, month, day, hour, minute, sec)) | |
return datetime.datetime(year, month, day, hour, minute, sec) | |
while True: | |
#新規パケットをwriteする前に終端にseek(seek位置から書くから) | |
bio.seek(0, io.SEEK_END) | |
bio.write(sys.stdin.buffer.read(packet_length*2)) | |
#読むので前回位置にseek | |
bio.seek(readpos) | |
read_data = bio.read(packet_length) | |
#前回seek位置を次のに合わせる | |
if count%10000 != 0: | |
readpos += packet_length*2 #一つ飛ばし | |
else: | |
#10000ごとにバッファクリア | |
bio = io.BytesIO() | |
readpos = 0 | |
packet = bitstring.ConstBitStream(bytes=read_data) | |
ts = TS(packet) | |
if ts.unpack(): | |
#syncできなかったら戻る | |
if ts.packet.bytepos != 0: #このパケット内にsyncがあったら | |
if readpos > packet_length*2: | |
readpos = readpos – packet_length*2 + ts.packet.bytepos #一つ飛ばし考慮 | |
else: | |
readpos = ts.packet.bytepos | |
failcount += 1 | |
else: | |
#TDT | |
if ts.pid == 0x14 and not ts.error: | |
nowtime = ts.tdt_unpack() | |
break | |
count += 1 | |
#マシン時計の標準時に対する遅延を返す | |
def delayer(nowtime): | |
unixtime_isdb = nowtime.timestamp() | |
unixtime_machine = time.time() | |
unixtime_delay = unixtime_isdb–unixtime_machine | |
return unixtime_delay | |
delay = delayer(nowtime) | |
print('delay:', delay) | |
class gui: | |
def __init__(self): | |
#Window | |
self.window = gtk.Window() | |
self.window.set_title('ISDB-T Clock') | |
self.window.connect('destroy_event',self.end_app) | |
self.window.connect('delete_event',self.end_app) | |
#self.window.set_icon_from_file("icon.ico") | |
self.window.set_default_size(200,50) | |
time_str, wait_msecs = self.make_time(delay) | |
if 0<wait_msecs: | |
time.sleep(wait_msecs) | |
else: | |
#正に修正 | |
time.sleep(1+wait_msecs) | |
#Label | |
self.label_clock = gtk.Label() | |
self.label_clock.set_markup(time_str) | |
#Button | |
#self.adjust_button = gtk.Button() | |
#self.adjust_button.set_label("再校正") | |
#self.adjust_button.connect("clicked",self.redelayer) | |
self.vbox = gtk.VBox() | |
self.vbox.add(self.label_clock) | |
#self.vbox.add(self.adjust_button) | |
self.window.add(self.vbox) | |
self.window.show_all() | |
gobject.timeout_add(1000,self.on_timeout) | |
def make_time(self, delay): | |
nowunixtime = time.time() + delay | |
nowtime_str = time.ctime(nowunixtime) | |
#小数点以下を出1- | |
wait_msecs = 1–math.modf(nowunixtime)[0] | |
return '<b>'+nowtime_str + '</b>', wait_msecs | |
def end_app(self,widget,data=None): | |
#gtk.main_quit() | |
exit() | |
return False | |
def on_timeout(self,*args): | |
wait_sec = self.make_time(delay)[1] | |
#誤差修正 | |
if 0<wait_sec: | |
time.sleep(wait_sec) | |
#多少の誤差は許容 | |
elif 0>wait_sec+0.05: | |
time.sleep(1+wait_msec) | |
time_str = self.make_time(delay)[0] | |
self.label_clock.set_markup(time_str) | |
return True | |
def redelayer(self,widget,data=None): | |
delayer(uri) | |
gobject.threads_init() | |
gui_instance = gui() | |
gtk.main() |