CC-5990, CC-5991 - Python cleanup, removed need for /usr/lib/airtime
This commit is contained in:
parent
cd102b984b
commit
875a9dfd8b
115 changed files with 248 additions and 212 deletions
1
python_apps/media-monitor/mm2/tests/__init__.py
Normal file
1
python_apps/media-monitor/mm2/tests/__init__.py
Normal file
|
@ -0,0 +1 @@
|
|||
|
115
python_apps/media-monitor/mm2/tests/api_client.cfg
Normal file
115
python_apps/media-monitor/mm2/tests/api_client.cfg
Normal file
|
@ -0,0 +1,115 @@
|
|||
bin_dir = "/usr/lib/airtime/api_clients"
|
||||
|
||||
#############################
|
||||
## Common
|
||||
#############################
|
||||
|
||||
# Value needed to access the API
|
||||
api_key = '3MP2IUR45E6KYQ01CUYK'
|
||||
|
||||
# Path to the base of the API
|
||||
api_base = 'api'
|
||||
|
||||
# URL to get the version number of the server API
|
||||
version_url = 'version/api_key/%%api_key%%'
|
||||
|
||||
#URL to register a components IP Address with the central web server
|
||||
register_component = 'register-component/format/json/api_key/%%api_key%%/component/%%component%%'
|
||||
|
||||
# Hostname
|
||||
base_url = 'localhost'
|
||||
base_port = 80
|
||||
|
||||
#############################
|
||||
## Config for Media Monitor
|
||||
#############################
|
||||
|
||||
# URL to setup the media monitor
|
||||
media_setup_url = 'media-monitor-setup/format/json/api_key/%%api_key%%'
|
||||
|
||||
# Tell Airtime the file id associated with a show instance.
|
||||
upload_recorded = 'upload-recorded/format/json/api_key/%%api_key%%/fileid/%%fileid%%/showinstanceid/%%showinstanceid%%'
|
||||
|
||||
# URL to tell Airtime to update file's meta data
|
||||
update_media_url = 'reload-metadata/format/json/api_key/%%api_key%%/mode/%%mode%%'
|
||||
|
||||
# URL to tell Airtime we want a listing of all files it knows about
|
||||
list_all_db_files = 'list-all-files/format/json/api_key/%%api_key%%/dir_id/%%dir_id%%'
|
||||
|
||||
# URL to tell Airtime we want a listing of all dirs its watching (including the stor dir)
|
||||
list_all_watched_dirs = 'list-all-watched-dirs/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to tell Airtime we want to add watched directory
|
||||
add_watched_dir = 'add-watched-dir/format/json/api_key/%%api_key%%/path/%%path%%'
|
||||
|
||||
# URL to tell Airtime we want to add watched directory
|
||||
remove_watched_dir = 'remove-watched-dir/format/json/api_key/%%api_key%%/path/%%path%%'
|
||||
|
||||
# URL to tell Airtime we want to add watched directory
|
||||
set_storage_dir = 'set-storage-dir/format/json/api_key/%%api_key%%/path/%%path%%'
|
||||
|
||||
# URL to tell Airtime about file system mount change
|
||||
update_fs_mount = 'update-file-system-mount/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to tell Airtime about file system mount change
|
||||
handle_watched_dir_missing = 'handle-watched-dir-missing/format/json/api_key/%%api_key%%/dir/%%dir%%'
|
||||
|
||||
#############################
|
||||
## Config for Recorder
|
||||
#############################
|
||||
|
||||
# URL to get the schedule of shows set to record
|
||||
show_schedule_url = 'recorded-shows/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to upload the recorded show's file to Airtime
|
||||
upload_file_url = 'upload-file/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to commit multiple updates from media monitor at the same time
|
||||
|
||||
reload_metadata_group = 'reload-metadata-group/format/json/api_key/%%api_key%%'
|
||||
|
||||
#number of retries to upload file if connection problem
|
||||
upload_retries = 3
|
||||
|
||||
#time to wait between attempts to upload file if connection problem (in seconds)
|
||||
upload_wait = 60
|
||||
|
||||
################################################################################
|
||||
# Uncomment *one of the sets* of values from the API clients below, and comment
|
||||
# out all the others.
|
||||
################################################################################
|
||||
|
||||
#############################
|
||||
## Config for Pypo
|
||||
#############################
|
||||
|
||||
# Schedule export path.
|
||||
# %%from%% - starting date/time in the form YYYY-MM-DD-hh-mm
|
||||
# %%to%% - starting date/time in the form YYYY-MM-DD-hh-mm
|
||||
export_url = 'schedule/api_key/%%api_key%%'
|
||||
|
||||
get_media_url = 'get-media/file/%%file%%/api_key/%%api_key%%'
|
||||
|
||||
# Update whether a schedule group has begun playing.
|
||||
update_item_url = 'notify-schedule-group-play/api_key/%%api_key%%/schedule_id/%%schedule_id%%'
|
||||
|
||||
# Update whether an audio clip is currently playing.
|
||||
update_start_playing_url = 'notify-media-item-start-play/api_key/%%api_key%%/media_id/%%media_id%%/schedule_id/%%schedule_id%%'
|
||||
|
||||
# URL to tell Airtime we want to get stream setting
|
||||
get_stream_setting = 'get-stream-setting/format/json/api_key/%%api_key%%/'
|
||||
|
||||
#URL to update liquidsoap status
|
||||
update_liquidsoap_status = 'update-liquidsoap-status/format/json/api_key/%%api_key%%/msg/%%msg%%/stream_id/%%stream_id%%/boot_time/%%boot_time%%'
|
||||
|
||||
#URL to check live stream auth
|
||||
check_live_stream_auth = 'check-live-stream-auth/format/json/api_key/%%api_key%%/username/%%username%%/password/%%password%%/djtype/%%djtype%%'
|
||||
|
||||
#URL to update source status
|
||||
update_source_status = 'update-source-status/format/json/api_key/%%api_key%%/sourcename/%%sourcename%%/status/%%status%%'
|
||||
|
||||
get_bootstrap_info = 'get-bootstrap-info/format/json/api_key/%%api_key%%'
|
||||
|
||||
get_files_without_replay_gain = 'get-files-without-replay-gain/api_key/%%api_key%%/dir_id/%%dir_id%%'
|
||||
|
||||
update_replay_gain_value = 'update-replay-gain-value/api_key/%%api_key%%'
|
138
python_apps/media-monitor/mm2/tests/live_client.cfg
Normal file
138
python_apps/media-monitor/mm2/tests/live_client.cfg
Normal file
|
@ -0,0 +1,138 @@
|
|||
bin_dir = "/usr/lib/airtime/api_clients"
|
||||
|
||||
############################################
|
||||
# RabbitMQ settings #
|
||||
############################################
|
||||
rabbitmq_host = 'localhost'
|
||||
rabbitmq_user = 'guest'
|
||||
rabbitmq_password = 'guest'
|
||||
rabbitmq_vhost = '/'
|
||||
|
||||
############################################
|
||||
# Media-Monitor preferences #
|
||||
############################################
|
||||
check_filesystem_events = 5 #how long to queue up events performed on the files themselves.
|
||||
check_airtime_events = 30 #how long to queue metadata input from airtime.
|
||||
|
||||
touch_interval = 5
|
||||
chunking_number = 450
|
||||
request_max_wait = 3.0
|
||||
rmq_event_wait = 0.5
|
||||
logpath = '/home/rudi/throwaway/mm2.log'
|
||||
|
||||
#############################
|
||||
## Common
|
||||
#############################
|
||||
|
||||
|
||||
index_path = '/home/rudi/Airtime/python_apps/media-monitor2/sample_post.txt'
|
||||
|
||||
# Value needed to access the API
|
||||
api_key = '5LF5D953RNS3KJSHN6FF'
|
||||
|
||||
# Path to the base of the API
|
||||
api_base = 'api'
|
||||
|
||||
# URL to get the version number of the server API
|
||||
version_url = 'version/api_key/%%api_key%%'
|
||||
|
||||
#URL to register a components IP Address with the central web server
|
||||
register_component = 'register-component/format/json/api_key/%%api_key%%/component/%%component%%'
|
||||
|
||||
# Hostname
|
||||
base_url = 'localhost'
|
||||
base_port = 80
|
||||
|
||||
#############################
|
||||
## Config for Media Monitor
|
||||
#############################
|
||||
|
||||
# URL to setup the media monitor
|
||||
media_setup_url = 'media-monitor-setup/format/json/api_key/%%api_key%%'
|
||||
|
||||
# Tell Airtime the file id associated with a show instance.
|
||||
upload_recorded = 'upload-recorded/format/json/api_key/%%api_key%%/fileid/%%fileid%%/showinstanceid/%%showinstanceid%%'
|
||||
|
||||
# URL to tell Airtime to update file's meta data
|
||||
update_media_url = 'reload-metadata/format/json/api_key/%%api_key%%/mode/%%mode%%'
|
||||
|
||||
# URL to tell Airtime we want a listing of all files it knows about
|
||||
list_all_db_files = 'list-all-files/format/json/api_key/%%api_key%%/dir_id/%%dir_id%%'
|
||||
|
||||
# URL to tell Airtime we want a listing of all dirs its watching (including the stor dir)
|
||||
list_all_watched_dirs = 'list-all-watched-dirs/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to tell Airtime we want to add watched directory
|
||||
add_watched_dir = 'add-watched-dir/format/json/api_key/%%api_key%%/path/%%path%%'
|
||||
|
||||
# URL to tell Airtime we want to add watched directory
|
||||
remove_watched_dir = 'remove-watched-dir/format/json/api_key/%%api_key%%/path/%%path%%'
|
||||
|
||||
# URL to tell Airtime we want to add watched directory
|
||||
set_storage_dir = 'set-storage-dir/format/json/api_key/%%api_key%%/path/%%path%%'
|
||||
|
||||
# URL to tell Airtime about file system mount change
|
||||
update_fs_mount = 'update-file-system-mount/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to tell Airtime about file system mount change
|
||||
handle_watched_dir_missing = 'handle-watched-dir-missing/format/json/api_key/%%api_key%%/dir/%%dir%%'
|
||||
|
||||
#############################
|
||||
## Config for Recorder
|
||||
#############################
|
||||
|
||||
# URL to get the schedule of shows set to record
|
||||
show_schedule_url = 'recorded-shows/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to upload the recorded show's file to Airtime
|
||||
upload_file_url = 'upload-file/format/json/api_key/%%api_key%%'
|
||||
|
||||
# URL to commit multiple updates from media monitor at the same time
|
||||
|
||||
reload_metadata_group = 'reload-metadata-group/format/json/api_key/%%api_key%%'
|
||||
|
||||
#number of retries to upload file if connection problem
|
||||
upload_retries = 3
|
||||
|
||||
#time to wait between attempts to upload file if connection problem (in seconds)
|
||||
upload_wait = 60
|
||||
|
||||
################################################################################
|
||||
# Uncomment *one of the sets* of values from the API clients below, and comment
|
||||
# out all the others.
|
||||
################################################################################
|
||||
|
||||
#############################
|
||||
## Config for Pypo
|
||||
#############################
|
||||
|
||||
# Schedule export path.
|
||||
# %%from%% - starting date/time in the form YYYY-MM-DD-hh-mm
|
||||
# %%to%% - starting date/time in the form YYYY-MM-DD-hh-mm
|
||||
export_url = 'schedule/api_key/%%api_key%%'
|
||||
|
||||
get_media_url = 'get-media/file/%%file%%/api_key/%%api_key%%'
|
||||
|
||||
# Update whether a schedule group has begun playing.
|
||||
update_item_url = 'notify-schedule-group-play/api_key/%%api_key%%/schedule_id/%%schedule_id%%'
|
||||
|
||||
# Update whether an audio clip is currently playing.
|
||||
update_start_playing_url = 'notify-media-item-start-play/api_key/%%api_key%%/media_id/%%media_id%%/schedule_id/%%schedule_id%%'
|
||||
|
||||
# URL to tell Airtime we want to get stream setting
|
||||
get_stream_setting = 'get-stream-setting/format/json/api_key/%%api_key%%/'
|
||||
|
||||
#URL to update liquidsoap status
|
||||
update_liquidsoap_status = 'update-liquidsoap-status/format/json/api_key/%%api_key%%/msg/%%msg%%/stream_id/%%stream_id%%/boot_time/%%boot_time%%'
|
||||
|
||||
#URL to check live stream auth
|
||||
check_live_stream_auth = 'check-live-stream-auth/format/json/api_key/%%api_key%%/username/%%username%%/password/%%password%%/djtype/%%djtype%%'
|
||||
|
||||
#URL to update source status
|
||||
update_source_status = 'update-source-status/format/json/api_key/%%api_key%%/sourcename/%%sourcename%%/status/%%status%%'
|
||||
|
||||
get_bootstrap_info = 'get-bootstrap-info/format/json/api_key/%%api_key%%'
|
||||
|
||||
get_files_without_replay_gain = 'get-files-without-replay-gain/api_key/%%api_key%%/dir_id/%%dir_id%%'
|
||||
|
||||
update_replay_gain_value = 'update-replay-gain-value/api_key/%%api_key%%'
|
14
python_apps/media-monitor/mm2/tests/prepare_tests.py
Normal file
14
python_apps/media-monitor/mm2/tests/prepare_tests.py
Normal file
|
@ -0,0 +1,14 @@
|
|||
# The tests rely on a lot of absolute paths so this file
|
||||
# configures all of that
|
||||
|
||||
music_folder = u'/home/rudi/music'
|
||||
o_path = u'/home/rudi/throwaway/ACDC_-_Back_In_Black-sample-64kbps.ogg'
|
||||
watch_path = u'/home/rudi/throwaway/watch/',
|
||||
real_path1 = u'/home/rudi/throwaway/watch/unknown/unknown/ACDC_-_Back_In_Black-sample-64kbps-64kbps.ogg'
|
||||
opath = u"/home/rudi/Airtime/python_apps/media-monitor2/tests/"
|
||||
ppath = u"/home/rudi/Airtime/python_apps/media-monitor2/media/"
|
||||
|
||||
api_client_path = '/etc/airtime/airtime.conf'
|
||||
# holdover from the time we had a special config for testing
|
||||
sample_config = api_client_path
|
||||
real_config = api_client_path
|
7
python_apps/media-monitor/mm2/tests/run_tests.pl
Executable file
7
python_apps/media-monitor/mm2/tests/run_tests.pl
Executable file
|
@ -0,0 +1,7 @@
|
|||
#!/usr/bin/perl
|
||||
use strict;
|
||||
use warnings;
|
||||
|
||||
foreach my $file (glob "*.py") {
|
||||
system("python $file") unless $file =~ /prepare_tests.py/;
|
||||
}
|
38
python_apps/media-monitor/mm2/tests/test_api_client.py
Normal file
38
python_apps/media-monitor/mm2/tests/test_api_client.py
Normal file
|
@ -0,0 +1,38 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import unittest
|
||||
import os
|
||||
import sys
|
||||
from api_clients import api_client as apc
|
||||
|
||||
|
||||
import prepare_tests
|
||||
|
||||
class TestApiClient(unittest.TestCase):
|
||||
def setUp(self):
|
||||
test_path = prepare_tests.api_client_path
|
||||
print("Running from api_config: %s" % test_path)
|
||||
if not os.path.exists(test_path):
|
||||
print("path for config does not exist: '%s' % test_path")
|
||||
# TODO : is there a cleaner way to exit the unit testing?
|
||||
sys.exit(1)
|
||||
self.apc = apc.AirtimeApiClient(config_path=test_path)
|
||||
self.apc.register_component("api-client-tester")
|
||||
# All of the following requests should error out in some way
|
||||
self.bad_requests = [
|
||||
{ 'mode' : 'foo', 'is_record' : 0 },
|
||||
{ 'mode' : 'bar', 'is_record' : 1 },
|
||||
{ 'no_mode' : 'at_all' }, ]
|
||||
|
||||
def test_bad_requests(self):
|
||||
responses = self.apc.send_media_monitor_requests(self.bad_requests, dry=True)
|
||||
for response in responses:
|
||||
self.assertTrue( 'key' in response )
|
||||
self.assertTrue( 'error' in response )
|
||||
print( "Response: '%s'" % response )
|
||||
|
||||
# We don't actually test any well formed requests because it is more
|
||||
# involved
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
||||
|
||||
|
24
python_apps/media-monitor/mm2/tests/test_config.cfg
Normal file
24
python_apps/media-monitor/mm2/tests/test_config.cfg
Normal file
|
@ -0,0 +1,24 @@
|
|||
api_client = 'airtime'
|
||||
|
||||
# where the binary files live
|
||||
bin_dir = '/usr/lib/airtime/media-monitor'
|
||||
|
||||
# where the logging files live
|
||||
log_dir = '/var/log/airtime/media-monitor'
|
||||
|
||||
|
||||
############################################
|
||||
# RabbitMQ settings #
|
||||
############################################
|
||||
rabbitmq_host = 'localhost'
|
||||
rabbitmq_user = 'guest'
|
||||
rabbitmq_password = 'guest'
|
||||
rabbitmq_vhost = '/'
|
||||
|
||||
############################################
|
||||
# Media-Monitor preferences #
|
||||
############################################
|
||||
check_filesystem_events = '5'
|
||||
check_airtime_events = '30'
|
||||
|
||||
list_value_testing = 'val1', 'val2', 'val3'
|
28
python_apps/media-monitor/mm2/tests/test_config.py
Normal file
28
python_apps/media-monitor/mm2/tests/test_config.py
Normal file
|
@ -0,0 +1,28 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import unittest
|
||||
import pprint
|
||||
|
||||
from media.monitor.config import MMConfig
|
||||
from media.monitor.exceptions import NoConfigFile, ConfigAccessViolation
|
||||
|
||||
pp = pprint.PrettyPrinter(indent=4)
|
||||
|
||||
class TestMMConfig(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.real_config = MMConfig("./test_config.cfg")
|
||||
#pp.pprint(self.real_config.cfg.dict)
|
||||
|
||||
def test_bad_config(self):
|
||||
self.assertRaises( NoConfigFile, lambda : MMConfig("/fake/stuff/here") )
|
||||
|
||||
def test_no_set(self):
|
||||
def myf(): self.real_config['bad'] = 'change'
|
||||
self.assertRaises( ConfigAccessViolation, myf )
|
||||
|
||||
def test_copying(self):
|
||||
k = 'list_value_testing'
|
||||
mycopy = self.real_config[k]
|
||||
mycopy.append("another element")
|
||||
self.assertTrue( len(mycopy) , len(self.real_config[k]) + 1 )
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
31
python_apps/media-monitor/mm2/tests/test_emf.py
Normal file
31
python_apps/media-monitor/mm2/tests/test_emf.py
Normal file
|
@ -0,0 +1,31 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import unittest
|
||||
#from pprint import pprint as pp
|
||||
|
||||
from media.metadata.process import global_reader
|
||||
from media.monitor.metadata import Metadata
|
||||
|
||||
import media.metadata.definitions as defs
|
||||
defs.load_definitions()
|
||||
|
||||
class TestMMP(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.maxDiff = None
|
||||
|
||||
def metadatas(self,f):
|
||||
return global_reader.read_mutagen(f), Metadata(f).extract()
|
||||
|
||||
def test_old_metadata(self):
|
||||
path = "/home/rudi/music/Nightingale.mp3"
|
||||
m = global_reader.read_mutagen(path)
|
||||
self.assertTrue( len(m) > 0 )
|
||||
n = Metadata(path)
|
||||
self.assertEqual(n.extract(), m)
|
||||
|
||||
def test_recorded(self):
|
||||
recorded_file = "./15:15:00-Untitled Show-256kbps.ogg"
|
||||
emf, old = self.metadatas(recorded_file)
|
||||
self.assertEqual(emf, old)
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
58
python_apps/media-monitor/mm2/tests/test_eventcontractor.py
Normal file
58
python_apps/media-monitor/mm2/tests/test_eventcontractor.py
Normal file
|
@ -0,0 +1,58 @@
|
|||
import unittest
|
||||
from media.monitor.eventcontractor import EventContractor
|
||||
#from media.monitor.exceptions import BadSongFile
|
||||
from media.monitor.events import FakePyinotify, NewFile, MoveFile, \
|
||||
DeleteFile
|
||||
|
||||
class TestMMP(unittest.TestCase):
|
||||
def test_event_registered(self):
|
||||
ev = EventContractor()
|
||||
e1 = NewFile( FakePyinotify('bull.mp3') ).proxify()
|
||||
e2 = MoveFile( FakePyinotify('bull.mp3') ).proxify()
|
||||
ev.register(e1)
|
||||
self.assertTrue( ev.event_registered(e2) )
|
||||
|
||||
def test_get_old_event(self):
|
||||
ev = EventContractor()
|
||||
e1 = NewFile( FakePyinotify('bull.mp3') ).proxify()
|
||||
e2 = MoveFile( FakePyinotify('bull.mp3') ).proxify()
|
||||
ev.register(e1)
|
||||
self.assertEqual( ev.get_old_event(e2), e1 )
|
||||
|
||||
def test_register(self):
|
||||
ev = EventContractor()
|
||||
e1 = NewFile( FakePyinotify('bull.mp3') ).proxify()
|
||||
e2 = DeleteFile( FakePyinotify('bull.mp3') ).proxify()
|
||||
self.assertTrue( ev.register(e1) )
|
||||
|
||||
self.assertFalse( ev.register(e2) )
|
||||
|
||||
self.assertEqual( len(ev.store.keys()), 1 )
|
||||
|
||||
delete_ev = e1.safe_pack()[0]
|
||||
self.assertEqual( delete_ev['mode'], u'delete')
|
||||
self.assertEqual( len(ev.store.keys()), 0 )
|
||||
|
||||
e3 = DeleteFile( FakePyinotify('horse.mp3') ).proxify()
|
||||
self.assertTrue( ev.register(e3) )
|
||||
self.assertTrue( ev.register(e2) )
|
||||
|
||||
|
||||
def test_register2(self):
|
||||
ev = EventContractor()
|
||||
p = 'bull.mp3'
|
||||
events = [
|
||||
NewFile( FakePyinotify(p) ),
|
||||
NewFile( FakePyinotify(p) ),
|
||||
DeleteFile( FakePyinotify(p) ),
|
||||
NewFile( FakePyinotify(p) ),
|
||||
NewFile( FakePyinotify(p) ), ]
|
||||
events = map(lambda x: x.proxify(), events)
|
||||
actual_events = []
|
||||
for e in events:
|
||||
if ev.register(e):
|
||||
actual_events.append(e)
|
||||
self.assertEqual( len(ev.store.keys()), 1 )
|
||||
#packed = [ x.safe_pack() for x in actual_events ]
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
21
python_apps/media-monitor/mm2/tests/test_instance.py
Normal file
21
python_apps/media-monitor/mm2/tests/test_instance.py
Normal file
|
@ -0,0 +1,21 @@
|
|||
import unittest
|
||||
from copy import deepcopy
|
||||
from media.saas.airtimeinstance import AirtimeInstance, NoConfigFile
|
||||
|
||||
class TestAirtimeInstance(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.cfg = {
|
||||
'api_client' : 'tests/test_instance.py',
|
||||
'media_monitor' : 'tests/test_instance.py',
|
||||
'logging' : 'tests/test_instance.py',
|
||||
}
|
||||
|
||||
def test_init_good(self):
|
||||
AirtimeInstance("/root", self.cfg)
|
||||
self.assertTrue(True)
|
||||
|
||||
def test_init_bad(self):
|
||||
cfg = deepcopy(self.cfg)
|
||||
cfg['api_client'] = 'bs'
|
||||
with self.assertRaises(NoConfigFile):
|
||||
AirtimeInstance("/root", cfg)
|
77
python_apps/media-monitor/mm2/tests/test_listeners.py
Normal file
77
python_apps/media-monitor/mm2/tests/test_listeners.py
Normal file
|
@ -0,0 +1,77 @@
|
|||
import os, shutil
|
||||
import time
|
||||
import pyinotify
|
||||
import unittest
|
||||
from pydispatch import dispatcher
|
||||
|
||||
from media.monitor.listeners import OrganizeListener
|
||||
from media.monitor.events import OrganizeFile
|
||||
|
||||
from os.path import join, normpath, abspath
|
||||
|
||||
def create_file(p):
|
||||
with open(p, 'w') as f: f.write(" ")
|
||||
|
||||
class TestOrganizeListener(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.organize_path = 'test_o'
|
||||
self.sig = 'org'
|
||||
def my_abs_path(x):
|
||||
return normpath(join(os.getcwd(), x))
|
||||
self.sample_files = [ my_abs_path(join(self.organize_path, f))
|
||||
for f in [ "gogi.mp3",
|
||||
"gio.mp3",
|
||||
"mimino.ogg" ] ]
|
||||
os.mkdir(self.organize_path)
|
||||
|
||||
def test_flush_events(self):
|
||||
org = self.create_org()
|
||||
self.create_sample_files()
|
||||
received = [0]
|
||||
def pass_event(sender, event):
|
||||
if isinstance(event, OrganizeFile):
|
||||
received[0] += 1
|
||||
self.assertTrue( abspath(event.path) in self.sample_files )
|
||||
dispatcher.connect(pass_event, signal=self.sig, sender=dispatcher.Any,
|
||||
weak=True)
|
||||
org.flush_events( self.organize_path )
|
||||
self.assertEqual( received[0], len(self.sample_files) )
|
||||
self.delete_sample_files()
|
||||
|
||||
def test_process(self):
|
||||
org = self.create_org()
|
||||
received = [0]
|
||||
def pass_event(sender, event):
|
||||
if isinstance(event, OrganizeFile):
|
||||
self.assertTrue( event.path in self.sample_files )
|
||||
received[0] += 1
|
||||
dispatcher.connect(pass_event, signal=self.sig, sender=dispatcher.Any,
|
||||
weak=True)
|
||||
wm = pyinotify.WatchManager()
|
||||
def stopper(notifier):
|
||||
return received[0] == len(self.sample_files)
|
||||
tn = pyinotify.ThreadedNotifier(wm, default_proc_fun=org)
|
||||
tn.daemon = True
|
||||
tn.start()
|
||||
wm.add_watch(self.organize_path, pyinotify.ALL_EVENTS, rec=True,
|
||||
auto_add=True)
|
||||
time.sleep(0.5)
|
||||
self.create_sample_files()
|
||||
time.sleep(1)
|
||||
self.assertEqual( len(self.sample_files), received[0] )
|
||||
self.delete_sample_files()
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.organize_path)
|
||||
|
||||
def create_sample_files(self):
|
||||
for f in self.sample_files: create_file(f)
|
||||
|
||||
def delete_sample_files(self):
|
||||
for f in self.sample_files: os.remove(f)
|
||||
|
||||
def create_org(self):
|
||||
return OrganizeListener( signal=self.sig )
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
||||
|
41
python_apps/media-monitor/mm2/tests/test_manager.py
Normal file
41
python_apps/media-monitor/mm2/tests/test_manager.py
Normal file
|
@ -0,0 +1,41 @@
|
|||
import unittest
|
||||
from media.monitor.manager import Manager
|
||||
|
||||
def add_paths(m,paths):
|
||||
for path in paths:
|
||||
m.add_watch_directory(path)
|
||||
|
||||
class TestManager(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.opath = "/home/rudi/Airtime/python_apps/media-monitor2/tests/"
|
||||
self.ppath = "/home/rudi/Airtime/python_apps/media-monitor2/media/"
|
||||
self.paths = [self.opath, self.ppath]
|
||||
|
||||
def test_init(self):
|
||||
man = Manager()
|
||||
self.assertTrue( len(man.watched_directories) == 0 )
|
||||
self.assertTrue( man.watch_channel is not None )
|
||||
self.assertTrue( man.organize_channel is not None )
|
||||
|
||||
def test_organize_path(self):
|
||||
man = Manager()
|
||||
man.set_organize_path( self.opath )
|
||||
self.assertEqual( man.get_organize_path(), self.opath )
|
||||
man.set_organize_path( self.ppath )
|
||||
self.assertEqual( man.get_organize_path(), self.ppath )
|
||||
|
||||
def test_add_watch_directory(self):
|
||||
man = Manager()
|
||||
add_paths(man, self.paths)
|
||||
for path in self.paths:
|
||||
self.assertTrue( man.has_watch(path) )
|
||||
|
||||
def test_remove_watch_directory(self):
|
||||
man = Manager()
|
||||
add_paths(man, self.paths)
|
||||
for path in self.paths:
|
||||
self.assertTrue( man.has_watch(path) )
|
||||
man.remove_watch_directory( path )
|
||||
self.assertTrue( not man.has_watch(path) )
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
44
python_apps/media-monitor/mm2/tests/test_metadata.py
Normal file
44
python_apps/media-monitor/mm2/tests/test_metadata.py
Normal file
|
@ -0,0 +1,44 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import os
|
||||
import unittest
|
||||
import sys
|
||||
import media.monitor.metadata as mmm
|
||||
|
||||
class TestMetadata(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.music_folder = u'/home/rudi/music'
|
||||
|
||||
def test_got_music_folder(self):
|
||||
t = os.path.exists(self.music_folder)
|
||||
self.assertTrue(t)
|
||||
if not t:
|
||||
print("'%s' must exist for this test to run." % self.music_folder )
|
||||
sys.exit(1)
|
||||
|
||||
def test_metadata(self):
|
||||
full_paths = (os.path.join(self.music_folder,filename) for filename in os.listdir(self.music_folder))
|
||||
i = 0
|
||||
for full_path in full_paths:
|
||||
if os.path.isfile(full_path):
|
||||
md_full = mmm.Metadata(full_path)
|
||||
md = md_full.extract()
|
||||
if i < 3:
|
||||
i += 1
|
||||
print("Sample metadata: '%s'" % md)
|
||||
self.assertTrue( len( md.keys() ) > 0 )
|
||||
utf8 = md_full.utf8()
|
||||
for k,v in md.iteritems():
|
||||
if hasattr(utf8[k], 'decode'):
|
||||
self.assertEqual( utf8[k].decode('utf-8'), md[k] )
|
||||
else: print("Skipping '%s' because it's a directory" % full_path)
|
||||
|
||||
def test_airtime_mutagen_dict(self):
|
||||
for muta,airtime in mmm.mutagen2airtime.iteritems():
|
||||
self.assertEqual( mmm.airtime2mutagen[airtime], muta )
|
||||
|
||||
def test_format_length(self):
|
||||
# TODO : add some real tests for this function
|
||||
x1 = 123456
|
||||
print("Formatting '%s' to '%s'" % (x1, mmm.format_length(x1)))
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
44
python_apps/media-monitor/mm2/tests/test_metadata_def.py
Normal file
44
python_apps/media-monitor/mm2/tests/test_metadata_def.py
Normal file
|
@ -0,0 +1,44 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import unittest
|
||||
|
||||
import media.metadata.process as md
|
||||
|
||||
class TestMetadataDef(unittest.TestCase):
|
||||
def test_simple(self):
|
||||
|
||||
with md.metadata('MDATA_TESTING') as t:
|
||||
t.optional(True)
|
||||
t.depends('ONE','TWO')
|
||||
t.default('unknown')
|
||||
t.translate(lambda kw: kw['ONE'] + kw['TWO'])
|
||||
|
||||
h = { 'ONE' : "testing", 'TWO' : "123" }
|
||||
result = md.global_reader.read('test_path',h)
|
||||
self.assertTrue( 'MDATA_TESTING' in result )
|
||||
self.assertEqual( result['MDATA_TESTING'], 'testing123' )
|
||||
h1 = { 'ONE' : 'big testing', 'two' : 'nothing' }
|
||||
result1 = md.global_reader.read('bs path', h1)
|
||||
self.assertEqual( result1['MDATA_TESTING'], 'unknown' )
|
||||
|
||||
def test_topo(self):
|
||||
with md.metadata('MDATA_TESTING') as t:
|
||||
t.depends('shen','sheni')
|
||||
t.default('megitzda')
|
||||
t.translate(lambda kw: kw['shen'] + kw['sheni'])
|
||||
|
||||
with md.metadata('shen') as t:
|
||||
t.default('vaxo')
|
||||
|
||||
with md.metadata('sheni') as t:
|
||||
t.default('gio')
|
||||
|
||||
with md.metadata('vaxo') as t:
|
||||
t.depends('shevetsi')
|
||||
|
||||
v = md.global_reader.read('bs mang', {})
|
||||
self.assertEqual(v['MDATA_TESTING'], 'vaxogio')
|
||||
self.assertTrue( 'vaxo' not in v )
|
||||
|
||||
md.global_reader.clear()
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
64
python_apps/media-monitor/mm2/tests/test_notifier.py
Normal file
64
python_apps/media-monitor/mm2/tests/test_notifier.py
Normal file
|
@ -0,0 +1,64 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import unittest
|
||||
import json
|
||||
|
||||
from media.monitor.airtime import AirtimeNotifier, AirtimeMessageReceiver
|
||||
from mock import patch, Mock
|
||||
from media.monitor.config import MMConfig
|
||||
|
||||
from media.monitor.manager import Manager
|
||||
|
||||
def filter_ev(d): return { i : j for i,j in d.iteritems() if i != 'event_type' }
|
||||
|
||||
class TestReceiver(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# TODO : properly mock this later
|
||||
cfg = {}
|
||||
self.amr = AirtimeMessageReceiver(cfg, Manager())
|
||||
|
||||
def test_supported(self):
|
||||
# Every supported message should fire something
|
||||
for event_type in self.amr.dispatch_table.keys():
|
||||
msg = { 'event_type' : event_type, 'extra_param' : 123 }
|
||||
filtered = filter_ev(msg)
|
||||
# There should be a better way to test the following without
|
||||
# patching private methods
|
||||
with patch.object(self.amr, '_execute_message') as mock_method:
|
||||
mock_method.side_effect = None
|
||||
ret = self.amr.message(msg)
|
||||
self.assertTrue(ret)
|
||||
mock_method.assert_called_with(event_type, filtered)
|
||||
|
||||
def test_no_mod_message(self):
|
||||
ev = { 'event_type' : 'new_watch', 'directory' : 'something here' }
|
||||
filtered = filter_ev(ev)
|
||||
with patch.object(self.amr, '_execute_message') as mock_method:
|
||||
mock_method.return_value = "tested"
|
||||
ret = self.amr.message(ev)
|
||||
self.assertTrue( ret ) # message passing worked
|
||||
mock_method.assert_called_with(ev['event_type'], filtered)
|
||||
# test that our copy of the message does not get modified
|
||||
self.assertTrue( 'event_type' in ev )
|
||||
|
||||
class TestAirtimeNotifier(unittest.TestCase):
|
||||
def test_handle_message(self):
|
||||
#from configobj import ConfigObj
|
||||
test_cfg = MMConfig('./test_config.cfg')
|
||||
ran = [False]
|
||||
class MockReceiver(object):
|
||||
def message(me,m):
|
||||
self.assertTrue( 'event_type' in m )
|
||||
self.assertEqual( m['path'], '/bs/path' )
|
||||
ran[0] = True
|
||||
airtime = AirtimeNotifier(cfg=test_cfg, message_receiver=MockReceiver())
|
||||
m1 = Mock()
|
||||
m1.ack = "ack'd message"
|
||||
m2 = Mock()
|
||||
m2.body = json.dumps({ 'event_type' : 'file_delete', 'path' : '/bs/path' })
|
||||
airtime.handle_message(body=m1,message=m2)
|
||||
self.assertTrue( ran[0] )
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
36
python_apps/media-monitor/mm2/tests/test_owners.py
Normal file
36
python_apps/media-monitor/mm2/tests/test_owners.py
Normal file
|
@ -0,0 +1,36 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import unittest
|
||||
from media.monitor import owners
|
||||
|
||||
class TestMMP(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.f = "test.mp3"
|
||||
|
||||
def test_has_owner(self):
|
||||
owners.reset_owners()
|
||||
o = 12345
|
||||
self.assertTrue( owners.add_file_owner(self.f,o) )
|
||||
self.assertTrue( owners.has_owner(self.f) )
|
||||
|
||||
def test_add_file_owner(self):
|
||||
owners.reset_owners()
|
||||
self.assertFalse( owners.add_file_owner('testing', -1) )
|
||||
self.assertTrue( owners.add_file_owner(self.f, 123) )
|
||||
self.assertTrue( owners.add_file_owner(self.f, 123) )
|
||||
self.assertTrue( owners.add_file_owner(self.f, 456) )
|
||||
|
||||
def test_remove_file_owner(self):
|
||||
owners.reset_owners()
|
||||
self.assertTrue( owners.add_file_owner(self.f, 123) )
|
||||
self.assertTrue( owners.remove_file_owner(self.f) )
|
||||
self.assertFalse( owners.remove_file_owner(self.f) )
|
||||
|
||||
def test_get_owner(self):
|
||||
owners.reset_owners()
|
||||
self.assertTrue( owners.add_file_owner(self.f, 123) )
|
||||
self.assertEqual( owners.get_owner(self.f), 123, "file is owned" )
|
||||
self.assertEqual( owners.get_owner("random_stuff.txt"), -1,
|
||||
"file is not owned" )
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
||||
|
75
python_apps/media-monitor/mm2/tests/test_pure.py
Normal file
75
python_apps/media-monitor/mm2/tests/test_pure.py
Normal file
|
@ -0,0 +1,75 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import unittest
|
||||
import os
|
||||
import media.monitor.pure as mmp
|
||||
|
||||
class TestMMP(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.md1 = {'MDATA_KEY_MD5': '71185323c2ab0179460546a9d0690107',
|
||||
'MDATA_KEY_FTYPE': 'audioclip',
|
||||
'MDATA_KEY_MIME': 'audio/vorbis',
|
||||
'MDATA_KEY_DURATION': '0:0:25.000687',
|
||||
'MDATA_KEY_SAMPLERATE': 48000,
|
||||
'MDATA_KEY_BITRATE': 64000,
|
||||
'MDATA_KEY_REPLAYGAIN': 0,
|
||||
'MDATA_KEY_TITLE': u'ACDC_-_Back_In_Black-sample-64kbps'}
|
||||
|
||||
def test_apply_rules(self):
|
||||
sample_dict = {
|
||||
'key' : 'val',
|
||||
'test' : 'IT',
|
||||
}
|
||||
rules = {
|
||||
'key' : lambda x : x.upper(),
|
||||
'test' : lambda y : y.lower()
|
||||
}
|
||||
sample_dict = mmp.apply_rules_dict(sample_dict, rules)
|
||||
self.assertEqual(sample_dict['key'], 'VAL')
|
||||
self.assertEqual(sample_dict['test'], 'it')
|
||||
|
||||
def test_default_to(self):
|
||||
sd = { }
|
||||
def_keys = ['one','two','three']
|
||||
sd = mmp.default_to(dictionary=sd, keys=def_keys, default='DEF')
|
||||
for k in def_keys: self.assertEqual( sd[k], 'DEF' )
|
||||
|
||||
def test_file_md5(self):
|
||||
p = os.path.realpath(__file__)
|
||||
m1 = mmp.file_md5(p)
|
||||
m2 = mmp.file_md5(p,10)
|
||||
self.assertTrue( m1 != m2 )
|
||||
self.assertRaises( ValueError, lambda : mmp.file_md5('/file/path') )
|
||||
self.assertTrue( m1 == mmp.file_md5(p) )
|
||||
|
||||
def test_sub_path(self):
|
||||
f1 = "/home/testing/123.mp3"
|
||||
d1 = "/home/testing"
|
||||
d2 = "/home/testing/"
|
||||
self.assertTrue( mmp.sub_path(d1, f1) )
|
||||
self.assertTrue( mmp.sub_path(d2, f1) )
|
||||
|
||||
def test_parse_int(self):
|
||||
self.assertEqual( mmp.parse_int("123"), "123" )
|
||||
self.assertEqual( mmp.parse_int("123asf"), "123" )
|
||||
self.assertEqual( mmp.parse_int("asdf"), None )
|
||||
|
||||
def test_truncate_to_length(self):
|
||||
s1 = "testing with non string literal"
|
||||
s2 = u"testing with unicode literal"
|
||||
self.assertEqual( len(mmp.truncate_to_length(s1, 5)), 5)
|
||||
self.assertEqual( len(mmp.truncate_to_length(s2, 8)), 8)
|
||||
|
||||
|
||||
def test_owner_id(self):
|
||||
start_path = "testing.mp3"
|
||||
id_path = "testing.mp3.identifier"
|
||||
o_id = 123
|
||||
f = open(id_path, 'w')
|
||||
f.write("123")
|
||||
f.close()
|
||||
possible_id = mmp.owner_id(start_path)
|
||||
self.assertFalse( os.path.exists(id_path) )
|
||||
self.assertEqual( possible_id, o_id )
|
||||
self.assertEqual( -1, mmp.owner_id("something.random") )
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
48
python_apps/media-monitor/mm2/tests/test_requestsync.py
Normal file
48
python_apps/media-monitor/mm2/tests/test_requestsync.py
Normal file
|
@ -0,0 +1,48 @@
|
|||
import unittest
|
||||
from mock import MagicMock
|
||||
|
||||
from media.monitor.request import RequestSync
|
||||
|
||||
class TestRequestSync(unittest.TestCase):
|
||||
|
||||
def apc_mock(self):
|
||||
fake_apc = MagicMock()
|
||||
fake_apc.send_media_monitor_requests = MagicMock()
|
||||
return fake_apc
|
||||
|
||||
def watcher_mock(self):
|
||||
fake_watcher = MagicMock()
|
||||
fake_watcher.flag_done = MagicMock()
|
||||
return fake_watcher
|
||||
|
||||
def request_mock(self):
|
||||
fake_request = MagicMock()
|
||||
fake_request.safe_pack = MagicMock(return_value=[])
|
||||
return fake_request
|
||||
|
||||
def test_send_media_monitor(self):
|
||||
fake_apc = self.apc_mock()
|
||||
fake_requests = [ self.request_mock() for x in range(1,5) ]
|
||||
fake_watcher = self.watcher_mock()
|
||||
rs = RequestSync(fake_watcher, fake_requests, fake_apc)
|
||||
rs.run_request()
|
||||
self.assertEquals(fake_apc.send_media_monitor_requests.call_count, 1)
|
||||
|
||||
def test_flag_done(self):
|
||||
fake_apc = self.apc_mock()
|
||||
fake_requests = [ self.request_mock() for x in range(1,5) ]
|
||||
fake_watcher = self.watcher_mock()
|
||||
rs = RequestSync(fake_watcher, fake_requests, fake_apc)
|
||||
rs.run_request()
|
||||
self.assertEquals(fake_watcher.flag_done.call_count, 1)
|
||||
|
||||
def test_safe_pack(self):
|
||||
fake_apc = self.apc_mock()
|
||||
fake_requests = [ self.request_mock() for x in range(1,5) ]
|
||||
fake_watcher = self.watcher_mock()
|
||||
rs = RequestSync(fake_watcher, fake_requests, fake_apc)
|
||||
rs.run_request()
|
||||
for req in fake_requests:
|
||||
self.assertEquals(req.safe_pack.call_count, 1)
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
35
python_apps/media-monitor/mm2/tests/test_syncdb.py
Normal file
35
python_apps/media-monitor/mm2/tests/test_syncdb.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import unittest
|
||||
import os
|
||||
from media.monitor.syncdb import AirtimeDB
|
||||
from media.monitor.log import get_logger
|
||||
from media.monitor.pure import partition
|
||||
import api_clients.api_client as ac
|
||||
import prepare_tests
|
||||
|
||||
class TestAirtimeDB(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.ac = ac.AirtimeApiClient(logger=get_logger(),
|
||||
config_path=prepare_tests.real_config)
|
||||
|
||||
def test_syncdb_init(self):
|
||||
sdb = AirtimeDB(self.ac)
|
||||
self.assertTrue( len(sdb.list_storable_paths()) > 0 )
|
||||
|
||||
def test_list(self):
|
||||
self.sdb = AirtimeDB(self.ac)
|
||||
for watch_dir in self.sdb.list_storable_paths():
|
||||
self.assertTrue( os.path.exists(watch_dir) )
|
||||
|
||||
def test_directory_get_files(self):
|
||||
sdb = AirtimeDB(self.ac)
|
||||
print(sdb.list_storable_paths())
|
||||
for wdir in sdb.list_storable_paths():
|
||||
files = sdb.directory_get_files(wdir)
|
||||
print( "total files: %d" % len(files) )
|
||||
self.assertTrue( len(files) >= 0 )
|
||||
self.assertTrue( isinstance(files, set) )
|
||||
exist, deleted = partition(os.path.exists, files)
|
||||
print("(exist, deleted) = (%d, %d)" % ( len(exist), len(deleted) ) )
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
64
python_apps/media-monitor/mm2/tests/test_thread.py
Normal file
64
python_apps/media-monitor/mm2/tests/test_thread.py
Normal file
|
@ -0,0 +1,64 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import unittest
|
||||
import time
|
||||
from media.saas.thread import InstanceThread, InstanceInheritingThread
|
||||
|
||||
# ugly but necessary for 2.7
|
||||
signal = False
|
||||
signal2 = False
|
||||
|
||||
class TestInstanceThread(unittest.TestCase):
|
||||
def test_user_inject(self):
|
||||
global signal
|
||||
signal = False
|
||||
u = "rudi"
|
||||
class T(InstanceThread):
|
||||
def run(me):
|
||||
global signal
|
||||
super(T, me).run()
|
||||
signal = True
|
||||
self.assertEquals(u, me.user())
|
||||
t = T(u, name="test_user_inject")
|
||||
t.daemon = True
|
||||
t.start()
|
||||
time.sleep(0.2)
|
||||
self.assertTrue(signal)
|
||||
|
||||
def test_inheriting_thread(utest):
|
||||
global signal2
|
||||
u = "testing..."
|
||||
|
||||
class TT(InstanceInheritingThread):
|
||||
def run(self):
|
||||
global signal2
|
||||
utest.assertEquals(self.user(), u)
|
||||
signal2 = True
|
||||
|
||||
class T(InstanceThread):
|
||||
def run(self):
|
||||
super(T, self).run()
|
||||
child_thread = TT(name="child thread")
|
||||
child_thread.daemon = True
|
||||
child_thread.start()
|
||||
|
||||
parent_thread = T(u, name="Parent instance thread")
|
||||
parent_thread.daemon = True
|
||||
parent_thread.start()
|
||||
|
||||
time.sleep(0.2)
|
||||
utest.assertTrue(signal2)
|
||||
|
||||
def test_different_user(utest):
|
||||
u1, u2 = "ru", "di"
|
||||
class T(InstanceThread):
|
||||
def run(self):
|
||||
super(T, self).run()
|
||||
|
||||
for u in [u1, u2]:
|
||||
t = T(u)
|
||||
t.daemon = True
|
||||
t.start()
|
||||
utest.assertEquals(t.user(), u)
|
||||
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
54
python_apps/media-monitor/mm2/tests/test_toucher.py
Normal file
54
python_apps/media-monitor/mm2/tests/test_toucher.py
Normal file
|
@ -0,0 +1,54 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import unittest
|
||||
import time
|
||||
import media.monitor.pure as mmp
|
||||
from media.monitor.toucher import Toucher, ToucherThread
|
||||
|
||||
class BaseTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.p = "api_client.cfg"
|
||||
|
||||
class TestToucher(BaseTest):
|
||||
def test_toucher(self):
|
||||
t1 = mmp.last_modified(self.p)
|
||||
t = Toucher(self.p)
|
||||
t()
|
||||
t2 = mmp.last_modified(self.p)
|
||||
print("(t1,t2) = (%d, %d) diff => %d" % (t1, t2, t2 - t1))
|
||||
self.assertTrue( t2 > t1 )
|
||||
|
||||
class TestToucherThread(BaseTest):
|
||||
def test_thread(self):
|
||||
t1 = mmp.last_modified(self.p)
|
||||
ToucherThread(self.p, interval=1)
|
||||
time.sleep(2)
|
||||
t2 = mmp.last_modified(self.p)
|
||||
print("(t1,t2) = (%d, %d) diff => %d" % (t1, t2, t2 - t1))
|
||||
self.assertTrue( t2 > t1 )
|
||||
|
||||
if __name__ == '__main__': unittest.main()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue