From 0b3c5199791f6f43a0b1f3f8b84bb5e58b5d52ee Mon Sep 17 00:00:00 2001 From: Naomi Aro Date: Mon, 23 Sep 2013 11:32:48 +0200 Subject: [PATCH] trying to fix master.. --- install_full/ubuntu/install_log.txt | 497 +++++++++++++++ python_apps/pypo/airtime-liquidsoap-init-d | 125 ++++ python_apps/pypo/airtime-playout-init-d | 83 +++ python_apps/pypo/eventtypes.py | 6 + python_apps/pypo/listenerstat.py | 160 +++++ python_apps/pypo/logging.cfg | 58 ++ python_apps/pypo/monit-airtime-liquidsoap.cfg | 17 + python_apps/pypo/monit-airtime-playout.cfg | 9 + .../pypo/monit-pre530-airtime-liquidsoap.cfg | 8 + python_apps/pypo/notify_logging.cfg | 28 + python_apps/pypo/pure.py | 17 + python_apps/pypo/pypo.cfg | 85 +++ python_apps/pypo/pypofetch.py | 592 ++++++++++++++++++ python_apps/pypo/pypofile.py | 146 +++++ python_apps/pypo/pypoliqqueue.py | 89 +++ python_apps/pypo/pypoliquidsoap.py | 237 +++++++ python_apps/pypo/pyponotify.py | 139 ++++ python_apps/pypo/recorder.py | 335 ++++++++++ python_apps/pypo/telnetliquidsoap.py | 310 +++++++++ python_apps/pypo/testpypoliqqueue.py | 98 +++ 20 files changed, 3039 insertions(+) create mode 100644 install_full/ubuntu/install_log.txt create mode 100755 python_apps/pypo/airtime-liquidsoap-init-d create mode 100755 python_apps/pypo/airtime-playout-init-d create mode 100644 python_apps/pypo/eventtypes.py create mode 100644 python_apps/pypo/listenerstat.py create mode 100644 python_apps/pypo/logging.cfg create mode 100644 python_apps/pypo/monit-airtime-liquidsoap.cfg create mode 100644 python_apps/pypo/monit-airtime-playout.cfg create mode 100644 python_apps/pypo/monit-pre530-airtime-liquidsoap.cfg create mode 100644 python_apps/pypo/notify_logging.cfg create mode 100644 python_apps/pypo/pure.py create mode 100644 python_apps/pypo/pypo.cfg create mode 100644 python_apps/pypo/pypofetch.py create mode 100644 python_apps/pypo/pypofile.py create mode 100644 python_apps/pypo/pypoliqqueue.py create mode 100644 python_apps/pypo/pypoliquidsoap.py create mode 100644 python_apps/pypo/pyponotify.py create mode 100644 python_apps/pypo/recorder.py create mode 100644 python_apps/pypo/telnetliquidsoap.py create mode 100644 python_apps/pypo/testpypoliqqueue.py diff --git a/install_full/ubuntu/install_log.txt b/install_full/ubuntu/install_log.txt new file mode 100644 index 000000000..7e196ecc8 --- /dev/null +++ b/install_full/ubuntu/install_log.txt @@ -0,0 +1,497 @@ +---------------------------------------------------- + 1. Install Packages +---------------------------------------------------- +deb http://apt.sourcefabric.org/ precise main +Hit http://dl.google.com stable Release.gpg +Hit http://dl.google.com stable Release.gpg +Hit http://dl.google.com stable Release +Get:1 http://security.ubuntu.com precise-security Release.gpg [198 B] +Get:2 http://extras.ubuntu.com precise Release.gpg [72 B] +Hit http://archive.canonical.com precise Release.gpg +Hit http://ppa.launchpad.net precise Release.gpg +Hit http://ppa.launchpad.net precise Release.gpg +Hit http://ppa.launchpad.net precise Release.gpg +Hit http://dl.google.com stable Release +Hit http://dl.google.com stable/main i386 Packages +Get:3 http://security.ubuntu.com precise-security Release [49.6 kB] +Hit http://extras.ubuntu.com precise Release +Hit http://archive.canonical.com precise Release +Ign http://dl.google.com stable/main TranslationIndex +Hit http://ppa.launchpad.net precise Release +Hit http://dl.google.com stable/main i386 Packages +Ign http://dl.google.com stable/main TranslationIndex +Hit http://extras.ubuntu.com precise/main Sources +Hit http://archive.canonical.com precise/partner i386 Packages +Hit http://ppa.launchpad.net precise Release +Ign http://archive.canonical.com precise/partner TranslationIndex +Hit http://ppa.launchpad.net precise Release +Hit http://extras.ubuntu.com precise/main i386 Packages +Ign http://extras.ubuntu.com precise/main TranslationIndex +Hit http://ppa.launchpad.net precise/main Sources +Hit http://ppa.launchpad.net precise/main i386 Packages +Ign http://ppa.launchpad.net precise/main TranslationIndex +Get:4 http://security.ubuntu.com precise-security/main Sources [81.4 kB] +Hit http://ppa.launchpad.net precise/main Sources +Hit http://ppa.launchpad.net precise/main i386 Packages +Ign http://ppa.launchpad.net precise/main TranslationIndex +Hit http://ppa.launchpad.net precise/main Sources +Hit http://ppa.launchpad.net precise/main i386 Packages +Ign http://ppa.launchpad.net precise/main TranslationIndex +Ign http://dl.google.com stable/main Translation-en_US +Ign http://dl.google.com stable/main Translation-en +Ign http://dl.google.com stable/main Translation-en_US +Ign http://dl.google.com stable/main Translation-en +Get:5 http://security.ubuntu.com precise-security/restricted Sources [2,494 B] +Get:6 http://security.ubuntu.com precise-security/universe Sources [26.3 kB] +Get:7 http://security.ubuntu.com precise-security/multiverse Sources [1,383 B] +Get:8 http://security.ubuntu.com precise-security/main i386 Packages [308 kB] +Ign http://extras.ubuntu.com precise/main Translation-en_US +Ign http://archive.canonical.com precise/partner Translation-en_US +Ign http://extras.ubuntu.com precise/main Translation-en +Ign http://archive.canonical.com precise/partner Translation-en +Get:9 http://security.ubuntu.com precise-security/restricted i386 Packages [4,620 B] +Get:10 http://security.ubuntu.com precise-security/universe i386 Packages [78.7 kB] +Ign http://ppa.launchpad.net precise/main Translation-en_US +Ign http://ppa.launchpad.net precise/main Translation-en +Ign http://ppa.launchpad.net precise/main Translation-en_US +Ign http://ppa.launchpad.net precise/main Translation-en +Ign http://ppa.launchpad.net precise/main Translation-en_US +Get:11 http://security.ubuntu.com precise-security/multiverse i386 Packages [2,367 B] +Hit http://security.ubuntu.com precise-security/main TranslationIndex +Hit http://security.ubuntu.com precise-security/multiverse TranslationIndex +Hit http://security.ubuntu.com precise-security/restricted TranslationIndex +Hit http://security.ubuntu.com precise-security/universe TranslationIndex +Ign http://ppa.launchpad.net precise/main Translation-en +Hit http://security.ubuntu.com precise-security/main Translation-en +Hit http://security.ubuntu.com precise-security/multiverse Translation-en +Hit http://security.ubuntu.com precise-security/restricted Translation-en +Hit http://security.ubuntu.com precise-security/universe Translation-en +Get:12 http://cran.ma.imperial.ac.uk precise/ Release.gpg [490 B] +Hit http://apt.sourcefabric.org precise Release.gpg +Hit http://cz.archive.ubuntu.com precise Release.gpg +Hit http://apt.sourcefabric.org precise Release +Get:13 http://cz.archive.ubuntu.com precise-updates Release.gpg [198 B] +Get:14 http://cran.ma.imperial.ac.uk precise/ Release [2,280 B] +Hit http://apt.sourcefabric.org precise/main i386 Packages +Hit http://cz.archive.ubuntu.com precise Release +Ign http://apt.sourcefabric.org precise/main TranslationIndex +Hit http://cran.ma.imperial.ac.uk precise/ Packages +Get:15 http://cz.archive.ubuntu.com precise-updates Release [49.6 kB] +Hit http://cz.archive.ubuntu.com precise/main Sources +Hit http://cz.archive.ubuntu.com precise/restricted Sources +Hit http://cz.archive.ubuntu.com precise/universe Sources +Hit http://cz.archive.ubuntu.com precise/multiverse Sources +Hit http://cz.archive.ubuntu.com precise/main i386 Packages +Hit http://cz.archive.ubuntu.com precise/restricted i386 Packages +Hit http://cz.archive.ubuntu.com precise/universe i386 Packages +Hit http://cz.archive.ubuntu.com precise/multiverse i386 Packages +Hit http://cz.archive.ubuntu.com precise/main TranslationIndex +Hit http://cz.archive.ubuntu.com precise/multiverse TranslationIndex +Hit http://cz.archive.ubuntu.com precise/restricted TranslationIndex +Hit http://cz.archive.ubuntu.com precise/universe TranslationIndex +Get:16 http://cz.archive.ubuntu.com precise-updates/main Sources [397 kB] +Ign http://apt.sourcefabric.org precise/main Translation-en_US +Ign http://apt.sourcefabric.org precise/main Translation-en +Get:17 http://cz.archive.ubuntu.com precise-updates/restricted Sources [5,467 B] +Get:18 http://cz.archive.ubuntu.com precise-updates/universe Sources [90.9 kB] +Get:19 http://cz.archive.ubuntu.com precise-updates/multiverse Sources [6,571 B] +Get:20 http://cz.archive.ubuntu.com precise-updates/main i386 Packages [661 kB] +Get:21 http://cz.archive.ubuntu.com precise-updates/restricted i386 Packages [10.0 kB] +Get:22 http://cz.archive.ubuntu.com precise-updates/universe i386 Packages [209 kB] +Get:23 http://cz.archive.ubuntu.com precise-updates/multiverse i386 Packages [13.8 kB] +Hit http://cz.archive.ubuntu.com precise-updates/main TranslationIndex +Hit http://cz.archive.ubuntu.com precise-updates/multiverse TranslationIndex +Hit http://cz.archive.ubuntu.com precise-updates/restricted TranslationIndex +Hit http://cz.archive.ubuntu.com precise-updates/universe TranslationIndex +Hit http://cz.archive.ubuntu.com precise/main Translation-en +Hit http://cz.archive.ubuntu.com precise/multiverse Translation-en +Hit http://cz.archive.ubuntu.com precise/restricted Translation-en +Ign http://cran.ma.imperial.ac.uk precise/ Translation-en_US +Hit http://cz.archive.ubuntu.com precise/universe Translation-en +Hit http://cz.archive.ubuntu.com precise-updates/main Translation-en +Hit http://cz.archive.ubuntu.com precise-updates/multiverse Translation-en +Hit http://cz.archive.ubuntu.com precise-updates/restricted Translation-en +Hit http://cz.archive.ubuntu.com precise-updates/universe Translation-en +Ign http://cran.ma.imperial.ac.uk precise/ Translation-en +Fetched 2,002 kB in 8s (246 kB/s) +Reading package lists... +Reading package lists... +Building dependency tree... +Reading state information... +flac is already the newest version. +gzip is already the newest version. +libesd0 is already the newest version. +libportaudio2 is already the newest version. +libsamplerate0 is already the newest version. +lsof is already the newest version. +patch is already the newest version. +pwgen is already the newest version. +rabbitmq-server is already the newest version. +tar is already the newest version. +vorbis-tools is already the newest version. +vorbisgain is already the newest version. +ecasound is already the newest version. +libao-ocaml is already the newest version. +libcamomile-ocaml-data is already the newest version. +libfaad2 is already the newest version. +libmad-ocaml is already the newest version. +libsoundtouch-ocaml is already the newest version. +libtaglib-ocaml is already the newest version. +monit is already the newest version. +mp3gain is already the newest version. +mpg123 is already the newest version. +multitail is already the newest version. +odbc-postgresql is already the newest version. +python-virtualenv is already the newest version. +curl is already the newest version. +libpulse0 is already the newest version. +lsb-release is already the newest version. +php-pear is already the newest version. +php5-curl is already the newest version. +php5-gd is already the newest version. +php5-pgsql is already the newest version. +postgresql is already the newest version. +sudo is already the newest version. +The following packages were automatically installed and are no longer required: + linux-headers-3.2.0-33-generic-pae linux-headers-3.2.0-33 libev4 + libv8-3.7.12.22 linux-headers-3.2.0-33-generic libc-ares2 +Use 'apt-get autoremove' to remove them. +The following extra packages will be installed: + python-minimal +Suggested packages: + python-doc +The following packages will be upgraded: + python python-minimal +2 upgraded, 0 newly installed, 0 to remove and 23 not upgraded. +Need to get 196 kB of archives. +After this operation, 0 B of additional disk space will be used. +Get:1 http://cz.archive.ubuntu.com/ubuntu/ precise-updates/main python-minimal i386 2.7.3-0ubuntu2.2 [29.2 kB] +Get:2 http://cz.archive.ubuntu.com/ubuntu/ precise-updates/main python i386 2.7.3-0ubuntu2.2 [166 kB] +Fetched 196 kB in 4s (40.4 kB/s) +(Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 354760 files and directories currently installed.) +Preparing to replace python-minimal 2.7.3-0ubuntu2 (using .../python-minimal_2.7.3-0ubuntu2.2_i386.deb) ... +Unpacking replacement python-minimal ... +Processing triggers for man-db ... +Setting up python-minimal (2.7.3-0ubuntu2.2) ... +(Reading database ... (Reading database ... 5% (Reading database ... 10% (Reading database ... 15% (Reading database ... 20% (Reading database ... 25% (Reading database ... 30% (Reading database ... 35% (Reading database ... 40% (Reading database ... 45% (Reading database ... 50% (Reading database ... 55% (Reading database ... 60% (Reading database ... 65% (Reading database ... 70% (Reading database ... 75% (Reading database ... 80% (Reading database ... 85% (Reading database ... 90% (Reading database ... 95% (Reading database ... 100% (Reading database ... 354760 files and directories currently installed.) +Preparing to replace python 2.7.3-0ubuntu2 (using .../python_2.7.3-0ubuntu2.2_i386.deb) ... +Unpacking replacement python ... +Processing triggers for man-db ... +Processing triggers for doc-base ... +Processing 1 changed doc-base file... +Registering documents with scrollkeeper... +Setting up python (2.7.3-0ubuntu2.2) ... +Reading package lists... +Building dependency tree... +Reading state information... +icecast2 is already the newest version. +lame is already the newest version. +libmp3lame-dev is already the newest version. +The following packages were automatically installed and are no longer required: + linux-headers-3.2.0-33-generic-pae linux-headers-3.2.0-33 libev4 + libv8-3.7.12.22 linux-headers-3.2.0-33-generic libc-ares2 +Use 'apt-get autoremove' to remove them. +0 upgraded, 0 newly installed, 0 to remove and 23 not upgraded. +Reading package lists... +Building dependency tree... +Reading state information... +libzend-framework-php is already the newest version. +The following packages were automatically installed and are no longer required: + linux-headers-3.2.0-33-generic-pae linux-headers-3.2.0-33 libev4 + libv8-3.7.12.22 linux-headers-3.2.0-33-generic libc-ares2 +Use 'apt-get autoremove' to remove them. +0 upgraded, 0 newly installed, 0 to remove and 23 not upgraded. +Reading package lists... +Building dependency tree... +Reading state information... +coreutils is already the newest version. +The following packages were automatically installed and are no longer required: + linux-headers-3.2.0-33-generic-pae linux-headers-3.2.0-33 libev4 + libv8-3.7.12.22 linux-headers-3.2.0-33-generic libc-ares2 +Use 'apt-get autoremove' to remove them. +0 upgraded, 0 newly installed, 0 to remove and 23 not upgraded. +Reading package lists... +Building dependency tree... +Reading state information... +libvo-aacenc0 is already the newest version. +The following packages were automatically installed and are no longer required: + linux-headers-3.2.0-33-generic-pae linux-headers-3.2.0-33 libev4 + libv8-3.7.12.22 linux-headers-3.2.0-33-generic libc-ares2 +Use 'apt-get autoremove' to remove them. +0 upgraded, 0 newly installed, 0 to remove and 23 not upgraded. +Reading package lists... +Building dependency tree... +Reading state information... +sourcefabric-keyring is already the newest version. +The following packages were automatically installed and are no longer required: + linux-headers-3.2.0-33-generic-pae linux-headers-3.2.0-33 libev4 + libv8-3.7.12.22 linux-headers-3.2.0-33-generic libc-ares2 +Use 'apt-get autoremove' to remove them. +0 upgraded, 0 newly installed, 0 to remove and 23 not upgraded. +Reading package lists... +Building dependency tree... +Reading state information... +liquidsoap is already the newest version. +The following packages were automatically installed and are no longer required: + linux-headers-3.2.0-33-generic-pae linux-headers-3.2.0-33 libev4 + libv8-3.7.12.22 linux-headers-3.2.0-33-generic libc-ares2 +Use 'apt-get autoremove' to remove them. +0 upgraded, 0 newly installed, 0 to remove and 23 not upgraded. +Reading package lists... +Building dependency tree... +Reading state information... +silan is already the newest version. +The following packages were automatically installed and are no longer required: + linux-headers-3.2.0-33-generic-pae linux-headers-3.2.0-33 libev4 + libv8-3.7.12.22 linux-headers-3.2.0-33-generic libc-ares2 +Use 'apt-get autoremove' to remove them. +0 upgraded, 0 newly installed, 0 to remove and 23 not upgraded. +Reading package lists... +Building dependency tree... +Reading state information... +libopus0 is already the newest version. +The following packages were automatically installed and are no longer required: + linux-headers-3.2.0-33-generic-pae linux-headers-3.2.0-33 libev4 + libv8-3.7.12.22 linux-headers-3.2.0-33-generic libc-ares2 +Use 'apt-get autoremove' to remove them. +0 upgraded, 0 newly installed, 0 to remove and 23 not upgraded. +Reading package lists... +Building dependency tree... +Reading state information... +apache2 is already the newest version. +libapache2-mod-php5 is already the newest version. +The following packages were automatically installed and are no longer required: + linux-headers-3.2.0-33-generic-pae linux-headers-3.2.0-33 libev4 + libv8-3.7.12.22 linux-headers-3.2.0-33-generic libc-ares2 +Use 'apt-get autoremove' to remove them. +0 upgraded, 0 newly installed, 0 to remove and 23 not upgraded. +---------------------------------------------------- +2. Apache Config File +---------------------------------------------------- +Apache config for Airtime already exists... +---------------------------------------------------- +3. Enable Icecast +---------------------------------------------------- +Starting icecast2: +---------------------------------------------------- +4. Enable Monit +---------------------------------------------------- +---------------------------------------------------- +5. Run Airtime Install +---------------------------------------------------- +* Making sure /etc/default/locale is set properly +LANG="en_US.UTF-8" + * None found. +* Temporarily stopping any previous running services + +******************************** Install Begin ********************************* +Ensuring python-virtualenv version > 1.4.8...Success! + +*** Creating Virtualenv for Airtime *** +Already using interpreter /usr/bin/python +New python executable in /usr/lib/airtime/airtime_virtualenv/bin/python +Installing setuptools............done. +Installing pip...............done. + +*** Installing Python Libraries *** +Unpacking /home/naomiaro/airtime/python_apps/python-virtualenv/airtime_virtual_env.pybundle +Downloading/unpacking pyinotify + Running setup.py egg_info for package pyinotify + +Downloading/unpacking mutagen + Running setup.py egg_info for package mutagen + +Downloading/unpacking kombu + Running setup.py egg_info for package kombu + +Downloading/unpacking anyjson + Running setup.py egg_info for package anyjson + +Downloading/unpacking amqplib + Running setup.py egg_info for package amqplib + +Requirement already satisfied (use --upgrade to upgrade): argparse in /usr/lib/python2.7 +Downloading/unpacking poster + Running setup.py egg_info for package poster + +Downloading/unpacking PyDispatcher + Running setup.py egg_info for package PyDispatcher + /usr/lib/python2.7/distutils/dist.py:267: UserWarning: Unknown distribution option: 'use_2to3' + warnings.warn(msg) + + warning: no previously-included files matching '*.bat' found anywhere in distribution + warning: no previously-included files matching './CVS' found anywhere in distribution + warning: no previously-included files matching '.cvsignore' found anywhere in distribution +Downloading/unpacking docopt + Running setup.py egg_info for package docopt + +Requirement already satisfied (use --upgrade to upgrade): wsgiref in /usr/lib/python2.7 +Downloading/unpacking pytz + Running setup.py egg_info for package pytz + + warning: no files found matching '*.pot' under directory 'pytz' + warning: no previously-included files found matching 'test_zdump.py' +Downloading/unpacking configobj + Running setup.py egg_info for package configobj + +Installing collected packages: pyinotify, mutagen, kombu, anyjson, amqplib, poster, PyDispatcher, docopt, pytz, configobj + Running setup.py install for pyinotify + + Running setup.py install for mutagen + changing mode of build/scripts-2.7/moggsplit from 644 to 755 + changing mode of build/scripts-2.7/mid3iconv from 644 to 755 + changing mode of build/scripts-2.7/mid3v2 from 644 to 755 + changing mode of build/scripts-2.7/mutagen-inspect from 644 to 755 + changing mode of build/scripts-2.7/mutagen-pony from 644 to 755 + + changing mode of /usr/lib/airtime/airtime_virtualenv/bin/moggsplit to 755 + changing mode of /usr/lib/airtime/airtime_virtualenv/bin/mid3iconv to 755 + changing mode of /usr/lib/airtime/airtime_virtualenv/bin/mid3v2 to 755 + changing mode of /usr/lib/airtime/airtime_virtualenv/bin/mutagen-inspect to 755 + changing mode of /usr/lib/airtime/airtime_virtualenv/bin/mutagen-pony to 755 + Running setup.py install for kombu + + Running setup.py install for anyjson + + Running setup.py install for amqplib + + Running setup.py install for poster + + Running setup.py install for PyDispatcher + /usr/lib/python2.7/distutils/dist.py:267: UserWarning: Unknown distribution option: 'use_2to3' + warnings.warn(msg) + + warning: no previously-included files matching '*.bat' found anywhere in distribution + warning: no previously-included files matching './CVS' found anywhere in distribution + warning: no previously-included files matching '.cvsignore' found anywhere in distribution + Running setup.py install for docopt + + Running setup.py install for pytz + + warning: no files found matching '*.pot' under directory 'pytz' + warning: no previously-included files found matching 'test_zdump.py' + Running setup.py install for configobj + +Successfully installed pyinotify mutagen kombu anyjson amqplib poster PyDispatcher docopt pytz configobj +Cleaning up... +* Checking for user pypo + * Creating user pypo +* Creating INI files +* Initializing INI files +* Airtime Version: 2.4.0 +* Storage directory setup +* Directory /srv/airtime/stor created +* Giving Apache permission to access /srv/airtime/stor +* Directory /srv/airtime/stor/organize created +* Giving Apache permission to access /srv/airtime/stor/organize +* Checking database for correct encoding + +* Database Installation + * Creating Airtime database user + * Database user 'airtime' created. + * Creating Airtime database + * Postgres scripting language already installed + * Creating database tables + * Setting Airtime version + * Inserting stor directory location /srv/airtime/stor/ into music_dirs table +Creating vhost "/airtime" ... +...done. +Creating user "airtime" ... +...done. +Setting permissions for user "airtime" in vhost "/airtime" ... +...done. +* Creating /etc/airtime +* Creating /etc/monit/conf.d/monit-airtime-generic.cfg +* Creating /etc/cron.d/airtime-crons +* Creating /usr/lib/airtime +* Creating symbolic links in /usr/bin +* Creating /var/log/airtime +* Creating /usr/share/airtime +* Creating /var/log/airtime +* Creating /var/tmp/airtime +Generating locales +Generating locales... + cs_CZ.UTF-8... done +Generation complete. +Generating locales... + de_AT.UTF-8... done +Generation complete. +Generating locales... + de_DE.UTF-8... done +Generation complete. +Generating locales... + el_GR.UTF-8... done +Generation complete. +Generating locales... + en_CA.UTF-8... done +Generation complete. +Generating locales... + en_GB.UTF-8... done +Generation complete. +Generating locales... + en_US.UTF-8... up-to-date +Generation complete. +Generating locales... + es_ES.UTF-8... done +Generation complete. +Generating locales... + fr_FR.UTF-8... done +Generation complete. +Generating locales... + hu_HU.UTF-8... done +Generation complete. +Generating locales... + it_IT.UTF-8... done +Generation complete. +Generating locales... + ko_KR.UTF-8... done +Generation complete. +Generating locales... + pl_PL.UTF-8... done +Generation complete. +Generating locales... + pt_BR.UTF-8... done +Generation complete. +Generating locales... + ru_RU.UTF-8... done +Generation complete. +Generating locales... + zh_CN.UTF-8... done +Generation complete. +Starting Airtime Media Monitor: Done. +* Waiting for media-monitor processes to start... +* Detecting OS: ... Found Ubuntu 12.04.2 LTS (precise) on i386 architecture + * Creating symlink to Liquidsoap binary +* Clearing previous pypo cache +* Waiting for pypo processes to start... + * Stopping daemon monitor monit + ...done. + * Starting daemon monitor monit + ...done. + +*** Verifying your system environment, running airtime-check-system *** +AIRTIME_STATUS_URL = http://localhost:80/api/status/format/json/api_key/%%api_key%% +AIRTIME_SERVER_RESPONDING = OK +KERNEL_VERSION = 3.2.0-34-generic-pae +MACHINE_ARCHITECTURE = i686 +TOTAL_MEMORY_MBYTES = 3982936 +TOTAL_SWAP_MBYTES = 4052988 +AIRTIME_VERSION = 2.4.0+c2e9f90:1371848298 +OS = Ubuntu 12.04.2 LTS i686 +CPU = Intel(R) Core(TM) i5 CPU M 480 @ 2.67GHz +WEB_SERVER = Apache/2.2.22 (Ubuntu) +PLAYOUT_ENGINE_PROCESS_ID = 10489 +PLAYOUT_ENGINE_RUNNING_SECONDS = 11 +PLAYOUT_ENGINE_MEM_PERC = 0.2% +PLAYOUT_ENGINE_CPU_PERC = 0.0% +LIQUIDSOAP_PROCESS_ID = 10456 +LIQUIDSOAP_RUNNING_SECONDS = 12 +LIQUIDSOAP_MEM_PERC = 0.3% +LIQUIDSOAP_CPU_PERC = 0.0% +MEDIA_MONITOR_PROCESS_ID = 10364 +MEDIA_MONITOR_RUNNING_SECONDS = 12 +MEDIA_MONITOR_MEM_PERC = 0.2% +MEDIA_MONITOR_CPU_PERC = 0.0% +-- Your installation of Airtime looks OK! + +******************************* Install Complete ******************************* diff --git a/python_apps/pypo/airtime-liquidsoap-init-d b/python_apps/pypo/airtime-liquidsoap-init-d new file mode 100755 index 000000000..f8b749c2b --- /dev/null +++ b/python_apps/pypo/airtime-liquidsoap-init-d @@ -0,0 +1,125 @@ +#!/bin/bash + +### BEGIN INIT INFO +# Provides: airtime-liquidsoap +# Required-Start: $local_fs $remote_fs $network $syslog $all +# Required-Stop: $local_fs $remote_fs $network $syslog +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: Liquidsoap daemon +### END INIT INFO + +USERID=pypo +GROUPID=pypo +NAME="Liquidsoap Playout Engine" + +DAEMON=/usr/lib/airtime/pypo/bin/airtime-liquidsoap +PIDFILE=/var/run/airtime-liquidsoap.pid +EXEC='/usr/bin/airtime-liquidsoap' + +start () { + mkdir -p /var/log/airtime/pypo-liquidsoap + chown $USERID:$GROUPID /var/log/airtime/pypo-liquidsoap + + chown $USERID:$GROUPID /etc/airtime + touch /etc/airtime/liquidsoap.cfg + chown $USERID:$GROUPID /etc/airtime/liquidsoap.cfg + + touch $PIDFILE + chown $USERID:$GROUPID $PIDFILE + + #start-stop-daemon --start --quiet --chuid $USERID:$GROUPID \ + #--pidfile $PIDFILE --nicelevel -15 --startas $DAEMON + start-stop-daemon --start --quiet --chuid $USERID:$GROUPID \ + --nicelevel -15 --startas $DAEMON --exec $EXEC +} + +stop () { + timeout --version >/dev/null 2>&1 + RESULT="$?" + + #send term signal after 10 seconds + if [ "$RESULT" = "0" ]; then + timeout -s9 10s /usr/lib/airtime/airtime_virtualenv/bin/python \ + /usr/lib/airtime/pypo/bin/liquidsoap_scripts/liquidsoap_prepare_terminate.py + else + #some earlier versions of Ubuntu (Lucid) had a different timeout + #command that takes different input parameters. + timeout 10 /usr/lib/airtime/airtime_virtualenv/bin/python \ + /usr/lib/airtime/pypo/bin/liquidsoap_scripts/liquidsoap_prepare_terminate.py + fi + # Send TERM after 5 seconds, wait at most 30 seconds. + #start-stop-daemon --stop --oknodo --retry=TERM/10/KILL/5 --quiet --pidfile $PIDFILE + start-stop-daemon --stop --oknodo --retry=TERM/10/KILL/5 --quiet --exec $EXEC + + rm -f $PIDFILE + sleep 2 +} + +start_with_monit () { + start + monit monitor airtime-liquidsoap >/dev/null 2>&1 +} + +stop_with_monit() { + monit unmonitor airtime-liquidsoap >/dev/null 2>&1 + stop +} + + + + +case "${1:-''}" in + 'stop') + echo "* Stopping Liquidsoap without notifying Monit process watchdog. If Monit is +* running it will automatically bring the Liquidsoap back up. You probably want +* 'stop-with-monit' instead of 'stop'." + echo -n "Stopping $NAME: " + stop + echo "Done." + ;; + 'start') + echo "* Starting $NAME without Monit process watchdog. To make sure Monit is watching +* Liquidsoap, use 'start-with-monit' instead of 'start'." + echo -n "Starting $NAME: " + start + echo "Done." + ;; + 'restart') + # restart commands here + echo -n "Restarting $NAME: " + stop + start + echo "Done." + ;; + + 'status') + if [ -f "$PIDFILE" ]; then + pid=`cat $PIDFILE` + if [ -d "/proc/$pid" ]; then + echo "$NAME is running" + exit 0 + fi + fi + echo "$NAME is not running" + exit 1 + ;; + 'start-with-monit') + # restart commands here + echo -n "Starting $NAME: " + start_with_monit + echo "Done." + ;; + 'stop-with-monit') + # restart commands here + echo -n "Stopping $NAME: " + stop_with_monit + echo "Done." + ;; + + *) # no parameter specified + echo "Usage: $SELF start|stop|restart|status" + exit 1 + ;; + +esac diff --git a/python_apps/pypo/airtime-playout-init-d b/python_apps/pypo/airtime-playout-init-d new file mode 100755 index 000000000..1d760d43a --- /dev/null +++ b/python_apps/pypo/airtime-playout-init-d @@ -0,0 +1,83 @@ +#!/bin/bash + +### BEGIN INIT INFO +# Provides: airtime-playout +# Required-Start: $local_fs $remote_fs $network $syslog $all +# Required-Stop: $local_fs $remote_fs $network $syslog +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: Manage airtime-playout daemon +### END INIT INFO + +USERID=root +NAME="Airtime Scheduler" + +DAEMON=/usr/lib/airtime/pypo/bin/airtime-playout +PIDFILE=/var/run/airtime-playout.pid + +start () { + mkdir -p /var/log/airtime/pypo + + start-stop-daemon --start --background --quiet --chuid $USERID:$USERID \ + --make-pidfile --pidfile $PIDFILE --startas $DAEMON +} + +stop () { + # Send TERM after 5 seconds, wait at most 30 seconds. + start-stop-daemon --stop --oknodo --retry TERM/5/0/30 --quiet --pidfile $PIDFILE + rm -f $PIDFILE +} + +start_with_monit() { + start + monit monitor airtime-playout >/dev/null 2>&1 +} + +stop_with_monit() { + monit unmonitor airtime-playout >/dev/null 2>&1 + stop +} + +case "${1:-''}" in + 'start') + # start commands here + echo -n "Starting $NAME: " + start + echo "Done." + ;; + 'stop') + # stop commands here + echo -n "Stopping $NAME: " + stop + echo "Done." + ;; + 'restart') + # restart commands here + echo -n "Restarting $NAME: " + stop + start + echo "Done." + ;; + 'start-with-monit') + # restart commands here + echo -n "Starting $NAME: " + start_with_monit + echo "Done." + ;; + 'stop-with-monit') + # restart commands here + echo -n "Stopping $NAME: " + stop_with_monit + echo "Done." + ;; + + 'status') + # status commands here + /usr/bin/airtime-check-system + ;; + *) # no parameter specified + echo "Usage: $SELF start|stop|restart|status" + exit 1 + ;; + +esac diff --git a/python_apps/pypo/eventtypes.py b/python_apps/pypo/eventtypes.py new file mode 100644 index 000000000..5f9c871db --- /dev/null +++ b/python_apps/pypo/eventtypes.py @@ -0,0 +1,6 @@ +FILE = "file" +EVENT = "event" +STREAM_BUFFER_START = "stream_buffer_start" +STREAM_OUTPUT_START = "stream_output_start" +STREAM_BUFFER_END = "stream_buffer_end" +STREAM_OUTPUT_END = "stream_output_end" diff --git a/python_apps/pypo/listenerstat.py b/python_apps/pypo/listenerstat.py new file mode 100644 index 000000000..10d9359fd --- /dev/null +++ b/python_apps/pypo/listenerstat.py @@ -0,0 +1,160 @@ +from threading import Thread +import urllib2 +import xml.dom.minidom +import base64 +from datetime import datetime +import traceback +import logging +import time + +from api_clients import api_client + +class ListenerStat(Thread): + def __init__(self, logger=None): + Thread.__init__(self) + self.api_client = api_client.AirtimeApiClient() + if logger is None: + self.logger = logging.getLogger() + else: + self.logger = logger + + def get_node_text(self, nodelist): + rc = [] + for node in nodelist: + if node.nodeType == node.TEXT_NODE: + rc.append(node.data) + return ''.join(rc) + + def get_stream_parameters(self): + #[{"user":"", "password":"", "url":"", "port":""},{},{}] + return self.api_client.get_stream_parameters() + + + def get_stream_server_xml(self, ip, url, is_shoutcast=False): + encoded = base64.b64encode("%(admin_user)s:%(admin_pass)s" % ip) + + header = {"Authorization":"Basic %s" % encoded} + + if is_shoutcast: + #user agent is required for shoutcast auth, otherwise it returns 404. + user_agent = "Mozilla/5.0 (Linux; rv:22.0) Gecko/20130405 Firefox/22.0" + header["User-Agent"] = user_agent + + req = urllib2.Request( + #assuming that the icecast stats path is /admin/stats.xml + #need to fix this + url=url, + headers=header) + + f = urllib2.urlopen(req) + document = f.read() + return document + + + def get_icecast_stats(self, ip): + url = 'http://%(host)s:%(port)s/admin/stats.xml' % ip + document = self.get_stream_server_xml(ip, url) + dom = xml.dom.minidom.parseString(document) + sources = dom.getElementsByTagName("source") + + mount_stats = None + for s in sources: + #drop the leading '/' character + mount_name = s.getAttribute("mount")[1:] + if mount_name == ip["mount"]: + timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + listeners = s.getElementsByTagName("listeners") + num_listeners = 0 + if len(listeners): + num_listeners = self.get_node_text(listeners[0].childNodes) + + mount_stats = {"timestamp":timestamp, \ + "num_listeners": num_listeners, \ + "mount_name": mount_name} + + return mount_stats + + def get_shoutcast_stats(self, ip): + url = 'http://%(host)s:%(port)s/admin.cgi?sid=1&mode=viewxml' % ip + document = self.get_stream_server_xml(ip, url, is_shoutcast=True) + dom = xml.dom.minidom.parseString(document) + current_listeners = dom.getElementsByTagName("CURRENTLISTENERS") + + timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + num_listeners = 0 + if len(current_listeners): + num_listeners = self.get_node_text(current_listeners[0].childNodes) + + mount_stats = {"timestamp":timestamp, \ + "num_listeners": num_listeners, \ + "mount_name": "shoutcast"} + + return mount_stats + + def get_stream_stats(self, stream_parameters): + stats = [] + + #iterate over stream_parameters which is a list of dicts. Each dict + #represents one Airtime stream (currently this limit is 3). + #Note that there can be optimizations done, since if all three + #streams are the same server, we will still initiate 3 separate + #connections + for k, v in stream_parameters.items(): + if v["enable"] == 'true': + try: + if v["output"] == "icecast": + mount_stats = self.get_icecast_stats(v) + if mount_stats: stats.append(mount_stats) + else: + stats.append(self.get_shoutcast_stats(v)) + self.update_listener_stat_error(v["mount"], 'OK') + except Exception, e: + try: + self.update_listener_stat_error(v["mount"], str(e)) + except Exception, e: + self.logger.error('Exception: %s', e) + + return stats + + def push_stream_stats(self, stats): + self.api_client.push_stream_stats(stats) + + def update_listener_stat_error(self, stream_id, error): + keyname = '%s_listener_stat_error' % stream_id + data = {keyname: error} + self.api_client.update_stream_setting_table(data) + + def run(self): + #Wake up every 120 seconds and gather icecast statistics. Note that we + #are currently querying the server every 2 minutes for list of + #mountpoints as well. We could remove this query if we hooked into + #rabbitmq events, and listened for these changes instead. + while True: + try: + stream_parameters = self.get_stream_parameters() + stats = self.get_stream_stats(stream_parameters["stream_params"]) + + if stats: + self.push_stream_stats(stats) + except Exception, e: + self.logger.error('Exception: %s', e) + + time.sleep(120) + + +if __name__ == "__main__": + # create logger + logger = logging.getLogger('std_out') + logger.setLevel(logging.DEBUG) + # create console handler and set level to debug + #ch = logging.StreamHandler() + #ch.setLevel(logging.DEBUG) + # create formatter + formatter = logging.Formatter('%(asctime)s - %(name)s - %(lineno)s - %(levelname)s - %(message)s') + # add formatter to ch + #ch.setFormatter(formatter) + # add ch to logger + #logger.addHandler(ch) + + ls = ListenerStat(logger) + ls.run() diff --git a/python_apps/pypo/logging.cfg b/python_apps/pypo/logging.cfg new file mode 100644 index 000000000..4c1ddd172 --- /dev/null +++ b/python_apps/pypo/logging.cfg @@ -0,0 +1,58 @@ +[loggers] +keys=root,fetch,push,recorder,message_h + +[handlers] +keys=pypo,recorder,message_h + +[formatters] +keys=simpleFormatter + +[logger_root] +level=DEBUG +handlers=pypo + +[logger_fetch] +level=DEBUG +handlers=pypo +qualname=fetch +propagate=0 + +[logger_push] +level=DEBUG +handlers=pypo +qualname=push +propagate=0 + +[logger_recorder] +level=DEBUG +handlers=recorder +qualname=recorder +propagate=0 + +[logger_message_h] +level=DEBUG +handlers=message_h +qualname=message_h +propagate=0 + +[handler_pypo] +class=logging.handlers.RotatingFileHandler +level=DEBUG +formatter=simpleFormatter +args=("/var/log/airtime/pypo/pypo.log", 'a', 5000000, 10,) + +[handler_recorder] +class=logging.handlers.RotatingFileHandler +level=DEBUG +formatter=simpleFormatter +args=("/var/log/airtime/pypo/show-recorder.log", 'a', 1000000, 5,) + +[handler_message_h] +class=logging.handlers.RotatingFileHandler +level=DEBUG +formatter=simpleFormatter +args=("/var/log/airtime/pypo/message-handler.log", 'a', 1000000, 5,) + +[formatter_simpleFormatter] +format=%(asctime)s %(levelname)s - [%(filename)s : %(funcName)s() : line %(lineno)d] - %(message)s +datefmt= diff --git a/python_apps/pypo/monit-airtime-liquidsoap.cfg b/python_apps/pypo/monit-airtime-liquidsoap.cfg new file mode 100644 index 000000000..ac5031c27 --- /dev/null +++ b/python_apps/pypo/monit-airtime-liquidsoap.cfg @@ -0,0 +1,17 @@ + set daemon 15 # Poll at 5 second intervals + set logfile /var/log/monit.log + + set httpd port 2812 + + check process airtime-liquidsoap matching "airtime-liquidsoap.*airtime.*ls_script" + if does not exist for 3 cycles then restart + + start program = "/etc/init.d/airtime-liquidsoap start" with timeout 30 seconds + stop program = "/etc/init.d/airtime-liquidsoap stop" + + if mem > 600 MB for 3 cycles then restart + if failed host localhost port 1234 + send "version\r\nexit\r\n" + expect "Liquidsoap" + with timeout 2 seconds retry 3 for 2 cycles + then restart diff --git a/python_apps/pypo/monit-airtime-playout.cfg b/python_apps/pypo/monit-airtime-playout.cfg new file mode 100644 index 000000000..453f4efec --- /dev/null +++ b/python_apps/pypo/monit-airtime-playout.cfg @@ -0,0 +1,9 @@ + set daemon 10 # Poll at 5 second intervals + set logfile /var/log/monit.log + + set httpd port 2812 + + check process airtime-playout + with pidfile "/var/run/airtime-playout.pid" + start program = "/etc/init.d/airtime-playout start" with timeout 5 seconds + stop program = "/etc/init.d/airtime-playout stop" diff --git a/python_apps/pypo/monit-pre530-airtime-liquidsoap.cfg b/python_apps/pypo/monit-pre530-airtime-liquidsoap.cfg new file mode 100644 index 000000000..4b2265f18 --- /dev/null +++ b/python_apps/pypo/monit-pre530-airtime-liquidsoap.cfg @@ -0,0 +1,8 @@ + set daemon 15 # Poll at 5 second intervals + set logfile /var/log/monit.log + + set httpd port 2812 + + check process airtime-liquidsoap with pidfile "/var/run/airtime-liquidsoap.pid" + start program = "/etc/init.d/airtime-liquidsoap start" with timeout 5 seconds + stop program = "/etc/init.d/airtime-liquidsoap stop" diff --git a/python_apps/pypo/notify_logging.cfg b/python_apps/pypo/notify_logging.cfg new file mode 100644 index 000000000..30eddc2f4 --- /dev/null +++ b/python_apps/pypo/notify_logging.cfg @@ -0,0 +1,28 @@ +[loggers] +keys=root,notify + +[handlers] +keys=notify + +[formatters] +keys=simpleFormatter + +[logger_root] +level=DEBUG +handlers=notify + +[logger_notify] +level=DEBUG +handlers=notify +qualname=notify +propagate=0 + +[handler_notify] +class=logging.handlers.RotatingFileHandler +level=DEBUG +formatter=simpleFormatter +args=("/var/log/airtime/pypo/notify.log", 'a', 1000000, 5,) + +[formatter_simpleFormatter] +format=%(asctime)s %(levelname)s - [%(filename)s : %(funcName)s() : line %(lineno)d] - %(message)s +datefmt= diff --git a/python_apps/pypo/pure.py b/python_apps/pypo/pure.py new file mode 100644 index 000000000..3aeeeebcb --- /dev/null +++ b/python_apps/pypo/pure.py @@ -0,0 +1,17 @@ +import re + + +def version_cmp(version1, version2): + def normalize(v): + return [int(x) for x in re.sub(r'(\.0+)*$','', v).split(".")] + return cmp(normalize(version1), normalize(version2)) + +def date_interval_to_seconds(interval): + """ + Convert timedelta object into int representing the number of seconds. If + number of seconds is less than 0, then return 0. + """ + seconds = (interval.microseconds + \ + (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) + + return seconds diff --git a/python_apps/pypo/pypo.cfg b/python_apps/pypo/pypo.cfg new file mode 100644 index 000000000..9ffc390e4 --- /dev/null +++ b/python_apps/pypo/pypo.cfg @@ -0,0 +1,85 @@ +############################################ +# pypo - configuration # +############################################ + +# Set the type of client you are using. +# Currently supported types: +# 1) "obp" = Open Broadcast Platform +# 2) "airtime" +# +api_client = "airtime" + +############################################ +# Cache Directories # +# *include* trailing slash !! # +############################################ +cache_dir = '/var/tmp/airtime/pypo/cache/' +file_dir = '/var/tmp/airtime/pypo/files/' +tmp_dir = '/var/tmp/airtime/pypo/tmp/' + +############################################ +# Setup Directories # +# Do *not* include trailing slash !! # +############################################ +cache_base_dir = '/var/tmp/airtime/pypo' +bin_dir = '/usr/lib/airtime/pypo' +log_base_dir = '/var/log/airtime' +pypo_log_dir = '/var/log/airtime/pypo' +liquidsoap_log_dir = '/var/log/airtime/pypo-liquidsoap' + +############################################ +# Liquidsoap settings # +############################################ +ls_host = '127.0.0.1' +ls_port = '1234' + +############################################ +# RabbitMQ settings # +############################################ +rabbitmq_host = 'localhost' +rabbitmq_user = 'guest' +rabbitmq_password = 'guest' +rabbitmq_vhost = '/' + +############################################ +# pypo preferences # +############################################ + +# Poll interval in seconds. +# +# This will rarely need to be changed because any schedule changes are +# automatically sent to pypo immediately. +# +# This is how often the poll script downloads new schedules and files from the +# server in the event that no changes are made to the schedule. +# +poll_interval = 3600 # in seconds. + + +# Push interval in seconds. +# +# This is how often the push script checks whether it has something new to +# push to liquidsoap. +# +# It's hard to imagine a situation where this should be more than 1 second. +# +push_interval = 1 # in seconds + +# 'pre' or 'otf'. 'pre' cues while playlist preparation +# while 'otf' (on the fly) cues while loading into ls +# (needs the post_processor patch) +cue_style = 'pre' + +############################################ +# Recorded Audio settings # +############################################ +record_bitrate = 256 +record_samplerate = 44100 +record_channels = 2 +record_sample_size = 16 + +#can be either ogg|mp3, mp3 recording requires installation of the package "lame" +record_file_type = 'ogg' + +# base path to store recordered shows at +base_recorded_files = '/var/tmp/airtime/show-recorder/' diff --git a/python_apps/pypo/pypofetch.py b/python_apps/pypo/pypofetch.py new file mode 100644 index 000000000..32b36f134 --- /dev/null +++ b/python_apps/pypo/pypofetch.py @@ -0,0 +1,592 @@ +# -*- coding: utf-8 -*- + +import os +import sys +import time +import logging.config +import json +import telnetlib +import copy +import subprocess +import signal +from datetime import datetime +import traceback +import pure + +from Queue import Empty +from threading import Thread +from subprocess import Popen, PIPE + +from api_clients import api_client +from std_err_override import LogWriter +from timeout import ls_timeout + + +# configure logging +logging_cfg = os.path.join(os.path.dirname(__file__), "logging.cfg") +logging.config.fileConfig(logging_cfg) +logger = logging.getLogger() +LogWriter.override_std_err(logger) + +def keyboardInterruptHandler(signum, frame): + logger = logging.getLogger() + logger.info('\nKeyboard Interrupt\n') + sys.exit(0) +signal.signal(signal.SIGINT, keyboardInterruptHandler) + +#need to wait for Python 2.7 for this.. +#logging.captureWarnings(True) + +POLL_INTERVAL = 1800 + +class PypoFetch(Thread): + + def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap, config): + Thread.__init__(self) + + #Hacky... + PypoFetch.ref = self + + self.api_client = api_client.AirtimeApiClient() + self.fetch_queue = pypoFetch_q + self.push_queue = pypoPush_q + self.media_prepare_queue = media_q + self.last_update_schedule_timestamp = time.time() + self.config = config + self.listener_timeout = POLL_INTERVAL + + self.telnet_lock = telnet_lock + + self.logger = logging.getLogger() + + self.pypo_liquidsoap = pypo_liquidsoap + + self.cache_dir = os.path.join(config["cache_dir"], "scheduler") + self.logger.debug("Cache dir %s", self.cache_dir) + + try: + if not os.path.isdir(dir): + """ + We get here if path does not exist, or path does exist but + is a file. We are not handling the second case, but don't + think we actually care about handling it. + """ + self.logger.debug("Cache dir does not exist. Creating...") + os.makedirs(dir) + except Exception, e: + pass + + self.schedule_data = [] + self.logger.info("PypoFetch: init complete") + + """ + Handle a message from RabbitMQ, put it into our yucky global var. + Hopefully there is a better way to do this. + """ + def handle_message(self, message): + try: + self.logger.info("Received event from Pypo Message Handler: %s" % message) + + m = json.loads(message) + command = m['event_type'] + self.logger.info("Handling command: " + command) + + if command == 'update_schedule': + self.schedule_data = m['schedule'] + self.process_schedule(self.schedule_data) + elif command == 'reset_liquidsoap_bootstrap': + self.set_bootstrap_variables() + elif command == 'update_stream_setting': + self.logger.info("Updating stream setting...") + self.regenerate_liquidsoap_conf(m['setting']) + elif command == 'update_stream_format': + self.logger.info("Updating stream format...") + self.update_liquidsoap_stream_format(m['stream_format']) + elif command == 'update_station_name': + self.logger.info("Updating station name...") + self.update_liquidsoap_station_name(m['station_name']) + elif command == 'update_transition_fade': + self.logger.info("Updating transition_fade...") + self.update_liquidsoap_transition_fade(m['transition_fade']) + elif command == 'switch_source': + self.logger.info("switch_on_source show command received...") + self.pypo_liquidsoap.\ + get_telnet_dispatcher().\ + switch_source(m['sourcename'], m['status']) + elif command == 'disconnect_source': + self.logger.info("disconnect_on_source show command received...") + self.pypo_liquidsoap.get_telnet_dispatcher().\ + disconnect_source(m['sourcename']) + else: + self.logger.info("Unknown command: %s" % command) + + # update timeout value + if command == 'update_schedule': + self.listener_timeout = POLL_INTERVAL + else: + self.listener_timeout = self.last_update_schedule_timestamp - time.time() + POLL_INTERVAL + if self.listener_timeout < 0: + self.listener_timeout = 0 + self.logger.info("New timeout: %s" % self.listener_timeout) + except Exception, e: + top = traceback.format_exc() + self.logger.error('Exception: %s', e) + self.logger.error("traceback: %s", top) + self.logger.error("Exception in handling Message Handler message: %s", e) + + + def switch_source_temp(self, sourcename, status): + self.logger.debug('Switching source: %s to "%s" status', sourcename, status) + command = "streams." + if sourcename == "master_dj": + command += "master_dj_" + elif sourcename == "live_dj": + command += "live_dj_" + elif sourcename == "scheduled_play": + command += "scheduled_play_" + + if status == "on": + command += "start\n" + else: + command += "stop\n" + + return command + + """ + Initialize Liquidsoap environment + """ + def set_bootstrap_variables(self): + self.logger.debug('Getting information needed on bootstrap from Airtime') + try: + info = self.api_client.get_bootstrap_info() + except Exception, e: + self.logger.error('Unable to get bootstrap info.. Exiting pypo...') + self.logger.error(str(e)) + + self.logger.debug('info:%s', info) + commands = [] + for k, v in info['switch_status'].iteritems(): + commands.append(self.switch_source_temp(k, v)) + + stream_format = info['stream_label'] + station_name = info['station_name'] + fade = info['transition_fade'] + + commands.append(('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8')) + commands.append(('vars.station_name %s\n' % station_name).encode('utf-8')) + commands.append(('vars.default_dj_fade %s\n' % fade).encode('utf-8')) + self.pypo_liquidsoap.get_telnet_dispatcher().telnet_send(commands) + + self.pypo_liquidsoap.clear_all_queues() + self.pypo_liquidsoap.clear_queue_tracker() + + def restart_liquidsoap(self): + try: + """do not block - if we receive the lock then good - no other thread + will try communicating with Liquidsoap. If we don't receive, it may + mean some thread blocked and is still holding the lock. Restarting + Liquidsoap will cause that thread to release the lock as an Exception + will be thrown.""" + self.telnet_lock.acquire(False) + + + self.logger.info("Restarting Liquidsoap") + subprocess.call('/etc/init.d/airtime-liquidsoap restart', shell=True) + + #Wait here and poll Liquidsoap until it has started up + self.logger.info("Waiting for Liquidsoap to start") + while True: + try: + tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port']) + tn.write("exit\n") + tn.read_all() + self.logger.info("Liquidsoap is up and running") + break + except Exception, e: + #sleep 0.5 seconds and try again + time.sleep(0.5) + + except Exception, e: + self.logger.error(e) + finally: + if self.telnet_lock.locked(): + self.telnet_lock.release() + + """ + TODO: This function needs to be way shorter, and refactored :/ - MK + """ + def regenerate_liquidsoap_conf(self, setting): + existing = {} + + setting = sorted(setting.items()) + try: + fh = open('/etc/airtime/liquidsoap.cfg', 'r') + except IOError, e: + #file does not exist + self.restart_liquidsoap() + return + + self.logger.info("Reading existing config...") + # read existing conf file and build dict + while True: + line = fh.readline() + + # empty line means EOF + if not line: + break + + line = line.strip() + + if not len(line) or line[0] == "#": + continue + + try: + key, value = line.split('=', 1) + except ValueError: + continue + key = key.strip() + value = value.strip() + value = value.replace('"', '') + if value == '' or value == "0": + value = '' + existing[key] = value + fh.close() + + # dict flag for any change in config + change = {} + # this flag is to detect disable -> disable change + # in that case, we don't want to restart even if there are changes. + state_change_restart = {} + #restart flag + restart = False + + self.logger.info("Looking for changes...") + # look for changes + for k, s in setting: + if "output_sound_device" in s[u'keyname'] or "icecast_vorbis_metadata" in s[u'keyname']: + dump, stream = s[u'keyname'].split('_', 1) + state_change_restart[stream] = False + # This is the case where restart is required no matter what + if (existing[s[u'keyname']] != str(s[u'value'])): + self.logger.info("'Need-to-restart' state detected for %s...", s[u'keyname']) + restart = True; + elif "master_live_stream_port" in s[u'keyname'] or "master_live_stream_mp" in s[u'keyname'] or "dj_live_stream_port" in s[u'keyname'] or "dj_live_stream_mp" in s[u'keyname'] or "off_air_meta" in s[u'keyname']: + if (existing[s[u'keyname']] != s[u'value']): + self.logger.info("'Need-to-restart' state detected for %s...", s[u'keyname']) + restart = True; + else: + stream, dump = s[u'keyname'].split('_', 1) + if "_output" in s[u'keyname']: + if (existing[s[u'keyname']] != s[u'value']): + self.logger.info("'Need-to-restart' state detected for %s...", s[u'keyname']) + restart = True; + state_change_restart[stream] = True + elif (s[u'value'] != 'disabled'): + state_change_restart[stream] = True + else: + state_change_restart[stream] = False + else: + # setting inital value + if stream not in change: + change[stream] = False + if not (s[u'value'] == existing[s[u'keyname']]): + self.logger.info("Keyname: %s, Current value: %s, New Value: %s", s[u'keyname'], existing[s[u'keyname']], s[u'value']) + change[stream] = True + + # set flag change for sound_device alway True + self.logger.info("Change:%s, State_Change:%s...", change, state_change_restart) + + for k, v in state_change_restart.items(): + if k == "sound_device" and v: + restart = True + elif v and change[k]: + self.logger.info("'Need-to-restart' state detected for %s...", k) + restart = True + # rewrite + if restart: + self.restart_liquidsoap() + else: + self.logger.info("No change detected in setting...") + self.update_liquidsoap_connection_status() + + @ls_timeout + def update_liquidsoap_connection_status(self): + """ + updates the status of Liquidsoap connection to the streaming server + This function updates the bootup time variable in Liquidsoap script + """ + + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port']) + # update the boot up time of Liquidsoap. Since Liquidsoap is not restarting, + # we are manually adjusting the bootup time variable so the status msg will get + # updated. + current_time = time.time() + boot_up_time_command = "vars.bootup_time " + str(current_time) + "\n" + self.logger.info(boot_up_time_command) + tn.write(boot_up_time_command) + + connection_status = "streams.connection_status\n" + self.logger.info(connection_status) + tn.write(connection_status) + + tn.write('exit\n') + + output = tn.read_all() + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + output_list = output.split("\r\n") + stream_info = output_list[2] + + # streamin info is in the form of: + # eg. s1:true,2:true,3:false + streams = stream_info.split(",") + self.logger.info(streams) + + fake_time = current_time + 1 + for s in streams: + info = s.split(':') + stream_id = info[0] + status = info[1] + if(status == "true"): + self.api_client.notify_liquidsoap_status("OK", stream_id, str(fake_time)) + + + @ls_timeout + def update_liquidsoap_stream_format(self, stream_format): + # Push stream metadata to liquidsoap + # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port']) + command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8') + self.logger.info(command) + tn.write(command) + tn.write('exit\n') + tn.read_all() + except Exception, e: + self.logger.error("Exception %s", e) + finally: + self.telnet_lock.release() + + @ls_timeout + def update_liquidsoap_transition_fade(self, fade): + # Push stream metadata to liquidsoap + # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port']) + command = ('vars.default_dj_fade %s\n' % fade).encode('utf-8') + self.logger.info(command) + tn.write(command) + tn.write('exit\n') + tn.read_all() + except Exception, e: + self.logger.error("Exception %s", e) + finally: + self.telnet_lock.release() + + @ls_timeout + def update_liquidsoap_station_name(self, station_name): + # Push stream metadata to liquidsoap + # TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!! + try: + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port']) + command = ('vars.station_name %s\n' % station_name).encode('utf-8') + self.logger.info(command) + tn.write(command) + tn.write('exit\n') + tn.read_all() + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + except Exception, e: + self.logger.error("Exception %s", e) + + """ + Process the schedule + - Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for") + - Saves a serialized file of the schedule + - playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied + to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss) + - runs the cleanup routine, to get rid of unused cached files + """ + def process_schedule(self, schedule_data): + self.last_update_schedule_timestamp = time.time() + self.logger.debug(schedule_data) + media = schedule_data["media"] + media_filtered = {} + + # Download all the media and put playlists in liquidsoap "annotate" format + try: + + """ + Make sure cache_dir exists + """ + download_dir = self.cache_dir + try: + os.makedirs(download_dir) + except Exception, e: + pass + + media_copy = {} + for key in media: + media_item = media[key] + if (media_item['type'] == 'file'): + self.sanity_check_media_item(media_item) + fileExt = os.path.splitext(media_item['uri'])[1] + dst = os.path.join(download_dir, unicode(media_item['id']) + fileExt) + media_item['dst'] = dst + media_item['file_ready'] = False + media_filtered[key] = media_item + + media_item['start'] = datetime.strptime(media_item['start'], + "%Y-%m-%d-%H-%M-%S") + media_item['end'] = datetime.strptime(media_item['end'], + "%Y-%m-%d-%H-%M-%S") + media_copy[key] = media_item + + + self.media_prepare_queue.put(copy.copy(media_filtered)) + except Exception, e: self.logger.error("%s", e) + + # Send the data to pypo-push + self.logger.debug("Pushing to pypo-push") + self.push_queue.put(media_copy) + + + # cleanup + try: self.cache_cleanup(media) + except Exception, e: self.logger.error("%s", e) + + #do basic validation of file parameters. Useful for debugging + #purposes + def sanity_check_media_item(self, media_item): + start = datetime.strptime(media_item['start'], "%Y-%m-%d-%H-%M-%S") + end = datetime.strptime(media_item['end'], "%Y-%m-%d-%H-%M-%S") + + length1 = pure.date_interval_to_seconds(end - start) + length2 = media_item['cue_out'] - media_item['cue_in'] + + if abs(length2 - length1) > 1: + self.logger.error("end - start length: %s", length1) + self.logger.error("cue_out - cue_in length: %s", length2) + self.logger.error("Two lengths are not equal!!!") + + def is_file_opened(self, path): + #Capture stderr to avoid polluting py-interpreter.log + proc = Popen(["lsof", path], stdout=PIPE, stderr=PIPE) + out = proc.communicate()[0].strip() + return bool(out) + + def cache_cleanup(self, media): + """ + Get list of all files in the cache dir and remove them if they aren't being used anymore. + Input dict() media, lists all files that are scheduled or currently playing. Not being in this + dict() means the file is safe to remove. + """ + cached_file_set = set(os.listdir(self.cache_dir)) + scheduled_file_set = set() + + for mkey in media: + media_item = media[mkey] + if media_item['type'] == 'file': + fileExt = os.path.splitext(media_item['uri'])[1] + scheduled_file_set.add(unicode(media_item["id"]) + fileExt) + + expired_files = cached_file_set - scheduled_file_set + + self.logger.debug("Files to remove " + str(expired_files)) + for f in expired_files: + try: + path = os.path.join(self.cache_dir, f) + self.logger.debug("Removing %s" % path) + + #check if this file is opened (sometimes Liquidsoap is still + #playing the file due to our knowledge of the track length + #being incorrect!) + if not self.is_file_opened(path): + os.remove(path) + self.logger.info("File '%s' removed" % path) + else: + self.logger.info("File '%s' not removed. Still busy!" % path) + except Exception, e: + self.logger.error("Problem removing file '%s'" % f) + self.logger.error(traceback.format_exc()) + + def manual_schedule_fetch(self): + success, self.schedule_data = self.api_client.get_schedule() + if success: + self.process_schedule(self.schedule_data) + return success + + def persistent_manual_schedule_fetch(self, max_attempts=1): + success = False + num_attempts = 0 + while not success and num_attempts < max_attempts: + success = self.manual_schedule_fetch() + num_attempts += 1 + + return success + + + def main(self): + #Make sure all Liquidsoap queues are empty. This is important in the + #case where we've just restarted the pypo scheduler, but Liquidsoap still + #is playing tracks. In this case let's just restart everything from scratch + #so that we can repopulate our dictionary that keeps track of what + #Liquidsoap is playing much more easily. + self.pypo_liquidsoap.clear_all_queues() + + self.set_bootstrap_variables() + + # Bootstrap: since we are just starting up, we need to grab the + # most recent schedule. After that we can just wait for updates. + success = self.persistent_manual_schedule_fetch(max_attempts=5) + + if success: + self.logger.info("Bootstrap schedule received: %s", self.schedule_data) + + loops = 1 + while True: + self.logger.info("Loop #%s", loops) + try: + """ + our simple_queue.get() requires a timeout, in which case we + fetch the Airtime schedule manually. It is important to fetch + the schedule periodically because if we didn't, we would only + get schedule updates via RabbitMq if the user was constantly + using the Airtime interface. + + If the user is not using the interface, RabbitMq messages are not + sent, and we will have very stale (or non-existent!) data about the + schedule. + + Currently we are checking every POLL_INTERVAL seconds + """ + + + message = self.fetch_queue.get(block=True, timeout=self.listener_timeout) + self.handle_message(message) + except Empty, e: + self.logger.info("Queue timeout. Fetching schedule manually") + self.persistent_manual_schedule_fetch(max_attempts=5) + except Exception, e: + top = traceback.format_exc() + self.logger.error('Exception: %s', e) + self.logger.error("traceback: %s", top) + + loops += 1 + + def run(self): + """ + Entry point of the thread + """ + self.main() diff --git a/python_apps/pypo/pypofile.py b/python_apps/pypo/pypofile.py new file mode 100644 index 000000000..c636e374c --- /dev/null +++ b/python_apps/pypo/pypofile.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- + +from threading import Thread +from Queue import Empty + +import logging +import shutil +import os +import sys +import stat + +from std_err_override import LogWriter + +# configure logging +logging.config.fileConfig("logging.cfg") +logger = logging.getLogger() +LogWriter.override_std_err(logger) + +#need to wait for Python 2.7 for this.. +#logging.captureWarnings(True) + + +class PypoFile(Thread): + + def __init__(self, schedule_queue, config): + Thread.__init__(self) + self.logger = logging.getLogger() + self.media_queue = schedule_queue + self.media = None + self.cache_dir = os.path.join(config["cache_dir"], "scheduler") + + def copy_file(self, media_item): + """ + Copy media_item from local library directory to local cache directory. + """ + src = media_item['uri'] + dst = media_item['dst'] + + try: + src_size = os.path.getsize(src) + except Exception, e: + self.logger.error("Could not get size of source file: %s", src) + return + + dst_exists = True + try: + dst_size = os.path.getsize(dst) + except Exception, e: + dst_exists = False + + do_copy = False + if dst_exists: + if src_size != dst_size: + do_copy = True + else: + self.logger.debug("file %s already exists in local cache as %s, skipping copying..." % (src, dst)) + else: + do_copy = True + + media_item['file_ready'] = not do_copy + + if do_copy: + self.logger.debug("copying from %s to local cache %s" % (src, dst)) + try: + + """ + copy will overwrite dst if it already exists + """ + shutil.copy(src, dst) + + #make file world readable + os.chmod(dst, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) + + media_item['file_ready'] = True + except Exception, e: + self.logger.error("Could not copy from %s to %s" % (src, dst)) + self.logger.error(e) + + def get_highest_priority_media_item(self, schedule): + """ + Get highest priority media_item in the queue. Currently the highest + priority is decided by how close the start time is to "now". + """ + if schedule is None or len(schedule) == 0: + return None + + sorted_keys = sorted(schedule.keys()) + + if len(sorted_keys) == 0: + return None + + highest_priority = sorted_keys[0] + media_item = schedule[highest_priority] + + self.logger.debug("Highest priority item: %s" % highest_priority) + + """ + Remove this media_item from the dictionary. On the next iteration + (from the main function) we won't consider it for prioritization + anymore. If on the next iteration we have received a new schedule, + it is very possible we will have to deal with the same media_items + again. In this situation, the worst possible case is that we try to + copy the file again and realize we already have it (thus aborting the copy). + """ + del schedule[highest_priority] + + return media_item + + + def main(self): + while True: + try: + if self.media is None or len(self.media) == 0: + """ + We have no schedule, so we have nothing else to do. Let's + do a blocked wait on the queue + """ + self.media = self.media_queue.get(block=True) + else: + """ + We have a schedule we need to process, but we also want + to check if a newer schedule is available. In this case + do a non-blocking queue.get and in either case (we get something + or we don't), get back to work on preparing getting files. + """ + try: + self.media = self.media_queue.get_nowait() + except Empty, e: + pass + + + media_item = self.get_highest_priority_media_item(self.media) + if media_item is not None: + self.copy_file(media_item) + except Exception, e: + import traceback + top = traceback.format_exc() + self.logger.error(str(e)) + self.logger.error(top) + raise + + def run(self): + """ + Entry point of the thread + """ + self.main() diff --git a/python_apps/pypo/pypoliqqueue.py b/python_apps/pypo/pypoliqqueue.py new file mode 100644 index 000000000..8f664c7d7 --- /dev/null +++ b/python_apps/pypo/pypoliqqueue.py @@ -0,0 +1,89 @@ +from threading import Thread +from collections import deque +from datetime import datetime + +import traceback +import sys +import time + + +from Queue import Empty + +import signal +def keyboardInterruptHandler(signum, frame): + logger = logging.getLogger() + logger.info('\nKeyboard Interrupt\n') + sys.exit(0) +signal.signal(signal.SIGINT, keyboardInterruptHandler) + +class PypoLiqQueue(Thread): + def __init__(self, q, pypo_liquidsoap, logger): + Thread.__init__(self) + self.queue = q + self.logger = logger + self.pypo_liquidsoap = pypo_liquidsoap + + def main(self): + time_until_next_play = None + schedule_deque = deque() + media_schedule = None + + while True: + try: + if time_until_next_play is None: + self.logger.info("waiting indefinitely for schedule") + media_schedule = self.queue.get(block=True) + else: + self.logger.info("waiting %ss until next scheduled item" % \ + time_until_next_play) + media_schedule = self.queue.get(block=True, \ + timeout=time_until_next_play) + except Empty, e: + #Time to push a scheduled item. + media_item = schedule_deque.popleft() + self.pypo_liquidsoap.play(media_item) + if len(schedule_deque): + time_until_next_play = \ + self.date_interval_to_seconds( + schedule_deque[0]['start'] - datetime.utcnow()) + if time_until_next_play < 0: + time_until_next_play = 0 + else: + time_until_next_play = None + else: + self.logger.info("New schedule received: %s", media_schedule) + + #new schedule received. Replace old one with this. + schedule_deque.clear() + + keys = sorted(media_schedule.keys()) + for i in keys: + schedule_deque.append(media_schedule[i]) + + if len(keys): + time_until_next_play = self.date_interval_to_seconds( + media_schedule[keys[0]]['start'] - + datetime.utcnow()) + + else: + time_until_next_play = None + + + def date_interval_to_seconds(self, interval): + """ + Convert timedelta object into int representing the number of seconds. If + number of seconds is less than 0, then return 0. + """ + seconds = (interval.microseconds + \ + (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) + if seconds < 0: seconds = 0 + + return seconds + + def run(self): + try: self.main() + except Exception, e: + self.logger.error('PypoLiqQueue Exception: %s', traceback.format_exc()) + + + diff --git a/python_apps/pypo/pypoliquidsoap.py b/python_apps/pypo/pypoliquidsoap.py new file mode 100644 index 000000000..50a47db94 --- /dev/null +++ b/python_apps/pypo/pypoliquidsoap.py @@ -0,0 +1,237 @@ +from pypofetch import PypoFetch +from telnetliquidsoap import TelnetLiquidsoap + +from datetime import datetime +from datetime import timedelta + +import eventtypes +import time + +class PypoLiquidsoap(): + def __init__(self, logger, telnet_lock, host, port): + self.logger = logger + self.liq_queue_tracker = { + "s0": None, + "s1": None, + "s2": None, + "s3": None, + } + + self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \ + logger,\ + host,\ + port,\ + self.liq_queue_tracker.keys()) + + def get_telnet_dispatcher(self): + return self.telnet_liquidsoap + + + def play(self, media_item): + if media_item["type"] == eventtypes.FILE: + self.handle_file_type(media_item) + elif media_item["type"] == eventtypes.EVENT: + self.handle_event_type(media_item) + elif media_item["type"] == eventtypes.STREAM_BUFFER_START: + self.telnet_liquidsoap.start_web_stream_buffer(media_item) + elif media_item["type"] == eventtypes.STREAM_OUTPUT_START: + if media_item['row_id'] != self.telnet_liquidsoap.current_prebuffering_stream_id: + #this is called if the stream wasn't scheduled sufficiently ahead of time + #so that the prebuffering stage could take effect. Let's do the prebuffering now. + self.telnet_liquidsoap.start_web_stream_buffer(media_item) + self.telnet_liquidsoap.start_web_stream(media_item) + elif media_item['type'] == eventtypes.STREAM_BUFFER_END: + self.telnet_liquidsoap.stop_web_stream_buffer() + elif media_item['type'] == eventtypes.STREAM_OUTPUT_END: + self.telnet_liquidsoap.stop_web_stream_output() + else: raise UnknownMediaItemType(str(media_item)) + + def handle_file_type(self, media_item): + """ + Wait maximum 5 seconds (50 iterations) for file to become ready, + otherwise give up on it. + """ + iter_num = 0 + while not media_item['file_ready'] and iter_num < 50: + time.sleep(0.1) + iter_num += 1 + + if media_item['file_ready']: + available_queue = self.find_available_queue() + + try: + self.telnet_liquidsoap.queue_push(available_queue, media_item) + self.liq_queue_tracker[available_queue] = media_item + except Exception as e: + self.logger.error(e) + raise + else: + self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst']) + + def handle_event_type(self, media_item): + if media_item['event_type'] == "kick_out": + self.telnet_liquidsoap.disconnect_source("live_dj") + elif media_item['event_type'] == "switch_off": + self.telnet_liquidsoap.switch_source("live_dj", "off") + + + def is_media_item_finished(self, media_item): + if media_item is None: + return True + else: + return datetime.utcnow() > media_item['end'] + + def find_available_queue(self): + available_queue = None + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if mi == None or self.is_media_item_finished(mi): + #queue "i" is available. Push to this queue + available_queue = i + + if available_queue == None: + raise NoQueueAvailableException() + + return available_queue + + + def verify_correct_present_media(self, scheduled_now): + #verify whether Liquidsoap is currently playing the correct files. + #if we find an item that Liquidsoap is not playing, then push it + #into one of Liquidsoap's queues. If Liquidsoap is already playing + #it do nothing. If Liquidsoap is playing a track that isn't in + #currently_playing then stop it. + + #Check for Liquidsoap media we should source.skip + #get liquidsoap items for each queue. Since each queue can only have one + #item, we should have a max of 8 items. + + #2013-03-21-22-56-00_0: { + #id: 1, + #type: "stream_output_start", + #row_id: 41, + #uri: "http://stream2.radioblackout.org:80/blackout.ogg", + #start: "2013-03-21-22-56-00", + #end: "2013-03-21-23-26-00", + #show_name: "Untitled Show", + #independent_event: true + #}, + + + scheduled_now_files = \ + filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now) + + scheduled_now_webstream = \ + filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START, \ + scheduled_now) + + schedule_ids = set(map(lambda x: x["row_id"], scheduled_now_files)) + + row_id_map = {} + liq_queue_ids = set() + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if not self.is_media_item_finished(mi): + liq_queue_ids.add(mi["row_id"]) + row_id_map[mi["row_id"]] = mi + + to_be_removed = set() + to_be_added = set() + + #Iterate over the new files, and compare them to currently scheduled + #tracks. If already in liquidsoap queue still need to make sure they don't + #have different attributes such replay_gain etc. + for i in scheduled_now_files: + if i["row_id"] in row_id_map: + mi = row_id_map[i["row_id"]] + correct = mi['start'] == i['start'] and \ + mi['end'] == i['end'] and \ + mi['row_id'] == i['row_id'] and \ + mi['replay_gain'] == i['replay_gain'] + + if not correct: + #need to re-add + self.logger.info("Track %s found to have new attr." % i) + to_be_removed.add(i["row_id"]) + to_be_added.add(i["row_id"]) + + + to_be_removed.update(liq_queue_ids - schedule_ids) + to_be_added.update(schedule_ids - liq_queue_ids) + + if to_be_removed: + self.logger.info("Need to remove items from Liquidsoap: %s" % \ + to_be_removed) + + #remove files from Liquidsoap's queue + for i in self.liq_queue_tracker: + mi = self.liq_queue_tracker[i] + if mi is not None and mi["row_id"] in to_be_removed: + self.stop(i) + + if to_be_added: + self.logger.info("Need to add items to Liquidsoap *now*: %s" % \ + to_be_added) + + for i in scheduled_now: + if i["row_id"] in to_be_added: + self.modify_cue_point(i) + self.play(i) + + #handle webstreams + current_stream_id = self.telnet_liquidsoap.get_current_stream_id() + if scheduled_now_webstream: + if current_stream_id != scheduled_now_webstream[0]: + self.play(scheduled_now_webstream[0]) + elif current_stream_id != "-1": + #something is playing and it shouldn't be. + self.telnet_liquidsoap.stop_web_stream_buffer() + self.telnet_liquidsoap.stop_web_stream_output() + + def stop(self, queue): + self.telnet_liquidsoap.queue_remove(queue) + self.liq_queue_tracker[queue] = None + + def is_file(self, media_item): + return media_item["type"] == eventtypes.FILE + + def clear_queue_tracker(self): + for i in self.liq_queue_tracker.keys(): + self.liq_queue_tracker[i] = None + + def modify_cue_point(self, link): + if not self.is_file(link): + return + + tnow = datetime.utcnow() + + link_start = link['start'] + + diff_td = tnow - link_start + diff_sec = self.date_interval_to_seconds(diff_td) + + if diff_sec > 0: + self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec) + original_cue_in_td = timedelta(seconds=float(link['cue_in'])) + link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec + + def date_interval_to_seconds(self, interval): + """ + Convert timedelta object into int representing the number of seconds. If + number of seconds is less than 0, then return 0. + """ + seconds = (interval.microseconds + \ + (interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6) + if seconds < 0: seconds = 0 + + return seconds + + def clear_all_queues(self): + self.telnet_liquidsoap.queue_clear_all() + + +class UnknownMediaItemType(Exception): + pass + +class NoQueueAvailableException(Exception): + pass diff --git a/python_apps/pypo/pyponotify.py b/python_apps/pypo/pyponotify.py new file mode 100644 index 000000000..797d1ce9b --- /dev/null +++ b/python_apps/pypo/pyponotify.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +import traceback + +""" +Python part of radio playout (pypo) + +This function acts as a gateway between liquidsoap and the server API. +Mainly used to tell the platform what pypo/liquidsoap does. + +Main case: + - whenever LS starts playing a new track, its on_metadata callback calls + a function in ls (notify(m)) which then calls the python script here + with the currently starting filename as parameter + - this python script takes this parameter, tries to extract the actual + media id from it, and then calls back to the API to tell about it about it. + +""" + +from optparse import OptionParser +import sys +import logging.config +import json + +# additional modules (should be checked) +from configobj import ConfigObj + +# custom imports +#from util import * +from api_clients import * +from std_err_override import LogWriter + +# help screeen / info +usage = "%prog [options]" + " - notification gateway" +parser = OptionParser(usage=usage) + +# Options +parser.add_option("-d", "--data", help="Pass JSON data from Liquidsoap into this script.", metavar="data") +parser.add_option("-m", "--media-id", help="ID of the file that is currently playing.", metavar="media_id") +parser.add_option("-e", "--error", action="store", dest="error", type="string", help="Liquidsoap error msg.", metavar="error_msg") +parser.add_option("-s", "--stream-id", help="ID stream", metavar="stream_id") +parser.add_option("-c", "--connect", help="Liquidsoap connected", action="store_true", metavar="connect") +parser.add_option("-t", "--time", help="Liquidsoap boot up time", action="store", dest="time", metavar="time", type="string") +parser.add_option("-x", "--source-name", help="source connection name", metavar="source_name") +parser.add_option("-y", "--source-status", help="source connection status", metavar="source_status") +parser.add_option("-w", "--webstream", help="JSON metadata associated with webstream", metavar="json_data") +parser.add_option("-n", "--liquidsoap-started", help="notify liquidsoap started", metavar="json_data", action="store_true", default=False) + + +# parse options +(options, args) = parser.parse_args() + +# configure logging +logging.config.fileConfig("notify_logging.cfg") +logger = logging.getLogger('notify') +LogWriter.override_std_err(logger) + +#need to wait for Python 2.7 for this.. +#logging.captureWarnings(True) + +# loading config file +try: + config = ConfigObj('/etc/airtime/pypo.cfg') + +except Exception, e: + logger.error('Error loading config file: %s', e) + sys.exit() + + +class Notify: + def __init__(self): + self.api_client = api_client.AirtimeApiClient(logger=logger) + + def notify_liquidsoap_started(self): + logger.debug("Notifying server that Liquidsoap has started") + self.api_client.notify_liquidsoap_started() + + def notify_media_start_playing(self, media_id): + logger.debug('#################################################') + logger.debug('# Calling server to update about what\'s playing #') + logger.debug('#################################################') + response = self.api_client.notify_media_item_start_playing(media_id) + logger.debug("Response: " + json.dumps(response)) + + # @pram time: time that LS started + def notify_liquidsoap_status(self, msg, stream_id, time): + logger.debug('#################################################') + logger.debug('# Calling server to update liquidsoap status #') + logger.debug('#################################################') + logger.debug('msg = ' + str(msg)) + response = self.api_client.notify_liquidsoap_status(msg, stream_id, time) + logger.debug("Response: " + json.dumps(response)) + + def notify_source_status(self, source_name, status): + logger.debug('#################################################') + logger.debug('# Calling server to update source status #') + logger.debug('#################################################') + logger.debug('msg = ' + str(source_name) + ' : ' + str(status)) + response = self.api_client.notify_source_status(source_name, status) + logger.debug("Response: " + json.dumps(response)) + + def notify_webstream_data(self, data, media_id): + logger.debug('#################################################') + logger.debug('# Calling server to update webstream data #') + logger.debug('#################################################') + response = self.api_client.notify_webstream_data(data, media_id) + logger.debug("Response: " + json.dumps(response)) + + def run_with_options(self, options): + if options.error and options.stream_id: + self.notify_liquidsoap_status(options.error, options.stream_id, options.time) + elif options.connect and options.stream_id: + self.notify_liquidsoap_status("OK", options.stream_id, options.time) + elif options.source_name and options.source_status: + self.notify_source_status(options.source_name, options.source_status) + elif options.webstream: + self.notify_webstream_data(options.webstream, options.media_id) + elif options.media_id: + self.notify_media_start_playing(options.media_id) + elif options.liquidsoap_started: + self.notify_liquidsoap_started() + else: + logger.debug("Unrecognized option in options(%s). Doing nothing" \ + % str(options)) + + +if __name__ == '__main__': + print + print '#########################################' + print '# *** pypo *** #' + print '# pypo notification gateway #' + print '#########################################' + + # initialize + try: + n = Notify() + n.run_with_options(options) + except Exception as e: + print( traceback.format_exc() ) + diff --git a/python_apps/pypo/recorder.py b/python_apps/pypo/recorder.py new file mode 100644 index 000000000..e61083738 --- /dev/null +++ b/python_apps/pypo/recorder.py @@ -0,0 +1,335 @@ +# -*- coding: utf-8 -*- + +import logging +import json +import time +import datetime +import os +import sys +import pytz +import signal +import math +import traceback +import re + +from configobj import ConfigObj + +from poster.encode import multipart_encode +from poster.streaminghttp import register_openers + +from subprocess import Popen +from subprocess import PIPE +from threading import Thread + +import mutagen + +from api_clients import api_client as apc + +def api_client(logger): + """ + api_client returns the correct instance of AirtimeApiClient. Although there is only one + instance to choose from at the moment. + """ + return apc.AirtimeApiClient(logger) + +# loading config file +try: + config = ConfigObj('/etc/airtime/pypo.cfg') +except Exception, e: + print ('Error loading config file: %s', e) + sys.exit() + +# TODO : add docstrings everywhere in this module + +def getDateTimeObj(time): + # TODO : clean up for this function later. + # - use tuples to parse result from split (instead of indices) + # - perhaps validate the input before doing dangerous casts? + # - rename this function to follow the standard convention + # - rename time to something else so that the module name does not get + # shadowed + # - add docstring to document all behaviour of this function + timeinfo = time.split(" ") + date = [ int(x) for x in timeinfo[0].split("-") ] + my_time = [ int(x) for x in timeinfo[1].split(":") ] + return datetime.datetime(date[0], date[1], date[2], my_time[0], my_time[1], my_time[2], 0, None) + +PUSH_INTERVAL = 2 + +class ShowRecorder(Thread): + + def __init__ (self, show_instance, show_name, filelength, start_time): + Thread.__init__(self) + self.logger = logging.getLogger('recorder') + self.api_client = api_client(self.logger) + self.filelength = filelength + self.start_time = start_time + self.show_instance = show_instance + self.show_name = show_name + self.p = None + + def record_show(self): + length = str(self.filelength) + ".0" + filename = self.start_time + filename = filename.replace(" ", "-") + + if config["record_file_type"] in ["mp3", "ogg"]: + filetype = config["record_file_type"] + else: + filetype = "ogg"; + + joined_path = os.path.join(config["base_recorded_files"], filename) + filepath = "%s.%s" % (joined_path, filetype) + + br = config["record_bitrate"] + sr = config["record_samplerate"] + c = config["record_channels"] + ss = config["record_sample_size"] + + #-f:16,2,44100 + #-b:256 + command = "ecasound -f:%s,%s,%s -i alsa -o %s,%s000 -t:%s" % \ + (ss, c, sr, filepath, br, length) + args = command.split(" ") + + self.logger.info("starting record") + self.logger.info("command " + command) + + self.p = Popen(args,stdout=PIPE) + + #blocks at the following line until the child process + #quits + self.p.wait() + outmsgs = self.p.stdout.readlines() + for msg in outmsgs: + m = re.search('^ERROR',msg) + if not m == None: + self.logger.info('Recording error is found: %s', msg) + self.logger.info("finishing record, return code %s", self.p.returncode) + code = self.p.returncode + + self.p = None + + return code, filepath + + def cancel_recording(self): + #add 3 second delay before actually cancelling the show. The reason + #for this is because it appears that ecasound starts 1 second later than + #it should, and therefore this method is sometimes incorrectly called 1 + #second before the show ends. + #time.sleep(3) + + #send signal interrupt (2) + self.logger.info("Show manually cancelled!") + if (self.p is not None): + self.p.send_signal(signal.SIGINT) + + #if self.p is defined, then the child process ecasound is recording + def is_recording(self): + return (self.p is not None) + + def upload_file(self, filepath): + + filename = os.path.split(filepath)[1] + + # Register the streaming http handlers with urllib2 + register_openers() + + # headers contains the necessary Content-Type and Content-Length + # datagen is a generator object that yields the encoded parameters + datagen, headers = multipart_encode({"file": open(filepath, "rb"), 'name': filename, 'show_instance': self.show_instance}) + + self.api_client.upload_recorded_show(datagen, headers) + + def set_metadata_and_save(self, filepath): + """ + Writes song to 'filepath'. Uses metadata from: + self.start_time, self.show_name, self.show_instance + """ + try: + full_date, full_time = self.start_time.split(" ",1) + # No idea why we translated - to : before + #full_time = full_time.replace(":","-") + self.logger.info("time: %s" % full_time) + artist = "Airtime Show Recorder" + #set some metadata for our file daemon + recorded_file = mutagen.File(filepath, easy = True) + recorded_file['artist'] = artist + recorded_file['date'] = full_date + recorded_file['title'] = "%s-%s-%s" % (self.show_name, + full_date, full_time) + #You cannot pass ints into the metadata of a file. Even tracknumber needs to be a string + recorded_file['tracknumber'] = unicode(self.show_instance) + recorded_file.save() + + except Exception, e: + top = traceback.format_exc() + self.logger.error('Exception: %s', e) + self.logger.error("traceback: %s", top) + + + def run(self): + code, filepath = self.record_show() + + if code == 0: + try: + self.logger.info("Preparing to upload %s" % filepath) + + self.set_metadata_and_save(filepath) + + self.upload_file(filepath) + os.remove(filepath) + except Exception, e: + self.logger.error(e) + else: + self.logger.info("problem recording show") + os.remove(filepath) + +class Recorder(Thread): + def __init__(self, q): + Thread.__init__(self) + self.logger = logging.getLogger('recorder') + self.api_client = api_client(self.logger) + self.sr = None + self.shows_to_record = {} + self.server_timezone = '' + self.queue = q + self.loops = 0 + self.logger.info("RecorderFetch: init complete") + + success = False + while not success: + try: + self.api_client.register_component('show-recorder') + success = True + except Exception, e: + self.logger.error(str(e)) + time.sleep(10) + + def handle_message(self): + if not self.queue.empty(): + message = self.queue.get() + msg = json.loads(message) + command = msg["event_type"] + self.logger.info("Received msg from Pypo Message Handler: %s", msg) + if command == 'cancel_recording': + if self.sr is not None and self.sr.is_recording(): + self.sr.cancel_recording() + else: + self.process_recorder_schedule(msg) + self.loops = 0 + + if self.shows_to_record: + self.start_record() + + def process_recorder_schedule(self, m): + self.logger.info("Parsing recording show schedules...") + temp_shows_to_record = {} + shows = m['shows'] + for show in shows: + show_starts = getDateTimeObj(show[u'starts']) + show_end = getDateTimeObj(show[u'ends']) + time_delta = show_end - show_starts + + temp_shows_to_record[show[u'starts']] = [time_delta, + show[u'instance_id'], show[u'name'], m['server_timezone']] + self.shows_to_record = temp_shows_to_record + + def get_time_till_next_show(self): + if len(self.shows_to_record) != 0: + tnow = datetime.datetime.utcnow() + sorted_show_keys = sorted(self.shows_to_record.keys()) + + start_time = sorted_show_keys[0] + next_show = getDateTimeObj(start_time) + + delta = next_show - tnow + s = '%s.%s' % (delta.seconds, delta.microseconds) + out = float(s) + + if out < 5: + self.logger.debug("Shows %s", self.shows_to_record) + self.logger.debug("Next show %s", next_show) + self.logger.debug("Now %s", tnow) + return out + + def start_record(self): + if len(self.shows_to_record) == 0: return None + try: + delta = self.get_time_till_next_show() + if delta < 5: + self.logger.debug("sleeping %s seconds until show", delta) + time.sleep(delta) + + sorted_show_keys = sorted(self.shows_to_record.keys()) + start_time = sorted_show_keys[0] + show_length = self.shows_to_record[start_time][0] + show_instance = self.shows_to_record[start_time][1] + show_name = self.shows_to_record[start_time][2] + server_timezone = self.shows_to_record[start_time][3] + + T = pytz.timezone(server_timezone) + start_time_on_UTC = getDateTimeObj(start_time) + start_time_on_server = start_time_on_UTC.replace(tzinfo=pytz.utc).astimezone(T) + start_time_formatted = '%(year)d-%(month)02d-%(day)02d %(hour)02d:%(min)02d:%(sec)02d' % \ + {'year': start_time_on_server.year, 'month': start_time_on_server.month, 'day': start_time_on_server.day, \ + 'hour': start_time_on_server.hour, 'min': start_time_on_server.minute, 'sec': start_time_on_server.second} + self.sr = ShowRecorder(show_instance, show_name, show_length.seconds, start_time_formatted) + self.sr.start() + #remove show from shows to record. + del self.shows_to_record[start_time] + #self.time_till_next_show = self.get_time_till_next_show() + except Exception, e : + top = traceback.format_exc() + self.logger.error('Exception: %s', e) + self.logger.error("traceback: %s", top) + + def run(self): + """ + Main loop of the thread: + Wait for schedule updates from RabbitMQ, but in case there arent any, + poll the server to get the upcoming schedule. + """ + try: + self.logger.info("Started...") + # Bootstrap: since we are just starting up, we need to grab the + # most recent schedule. After that we can just wait for updates. + try: + temp = self.api_client.get_shows_to_record() + if temp is not None: + self.process_recorder_schedule(temp) + self.logger.info("Bootstrap recorder schedule received: %s", temp) + except Exception, e: + self.logger.error( traceback.format_exc() ) + self.logger.error(e) + + self.logger.info("Bootstrap complete: got initial copy of the schedule") + + self.loops = 0 + heartbeat_period = math.floor(30 / PUSH_INTERVAL) + + while True: + if self.loops * PUSH_INTERVAL > 3600: + self.loops = 0 + """ + Fetch recorder schedule + """ + try: + temp = self.api_client.get_shows_to_record() + if temp is not None: + self.process_recorder_schedule(temp) + self.logger.info("updated recorder schedule received: %s", temp) + except Exception, e: + self.logger.error( traceback.format_exc() ) + self.logger.error(e) + try: self.handle_message() + except Exception, e: + self.logger.error( traceback.format_exc() ) + self.logger.error('Pypo Recorder Exception: %s', e) + time.sleep(PUSH_INTERVAL) + self.loops += 1 + except Exception, e : + top = traceback.format_exc() + self.logger.error('Exception: %s', e) + self.logger.error("traceback: %s", top) + diff --git a/python_apps/pypo/telnetliquidsoap.py b/python_apps/pypo/telnetliquidsoap.py new file mode 100644 index 000000000..44d97a13f --- /dev/null +++ b/python_apps/pypo/telnetliquidsoap.py @@ -0,0 +1,310 @@ +import telnetlib +from timeout import ls_timeout + +def create_liquidsoap_annotation(media): + # We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade. + return ('annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",' + \ + 'liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",' + \ + 'schedule_table_id="%s",replay_gain="%s dB":%s') % \ + (media['id'], + float(media['fade_in']) / 1000, + float(media['fade_out']) / 1000, + float(media['cue_in']), + float(media['cue_out']), + media['row_id'], + media['replay_gain'], + media['dst']) + +class TelnetLiquidsoap: + + def __init__(self, telnet_lock, logger, ls_host, ls_port, queues): + self.telnet_lock = telnet_lock + self.ls_host = ls_host + self.ls_port = ls_port + self.logger = logger + self.queues = queues + self.current_prebuffering_stream_id = None + + def __connect(self): + return telnetlib.Telnet(self.ls_host, self.ls_port) + + def __is_empty(self, queue_id): + return True + tn = self.__connect() + msg = '%s.queue\nexit\n' % queue_id + tn.write(msg) + output = tn.read_all().splitlines() + if len(output) == 3: + return len(output[0]) == 0 + else: + raise Exception("Unexpected list length returned: %s" % output) + + @ls_timeout + def queue_clear_all(self): + try: + self.telnet_lock.acquire() + tn = self.__connect() + + for i in self.queues: + msg = 'queues.%s_skip\n' % i + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + except Exception: + raise + finally: + self.telnet_lock.release() + + @ls_timeout + def queue_remove(self, queue_id): + try: + self.telnet_lock.acquire() + tn = self.__connect() + + msg = 'queues.%s_skip\n' % queue_id + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + except Exception: + raise + finally: + self.telnet_lock.release() + + + @ls_timeout + def queue_push(self, queue_id, media_item): + try: + self.telnet_lock.acquire() + + if not self.__is_empty(queue_id): + raise QueueNotEmptyException() + + tn = self.__connect() + annotation = create_liquidsoap_annotation(media_item) + msg = '%s.push %s\n' % (queue_id, annotation.encode('utf-8')) + self.logger.debug(msg) + tn.write(msg) + + show_name = media_item['show_name'] + msg = 'vars.show_name %s\n' % show_name.encode('utf-8') + tn.write(msg) + self.logger.debug(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + except Exception: + raise + finally: + self.telnet_lock.release() + + + @ls_timeout + def stop_web_stream_buffer(self): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + #dynamic_source.stop http://87.230.101.24:80/top100station.mp3 + + msg = 'http.stop\n' + self.logger.debug(msg) + tn.write(msg) + + msg = 'dynamic_source.id -1\n' + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + @ls_timeout + def stop_web_stream_output(self): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + #dynamic_source.stop http://87.230.101.24:80/top100station.mp3 + + msg = 'dynamic_source.output_stop\n' + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + @ls_timeout + def start_web_stream(self, media_item): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + + #TODO: DO we need this? + msg = 'streams.scheduled_play_start\n' + tn.write(msg) + + msg = 'dynamic_source.output_start\n' + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + + self.current_prebuffering_stream_id = None + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + @ls_timeout + def start_web_stream_buffer(self, media_item): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + + msg = 'dynamic_source.id %s\n' % media_item['row_id'] + self.logger.debug(msg) + tn.write(msg) + + msg = 'http.restart %s\n' % media_item['uri'].encode('latin-1') + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + self.logger.debug(tn.read_all()) + + self.current_prebuffering_stream_id = media_item['row_id'] + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + @ls_timeout + def get_current_stream_id(self): + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + + msg = 'dynamic_source.get_id\n' + self.logger.debug(msg) + tn.write(msg) + + tn.write("exit\n") + stream_id = tn.read_all().splitlines()[0] + self.logger.debug("stream_id: %s" % stream_id) + + return stream_id + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + @ls_timeout + def disconnect_source(self, sourcename): + self.logger.debug('Disconnecting source: %s', sourcename) + command = "" + if(sourcename == "master_dj"): + command += "master_harbor.kick\n" + elif(sourcename == "live_dj"): + command += "live_dj_harbor.kick\n" + + try: + self.telnet_lock.acquire() + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + self.logger.info(command) + tn.write(command) + tn.write('exit\n') + tn.read_all() + except Exception, e: + self.logger.error(traceback.format_exc()) + finally: + self.telnet_lock.release() + + @ls_timeout + def telnet_send(self, commands): + try: + self.telnet_lock.acquire() + + tn = telnetlib.Telnet(self.ls_host, self.ls_port) + for i in commands: + self.logger.info(i) + tn.write(i) + + tn.write('exit\n') + tn.read_all() + except Exception, e: + self.logger.error(str(e)) + finally: + self.telnet_lock.release() + + + def switch_source(self, sourcename, status): + self.logger.debug('Switching source: %s to "%s" status', sourcename, status) + command = "streams." + if sourcename == "master_dj": + command += "master_dj_" + elif sourcename == "live_dj": + command += "live_dj_" + elif sourcename == "scheduled_play": + command += "scheduled_play_" + + if status == "on": + command += "start\n" + else: + command += "stop\n" + + self.telnet_send([command]) + +class DummyTelnetLiquidsoap: + + def __init__(self, telnet_lock, logger): + self.telnet_lock = telnet_lock + self.liquidsoap_mock_queues = {} + self.logger = logger + + for i in range(4): + self.liquidsoap_mock_queues["s"+str(i)] = [] + + @ls_timeout + def queue_push(self, queue_id, media_item): + try: + self.telnet_lock.acquire() + + self.logger.info("Pushing %s to queue %s" % (media_item, queue_id)) + from datetime import datetime + print "Time now: %s" % datetime.utcnow() + + annotation = create_liquidsoap_annotation(media_item) + self.liquidsoap_mock_queues[queue_id].append(annotation) + except Exception: + raise + finally: + self.telnet_lock.release() + + @ls_timeout + def queue_remove(self, queue_id): + try: + self.telnet_lock.acquire() + + self.logger.info("Purging queue %s" % queue_id) + from datetime import datetime + print "Time now: %s" % datetime.utcnow() + + except Exception: + raise + finally: + self.telnet_lock.release() + +class QueueNotEmptyException(Exception): + pass diff --git a/python_apps/pypo/testpypoliqqueue.py b/python_apps/pypo/testpypoliqqueue.py new file mode 100644 index 000000000..f1847b34f --- /dev/null +++ b/python_apps/pypo/testpypoliqqueue.py @@ -0,0 +1,98 @@ +from pypoliqqueue import PypoLiqQueue +from telnetliquidsoap import DummyTelnetLiquidsoap, TelnetLiquidsoap + + +from Queue import Queue +from threading import Lock + +import sys +import signal +import logging +from datetime import datetime +from datetime import timedelta + +def keyboardInterruptHandler(signum, frame): + logger = logging.getLogger() + logger.info('\nKeyboard Interrupt\n') + sys.exit(0) +signal.signal(signal.SIGINT, keyboardInterruptHandler) + +# configure logging +format = '%(levelname)s - %(pathname)s - %(lineno)s - %(asctime)s - %(message)s' +logging.basicConfig(level=logging.DEBUG, format=format) +logging.captureWarnings(True) + +telnet_lock = Lock() +pypoPush_q = Queue() + + +pypoLiq_q = Queue() +liq_queue_tracker = { + "s0": None, + "s1": None, + "s2": None, + "s3": None, + } + +#dummy_telnet_liquidsoap = DummyTelnetLiquidsoap(telnet_lock, logging) +dummy_telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, logging, \ + "localhost", \ + 1234) + +plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logging, liq_queue_tracker, \ + dummy_telnet_liquidsoap) +plq.daemon = True +plq.start() + + +print "Time now: %s" % datetime.utcnow() + +media_schedule = {} + +start_dt = datetime.utcnow() + timedelta(seconds=1) +end_dt = datetime.utcnow() + timedelta(seconds=6) + +media_schedule[start_dt] = {"id": 5, \ + "type":"file", \ + "row_id":9, \ + "uri":"", \ + "dst":"/home/martin/Music/ipod/Hot Chocolate - You Sexy Thing.mp3", \ + "fade_in":0, \ + "fade_out":0, \ + "cue_in":0, \ + "cue_out":300, \ + "start": start_dt, \ + "end": end_dt, \ + "show_name":"Untitled", \ + "replay_gain": 0, \ + "independent_event": True \ + } + + + +start_dt = datetime.utcnow() + timedelta(seconds=2) +end_dt = datetime.utcnow() + timedelta(seconds=6) + +media_schedule[start_dt] = {"id": 5, \ + "type":"file", \ + "row_id":9, \ + "uri":"", \ + "dst":"/home/martin/Music/ipod/Good Charlotte - bloody valentine.mp3", \ + "fade_in":0, \ + "fade_out":0, \ + "cue_in":0, \ + "cue_out":300, \ + "start": start_dt, \ + "end": end_dt, \ + "show_name":"Untitled", \ + "replay_gain": 0, \ + "independent_event": True \ + } +pypoLiq_q.put(media_schedule) + +plq.join() + + + + +