Merge branch '2.5.x-installer' into saas-installer-albert
Conflicts: .gitignore airtime_mvc/application/Bootstrap.php airtime_mvc/application/configs/conf.php airtime_mvc/application/controllers/SystemstatusController.php airtime_mvc/application/controllers/UpgradeController.php airtime_mvc/application/upgrade/Upgrades.php airtime_mvc/application/views/scripts/systemstatus/index.phtml airtime_mvc/build/airtime.conf airtime_mvc/build/sql/defaultdata.sql airtime_mvc/public/index.php airtime_mvc/tests/application/helpers/AirtimeInstall.php install_minimal/airtime-install install_minimal/include/airtime-constants.php install_minimal/include/airtime-copy-files.sh install_minimal/include/airtime-db-install.php install_minimal/include/airtime-initialize.sh install_minimal/include/airtime-install.php install_minimal/include/airtime-installed-check.php install_minimal/include/airtime-remove-files.sh install_minimal/include/airtime-upgrade.php python_apps/media-monitor/install/media-monitor-copy-files.py python_apps/monit/monit-airtime-generic.cfg python_apps/pypo/airtime-playout python_apps/pypo/install/pypo-copy-files.py python_apps/pypo/liquidsoap/generate_liquidsoap_cfg.py python_apps/pypo/liquidsoap/ls_script.liq python_apps/pypo/pypo/__main__.py python_apps/pypo/pypo/media/update/replaygain.py python_apps/pypo/pypo/media/update/replaygainupdater.py python_apps/pypo/pypo/media/update/silananalyzer.py python_apps/python-virtualenv/airtime_virtual_env.pybundle python_apps/python-virtualenv/requirements utils/airtime-check-system.php
This commit is contained in:
commit
11c6818e61
335 changed files with 5114 additions and 10061 deletions
10
python_apps/pypo/pypo/AUTHORS
Normal file
10
python_apps/pypo/pypo/AUTHORS
Normal file
|
@ -0,0 +1,10 @@
|
|||
This tool was born out of a collaboration between Open Broadcast
|
||||
and Sourcefabric. The authors of the code are:
|
||||
|
||||
Original Authors:
|
||||
Jonas Ohrstrom <jonas@digris.ch>
|
||||
Paul Baranowski <paul.baranowski@sourcefabric.org>
|
||||
James Moon <james.moon@sourcefabric.org>
|
||||
|
||||
|
||||
Almost a complete refactor/rewrite by: Martin Konecny <martin.konecny@gmail.com>
|
675
python_apps/pypo/pypo/LICENSE
Normal file
675
python_apps/pypo/pypo/LICENSE
Normal file
|
@ -0,0 +1,675 @@
|
|||
GNU GENERAL PUBLIC LICENSE
|
||||
Version 3, 29 June 2007
|
||||
|
||||
Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
|
||||
Everyone is permitted to copy and distribute verbatim copies
|
||||
of this license document, but changing it is not allowed.
|
||||
|
||||
Preamble
|
||||
|
||||
The GNU General Public License is a free, copyleft license for
|
||||
software and other kinds of works.
|
||||
|
||||
The licenses for most software and other practical works are designed
|
||||
to take away your freedom to share and change the works. By contrast,
|
||||
the GNU General Public License is intended to guarantee your freedom to
|
||||
share and change all versions of a program--to make sure it remains free
|
||||
software for all its users. We, the Free Software Foundation, use the
|
||||
GNU General Public License for most of our software; it applies also to
|
||||
any other work released this way by its authors. You can apply it to
|
||||
your programs, too.
|
||||
|
||||
When we speak of free software, we are referring to freedom, not
|
||||
price. Our General Public Licenses are designed to make sure that you
|
||||
have the freedom to distribute copies of free software (and charge for
|
||||
them if you wish), that you receive source code or can get it if you
|
||||
want it, that you can change the software or use pieces of it in new
|
||||
free programs, and that you know you can do these things.
|
||||
|
||||
To protect your rights, we need to prevent others from denying you
|
||||
these rights or asking you to surrender the rights. Therefore, you have
|
||||
certain responsibilities if you distribute copies of the software, or if
|
||||
you modify it: responsibilities to respect the freedom of others.
|
||||
|
||||
For example, if you distribute copies of such a program, whether
|
||||
gratis or for a fee, you must pass on to the recipients the same
|
||||
freedoms that you received. You must make sure that they, too, receive
|
||||
or can get the source code. And you must show them these terms so they
|
||||
know their rights.
|
||||
|
||||
Developers that use the GNU GPL protect your rights with two steps:
|
||||
(1) assert copyright on the software, and (2) offer you this License
|
||||
giving you legal permission to copy, distribute and/or modify it.
|
||||
|
||||
For the developers' and authors' protection, the GPL clearly explains
|
||||
that there is no warranty for this free software. For both users' and
|
||||
authors' sake, the GPL requires that modified versions be marked as
|
||||
changed, so that their problems will not be attributed erroneously to
|
||||
authors of previous versions.
|
||||
|
||||
Some devices are designed to deny users access to install or run
|
||||
modified versions of the software inside them, although the manufacturer
|
||||
can do so. This is fundamentally incompatible with the aim of
|
||||
protecting users' freedom to change the software. The systematic
|
||||
pattern of such abuse occurs in the area of products for individuals to
|
||||
use, which is precisely where it is most unacceptable. Therefore, we
|
||||
have designed this version of the GPL to prohibit the practice for those
|
||||
products. If such problems arise substantially in other domains, we
|
||||
stand ready to extend this provision to those domains in future versions
|
||||
of the GPL, as needed to protect the freedom of users.
|
||||
|
||||
Finally, every program is threatened constantly by software patents.
|
||||
States should not allow patents to restrict development and use of
|
||||
software on general-purpose computers, but in those that do, we wish to
|
||||
avoid the special danger that patents applied to a free program could
|
||||
make it effectively proprietary. To prevent this, the GPL assures that
|
||||
patents cannot be used to render the program non-free.
|
||||
|
||||
The precise terms and conditions for copying, distribution and
|
||||
modification follow.
|
||||
|
||||
TERMS AND CONDITIONS
|
||||
|
||||
0. Definitions.
|
||||
|
||||
"This License" refers to version 3 of the GNU General Public License.
|
||||
|
||||
"Copyright" also means copyright-like laws that apply to other kinds of
|
||||
works, such as semiconductor masks.
|
||||
|
||||
"The Program" refers to any copyrightable work licensed under this
|
||||
License. Each licensee is addressed as "you". "Licensees" and
|
||||
"recipients" may be individuals or organizations.
|
||||
|
||||
To "modify" a work means to copy from or adapt all or part of the work
|
||||
in a fashion requiring copyright permission, other than the making of an
|
||||
exact copy. The resulting work is called a "modified version" of the
|
||||
earlier work or a work "based on" the earlier work.
|
||||
|
||||
A "covered work" means either the unmodified Program or a work based
|
||||
on the Program.
|
||||
|
||||
To "propagate" a work means to do anything with it that, without
|
||||
permission, would make you directly or secondarily liable for
|
||||
infringement under applicable copyright law, except executing it on a
|
||||
computer or modifying a private copy. Propagation includes copying,
|
||||
distribution (with or without modification), making available to the
|
||||
public, and in some countries other activities as well.
|
||||
|
||||
To "convey" a work means any kind of propagation that enables other
|
||||
parties to make or receive copies. Mere interaction with a user through
|
||||
a computer network, with no transfer of a copy, is not conveying.
|
||||
|
||||
An interactive user interface displays "Appropriate Legal Notices"
|
||||
to the extent that it includes a convenient and prominently visible
|
||||
feature that (1) displays an appropriate copyright notice, and (2)
|
||||
tells the user that there is no warranty for the work (except to the
|
||||
extent that warranties are provided), that licensees may convey the
|
||||
work under this License, and how to view a copy of this License. If
|
||||
the interface presents a list of user commands or options, such as a
|
||||
menu, a prominent item in the list meets this criterion.
|
||||
|
||||
1. Source Code.
|
||||
|
||||
The "source code" for a work means the preferred form of the work
|
||||
for making modifications to it. "Object code" means any non-source
|
||||
form of a work.
|
||||
|
||||
A "Standard Interface" means an interface that either is an official
|
||||
standard defined by a recognized standards body, or, in the case of
|
||||
interfaces specified for a particular programming language, one that
|
||||
is widely used among developers working in that language.
|
||||
|
||||
The "System Libraries" of an executable work include anything, other
|
||||
than the work as a whole, that (a) is included in the normal form of
|
||||
packaging a Major Component, but which is not part of that Major
|
||||
Component, and (b) serves only to enable use of the work with that
|
||||
Major Component, or to implement a Standard Interface for which an
|
||||
implementation is available to the public in source code form. A
|
||||
"Major Component", in this context, means a major essential component
|
||||
(kernel, window system, and so on) of the specific operating system
|
||||
(if any) on which the executable work runs, or a compiler used to
|
||||
produce the work, or an object code interpreter used to run it.
|
||||
|
||||
The "Corresponding Source" for a work in object code form means all
|
||||
the source code needed to generate, install, and (for an executable
|
||||
work) run the object code and to modify the work, including scripts to
|
||||
control those activities. However, it does not include the work's
|
||||
System Libraries, or general-purpose tools or generally available free
|
||||
programs which are used unmodified in performing those activities but
|
||||
which are not part of the work. For example, Corresponding Source
|
||||
includes interface definition files associated with source files for
|
||||
the work, and the source code for shared libraries and dynamically
|
||||
linked subprograms that the work is specifically designed to require,
|
||||
such as by intimate data communication or control flow between those
|
||||
subprograms and other parts of the work.
|
||||
|
||||
The Corresponding Source need not include anything that users
|
||||
can regenerate automatically from other parts of the Corresponding
|
||||
Source.
|
||||
|
||||
The Corresponding Source for a work in source code form is that
|
||||
same work.
|
||||
|
||||
2. Basic Permissions.
|
||||
|
||||
All rights granted under this License are granted for the term of
|
||||
copyright on the Program, and are irrevocable provided the stated
|
||||
conditions are met. This License explicitly affirms your unlimited
|
||||
permission to run the unmodified Program. The output from running a
|
||||
covered work is covered by this License only if the output, given its
|
||||
content, constitutes a covered work. This License acknowledges your
|
||||
rights of fair use or other equivalent, as provided by copyright law.
|
||||
|
||||
You may make, run and propagate covered works that you do not
|
||||
convey, without conditions so long as your license otherwise remains
|
||||
in force. You may convey covered works to others for the sole purpose
|
||||
of having them make modifications exclusively for you, or provide you
|
||||
with facilities for running those works, provided that you comply with
|
||||
the terms of this License in conveying all material for which you do
|
||||
not control copyright. Those thus making or running the covered works
|
||||
for you must do so exclusively on your behalf, under your direction
|
||||
and control, on terms that prohibit them from making any copies of
|
||||
your copyrighted material outside their relationship with you.
|
||||
|
||||
Conveying under any other circumstances is permitted solely under
|
||||
the conditions stated below. Sublicensing is not allowed; section 10
|
||||
makes it unnecessary.
|
||||
|
||||
3. Protecting Users' Legal Rights From Anti-Circumvention Law.
|
||||
|
||||
No covered work shall be deemed part of an effective technological
|
||||
measure under any applicable law fulfilling obligations under article
|
||||
11 of the WIPO copyright treaty adopted on 20 December 1996, or
|
||||
similar laws prohibiting or restricting circumvention of such
|
||||
measures.
|
||||
|
||||
When you convey a covered work, you waive any legal power to forbid
|
||||
circumvention of technological measures to the extent such circumvention
|
||||
is effected by exercising rights under this License with respect to
|
||||
the covered work, and you disclaim any intention to limit operation or
|
||||
modification of the work as a means of enforcing, against the work's
|
||||
users, your or third parties' legal rights to forbid circumvention of
|
||||
technological measures.
|
||||
|
||||
4. Conveying Verbatim Copies.
|
||||
|
||||
You may convey verbatim copies of the Program's source code as you
|
||||
receive it, in any medium, provided that you conspicuously and
|
||||
appropriately publish on each copy an appropriate copyright notice;
|
||||
keep intact all notices stating that this License and any
|
||||
non-permissive terms added in accord with section 7 apply to the code;
|
||||
keep intact all notices of the absence of any warranty; and give all
|
||||
recipients a copy of this License along with the Program.
|
||||
|
||||
You may charge any price or no price for each copy that you convey,
|
||||
and you may offer support or warranty protection for a fee.
|
||||
|
||||
5. Conveying Modified Source Versions.
|
||||
|
||||
You may convey a work based on the Program, or the modifications to
|
||||
produce it from the Program, in the form of source code under the
|
||||
terms of section 4, provided that you also meet all of these conditions:
|
||||
|
||||
a) The work must carry prominent notices stating that you modified
|
||||
it, and giving a relevant date.
|
||||
|
||||
b) The work must carry prominent notices stating that it is
|
||||
released under this License and any conditions added under section
|
||||
7. This requirement modifies the requirement in section 4 to
|
||||
"keep intact all notices".
|
||||
|
||||
c) You must license the entire work, as a whole, under this
|
||||
License to anyone who comes into possession of a copy. This
|
||||
License will therefore apply, along with any applicable section 7
|
||||
additional terms, to the whole of the work, and all its parts,
|
||||
regardless of how they are packaged. This License gives no
|
||||
permission to license the work in any other way, but it does not
|
||||
invalidate such permission if you have separately received it.
|
||||
|
||||
d) If the work has interactive user interfaces, each must display
|
||||
Appropriate Legal Notices; however, if the Program has interactive
|
||||
interfaces that do not display Appropriate Legal Notices, your
|
||||
work need not make them do so.
|
||||
|
||||
A compilation of a covered work with other separate and independent
|
||||
works, which are not by their nature extensions of the covered work,
|
||||
and which are not combined with it such as to form a larger program,
|
||||
in or on a volume of a storage or distribution medium, is called an
|
||||
"aggregate" if the compilation and its resulting copyright are not
|
||||
used to limit the access or legal rights of the compilation's users
|
||||
beyond what the individual works permit. Inclusion of a covered work
|
||||
in an aggregate does not cause this License to apply to the other
|
||||
parts of the aggregate.
|
||||
|
||||
6. Conveying Non-Source Forms.
|
||||
|
||||
You may convey a covered work in object code form under the terms
|
||||
of sections 4 and 5, provided that you also convey the
|
||||
machine-readable Corresponding Source under the terms of this License,
|
||||
in one of these ways:
|
||||
|
||||
a) Convey the object code in, or embodied in, a physical product
|
||||
(including a physical distribution medium), accompanied by the
|
||||
Corresponding Source fixed on a durable physical medium
|
||||
customarily used for software interchange.
|
||||
|
||||
b) Convey the object code in, or embodied in, a physical product
|
||||
(including a physical distribution medium), accompanied by a
|
||||
written offer, valid for at least three years and valid for as
|
||||
long as you offer spare parts or customer support for that product
|
||||
model, to give anyone who possesses the object code either (1) a
|
||||
copy of the Corresponding Source for all the software in the
|
||||
product that is covered by this License, on a durable physical
|
||||
medium customarily used for software interchange, for a price no
|
||||
more than your reasonable cost of physically performing this
|
||||
conveying of source, or (2) access to copy the
|
||||
Corresponding Source from a network server at no charge.
|
||||
|
||||
c) Convey individual copies of the object code with a copy of the
|
||||
written offer to provide the Corresponding Source. This
|
||||
alternative is allowed only occasionally and noncommercially, and
|
||||
only if you received the object code with such an offer, in accord
|
||||
with subsection 6b.
|
||||
|
||||
d) Convey the object code by offering access from a designated
|
||||
place (gratis or for a charge), and offer equivalent access to the
|
||||
Corresponding Source in the same way through the same place at no
|
||||
further charge. You need not require recipients to copy the
|
||||
Corresponding Source along with the object code. If the place to
|
||||
copy the object code is a network server, the Corresponding Source
|
||||
may be on a different server (operated by you or a third party)
|
||||
that supports equivalent copying facilities, provided you maintain
|
||||
clear directions next to the object code saying where to find the
|
||||
Corresponding Source. Regardless of what server hosts the
|
||||
Corresponding Source, you remain obligated to ensure that it is
|
||||
available for as long as needed to satisfy these requirements.
|
||||
|
||||
e) Convey the object code using peer-to-peer transmission, provided
|
||||
you inform other peers where the object code and Corresponding
|
||||
Source of the work are being offered to the general public at no
|
||||
charge under subsection 6d.
|
||||
|
||||
A separable portion of the object code, whose source code is excluded
|
||||
from the Corresponding Source as a System Library, need not be
|
||||
included in conveying the object code work.
|
||||
|
||||
A "User Product" is either (1) a "consumer product", which means any
|
||||
tangible personal property which is normally used for personal, family,
|
||||
or household purposes, or (2) anything designed or sold for incorporation
|
||||
into a dwelling. In determining whether a product is a consumer product,
|
||||
doubtful cases shall be resolved in favor of coverage. For a particular
|
||||
product received by a particular user, "normally used" refers to a
|
||||
typical or common use of that class of product, regardless of the status
|
||||
of the particular user or of the way in which the particular user
|
||||
actually uses, or expects or is expected to use, the product. A product
|
||||
is a consumer product regardless of whether the product has substantial
|
||||
commercial, industrial or non-consumer uses, unless such uses represent
|
||||
the only significant mode of use of the product.
|
||||
|
||||
"Installation Information" for a User Product means any methods,
|
||||
procedures, authorization keys, or other information required to install
|
||||
and execute modified versions of a covered work in that User Product from
|
||||
a modified version of its Corresponding Source. The information must
|
||||
suffice to ensure that the continued functioning of the modified object
|
||||
code is in no case prevented or interfered with solely because
|
||||
modification has been made.
|
||||
|
||||
If you convey an object code work under this section in, or with, or
|
||||
specifically for use in, a User Product, and the conveying occurs as
|
||||
part of a transaction in which the right of possession and use of the
|
||||
User Product is transferred to the recipient in perpetuity or for a
|
||||
fixed term (regardless of how the transaction is characterized), the
|
||||
Corresponding Source conveyed under this section must be accompanied
|
||||
by the Installation Information. But this requirement does not apply
|
||||
if neither you nor any third party retains the ability to install
|
||||
modified object code on the User Product (for example, the work has
|
||||
been installed in ROM).
|
||||
|
||||
The requirement to provide Installation Information does not include a
|
||||
requirement to continue to provide support service, warranty, or updates
|
||||
for a work that has been modified or installed by the recipient, or for
|
||||
the User Product in which it has been modified or installed. Access to a
|
||||
network may be denied when the modification itself materially and
|
||||
adversely affects the operation of the network or violates the rules and
|
||||
protocols for communication across the network.
|
||||
|
||||
Corresponding Source conveyed, and Installation Information provided,
|
||||
in accord with this section must be in a format that is publicly
|
||||
documented (and with an implementation available to the public in
|
||||
source code form), and must require no special password or key for
|
||||
unpacking, reading or copying.
|
||||
|
||||
7. Additional Terms.
|
||||
|
||||
"Additional permissions" are terms that supplement the terms of this
|
||||
License by making exceptions from one or more of its conditions.
|
||||
Additional permissions that are applicable to the entire Program shall
|
||||
be treated as though they were included in this License, to the extent
|
||||
that they are valid under applicable law. If additional permissions
|
||||
apply only to part of the Program, that part may be used separately
|
||||
under those permissions, but the entire Program remains governed by
|
||||
this License without regard to the additional permissions.
|
||||
|
||||
When you convey a copy of a covered work, you may at your option
|
||||
remove any additional permissions from that copy, or from any part of
|
||||
it. (Additional permissions may be written to require their own
|
||||
removal in certain cases when you modify the work.) You may place
|
||||
additional permissions on material, added by you to a covered work,
|
||||
for which you have or can give appropriate copyright permission.
|
||||
|
||||
Notwithstanding any other provision of this License, for material you
|
||||
add to a covered work, you may (if authorized by the copyright holders of
|
||||
that material) supplement the terms of this License with terms:
|
||||
|
||||
a) Disclaiming warranty or limiting liability differently from the
|
||||
terms of sections 15 and 16 of this License; or
|
||||
|
||||
b) Requiring preservation of specified reasonable legal notices or
|
||||
author attributions in that material or in the Appropriate Legal
|
||||
Notices displayed by works containing it; or
|
||||
|
||||
c) Prohibiting misrepresentation of the origin of that material, or
|
||||
requiring that modified versions of such material be marked in
|
||||
reasonable ways as different from the original version; or
|
||||
|
||||
d) Limiting the use for publicity purposes of names of licensors or
|
||||
authors of the material; or
|
||||
|
||||
e) Declining to grant rights under trademark law for use of some
|
||||
trade names, trademarks, or service marks; or
|
||||
|
||||
f) Requiring indemnification of licensors and authors of that
|
||||
material by anyone who conveys the material (or modified versions of
|
||||
it) with contractual assumptions of liability to the recipient, for
|
||||
any liability that these contractual assumptions directly impose on
|
||||
those licensors and authors.
|
||||
|
||||
All other non-permissive additional terms are considered "further
|
||||
restrictions" within the meaning of section 10. If the Program as you
|
||||
received it, or any part of it, contains a notice stating that it is
|
||||
governed by this License along with a term that is a further
|
||||
restriction, you may remove that term. If a license document contains
|
||||
a further restriction but permits relicensing or conveying under this
|
||||
License, you may add to a covered work material governed by the terms
|
||||
of that license document, provided that the further restriction does
|
||||
not survive such relicensing or conveying.
|
||||
|
||||
If you add terms to a covered work in accord with this section, you
|
||||
must place, in the relevant source files, a statement of the
|
||||
additional terms that apply to those files, or a notice indicating
|
||||
where to find the applicable terms.
|
||||
|
||||
Additional terms, permissive or non-permissive, may be stated in the
|
||||
form of a separately written license, or stated as exceptions;
|
||||
the above requirements apply either way.
|
||||
|
||||
8. Termination.
|
||||
|
||||
You may not propagate or modify a covered work except as expressly
|
||||
provided under this License. Any attempt otherwise to propagate or
|
||||
modify it is void, and will automatically terminate your rights under
|
||||
this License (including any patent licenses granted under the third
|
||||
paragraph of section 11).
|
||||
|
||||
However, if you cease all violation of this License, then your
|
||||
license from a particular copyright holder is reinstated (a)
|
||||
provisionally, unless and until the copyright holder explicitly and
|
||||
finally terminates your license, and (b) permanently, if the copyright
|
||||
holder fails to notify you of the violation by some reasonable means
|
||||
prior to 60 days after the cessation.
|
||||
|
||||
Moreover, your license from a particular copyright holder is
|
||||
reinstated permanently if the copyright holder notifies you of the
|
||||
violation by some reasonable means, this is the first time you have
|
||||
received notice of violation of this License (for any work) from that
|
||||
copyright holder, and you cure the violation prior to 30 days after
|
||||
your receipt of the notice.
|
||||
|
||||
Termination of your rights under this section does not terminate the
|
||||
licenses of parties who have received copies or rights from you under
|
||||
this License. If your rights have been terminated and not permanently
|
||||
reinstated, you do not qualify to receive new licenses for the same
|
||||
material under section 10.
|
||||
|
||||
9. Acceptance Not Required for Having Copies.
|
||||
|
||||
You are not required to accept this License in order to receive or
|
||||
run a copy of the Program. Ancillary propagation of a covered work
|
||||
occurring solely as a consequence of using peer-to-peer transmission
|
||||
to receive a copy likewise does not require acceptance. However,
|
||||
nothing other than this License grants you permission to propagate or
|
||||
modify any covered work. These actions infringe copyright if you do
|
||||
not accept this License. Therefore, by modifying or propagating a
|
||||
covered work, you indicate your acceptance of this License to do so.
|
||||
|
||||
10. Automatic Licensing of Downstream Recipients.
|
||||
|
||||
Each time you convey a covered work, the recipient automatically
|
||||
receives a license from the original licensors, to run, modify and
|
||||
propagate that work, subject to this License. You are not responsible
|
||||
for enforcing compliance by third parties with this License.
|
||||
|
||||
An "entity transaction" is a transaction transferring control of an
|
||||
organization, or substantially all assets of one, or subdividing an
|
||||
organization, or merging organizations. If propagation of a covered
|
||||
work results from an entity transaction, each party to that
|
||||
transaction who receives a copy of the work also receives whatever
|
||||
licenses to the work the party's predecessor in interest had or could
|
||||
give under the previous paragraph, plus a right to possession of the
|
||||
Corresponding Source of the work from the predecessor in interest, if
|
||||
the predecessor has it or can get it with reasonable efforts.
|
||||
|
||||
You may not impose any further restrictions on the exercise of the
|
||||
rights granted or affirmed under this License. For example, you may
|
||||
not impose a license fee, royalty, or other charge for exercise of
|
||||
rights granted under this License, and you may not initiate litigation
|
||||
(including a cross-claim or counterclaim in a lawsuit) alleging that
|
||||
any patent claim is infringed by making, using, selling, offering for
|
||||
sale, or importing the Program or any portion of it.
|
||||
|
||||
11. Patents.
|
||||
|
||||
A "contributor" is a copyright holder who authorizes use under this
|
||||
License of the Program or a work on which the Program is based. The
|
||||
work thus licensed is called the contributor's "contributor version".
|
||||
|
||||
A contributor's "essential patent claims" are all patent claims
|
||||
owned or controlled by the contributor, whether already acquired or
|
||||
hereafter acquired, that would be infringed by some manner, permitted
|
||||
by this License, of making, using, or selling its contributor version,
|
||||
but do not include claims that would be infringed only as a
|
||||
consequence of further modification of the contributor version. For
|
||||
purposes of this definition, "control" includes the right to grant
|
||||
patent sublicenses in a manner consistent with the requirements of
|
||||
this License.
|
||||
|
||||
Each contributor grants you a non-exclusive, worldwide, royalty-free
|
||||
patent license under the contributor's essential patent claims, to
|
||||
make, use, sell, offer for sale, import and otherwise run, modify and
|
||||
propagate the contents of its contributor version.
|
||||
|
||||
In the following three paragraphs, a "patent license" is any express
|
||||
agreement or commitment, however denominated, not to enforce a patent
|
||||
(such as an express permission to practice a patent or covenant not to
|
||||
sue for patent infringement). To "grant" such a patent license to a
|
||||
party means to make such an agreement or commitment not to enforce a
|
||||
patent against the party.
|
||||
|
||||
If you convey a covered work, knowingly relying on a patent license,
|
||||
and the Corresponding Source of the work is not available for anyone
|
||||
to copy, free of charge and under the terms of this License, through a
|
||||
publicly available network server or other readily accessible means,
|
||||
then you must either (1) cause the Corresponding Source to be so
|
||||
available, or (2) arrange to deprive yourself of the benefit of the
|
||||
patent license for this particular work, or (3) arrange, in a manner
|
||||
consistent with the requirements of this License, to extend the patent
|
||||
license to downstream recipients. "Knowingly relying" means you have
|
||||
actual knowledge that, but for the patent license, your conveying the
|
||||
covered work in a country, or your recipient's use of the covered work
|
||||
in a country, would infringe one or more identifiable patents in that
|
||||
country that you have reason to believe are valid.
|
||||
|
||||
If, pursuant to or in connection with a single transaction or
|
||||
arrangement, you convey, or propagate by procuring conveyance of, a
|
||||
covered work, and grant a patent license to some of the parties
|
||||
receiving the covered work authorizing them to use, propagate, modify
|
||||
or convey a specific copy of the covered work, then the patent license
|
||||
you grant is automatically extended to all recipients of the covered
|
||||
work and works based on it.
|
||||
|
||||
A patent license is "discriminatory" if it does not include within
|
||||
the scope of its coverage, prohibits the exercise of, or is
|
||||
conditioned on the non-exercise of one or more of the rights that are
|
||||
specifically granted under this License. You may not convey a covered
|
||||
work if you are a party to an arrangement with a third party that is
|
||||
in the business of distributing software, under which you make payment
|
||||
to the third party based on the extent of your activity of conveying
|
||||
the work, and under which the third party grants, to any of the
|
||||
parties who would receive the covered work from you, a discriminatory
|
||||
patent license (a) in connection with copies of the covered work
|
||||
conveyed by you (or copies made from those copies), or (b) primarily
|
||||
for and in connection with specific products or compilations that
|
||||
contain the covered work, unless you entered into that arrangement,
|
||||
or that patent license was granted, prior to 28 March 2007.
|
||||
|
||||
Nothing in this License shall be construed as excluding or limiting
|
||||
any implied license or other defenses to infringement that may
|
||||
otherwise be available to you under applicable patent law.
|
||||
|
||||
12. No Surrender of Others' Freedom.
|
||||
|
||||
If conditions are imposed on you (whether by court order, agreement or
|
||||
otherwise) that contradict the conditions of this License, they do not
|
||||
excuse you from the conditions of this License. If you cannot convey a
|
||||
covered work so as to satisfy simultaneously your obligations under this
|
||||
License and any other pertinent obligations, then as a consequence you may
|
||||
not convey it at all. For example, if you agree to terms that obligate you
|
||||
to collect a royalty for further conveying from those to whom you convey
|
||||
the Program, the only way you could satisfy both those terms and this
|
||||
License would be to refrain entirely from conveying the Program.
|
||||
|
||||
13. Use with the GNU Affero General Public License.
|
||||
|
||||
Notwithstanding any other provision of this License, you have
|
||||
permission to link or combine any covered work with a work licensed
|
||||
under version 3 of the GNU Affero General Public License into a single
|
||||
combined work, and to convey the resulting work. The terms of this
|
||||
License will continue to apply to the part which is the covered work,
|
||||
but the special requirements of the GNU Affero General Public License,
|
||||
section 13, concerning interaction through a network will apply to the
|
||||
combination as such.
|
||||
|
||||
14. Revised Versions of this License.
|
||||
|
||||
The Free Software Foundation may publish revised and/or new versions of
|
||||
the GNU General Public License from time to time. Such new versions will
|
||||
be similar in spirit to the present version, but may differ in detail to
|
||||
address new problems or concerns.
|
||||
|
||||
Each version is given a distinguishing version number. If the
|
||||
Program specifies that a certain numbered version of the GNU General
|
||||
Public License "or any later version" applies to it, you have the
|
||||
option of following the terms and conditions either of that numbered
|
||||
version or of any later version published by the Free Software
|
||||
Foundation. If the Program does not specify a version number of the
|
||||
GNU General Public License, you may choose any version ever published
|
||||
by the Free Software Foundation.
|
||||
|
||||
If the Program specifies that a proxy can decide which future
|
||||
versions of the GNU General Public License can be used, that proxy's
|
||||
public statement of acceptance of a version permanently authorizes you
|
||||
to choose that version for the Program.
|
||||
|
||||
Later license versions may give you additional or different
|
||||
permissions. However, no additional obligations are imposed on any
|
||||
author or copyright holder as a result of your choosing to follow a
|
||||
later version.
|
||||
|
||||
15. Disclaimer of Warranty.
|
||||
|
||||
THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY
|
||||
APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT
|
||||
HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY
|
||||
OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO,
|
||||
THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
|
||||
PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM
|
||||
IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF
|
||||
ALL NECESSARY SERVICING, REPAIR OR CORRECTION.
|
||||
|
||||
16. Limitation of Liability.
|
||||
|
||||
IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
|
||||
WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS
|
||||
THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY
|
||||
GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE
|
||||
USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF
|
||||
DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD
|
||||
PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS),
|
||||
EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
|
||||
SUCH DAMAGES.
|
||||
|
||||
17. Interpretation of Sections 15 and 16.
|
||||
|
||||
If the disclaimer of warranty and limitation of liability provided
|
||||
above cannot be given local legal effect according to their terms,
|
||||
reviewing courts shall apply local law that most closely approximates
|
||||
an absolute waiver of all civil liability in connection with the
|
||||
Program, unless a warranty or assumption of liability accompanies a
|
||||
copy of the Program in return for a fee.
|
||||
|
||||
END OF TERMS AND CONDITIONS
|
||||
|
||||
How to Apply These Terms to Your New Programs
|
||||
|
||||
If you develop a new program, and you want it to be of the greatest
|
||||
possible use to the public, the best way to achieve this is to make it
|
||||
free software which everyone can redistribute and change under these terms.
|
||||
|
||||
To do so, attach the following notices to the program. It is safest
|
||||
to attach them to the start of each source file to most effectively
|
||||
state the exclusion of warranty; and each file should have at least
|
||||
the "copyright" line and a pointer to where the full notice is found.
|
||||
|
||||
<one line to give the program's name and a brief idea of what it does.>
|
||||
Copyright (C) <year> <name of author>
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
Also add information on how to contact you by electronic and paper mail.
|
||||
|
||||
If the program does terminal interaction, make it output a short
|
||||
notice like this when it starts in an interactive mode:
|
||||
|
||||
<program> Copyright (C) <year> <name of author>
|
||||
This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
|
||||
This is free software, and you are welcome to redistribute it
|
||||
under certain conditions; type `show c' for details.
|
||||
|
||||
The hypothetical commands `show w' and `show c' should show the appropriate
|
||||
parts of the General Public License. Of course, your program's commands
|
||||
might be different; for a GUI interface, you would use an "about box".
|
||||
|
||||
You should also get your employer (if you work as a programmer) or school,
|
||||
if any, to sign a "copyright disclaimer" for the program, if necessary.
|
||||
For more information on this, and how to apply and follow the GNU GPL, see
|
||||
<http://www.gnu.org/licenses/>.
|
||||
|
||||
The GNU General Public License does not permit incorporating your program
|
||||
into proprietary programs. If your program is a subroutine library, you
|
||||
may consider it more useful to permit linking proprietary applications with
|
||||
the library. If this is what you want to do, use the GNU Lesser General
|
||||
Public License instead of this License. But first, please read
|
||||
<http://www.gnu.org/philosophy/why-not-lgpl.html>.
|
||||
|
0
python_apps/pypo/pypo/__init__.py
Normal file
0
python_apps/pypo/pypo/__init__.py
Normal file
299
python_apps/pypo/pypo/__main__.py
Normal file
299
python_apps/pypo/pypo/__main__.py
Normal file
|
@ -0,0 +1,299 @@
|
|||
"""
|
||||
Python part of radio playout (pypo)
|
||||
"""
|
||||
|
||||
from optparse import OptionParser
|
||||
from datetime import datetime
|
||||
|
||||
import telnetlib
|
||||
|
||||
import time
|
||||
import sys
|
||||
import signal
|
||||
import logging
|
||||
import locale
|
||||
import os
|
||||
import re
|
||||
|
||||
from Queue import Queue
|
||||
from threading import Lock
|
||||
|
||||
from pypopush import PypoPush
|
||||
from pypofetch import PypoFetch
|
||||
from pypofile import PypoFile
|
||||
from recorder import Recorder
|
||||
from listenerstat import ListenerStat
|
||||
from pypomessagehandler import PypoMessageHandler
|
||||
from pypoliquidsoap import PypoLiquidsoap
|
||||
from timeout import ls_timeout
|
||||
|
||||
from configobj import ConfigObj
|
||||
|
||||
# custom imports
|
||||
from api_clients import api_client
|
||||
from std_err_override import LogWriter
|
||||
import pure
|
||||
|
||||
# Set up command-line options
|
||||
parser = OptionParser()
|
||||
|
||||
# help screen / info
|
||||
usage = "%prog [options]" + " - python playout system"
|
||||
parser = OptionParser(usage=usage)
|
||||
|
||||
# Options
|
||||
parser.add_option("-v", "--compat",
|
||||
help="Check compatibility with server API version",
|
||||
default=False,
|
||||
action="store_true",
|
||||
dest="check_compat")
|
||||
|
||||
parser.add_option("-t", "--test",
|
||||
help="Do a test to make sure everything is working properly.",
|
||||
default=False,
|
||||
action="store_true",
|
||||
dest="test")
|
||||
|
||||
parser.add_option("-b",
|
||||
"--cleanup",
|
||||
help="Cleanup",
|
||||
default=False,
|
||||
action="store_true",
|
||||
dest="cleanup")
|
||||
|
||||
parser.add_option("-c",
|
||||
"--check",
|
||||
help="Check the cached schedule and exit",
|
||||
default=False,
|
||||
action="store_true",
|
||||
dest="check")
|
||||
|
||||
# parse options
|
||||
(options, args) = parser.parse_args()
|
||||
|
||||
LIQUIDSOAP_MIN_VERSION = "1.1.1"
|
||||
|
||||
PYPO_HOME='/var/tmp/airtime/pypo/'
|
||||
|
||||
def configure_environment():
|
||||
os.environ["HOME"] = PYPO_HOME
|
||||
os.environ["TERM"] = 'xterm'
|
||||
|
||||
configure_environment()
|
||||
|
||||
# need to wait for Python 2.7 for this..
|
||||
logging.captureWarnings(True)
|
||||
|
||||
# configure logging
|
||||
try:
|
||||
logging.config.fileConfig("/etc/airtime/pypo_logging.cfg")
|
||||
logger = logging.getLogger()
|
||||
LogWriter.override_std_err(logger)
|
||||
except Exception, e:
|
||||
print "Couldn't configure logging", e
|
||||
sys.exit(1)
|
||||
|
||||
def configure_locale():
|
||||
"""
|
||||
Silly hacks to force Python 2.x to run in UTF-8 mode. Not portable at all,
|
||||
however serves our purpose at the moment.
|
||||
|
||||
More information available here:
|
||||
http://stackoverflow.com/questions/3828723/why-we-need-sys-setdefaultencodingutf-8-in-a-py-script
|
||||
"""
|
||||
logger.debug("Before %s", locale.nl_langinfo(locale.CODESET))
|
||||
current_locale = locale.getlocale()
|
||||
|
||||
if current_locale[1] is None:
|
||||
logger.debug("No locale currently set. Attempting to get default locale.")
|
||||
default_locale = locale.getdefaultlocale()
|
||||
|
||||
if default_locale[1] is None:
|
||||
logger.debug("No default locale exists. Let's try loading from \
|
||||
/etc/default/locale")
|
||||
if os.path.exists("/etc/default/locale"):
|
||||
locale_config = ConfigObj('/etc/default/locale')
|
||||
lang = locale_config.get('LANG')
|
||||
new_locale = lang
|
||||
else:
|
||||
logger.error("/etc/default/locale could not be found! Please \
|
||||
run 'sudo update-locale' from command-line.")
|
||||
sys.exit(1)
|
||||
else:
|
||||
new_locale = default_locale
|
||||
|
||||
logger.info("New locale set to: %s", \
|
||||
locale.setlocale(locale.LC_ALL, new_locale))
|
||||
|
||||
reload(sys)
|
||||
sys.setdefaultencoding("UTF-8")
|
||||
current_locale_encoding = locale.getlocale()[1].lower()
|
||||
logger.debug("sys default encoding %s", sys.getdefaultencoding())
|
||||
logger.debug("After %s", locale.nl_langinfo(locale.CODESET))
|
||||
|
||||
if current_locale_encoding not in ['utf-8', 'utf8']:
|
||||
logger.error("Need a UTF-8 locale. Currently '%s'. Exiting..." % \
|
||||
current_locale_encoding)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
configure_locale()
|
||||
|
||||
# loading config file
|
||||
try:
|
||||
config = ConfigObj('/etc/airtime/airtime.conf')
|
||||
except Exception, e:
|
||||
logger.error('Error loading config file: %s', e)
|
||||
sys.exit(1)
|
||||
|
||||
class Global:
|
||||
def __init__(self, api_client):
|
||||
self.api_client = api_client
|
||||
|
||||
def selfcheck(self):
|
||||
return self.api_client.is_server_compatible()
|
||||
|
||||
def test_api(self):
|
||||
self.api_client.test()
|
||||
|
||||
def keyboardInterruptHandler(signum, frame):
|
||||
logger = logging.getLogger()
|
||||
logger.info('\nKeyboard Interrupt\n')
|
||||
sys.exit(0)
|
||||
|
||||
@ls_timeout
|
||||
def liquidsoap_get_info(telnet_lock, host, port, logger):
|
||||
logger.debug("Checking to see if Liquidsoap is running")
|
||||
try:
|
||||
telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(host, port)
|
||||
msg = "version\n"
|
||||
tn.write(msg)
|
||||
tn.write("exit\n")
|
||||
response = tn.read_all()
|
||||
except Exception, e:
|
||||
logger.error(str(e))
|
||||
return None
|
||||
finally:
|
||||
telnet_lock.release()
|
||||
|
||||
return get_liquidsoap_version(response)
|
||||
|
||||
def get_liquidsoap_version(version_string):
|
||||
m = re.match(r"Liquidsoap (\d+.\d+.\d+)", version_string)
|
||||
|
||||
if m:
|
||||
return m.group(1)
|
||||
else:
|
||||
return None
|
||||
|
||||
|
||||
if m:
|
||||
current_version = m.group(1)
|
||||
return pure.version_cmp(current_version, LIQUIDSOAP_MIN_VERSION) >= 0
|
||||
return False
|
||||
|
||||
def liquidsoap_startup_test():
|
||||
|
||||
liquidsoap_version_string = \
|
||||
liquidsoap_get_info(telnet_lock, ls_host, ls_port, logger)
|
||||
while not liquidsoap_version_string:
|
||||
logger.warning("Liquidsoap doesn't appear to be running!, " + \
|
||||
"Sleeping and trying again")
|
||||
time.sleep(1)
|
||||
liquidsoap_version_string = \
|
||||
liquidsoap_get_info(telnet_lock, ls_host, ls_port, logger)
|
||||
|
||||
while pure.version_cmp(liquidsoap_version_string, LIQUIDSOAP_MIN_VERSION) < 0:
|
||||
logger.warning("Liquidsoap is running but in incorrect version! " + \
|
||||
"Make sure you have at least Liquidsoap %s installed" % LIQUIDSOAP_MIN_VERSION)
|
||||
time.sleep(1)
|
||||
liquidsoap_version_string = \
|
||||
liquidsoap_get_info(telnet_lock, ls_host, ls_port, logger)
|
||||
|
||||
logger.info("Liquidsoap version string found %s" % liquidsoap_version_string)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
logger.info('###########################################')
|
||||
logger.info('# *** pypo *** #')
|
||||
logger.info('# Liquidsoap Scheduled Playout System #')
|
||||
logger.info('###########################################')
|
||||
|
||||
#Although all of our calculations are in UTC, it is useful to know what timezone
|
||||
#the local machine is, so that we have a reference for what time the actual
|
||||
#log entries were made
|
||||
logger.info("Timezone: %s" % str(time.tzname))
|
||||
logger.info("UTC time: %s" % str(datetime.utcnow()))
|
||||
|
||||
signal.signal(signal.SIGINT, keyboardInterruptHandler)
|
||||
|
||||
api_client = api_client.AirtimeApiClient()
|
||||
g = Global(api_client)
|
||||
|
||||
while not g.selfcheck():
|
||||
time.sleep(5)
|
||||
|
||||
success = False
|
||||
while not success:
|
||||
try:
|
||||
api_client.register_component('pypo')
|
||||
success = True
|
||||
except Exception, e:
|
||||
logger.error(str(e))
|
||||
time.sleep(10)
|
||||
|
||||
telnet_lock = Lock()
|
||||
|
||||
ls_host = config['pypo']['ls_host']
|
||||
ls_port = config['pypo']['ls_port']
|
||||
|
||||
liquidsoap_startup_test()
|
||||
|
||||
if options.test:
|
||||
g.test_api()
|
||||
sys.exit(0)
|
||||
|
||||
pypoFetch_q = Queue()
|
||||
recorder_q = Queue()
|
||||
pypoPush_q = Queue()
|
||||
|
||||
pypo_liquidsoap = PypoLiquidsoap(logger, telnet_lock,\
|
||||
ls_host, ls_port)
|
||||
|
||||
"""
|
||||
This queue is shared between pypo-fetch and pypo-file, where pypo-file
|
||||
is the consumer. Pypo-fetch will send every schedule it gets to pypo-file
|
||||
and pypo will parse this schedule to determine which file has the highest
|
||||
priority, and retrieve it.
|
||||
"""
|
||||
media_q = Queue()
|
||||
|
||||
# Pass only the configuration sections needed; PypoMessageHandler only needs rabbitmq settings
|
||||
pmh = PypoMessageHandler(pypoFetch_q, recorder_q, config['rabbitmq'])
|
||||
pmh.daemon = True
|
||||
pmh.start()
|
||||
|
||||
pfile = PypoFile(media_q, config['pypo'])
|
||||
pfile.daemon = True
|
||||
pfile.start()
|
||||
|
||||
pf = PypoFetch(pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap, config['pypo'])
|
||||
pf.daemon = True
|
||||
pf.start()
|
||||
|
||||
pp = PypoPush(pypoPush_q, telnet_lock, pypo_liquidsoap, config['pypo'])
|
||||
pp.daemon = True
|
||||
pp.start()
|
||||
|
||||
recorder = Recorder(recorder_q)
|
||||
recorder.daemon = True
|
||||
recorder.start()
|
||||
|
||||
stat = ListenerStat(config)
|
||||
stat.daemon = True
|
||||
stat.start()
|
||||
|
||||
pf.join()
|
||||
|
||||
logger.info("System exit")
|
6
python_apps/pypo/pypo/eventtypes.py
Normal file
6
python_apps/pypo/pypo/eventtypes.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
FILE = "file"
|
||||
EVENT = "event"
|
||||
STREAM_BUFFER_START = "stream_buffer_start"
|
||||
STREAM_OUTPUT_START = "stream_output_start"
|
||||
STREAM_BUFFER_END = "stream_buffer_end"
|
||||
STREAM_OUTPUT_END = "stream_output_end"
|
170
python_apps/pypo/pypo/listenerstat.py
Normal file
170
python_apps/pypo/pypo/listenerstat.py
Normal file
|
@ -0,0 +1,170 @@
|
|||
from threading import Thread
|
||||
import urllib2
|
||||
import xml.dom.minidom
|
||||
import base64
|
||||
from datetime import datetime
|
||||
import traceback
|
||||
import logging
|
||||
import time
|
||||
|
||||
from api_clients import api_client
|
||||
|
||||
class ListenerStat(Thread):
|
||||
|
||||
HTTP_REQUEST_TIMEOUT = 30 # 30 second HTTP request timeout
|
||||
|
||||
def __init__(self, config, logger=None):
|
||||
Thread.__init__(self)
|
||||
self.config = config
|
||||
self.api_client = api_client.AirtimeApiClient()
|
||||
if logger is None:
|
||||
self.logger = logging.getLogger()
|
||||
else:
|
||||
self.logger = logger
|
||||
|
||||
def get_node_text(self, nodelist):
|
||||
rc = []
|
||||
for node in nodelist:
|
||||
if node.nodeType == node.TEXT_NODE:
|
||||
rc.append(node.data)
|
||||
return ''.join(rc)
|
||||
|
||||
def get_stream_parameters(self):
|
||||
#[{"user":"", "password":"", "url":"", "port":""},{},{}]
|
||||
return self.api_client.get_stream_parameters()
|
||||
|
||||
|
||||
def get_stream_server_xml(self, ip, url, is_shoutcast=False):
|
||||
encoded = base64.b64encode("%(admin_user)s:%(admin_pass)s" % ip)
|
||||
|
||||
header = {"Authorization":"Basic %s" % encoded}
|
||||
|
||||
if is_shoutcast:
|
||||
#user agent is required for shoutcast auth, otherwise it returns 404.
|
||||
user_agent = "Mozilla/5.0 (Linux; rv:22.0) Gecko/20130405 Firefox/22.0"
|
||||
header["User-Agent"] = user_agent
|
||||
|
||||
req = urllib2.Request(
|
||||
#assuming that the icecast stats path is /admin/stats.xml
|
||||
#need to fix this
|
||||
url=url,
|
||||
headers=header)
|
||||
|
||||
f = urllib2.urlopen(req, timeout=ListenerStat.HTTP_REQUEST_TIMEOUT)
|
||||
document = f.read()
|
||||
|
||||
return document
|
||||
|
||||
|
||||
def get_icecast_stats(self, ip):
|
||||
document = None
|
||||
if "airtime.pro" in ip["host"].lower():
|
||||
url = 'http://%(host)s:%(port)s/stats.xsl' % ip
|
||||
document = self.get_stream_server_xml(ip, url)
|
||||
else:
|
||||
url = 'http://%(host)s:%(port)s/admin/stats.xml' % ip
|
||||
document = self.get_stream_server_xml(ip, url)
|
||||
dom = xml.dom.minidom.parseString(document)
|
||||
sources = dom.getElementsByTagName("source")
|
||||
|
||||
mount_stats = None
|
||||
for s in sources:
|
||||
#drop the leading '/' character
|
||||
mount_name = s.getAttribute("mount")[1:]
|
||||
if mount_name == ip["mount"]:
|
||||
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
||||
listeners = s.getElementsByTagName("listeners")
|
||||
num_listeners = 0
|
||||
if len(listeners):
|
||||
num_listeners = self.get_node_text(listeners[0].childNodes)
|
||||
|
||||
mount_stats = {"timestamp":timestamp, \
|
||||
"num_listeners": num_listeners, \
|
||||
"mount_name": mount_name}
|
||||
|
||||
return mount_stats
|
||||
|
||||
def get_shoutcast_stats(self, ip):
|
||||
url = 'http://%(host)s:%(port)s/admin.cgi?sid=1&mode=viewxml' % ip
|
||||
document = self.get_stream_server_xml(ip, url, is_shoutcast=True)
|
||||
dom = xml.dom.minidom.parseString(document)
|
||||
current_listeners = dom.getElementsByTagName("CURRENTLISTENERS")
|
||||
|
||||
timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
||||
num_listeners = 0
|
||||
if len(current_listeners):
|
||||
num_listeners = self.get_node_text(current_listeners[0].childNodes)
|
||||
|
||||
mount_stats = {"timestamp":timestamp, \
|
||||
"num_listeners": num_listeners, \
|
||||
"mount_name": "shoutcast"}
|
||||
|
||||
return mount_stats
|
||||
|
||||
def get_stream_stats(self, stream_parameters):
|
||||
stats = []
|
||||
|
||||
#iterate over stream_parameters which is a list of dicts. Each dict
|
||||
#represents one Airtime stream (currently this limit is 3).
|
||||
#Note that there can be optimizations done, since if all three
|
||||
#streams are the same server, we will still initiate 3 separate
|
||||
#connections
|
||||
for k, v in stream_parameters.items():
|
||||
if v["enable"] == 'true':
|
||||
try:
|
||||
if v["output"] == "icecast":
|
||||
mount_stats = self.get_icecast_stats(v)
|
||||
if mount_stats: stats.append(mount_stats)
|
||||
else:
|
||||
stats.append(self.get_shoutcast_stats(v))
|
||||
self.update_listener_stat_error(v["mount"], 'OK')
|
||||
except Exception as e:
|
||||
try:
|
||||
self.update_listener_stat_error(v["mount"], str(e))
|
||||
except Exception as e:
|
||||
self.logger.error('Exception: %s', e)
|
||||
|
||||
return stats
|
||||
|
||||
def push_stream_stats(self, stats):
|
||||
self.api_client.push_stream_stats(stats)
|
||||
|
||||
def update_listener_stat_error(self, stream_id, error):
|
||||
keyname = '%s_listener_stat_error' % stream_id
|
||||
data = {keyname: error}
|
||||
self.api_client.update_stream_setting_table(data)
|
||||
|
||||
def run(self):
|
||||
#Wake up every 120 seconds and gather icecast statistics. Note that we
|
||||
#are currently querying the server every 2 minutes for list of
|
||||
#mountpoints as well. We could remove this query if we hooked into
|
||||
#rabbitmq events, and listened for these changes instead.
|
||||
while True:
|
||||
try:
|
||||
stream_parameters = self.get_stream_parameters()
|
||||
stats = self.get_stream_stats(stream_parameters["stream_params"])
|
||||
|
||||
if stats:
|
||||
self.push_stream_stats(stats)
|
||||
except Exception, e:
|
||||
self.logger.error('Exception: %s', e)
|
||||
|
||||
time.sleep(120)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# create logger
|
||||
logger = logging.getLogger('std_out')
|
||||
logger.setLevel(logging.DEBUG)
|
||||
# create console handler and set level to debug
|
||||
#ch = logging.StreamHandler()
|
||||
#ch.setLevel(logging.DEBUG)
|
||||
# create formatter
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(lineno)s - %(levelname)s - %(message)s')
|
||||
# add formatter to ch
|
||||
#ch.setFormatter(formatter)
|
||||
# add ch to logger
|
||||
#logger.addHandler(ch)
|
||||
|
||||
#ls = ListenerStat(logger=logger)
|
||||
#ls.run()
|
0
python_apps/pypo/pypo/media/__init__.py
Normal file
0
python_apps/pypo/pypo/media/__init__.py
Normal file
0
python_apps/pypo/pypo/media/update/__init__.py
Normal file
0
python_apps/pypo/pypo/media/update/__init__.py
Normal file
17
python_apps/pypo/pypo/pure.py
Normal file
17
python_apps/pypo/pypo/pure.py
Normal file
|
@ -0,0 +1,17 @@
|
|||
import re
|
||||
|
||||
|
||||
def version_cmp(version1, version2):
|
||||
def normalize(v):
|
||||
return [int(x) for x in re.sub(r'(\.0+)*$','', v).split(".")]
|
||||
return cmp(normalize(version1), normalize(version2))
|
||||
|
||||
def date_interval_to_seconds(interval):
|
||||
"""
|
||||
Convert timedelta object into int representing the number of seconds. If
|
||||
number of seconds is less than 0, then return 0.
|
||||
"""
|
||||
seconds = (interval.microseconds + \
|
||||
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
|
||||
|
||||
return seconds
|
504
python_apps/pypo/pypo/pypofetch.py
Normal file
504
python_apps/pypo/pypo/pypofetch.py
Normal file
|
@ -0,0 +1,504 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import logging.config
|
||||
import json
|
||||
import telnetlib
|
||||
import copy
|
||||
import subprocess
|
||||
import signal
|
||||
from datetime import datetime
|
||||
import traceback
|
||||
import pure
|
||||
|
||||
from Queue import Empty
|
||||
from threading import Thread
|
||||
from subprocess import Popen, PIPE
|
||||
|
||||
from api_clients import api_client
|
||||
from std_err_override import LogWriter
|
||||
from timeout import ls_timeout
|
||||
|
||||
|
||||
# configure logging
|
||||
logging_cfg = "/etc/airtime/pypo_logging.cfg"
|
||||
logging.config.fileConfig(logging_cfg)
|
||||
logger = logging.getLogger()
|
||||
LogWriter.override_std_err(logger)
|
||||
|
||||
def keyboardInterruptHandler(signum, frame):
|
||||
logger = logging.getLogger()
|
||||
logger.info('\nKeyboard Interrupt\n')
|
||||
sys.exit(0)
|
||||
signal.signal(signal.SIGINT, keyboardInterruptHandler)
|
||||
|
||||
#need to wait for Python 2.7 for this..
|
||||
#logging.captureWarnings(True)
|
||||
|
||||
POLL_INTERVAL = 480
|
||||
|
||||
class PypoFetch(Thread):
|
||||
|
||||
def __init__(self, pypoFetch_q, pypoPush_q, media_q, telnet_lock, pypo_liquidsoap, config):
|
||||
Thread.__init__(self)
|
||||
|
||||
#Hacky...
|
||||
PypoFetch.ref = self
|
||||
|
||||
self.api_client = api_client.AirtimeApiClient()
|
||||
self.fetch_queue = pypoFetch_q
|
||||
self.push_queue = pypoPush_q
|
||||
self.media_prepare_queue = media_q
|
||||
self.last_update_schedule_timestamp = time.time()
|
||||
self.config = config
|
||||
self.listener_timeout = POLL_INTERVAL
|
||||
|
||||
self.telnet_lock = telnet_lock
|
||||
|
||||
self.logger = logging.getLogger()
|
||||
|
||||
self.pypo_liquidsoap = pypo_liquidsoap
|
||||
|
||||
self.cache_dir = os.path.join(config["cache_dir"], "scheduler")
|
||||
self.logger.debug("Cache dir %s", self.cache_dir)
|
||||
|
||||
try:
|
||||
if not os.path.isdir(dir):
|
||||
"""
|
||||
We get here if path does not exist, or path does exist but
|
||||
is a file. We are not handling the second case, but don't
|
||||
think we actually care about handling it.
|
||||
"""
|
||||
self.logger.debug("Cache dir does not exist. Creating...")
|
||||
os.makedirs(dir)
|
||||
except Exception, e:
|
||||
pass
|
||||
|
||||
self.schedule_data = []
|
||||
self.logger.info("PypoFetch: init complete")
|
||||
|
||||
"""
|
||||
Handle a message from RabbitMQ, put it into our yucky global var.
|
||||
Hopefully there is a better way to do this.
|
||||
"""
|
||||
def handle_message(self, message):
|
||||
try:
|
||||
self.logger.info("Received event from Pypo Message Handler: %s" % message)
|
||||
|
||||
m = json.loads(message)
|
||||
command = m['event_type']
|
||||
self.logger.info("Handling command: " + command)
|
||||
|
||||
if command == 'update_schedule':
|
||||
self.schedule_data = m['schedule']
|
||||
self.process_schedule(self.schedule_data)
|
||||
elif command == 'reset_liquidsoap_bootstrap':
|
||||
self.set_bootstrap_variables()
|
||||
elif command == 'update_stream_setting':
|
||||
self.logger.info("Updating stream setting...")
|
||||
self.regenerate_liquidsoap_conf(m['setting'])
|
||||
elif command == 'update_stream_format':
|
||||
self.logger.info("Updating stream format...")
|
||||
self.update_liquidsoap_stream_format(m['stream_format'])
|
||||
elif command == 'update_station_name':
|
||||
self.logger.info("Updating station name...")
|
||||
self.update_liquidsoap_station_name(m['station_name'])
|
||||
elif command == 'update_transition_fade':
|
||||
self.logger.info("Updating transition_fade...")
|
||||
self.update_liquidsoap_transition_fade(m['transition_fade'])
|
||||
elif command == 'switch_source':
|
||||
self.logger.info("switch_on_source show command received...")
|
||||
self.pypo_liquidsoap.\
|
||||
get_telnet_dispatcher().\
|
||||
switch_source(m['sourcename'], m['status'])
|
||||
elif command == 'disconnect_source':
|
||||
self.logger.info("disconnect_on_source show command received...")
|
||||
self.pypo_liquidsoap.get_telnet_dispatcher().\
|
||||
disconnect_source(m['sourcename'])
|
||||
else:
|
||||
self.logger.info("Unknown command: %s" % command)
|
||||
|
||||
# update timeout value
|
||||
if command == 'update_schedule':
|
||||
self.listener_timeout = POLL_INTERVAL
|
||||
else:
|
||||
self.listener_timeout = self.last_update_schedule_timestamp - time.time() + POLL_INTERVAL
|
||||
if self.listener_timeout < 0:
|
||||
self.listener_timeout = 0
|
||||
self.logger.info("New timeout: %s" % self.listener_timeout)
|
||||
except Exception, e:
|
||||
top = traceback.format_exc()
|
||||
self.logger.error('Exception: %s', e)
|
||||
self.logger.error("traceback: %s", top)
|
||||
self.logger.error("Exception in handling Message Handler message: %s", e)
|
||||
|
||||
|
||||
def switch_source_temp(self, sourcename, status):
|
||||
self.logger.debug('Switching source: %s to "%s" status', sourcename, status)
|
||||
command = "streams."
|
||||
if sourcename == "master_dj":
|
||||
command += "master_dj_"
|
||||
elif sourcename == "live_dj":
|
||||
command += "live_dj_"
|
||||
elif sourcename == "scheduled_play":
|
||||
command += "scheduled_play_"
|
||||
|
||||
if status == "on":
|
||||
command += "start\n"
|
||||
else:
|
||||
command += "stop\n"
|
||||
|
||||
return command
|
||||
|
||||
"""
|
||||
Initialize Liquidsoap environment
|
||||
"""
|
||||
def set_bootstrap_variables(self):
|
||||
self.logger.debug('Getting information needed on bootstrap from Airtime')
|
||||
try:
|
||||
info = self.api_client.get_bootstrap_info()
|
||||
except Exception, e:
|
||||
self.logger.error('Unable to get bootstrap info.. Exiting pypo...')
|
||||
self.logger.error(str(e))
|
||||
|
||||
self.logger.debug('info:%s', info)
|
||||
commands = []
|
||||
for k, v in info['switch_status'].iteritems():
|
||||
commands.append(self.switch_source_temp(k, v))
|
||||
|
||||
stream_format = info['stream_label']
|
||||
station_name = info['station_name']
|
||||
fade = info['transition_fade']
|
||||
|
||||
commands.append(('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8'))
|
||||
commands.append(('vars.station_name %s\n' % station_name).encode('utf-8'))
|
||||
commands.append(('vars.default_dj_fade %s\n' % fade).encode('utf-8'))
|
||||
self.pypo_liquidsoap.get_telnet_dispatcher().telnet_send(commands)
|
||||
|
||||
self.pypo_liquidsoap.clear_all_queues()
|
||||
self.pypo_liquidsoap.clear_queue_tracker()
|
||||
|
||||
def restart_liquidsoap(self):
|
||||
try:
|
||||
"""do not block - if we receive the lock then good - no other thread
|
||||
will try communicating with Liquidsoap. If we don't receive, it may
|
||||
mean some thread blocked and is still holding the lock. Restarting
|
||||
Liquidsoap will cause that thread to release the lock as an Exception
|
||||
will be thrown."""
|
||||
self.telnet_lock.acquire(False)
|
||||
|
||||
|
||||
self.logger.info("Restarting Liquidsoap")
|
||||
subprocess.call('/etc/init.d/airtime-liquidsoap restart', shell=True, close_fds=True)
|
||||
|
||||
#Wait here and poll Liquidsoap until it has started up
|
||||
self.logger.info("Waiting for Liquidsoap to start")
|
||||
while True:
|
||||
try:
|
||||
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
|
||||
tn.write("exit\n")
|
||||
tn.read_all()
|
||||
self.logger.info("Liquidsoap is up and running")
|
||||
break
|
||||
except Exception, e:
|
||||
#sleep 0.5 seconds and try again
|
||||
time.sleep(0.5)
|
||||
|
||||
except Exception, e:
|
||||
self.logger.error(e)
|
||||
finally:
|
||||
if self.telnet_lock.locked():
|
||||
self.telnet_lock.release()
|
||||
|
||||
"""
|
||||
TODO: This function needs to be way shorter, and refactored :/ - MK
|
||||
"""
|
||||
def regenerate_liquidsoap_conf(self, setting):
|
||||
self.restart_liquidsoap()
|
||||
self.update_liquidsoap_connection_status()
|
||||
|
||||
|
||||
@ls_timeout
|
||||
def update_liquidsoap_connection_status(self):
|
||||
"""
|
||||
updates the status of Liquidsoap connection to the streaming server
|
||||
This function updates the bootup time variable in Liquidsoap script
|
||||
"""
|
||||
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
|
||||
# update the boot up time of Liquidsoap. Since Liquidsoap is not restarting,
|
||||
# we are manually adjusting the bootup time variable so the status msg will get
|
||||
# updated.
|
||||
current_time = time.time()
|
||||
boot_up_time_command = "vars.bootup_time " + str(current_time) + "\n"
|
||||
self.logger.info(boot_up_time_command)
|
||||
tn.write(boot_up_time_command)
|
||||
|
||||
connection_status = "streams.connection_status\n"
|
||||
self.logger.info(connection_status)
|
||||
tn.write(connection_status)
|
||||
|
||||
tn.write('exit\n')
|
||||
|
||||
output = tn.read_all()
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
output_list = output.split("\r\n")
|
||||
stream_info = output_list[2]
|
||||
|
||||
# streamin info is in the form of:
|
||||
# eg. s1:true,2:true,3:false
|
||||
streams = stream_info.split(",")
|
||||
self.logger.info(streams)
|
||||
|
||||
fake_time = current_time + 1
|
||||
for s in streams:
|
||||
info = s.split(':')
|
||||
stream_id = info[0]
|
||||
status = info[1]
|
||||
if(status == "true"):
|
||||
self.api_client.notify_liquidsoap_status("OK", stream_id, str(fake_time))
|
||||
|
||||
|
||||
@ls_timeout
|
||||
def update_liquidsoap_stream_format(self, stream_format):
|
||||
# Push stream metadata to liquidsoap
|
||||
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
|
||||
command = ('vars.stream_metadata_type %s\n' % stream_format).encode('utf-8')
|
||||
self.logger.info(command)
|
||||
tn.write(command)
|
||||
tn.write('exit\n')
|
||||
tn.read_all()
|
||||
except Exception, e:
|
||||
self.logger.error("Exception %s", e)
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
@ls_timeout
|
||||
def update_liquidsoap_transition_fade(self, fade):
|
||||
# Push stream metadata to liquidsoap
|
||||
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
|
||||
command = ('vars.default_dj_fade %s\n' % fade).encode('utf-8')
|
||||
self.logger.info(command)
|
||||
tn.write(command)
|
||||
tn.write('exit\n')
|
||||
tn.read_all()
|
||||
except Exception, e:
|
||||
self.logger.error("Exception %s", e)
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
@ls_timeout
|
||||
def update_liquidsoap_station_name(self, station_name):
|
||||
# Push stream metadata to liquidsoap
|
||||
# TODO: THIS LIQUIDSOAP STUFF NEEDS TO BE MOVED TO PYPO-PUSH!!!
|
||||
try:
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.config['ls_host'], self.config['ls_port'])
|
||||
command = ('vars.station_name %s\n' % station_name).encode('utf-8')
|
||||
self.logger.info(command)
|
||||
tn.write(command)
|
||||
tn.write('exit\n')
|
||||
tn.read_all()
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
except Exception, e:
|
||||
self.logger.error("Exception %s", e)
|
||||
|
||||
"""
|
||||
Process the schedule
|
||||
- Reads the scheduled entries of a given range (actual time +/- "prepare_ahead" / "cache_for")
|
||||
- Saves a serialized file of the schedule
|
||||
- playlists are prepared. (brought to liquidsoap format) and, if not mounted via nsf, files are copied
|
||||
to the cache dir (Folder-structure: cache/YYYY-MM-DD-hh-mm-ss)
|
||||
- runs the cleanup routine, to get rid of unused cached files
|
||||
"""
|
||||
def process_schedule(self, schedule_data):
|
||||
self.last_update_schedule_timestamp = time.time()
|
||||
self.logger.debug(schedule_data)
|
||||
media = schedule_data["media"]
|
||||
media_filtered = {}
|
||||
|
||||
# Download all the media and put playlists in liquidsoap "annotate" format
|
||||
try:
|
||||
|
||||
"""
|
||||
Make sure cache_dir exists
|
||||
"""
|
||||
download_dir = self.cache_dir
|
||||
try:
|
||||
os.makedirs(download_dir)
|
||||
except Exception, e:
|
||||
pass
|
||||
|
||||
media_copy = {}
|
||||
for key in media:
|
||||
media_item = media[key]
|
||||
if (media_item['type'] == 'file'):
|
||||
self.sanity_check_media_item(media_item)
|
||||
fileExt = os.path.splitext(media_item['uri'])[1]
|
||||
dst = os.path.join(download_dir, unicode(media_item['id']) + fileExt)
|
||||
media_item['dst'] = dst
|
||||
media_item['file_ready'] = False
|
||||
media_filtered[key] = media_item
|
||||
|
||||
media_item['start'] = datetime.strptime(media_item['start'],
|
||||
"%Y-%m-%d-%H-%M-%S")
|
||||
media_item['end'] = datetime.strptime(media_item['end'],
|
||||
"%Y-%m-%d-%H-%M-%S")
|
||||
media_copy[key] = media_item
|
||||
|
||||
|
||||
self.media_prepare_queue.put(copy.copy(media_filtered))
|
||||
except Exception, e: self.logger.error("%s", e)
|
||||
|
||||
# Send the data to pypo-push
|
||||
self.logger.debug("Pushing to pypo-push")
|
||||
self.push_queue.put(media_copy)
|
||||
|
||||
|
||||
# cleanup
|
||||
try: self.cache_cleanup(media)
|
||||
except Exception, e: self.logger.error("%s", e)
|
||||
|
||||
#do basic validation of file parameters. Useful for debugging
|
||||
#purposes
|
||||
def sanity_check_media_item(self, media_item):
|
||||
start = datetime.strptime(media_item['start'], "%Y-%m-%d-%H-%M-%S")
|
||||
end = datetime.strptime(media_item['end'], "%Y-%m-%d-%H-%M-%S")
|
||||
|
||||
length1 = pure.date_interval_to_seconds(end - start)
|
||||
length2 = media_item['cue_out'] - media_item['cue_in']
|
||||
|
||||
if abs(length2 - length1) > 1:
|
||||
self.logger.error("end - start length: %s", length1)
|
||||
self.logger.error("cue_out - cue_in length: %s", length2)
|
||||
self.logger.error("Two lengths are not equal!!!")
|
||||
|
||||
def is_file_opened(self, path):
|
||||
#Capture stderr to avoid polluting py-interpreter.log
|
||||
proc = Popen(["lsof", path], stdout=PIPE, stderr=PIPE)
|
||||
out = proc.communicate()[0].strip()
|
||||
return bool(out)
|
||||
|
||||
def cache_cleanup(self, media):
|
||||
"""
|
||||
Get list of all files in the cache dir and remove them if they aren't being used anymore.
|
||||
Input dict() media, lists all files that are scheduled or currently playing. Not being in this
|
||||
dict() means the file is safe to remove.
|
||||
"""
|
||||
cached_file_set = set(os.listdir(self.cache_dir))
|
||||
scheduled_file_set = set()
|
||||
|
||||
for mkey in media:
|
||||
media_item = media[mkey]
|
||||
if media_item['type'] == 'file':
|
||||
fileExt = os.path.splitext(media_item['uri'])[1]
|
||||
scheduled_file_set.add(unicode(media_item["id"]) + fileExt)
|
||||
|
||||
expired_files = cached_file_set - scheduled_file_set
|
||||
|
||||
self.logger.debug("Files to remove " + str(expired_files))
|
||||
for f in expired_files:
|
||||
try:
|
||||
path = os.path.join(self.cache_dir, f)
|
||||
self.logger.debug("Removing %s" % path)
|
||||
|
||||
#check if this file is opened (sometimes Liquidsoap is still
|
||||
#playing the file due to our knowledge of the track length
|
||||
#being incorrect!)
|
||||
if not self.is_file_opened(path):
|
||||
os.remove(path)
|
||||
self.logger.info("File '%s' removed" % path)
|
||||
else:
|
||||
self.logger.info("File '%s' not removed. Still busy!" % path)
|
||||
except Exception, e:
|
||||
self.logger.error("Problem removing file '%s'" % f)
|
||||
self.logger.error(traceback.format_exc())
|
||||
|
||||
def manual_schedule_fetch(self):
|
||||
success, self.schedule_data = self.api_client.get_schedule()
|
||||
if success:
|
||||
self.process_schedule(self.schedule_data)
|
||||
return success
|
||||
|
||||
def persistent_manual_schedule_fetch(self, max_attempts=1):
|
||||
success = False
|
||||
num_attempts = 0
|
||||
while not success and num_attempts < max_attempts:
|
||||
success = self.manual_schedule_fetch()
|
||||
num_attempts += 1
|
||||
|
||||
return success
|
||||
|
||||
|
||||
def main(self):
|
||||
#Make sure all Liquidsoap queues are empty. This is important in the
|
||||
#case where we've just restarted the pypo scheduler, but Liquidsoap still
|
||||
#is playing tracks. In this case let's just restart everything from scratch
|
||||
#so that we can repopulate our dictionary that keeps track of what
|
||||
#Liquidsoap is playing much more easily.
|
||||
self.pypo_liquidsoap.clear_all_queues()
|
||||
|
||||
self.set_bootstrap_variables()
|
||||
|
||||
# Bootstrap: since we are just starting up, we need to grab the
|
||||
# most recent schedule. After that we fetch the schedule every 30
|
||||
# minutes or wait for schedule updates to get pushed.
|
||||
success = self.persistent_manual_schedule_fetch(max_attempts=5)
|
||||
|
||||
if success:
|
||||
self.logger.info("Bootstrap schedule received: %s", self.schedule_data)
|
||||
|
||||
loops = 1
|
||||
while True:
|
||||
self.logger.info("Loop #%s", loops)
|
||||
try:
|
||||
"""
|
||||
our simple_queue.get() requires a timeout, in which case we
|
||||
fetch the Airtime schedule manually. It is important to fetch
|
||||
the schedule periodically because if we didn't, we would only
|
||||
get schedule updates via RabbitMq if the user was constantly
|
||||
using the Airtime interface.
|
||||
|
||||
If the user is not using the interface, RabbitMq messages are not
|
||||
sent, and we will have very stale (or non-existent!) data about the
|
||||
schedule.
|
||||
|
||||
Currently we are checking every POLL_INTERVAL seconds
|
||||
"""
|
||||
|
||||
|
||||
message = self.fetch_queue.get(block=True, timeout=self.listener_timeout)
|
||||
self.handle_message(message)
|
||||
except Empty, e:
|
||||
self.logger.info("Queue timeout. Fetching schedule manually")
|
||||
self.persistent_manual_schedule_fetch(max_attempts=5)
|
||||
except Exception, e:
|
||||
top = traceback.format_exc()
|
||||
self.logger.error('Exception: %s', e)
|
||||
self.logger.error("traceback: %s", top)
|
||||
|
||||
loops += 1
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Entry point of the thread
|
||||
"""
|
||||
self.main()
|
213
python_apps/pypo/pypo/pypofile.py
Normal file
213
python_apps/pypo/pypo/pypofile.py
Normal file
|
@ -0,0 +1,213 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from threading import Thread
|
||||
from Queue import Empty
|
||||
|
||||
import logging
|
||||
import shutil
|
||||
import os
|
||||
import sys
|
||||
import stat
|
||||
import requests
|
||||
import ConfigParser
|
||||
import json
|
||||
import hashlib
|
||||
from requests.exceptions import ConnectionError, HTTPError, Timeout
|
||||
|
||||
from std_err_override import LogWriter
|
||||
|
||||
CONFIG_PATH = '/etc/airtime/airtime.conf'
|
||||
|
||||
# configure logging
|
||||
logging_cfg = "/etc/airtime/pypo_logging.cfg"
|
||||
logging.config.fileConfig(logging_cfg)
|
||||
logger = logging.getLogger()
|
||||
LogWriter.override_std_err(logger)
|
||||
|
||||
#need to wait for Python 2.7 for this..
|
||||
#logging.captureWarnings(True)
|
||||
|
||||
|
||||
class PypoFile(Thread):
|
||||
|
||||
def __init__(self, schedule_queue, config):
|
||||
Thread.__init__(self)
|
||||
self.logger = logging.getLogger()
|
||||
self.media_queue = schedule_queue
|
||||
self.media = None
|
||||
self.cache_dir = os.path.join(config["cache_dir"], "scheduler")
|
||||
|
||||
def copy_file(self, media_item):
|
||||
"""
|
||||
Copy media_item from local library directory to local cache directory.
|
||||
"""
|
||||
src = media_item['uri']
|
||||
dst = media_item['dst']
|
||||
|
||||
src_size = media_item['filesize']
|
||||
|
||||
dst_exists = True
|
||||
try:
|
||||
dst_size = os.path.getsize(dst)
|
||||
except Exception, e:
|
||||
dst_exists = False
|
||||
|
||||
do_copy = False
|
||||
if dst_exists:
|
||||
if src_size != dst_size:
|
||||
do_copy = True
|
||||
else:
|
||||
self.logger.debug("file %s already exists in local cache as %s, skipping copying..." % (src, dst))
|
||||
else:
|
||||
do_copy = True
|
||||
|
||||
media_item['file_ready'] = not do_copy
|
||||
|
||||
if do_copy:
|
||||
self.logger.debug("copying from %s to local cache %s" % (src, dst))
|
||||
try:
|
||||
config = self.read_config_file(CONFIG_PATH)
|
||||
CONFIG_SECTION = "general"
|
||||
username = config.get(CONFIG_SECTION, 'api_key')
|
||||
|
||||
host = config.get(CONFIG_SECTION, 'base_url')
|
||||
url = "http://%s/rest/media/%s/download" % (host, media_item["id"])
|
||||
with open(dst, "wb") as handle:
|
||||
response = requests.get(url, auth=requests.auth.HTTPBasicAuth(username, ''), stream=True, verify=False)
|
||||
|
||||
if not response.ok:
|
||||
self.logger.error(response)
|
||||
raise Exception("%s - Error occurred downloading file" % response.status_code)
|
||||
|
||||
for chunk in response.iter_content(1024):
|
||||
if not chunk:
|
||||
break
|
||||
|
||||
handle.write(chunk)
|
||||
|
||||
#make file world readable
|
||||
os.chmod(dst, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
|
||||
|
||||
if media_item['filesize'] == 0:
|
||||
file_size = self.report_file_size_and_md5_to_airtime(dst, media_item["id"], host, username)
|
||||
media_item["filesize"] = file_size
|
||||
|
||||
media_item['file_ready'] = True
|
||||
except Exception, e:
|
||||
self.logger.error("Could not copy from %s to %s" % (src, dst))
|
||||
self.logger.error(e)
|
||||
|
||||
def report_file_size_and_md5_to_airtime(self, file_path, file_id, host_name, api_key):
|
||||
try:
|
||||
file_size = os.path.getsize(file_path)
|
||||
|
||||
with open(file_path, 'rb') as fh:
|
||||
m = hashlib.md5()
|
||||
while True:
|
||||
data = fh.read(8192)
|
||||
if not data:
|
||||
break
|
||||
m.update(data)
|
||||
md5_hash = m.hexdigest()
|
||||
except (OSError, IOError) as e:
|
||||
file_size = 0
|
||||
self.logger.error("Error getting file size and md5 hash for file id %s" % file_id)
|
||||
self.logger.error(e)
|
||||
|
||||
# Make PUT request to Airtime to update the file size and hash
|
||||
error_msg = "Could not update media file %s with file size and md5 hash" % file_id
|
||||
try:
|
||||
put_url = "http://%s/rest/media/%s" % (host_name, file_id)
|
||||
payload = json.dumps({'filesize': file_size, 'md5': md5_hash})
|
||||
response = requests.put(put_url, data=payload, auth=requests.auth.HTTPBasicAuth(api_key, ''))
|
||||
if not response.ok:
|
||||
self.logger.error(error_msg)
|
||||
except (ConnectionError, Timeout):
|
||||
self.logger.error(error_msg)
|
||||
except Exception as e:
|
||||
self.logger.error(error_msg)
|
||||
self.logger.error(e)
|
||||
|
||||
return file_size
|
||||
|
||||
def get_highest_priority_media_item(self, schedule):
|
||||
"""
|
||||
Get highest priority media_item in the queue. Currently the highest
|
||||
priority is decided by how close the start time is to "now".
|
||||
"""
|
||||
if schedule is None or len(schedule) == 0:
|
||||
return None
|
||||
|
||||
sorted_keys = sorted(schedule.keys())
|
||||
|
||||
if len(sorted_keys) == 0:
|
||||
return None
|
||||
|
||||
highest_priority = sorted_keys[0]
|
||||
media_item = schedule[highest_priority]
|
||||
|
||||
self.logger.debug("Highest priority item: %s" % highest_priority)
|
||||
|
||||
"""
|
||||
Remove this media_item from the dictionary. On the next iteration
|
||||
(from the main function) we won't consider it for prioritization
|
||||
anymore. If on the next iteration we have received a new schedule,
|
||||
it is very possible we will have to deal with the same media_items
|
||||
again. In this situation, the worst possible case is that we try to
|
||||
copy the file again and realize we already have it (thus aborting the copy).
|
||||
"""
|
||||
del schedule[highest_priority]
|
||||
|
||||
return media_item
|
||||
|
||||
def read_config_file(self, config_path):
|
||||
"""Parse the application's config file located at config_path."""
|
||||
config = ConfigParser.SafeConfigParser()
|
||||
try:
|
||||
config.readfp(open(config_path))
|
||||
except IOError as e:
|
||||
logging.debug("Failed to open config file at %s: %s" % (config_path, e.strerror))
|
||||
sys.exit()
|
||||
except Exception as e:
|
||||
logging.debug(e.strerror)
|
||||
sys.exit()
|
||||
|
||||
return config
|
||||
|
||||
def main(self):
|
||||
while True:
|
||||
try:
|
||||
if self.media is None or len(self.media) == 0:
|
||||
"""
|
||||
We have no schedule, so we have nothing else to do. Let's
|
||||
do a blocked wait on the queue
|
||||
"""
|
||||
self.media = self.media_queue.get(block=True)
|
||||
else:
|
||||
"""
|
||||
We have a schedule we need to process, but we also want
|
||||
to check if a newer schedule is available. In this case
|
||||
do a non-blocking queue.get and in either case (we get something
|
||||
or we don't), get back to work on preparing getting files.
|
||||
"""
|
||||
try:
|
||||
self.media = self.media_queue.get_nowait()
|
||||
except Empty, e:
|
||||
pass
|
||||
|
||||
|
||||
media_item = self.get_highest_priority_media_item(self.media)
|
||||
if media_item is not None:
|
||||
self.copy_file(media_item)
|
||||
except Exception, e:
|
||||
import traceback
|
||||
top = traceback.format_exc()
|
||||
self.logger.error(str(e))
|
||||
self.logger.error(top)
|
||||
raise
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Entry point of the thread
|
||||
"""
|
||||
self.main()
|
89
python_apps/pypo/pypo/pypoliqqueue.py
Normal file
89
python_apps/pypo/pypo/pypoliqqueue.py
Normal file
|
@ -0,0 +1,89 @@
|
|||
from threading import Thread
|
||||
from collections import deque
|
||||
from datetime import datetime
|
||||
|
||||
import traceback
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
||||
from Queue import Empty
|
||||
|
||||
import signal
|
||||
def keyboardInterruptHandler(signum, frame):
|
||||
logger = logging.getLogger()
|
||||
logger.info('\nKeyboard Interrupt\n')
|
||||
sys.exit(0)
|
||||
signal.signal(signal.SIGINT, keyboardInterruptHandler)
|
||||
|
||||
class PypoLiqQueue(Thread):
|
||||
def __init__(self, q, pypo_liquidsoap, logger):
|
||||
Thread.__init__(self)
|
||||
self.queue = q
|
||||
self.logger = logger
|
||||
self.pypo_liquidsoap = pypo_liquidsoap
|
||||
|
||||
def main(self):
|
||||
time_until_next_play = None
|
||||
schedule_deque = deque()
|
||||
media_schedule = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
if time_until_next_play is None:
|
||||
self.logger.info("waiting indefinitely for schedule")
|
||||
media_schedule = self.queue.get(block=True)
|
||||
else:
|
||||
self.logger.info("waiting %ss until next scheduled item" % \
|
||||
time_until_next_play)
|
||||
media_schedule = self.queue.get(block=True, \
|
||||
timeout=time_until_next_play)
|
||||
except Empty, e:
|
||||
#Time to push a scheduled item.
|
||||
media_item = schedule_deque.popleft()
|
||||
self.pypo_liquidsoap.play(media_item)
|
||||
if len(schedule_deque):
|
||||
time_until_next_play = \
|
||||
self.date_interval_to_seconds(
|
||||
schedule_deque[0]['start'] - datetime.utcnow())
|
||||
if time_until_next_play < 0:
|
||||
time_until_next_play = 0
|
||||
else:
|
||||
time_until_next_play = None
|
||||
else:
|
||||
self.logger.info("New schedule received: %s", media_schedule)
|
||||
|
||||
#new schedule received. Replace old one with this.
|
||||
schedule_deque.clear()
|
||||
|
||||
keys = sorted(media_schedule.keys())
|
||||
for i in keys:
|
||||
schedule_deque.append(media_schedule[i])
|
||||
|
||||
if len(keys):
|
||||
time_until_next_play = self.date_interval_to_seconds(
|
||||
media_schedule[keys[0]]['start'] -
|
||||
datetime.utcnow())
|
||||
|
||||
else:
|
||||
time_until_next_play = None
|
||||
|
||||
|
||||
def date_interval_to_seconds(self, interval):
|
||||
"""
|
||||
Convert timedelta object into int representing the number of seconds. If
|
||||
number of seconds is less than 0, then return 0.
|
||||
"""
|
||||
seconds = (interval.microseconds + \
|
||||
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
|
||||
if seconds < 0: seconds = 0
|
||||
|
||||
return seconds
|
||||
|
||||
def run(self):
|
||||
try: self.main()
|
||||
except Exception, e:
|
||||
self.logger.error('PypoLiqQueue Exception: %s', traceback.format_exc())
|
||||
|
||||
|
||||
|
240
python_apps/pypo/pypo/pypoliquidsoap.py
Normal file
240
python_apps/pypo/pypo/pypoliquidsoap.py
Normal file
|
@ -0,0 +1,240 @@
|
|||
from pypofetch import PypoFetch
|
||||
from telnetliquidsoap import TelnetLiquidsoap
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
|
||||
import eventtypes
|
||||
import time
|
||||
|
||||
class PypoLiquidsoap():
|
||||
def __init__(self, logger, telnet_lock, host, port):
|
||||
self.logger = logger
|
||||
self.liq_queue_tracker = {
|
||||
"s0": None,
|
||||
"s1": None,
|
||||
"s2": None,
|
||||
"s3": None,
|
||||
"s4": None,
|
||||
}
|
||||
|
||||
self.telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, \
|
||||
logger,\
|
||||
host,\
|
||||
port,\
|
||||
self.liq_queue_tracker.keys())
|
||||
|
||||
def get_telnet_dispatcher(self):
|
||||
return self.telnet_liquidsoap
|
||||
|
||||
|
||||
def play(self, media_item):
|
||||
if media_item["type"] == eventtypes.FILE:
|
||||
self.handle_file_type(media_item)
|
||||
elif media_item["type"] == eventtypes.EVENT:
|
||||
self.handle_event_type(media_item)
|
||||
elif media_item["type"] == eventtypes.STREAM_BUFFER_START:
|
||||
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
|
||||
elif media_item["type"] == eventtypes.STREAM_OUTPUT_START:
|
||||
if media_item['row_id'] != self.telnet_liquidsoap.current_prebuffering_stream_id:
|
||||
#this is called if the stream wasn't scheduled sufficiently ahead of time
|
||||
#so that the prebuffering stage could take effect. Let's do the prebuffering now.
|
||||
self.telnet_liquidsoap.start_web_stream_buffer(media_item)
|
||||
self.telnet_liquidsoap.start_web_stream(media_item)
|
||||
elif media_item['type'] == eventtypes.STREAM_BUFFER_END:
|
||||
self.telnet_liquidsoap.stop_web_stream_buffer()
|
||||
elif media_item['type'] == eventtypes.STREAM_OUTPUT_END:
|
||||
self.telnet_liquidsoap.stop_web_stream_output()
|
||||
else: raise UnknownMediaItemType(str(media_item))
|
||||
|
||||
def handle_file_type(self, media_item):
|
||||
"""
|
||||
Wait 200 seconds (2000 iterations) for file to become ready,
|
||||
otherwise give up on it.
|
||||
"""
|
||||
iter_num = 0
|
||||
while not media_item['file_ready'] and iter_num < 2000:
|
||||
time.sleep(0.1)
|
||||
iter_num += 1
|
||||
|
||||
if media_item['file_ready']:
|
||||
available_queue = self.find_available_queue()
|
||||
|
||||
try:
|
||||
self.telnet_liquidsoap.queue_push(available_queue, media_item)
|
||||
self.liq_queue_tracker[available_queue] = media_item
|
||||
except Exception as e:
|
||||
self.logger.error(e)
|
||||
raise
|
||||
else:
|
||||
self.logger.warn("File %s did not become ready in less than 5 seconds. Skipping...", media_item['dst'])
|
||||
|
||||
def handle_event_type(self, media_item):
|
||||
if media_item['event_type'] == "kick_out":
|
||||
self.telnet_liquidsoap.disconnect_source("live_dj")
|
||||
elif media_item['event_type'] == "switch_off":
|
||||
self.telnet_liquidsoap.switch_source("live_dj", "off")
|
||||
|
||||
|
||||
def is_media_item_finished(self, media_item):
|
||||
if media_item is None:
|
||||
return True
|
||||
else:
|
||||
return datetime.utcnow() > media_item['end']
|
||||
|
||||
def find_available_queue(self):
|
||||
available_queue = None
|
||||
for i in self.liq_queue_tracker:
|
||||
mi = self.liq_queue_tracker[i]
|
||||
if mi == None or self.is_media_item_finished(mi):
|
||||
#queue "i" is available. Push to this queue
|
||||
available_queue = i
|
||||
|
||||
if available_queue == None:
|
||||
raise NoQueueAvailableException()
|
||||
|
||||
return available_queue
|
||||
|
||||
|
||||
def verify_correct_present_media(self, scheduled_now):
|
||||
#verify whether Liquidsoap is currently playing the correct files.
|
||||
#if we find an item that Liquidsoap is not playing, then push it
|
||||
#into one of Liquidsoap's queues. If Liquidsoap is already playing
|
||||
#it do nothing. If Liquidsoap is playing a track that isn't in
|
||||
#currently_playing then stop it.
|
||||
|
||||
#Check for Liquidsoap media we should source.skip
|
||||
#get liquidsoap items for each queue. Since each queue can only have one
|
||||
#item, we should have a max of 8 items.
|
||||
|
||||
#2013-03-21-22-56-00_0: {
|
||||
#id: 1,
|
||||
#type: "stream_output_start",
|
||||
#row_id: 41,
|
||||
#uri: "http://stream2.radioblackout.org:80/blackout.ogg",
|
||||
#start: "2013-03-21-22-56-00",
|
||||
#end: "2013-03-21-23-26-00",
|
||||
#show_name: "Untitled Show",
|
||||
#independent_event: true
|
||||
#},
|
||||
|
||||
try:
|
||||
scheduled_now_files = \
|
||||
filter(lambda x: x["type"] == eventtypes.FILE, scheduled_now)
|
||||
|
||||
scheduled_now_webstream = \
|
||||
filter(lambda x: x["type"] == eventtypes.STREAM_OUTPUT_START, \
|
||||
scheduled_now)
|
||||
|
||||
schedule_ids = set(map(lambda x: x["row_id"], scheduled_now_files))
|
||||
|
||||
row_id_map = {}
|
||||
liq_queue_ids = set()
|
||||
for i in self.liq_queue_tracker:
|
||||
mi = self.liq_queue_tracker[i]
|
||||
if not self.is_media_item_finished(mi):
|
||||
liq_queue_ids.add(mi["row_id"])
|
||||
row_id_map[mi["row_id"]] = mi
|
||||
|
||||
to_be_removed = set()
|
||||
to_be_added = set()
|
||||
|
||||
#Iterate over the new files, and compare them to currently scheduled
|
||||
#tracks. If already in liquidsoap queue still need to make sure they don't
|
||||
#have different attributes
|
||||
#if replay gain changes, it shouldn't change the amplification of the currently playing song
|
||||
for i in scheduled_now_files:
|
||||
if i["row_id"] in row_id_map:
|
||||
mi = row_id_map[i["row_id"]]
|
||||
correct = mi['start'] == i['start'] and \
|
||||
mi['end'] == i['end'] and \
|
||||
mi['row_id'] == i['row_id']
|
||||
|
||||
if not correct:
|
||||
#need to re-add
|
||||
self.logger.info("Track %s found to have new attr." % i)
|
||||
to_be_removed.add(i["row_id"])
|
||||
to_be_added.add(i["row_id"])
|
||||
|
||||
|
||||
to_be_removed.update(liq_queue_ids - schedule_ids)
|
||||
to_be_added.update(schedule_ids - liq_queue_ids)
|
||||
|
||||
if to_be_removed:
|
||||
self.logger.info("Need to remove items from Liquidsoap: %s" % \
|
||||
to_be_removed)
|
||||
|
||||
#remove files from Liquidsoap's queue
|
||||
for i in self.liq_queue_tracker:
|
||||
mi = self.liq_queue_tracker[i]
|
||||
if mi is not None and mi["row_id"] in to_be_removed:
|
||||
self.stop(i)
|
||||
|
||||
if to_be_added:
|
||||
self.logger.info("Need to add items to Liquidsoap *now*: %s" % \
|
||||
to_be_added)
|
||||
|
||||
for i in scheduled_now_files:
|
||||
if i["row_id"] in to_be_added:
|
||||
self.modify_cue_point(i)
|
||||
self.play(i)
|
||||
|
||||
#handle webstreams
|
||||
current_stream_id = self.telnet_liquidsoap.get_current_stream_id()
|
||||
if scheduled_now_webstream:
|
||||
if int(current_stream_id) != int(scheduled_now_webstream[0]["row_id"]):
|
||||
self.play(scheduled_now_webstream[0])
|
||||
elif current_stream_id != "-1":
|
||||
#something is playing and it shouldn't be.
|
||||
self.telnet_liquidsoap.stop_web_stream_buffer()
|
||||
self.telnet_liquidsoap.stop_web_stream_output()
|
||||
except KeyError as e:
|
||||
self.logger.error("Error: Malformed event in schedule. " + str(e))
|
||||
|
||||
|
||||
def stop(self, queue):
|
||||
self.telnet_liquidsoap.queue_remove(queue)
|
||||
self.liq_queue_tracker[queue] = None
|
||||
|
||||
def is_file(self, media_item):
|
||||
return media_item["type"] == eventtypes.FILE
|
||||
|
||||
def clear_queue_tracker(self):
|
||||
for i in self.liq_queue_tracker.keys():
|
||||
self.liq_queue_tracker[i] = None
|
||||
|
||||
def modify_cue_point(self, link):
|
||||
assert self.is_file(link)
|
||||
|
||||
tnow = datetime.utcnow()
|
||||
|
||||
link_start = link['start']
|
||||
|
||||
diff_td = tnow - link_start
|
||||
diff_sec = self.date_interval_to_seconds(diff_td)
|
||||
|
||||
if diff_sec > 0:
|
||||
self.logger.debug("media item was supposed to start %s ago. Preparing to start..", diff_sec)
|
||||
original_cue_in_td = timedelta(seconds=float(link['cue_in']))
|
||||
link['cue_in'] = self.date_interval_to_seconds(original_cue_in_td) + diff_sec
|
||||
|
||||
def date_interval_to_seconds(self, interval):
|
||||
"""
|
||||
Convert timedelta object into int representing the number of seconds. If
|
||||
number of seconds is less than 0, then return 0.
|
||||
"""
|
||||
seconds = (interval.microseconds + \
|
||||
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
|
||||
if seconds < 0: seconds = 0
|
||||
|
||||
return seconds
|
||||
|
||||
def clear_all_queues(self):
|
||||
self.telnet_liquidsoap.queue_clear_all()
|
||||
|
||||
|
||||
class UnknownMediaItemType(Exception):
|
||||
pass
|
||||
|
||||
class NoQueueAvailableException(Exception):
|
||||
pass
|
142
python_apps/pypo/pypo/pypomessagehandler.py
Normal file
142
python_apps/pypo/pypo/pypomessagehandler.py
Normal file
|
@ -0,0 +1,142 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import logging
|
||||
import traceback
|
||||
import os
|
||||
import sys
|
||||
from threading import Thread
|
||||
import time
|
||||
# For RabbitMQ
|
||||
from kombu.connection import BrokerConnection
|
||||
from kombu.messaging import Exchange, Queue
|
||||
from kombu.simple import SimpleQueue
|
||||
from amqplib.client_0_8.exceptions import AMQPConnectionException
|
||||
import json
|
||||
|
||||
from std_err_override import LogWriter
|
||||
|
||||
# configure logging
|
||||
logging_cfg = "/etc/airtime/pypo_logging.cfg"
|
||||
logging.config.fileConfig(logging_cfg)
|
||||
logger = logging.getLogger('message_h')
|
||||
LogWriter.override_std_err(logger)
|
||||
|
||||
#need to wait for Python 2.7 for this..
|
||||
#logging.captureWarnings(True)
|
||||
|
||||
|
||||
class PypoMessageHandler(Thread):
|
||||
def __init__(self, pq, rq, config):
|
||||
Thread.__init__(self)
|
||||
self.logger = logging.getLogger('message_h')
|
||||
self.pypo_queue = pq
|
||||
self.recorder_queue = rq
|
||||
self.config = config
|
||||
|
||||
def init_rabbit_mq(self):
|
||||
self.logger.info("Initializing RabbitMQ stuff")
|
||||
try:
|
||||
schedule_exchange = Exchange("airtime-pypo", "direct", durable=True, auto_delete=True)
|
||||
schedule_queue = Queue("pypo-fetch", exchange=schedule_exchange, key="foo")
|
||||
connection = BrokerConnection(self.config["host"], \
|
||||
self.config["user"], \
|
||||
self.config["password"], \
|
||||
self.config["vhost"])
|
||||
|
||||
channel = connection.channel()
|
||||
self.simple_queue = SimpleQueue(channel, schedule_queue)
|
||||
except Exception, e:
|
||||
self.logger.error(e)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
"""
|
||||
Handle a message from RabbitMQ, put it into our yucky global var.
|
||||
Hopefully there is a better way to do this.
|
||||
"""
|
||||
def handle_message(self, message):
|
||||
try:
|
||||
self.logger.info("Received event from RabbitMQ: %s" % message)
|
||||
|
||||
m = json.loads(message)
|
||||
command = m['event_type']
|
||||
self.logger.info("Handling command: " + command)
|
||||
|
||||
if command == 'update_schedule':
|
||||
self.logger.info("Updating schdule...")
|
||||
self.pypo_queue.put(message)
|
||||
elif command == 'reset_liquidsoap_bootstrap':
|
||||
self.logger.info("Resetting bootstrap vars...")
|
||||
self.pypo_queue.put(message)
|
||||
elif command == 'update_stream_setting':
|
||||
self.logger.info("Updating stream setting...")
|
||||
self.pypo_queue.put(message)
|
||||
elif command == 'update_stream_format':
|
||||
self.logger.info("Updating stream format...")
|
||||
self.pypo_queue.put(message)
|
||||
elif command == 'update_station_name':
|
||||
self.logger.info("Updating station name...")
|
||||
self.pypo_queue.put(message)
|
||||
elif command == 'switch_source':
|
||||
self.logger.info("switch_source command received...")
|
||||
self.pypo_queue.put(message)
|
||||
elif command == 'update_transition_fade':
|
||||
self.logger.info("Updating trasition fade...")
|
||||
self.pypo_queue.put(message)
|
||||
elif command == 'disconnect_source':
|
||||
self.logger.info("disconnect_source command received...")
|
||||
self.pypo_queue.put(message)
|
||||
elif command == 'update_recorder_schedule':
|
||||
self.recorder_queue.put(message)
|
||||
elif command == 'cancel_recording':
|
||||
self.recorder_queue.put(message)
|
||||
else:
|
||||
self.logger.info("Unknown command: %s" % command)
|
||||
except Exception, e:
|
||||
self.logger.error("Exception in handling RabbitMQ message: %s", e)
|
||||
|
||||
def main(self):
|
||||
while not self.init_rabbit_mq():
|
||||
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
|
||||
time.sleep(5)
|
||||
|
||||
loops = 1
|
||||
while True:
|
||||
self.logger.info("Loop #%s", loops)
|
||||
try:
|
||||
message = self.simple_queue.get(block=True)
|
||||
self.handle_message(message.payload)
|
||||
# ACK the message to take it off the queue
|
||||
message.ack()
|
||||
except (IOError, AttributeError, AMQPConnectionException), e:
|
||||
self.logger.error('Exception: %s', e)
|
||||
self.logger.error("traceback: %s", traceback.format_exc())
|
||||
while not self.init_rabbit_mq():
|
||||
self.logger.error("Error connecting to RabbitMQ Server. Trying again in few seconds")
|
||||
time.sleep(5)
|
||||
except Exception, e:
|
||||
"""
|
||||
sleep 5 seconds so that we don't spin inside this
|
||||
while loop and eat all the CPU
|
||||
"""
|
||||
time.sleep(5)
|
||||
|
||||
"""
|
||||
There is a problem with the RabbitMq messenger service. Let's
|
||||
log the error and get the schedule via HTTP polling
|
||||
"""
|
||||
self.logger.error('Exception: %s', e)
|
||||
self.logger.error("traceback: %s", traceback.format_exc())
|
||||
|
||||
loops += 1
|
||||
|
||||
"""
|
||||
Main loop of the thread:
|
||||
Wait for schedule updates from RabbitMQ, but in case there arent any,
|
||||
poll the server to get the upcoming schedule.
|
||||
"""
|
||||
def run(self):
|
||||
while True:
|
||||
self.main()
|
||||
|
162
python_apps/pypo/pypo/pypopush.py
Normal file
162
python_apps/pypo/pypo/pypopush.py
Normal file
|
@ -0,0 +1,162 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from configobj import ConfigObj
|
||||
|
||||
import sys
|
||||
import time
|
||||
import logging.config
|
||||
import telnetlib
|
||||
import calendar
|
||||
import math
|
||||
import traceback
|
||||
import os
|
||||
|
||||
from pypofetch import PypoFetch
|
||||
from pypoliqqueue import PypoLiqQueue
|
||||
|
||||
from Queue import Empty, Queue
|
||||
|
||||
from threading import Thread
|
||||
|
||||
from api_clients import api_client
|
||||
from std_err_override import LogWriter
|
||||
from timeout import ls_timeout
|
||||
|
||||
|
||||
# configure logging
|
||||
logging_cfg = "/etc/airtime/pypo_logging.cfg"
|
||||
logging.config.fileConfig(logging_cfg)
|
||||
logger = logging.getLogger()
|
||||
LogWriter.override_std_err(logger)
|
||||
|
||||
#need to wait for Python 2.7 for this..
|
||||
#logging.captureWarnings(True)
|
||||
|
||||
PUSH_INTERVAL = 2
|
||||
|
||||
|
||||
def is_stream(media_item):
|
||||
return media_item['type'] == 'stream_output_start'
|
||||
|
||||
def is_file(media_item):
|
||||
return media_item['type'] == 'file'
|
||||
|
||||
class PypoPush(Thread):
|
||||
def __init__(self, q, telnet_lock, pypo_liquidsoap, config):
|
||||
Thread.__init__(self)
|
||||
self.api_client = api_client.AirtimeApiClient()
|
||||
self.queue = q
|
||||
|
||||
self.telnet_lock = telnet_lock
|
||||
self.config = config
|
||||
|
||||
self.pushed_objects = {}
|
||||
self.logger = logging.getLogger('push')
|
||||
self.current_prebuffering_stream_id = None
|
||||
self.queue_id = 0
|
||||
|
||||
self.future_scheduled_queue = Queue()
|
||||
self.pypo_liquidsoap = pypo_liquidsoap
|
||||
|
||||
self.plq = PypoLiqQueue(self.future_scheduled_queue, \
|
||||
self.pypo_liquidsoap, \
|
||||
self.logger)
|
||||
self.plq.daemon = True
|
||||
self.plq.start()
|
||||
|
||||
|
||||
def main(self):
|
||||
loops = 0
|
||||
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
|
||||
|
||||
media_schedule = None
|
||||
|
||||
while True:
|
||||
try:
|
||||
media_schedule = self.queue.get(block=True)
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
raise
|
||||
else:
|
||||
self.logger.debug(media_schedule)
|
||||
#separate media_schedule list into currently_playing and
|
||||
#scheduled_for_future lists
|
||||
currently_playing, scheduled_for_future = \
|
||||
self.separate_present_future(media_schedule)
|
||||
|
||||
self.pypo_liquidsoap.verify_correct_present_media(currently_playing)
|
||||
self.future_scheduled_queue.put(scheduled_for_future)
|
||||
|
||||
if loops % heartbeat_period == 0:
|
||||
self.logger.info("heartbeat")
|
||||
loops = 0
|
||||
loops += 1
|
||||
|
||||
|
||||
def separate_present_future(self, media_schedule):
|
||||
tnow = datetime.utcnow()
|
||||
|
||||
present = []
|
||||
future = {}
|
||||
|
||||
sorted_keys = sorted(media_schedule.keys())
|
||||
for mkey in sorted_keys:
|
||||
media_item = media_schedule[mkey]
|
||||
|
||||
diff_td = tnow - media_item['start']
|
||||
diff_sec = self.date_interval_to_seconds(diff_td)
|
||||
|
||||
if diff_sec >= 0:
|
||||
present.append(media_item)
|
||||
else:
|
||||
future[mkey] = media_item
|
||||
|
||||
return present, future
|
||||
|
||||
def date_interval_to_seconds(self, interval):
|
||||
"""
|
||||
Convert timedelta object into int representing the number of seconds. If
|
||||
number of seconds is less than 0, then return 0.
|
||||
"""
|
||||
seconds = (interval.microseconds + \
|
||||
(interval.seconds + interval.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
|
||||
|
||||
return seconds
|
||||
|
||||
@ls_timeout
|
||||
def stop_web_stream_all(self):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.config['LS_HOST'], self.config['LS_PORT'])
|
||||
|
||||
#msg = 'dynamic_source.read_stop_all xxx\n'
|
||||
msg = 'http.stop\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
msg = 'dynamic_source.output_stop\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
msg = 'dynamic_source.id -1\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
try: self.main()
|
||||
except Exception, e:
|
||||
top = traceback.format_exc()
|
||||
self.logger.error('Pypo Push Exception: %s', top)
|
||||
time.sleep(5)
|
||||
|
354
python_apps/pypo/pypo/recorder.py
Normal file
354
python_apps/pypo/pypo/recorder.py
Normal file
|
@ -0,0 +1,354 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import logging
|
||||
import json
|
||||
import time
|
||||
import datetime
|
||||
import os
|
||||
import sys
|
||||
import pytz
|
||||
import signal
|
||||
import math
|
||||
import traceback
|
||||
import re
|
||||
|
||||
from configobj import ConfigObj
|
||||
|
||||
from poster.encode import multipart_encode
|
||||
from poster.streaminghttp import register_openers
|
||||
|
||||
from subprocess import Popen
|
||||
from subprocess import PIPE
|
||||
from threading import Thread
|
||||
|
||||
import mutagen
|
||||
|
||||
from api_clients import api_client as apc
|
||||
|
||||
def api_client(logger):
|
||||
"""
|
||||
api_client returns the correct instance of AirtimeApiClient. Although there is only one
|
||||
instance to choose from at the moment.
|
||||
"""
|
||||
return apc.AirtimeApiClient(logger)
|
||||
|
||||
# loading config file
|
||||
try:
|
||||
config = ConfigObj('/etc/airtime/airtime.conf')
|
||||
except Exception, e:
|
||||
print ('Error loading config file: %s', e)
|
||||
sys.exit()
|
||||
|
||||
# TODO : add docstrings everywhere in this module
|
||||
|
||||
def getDateTimeObj(time):
|
||||
# TODO : clean up for this function later.
|
||||
# - use tuples to parse result from split (instead of indices)
|
||||
# - perhaps validate the input before doing dangerous casts?
|
||||
# - rename this function to follow the standard convention
|
||||
# - rename time to something else so that the module name does not get
|
||||
# shadowed
|
||||
# - add docstring to document all behaviour of this function
|
||||
timeinfo = time.split(" ")
|
||||
date = [ int(x) for x in timeinfo[0].split("-") ]
|
||||
my_time = [ int(x) for x in timeinfo[1].split(":") ]
|
||||
return datetime.datetime(date[0], date[1], date[2], my_time[0], my_time[1], my_time[2], 0, None)
|
||||
|
||||
PUSH_INTERVAL = 2
|
||||
|
||||
class ShowRecorder(Thread):
|
||||
|
||||
def __init__ (self, show_instance, show_name, filelength, start_time):
|
||||
Thread.__init__(self)
|
||||
self.logger = logging.getLogger('recorder')
|
||||
self.api_client = api_client(self.logger)
|
||||
self.filelength = filelength
|
||||
self.start_time = start_time
|
||||
self.show_instance = show_instance
|
||||
self.show_name = show_name
|
||||
self.p = None
|
||||
|
||||
def record_show(self):
|
||||
length = str(self.filelength)
|
||||
filename = self.start_time
|
||||
filename = filename.replace(" ", "-")
|
||||
|
||||
if config["pypo"]["record_file_type"] in ["mp3", "ogg"]:
|
||||
filetype = config["pypo"]["record_file_type"]
|
||||
else:
|
||||
filetype = "ogg";
|
||||
|
||||
joined_path = os.path.join(config["base_recorded_files"], filename)
|
||||
filepath = "%s.%s" % (joined_path, filetype)
|
||||
|
||||
br = config["pypo"]["record_bitrate"]
|
||||
sr = config["pypo"]["record_samplerate"]
|
||||
c = config["pypo"]["record_channels"]
|
||||
ss = config["pypo"]["record_sample_size"]
|
||||
|
||||
#-f:16,2,44100
|
||||
#-b:256
|
||||
command = "ecasound -f:%s,%s,%s -i alsa -o %s,%s000 -t:%s" % \
|
||||
(ss, c, sr, filepath, br, length)
|
||||
args = command.split(" ")
|
||||
|
||||
self.logger.info("starting record")
|
||||
self.logger.info("command " + command)
|
||||
|
||||
self.p = Popen(args,stdout=PIPE)
|
||||
|
||||
#blocks at the following line until the child process
|
||||
#quits
|
||||
self.p.wait()
|
||||
outmsgs = self.p.stdout.readlines()
|
||||
for msg in outmsgs:
|
||||
m = re.search('^ERROR',msg)
|
||||
if not m == None:
|
||||
self.logger.info('Recording error is found: %s', outmsgs)
|
||||
self.logger.info("finishing record, return code %s", self.p.returncode)
|
||||
code = self.p.returncode
|
||||
|
||||
self.p = None
|
||||
|
||||
return code, filepath
|
||||
|
||||
def cancel_recording(self):
|
||||
#send signal interrupt (2)
|
||||
self.logger.info("Show manually cancelled!")
|
||||
if (self.p is not None):
|
||||
self.p.send_signal(signal.SIGINT)
|
||||
|
||||
#if self.p is defined, then the child process ecasound is recording
|
||||
def is_recording(self):
|
||||
return (self.p is not None)
|
||||
|
||||
def upload_file(self, filepath):
|
||||
|
||||
filename = os.path.split(filepath)[1]
|
||||
|
||||
# Register the streaming http handlers with urllib2
|
||||
register_openers()
|
||||
|
||||
# headers contains the necessary Content-Type and Content-Length
|
||||
# datagen is a generator object that yields the encoded parameters
|
||||
datagen, headers = multipart_encode({"file": open(filepath, "rb"), 'name': filename, 'show_instance': self.show_instance})
|
||||
|
||||
self.api_client.upload_recorded_show(datagen, headers)
|
||||
|
||||
def set_metadata_and_save(self, filepath):
|
||||
"""
|
||||
Writes song to 'filepath'. Uses metadata from:
|
||||
self.start_time, self.show_name, self.show_instance
|
||||
"""
|
||||
try:
|
||||
full_date, full_time = self.start_time.split(" ",1)
|
||||
# No idea why we translated - to : before
|
||||
#full_time = full_time.replace(":","-")
|
||||
self.logger.info("time: %s" % full_time)
|
||||
artist = "Airtime Show Recorder"
|
||||
#set some metadata for our file daemon
|
||||
recorded_file = mutagen.File(filepath, easy = True)
|
||||
recorded_file['artist'] = artist
|
||||
recorded_file['date'] = full_date
|
||||
recorded_file['title'] = "%s-%s-%s" % (self.show_name,
|
||||
full_date, full_time)
|
||||
#You cannot pass ints into the metadata of a file. Even tracknumber needs to be a string
|
||||
recorded_file['tracknumber'] = unicode(self.show_instance)
|
||||
recorded_file.save()
|
||||
|
||||
except Exception, e:
|
||||
top = traceback.format_exc()
|
||||
self.logger.error('Exception: %s', e)
|
||||
self.logger.error("traceback: %s", top)
|
||||
|
||||
|
||||
def run(self):
|
||||
code, filepath = self.record_show()
|
||||
|
||||
if code == 0:
|
||||
try:
|
||||
self.logger.info("Preparing to upload %s" % filepath)
|
||||
|
||||
self.set_metadata_and_save(filepath)
|
||||
|
||||
self.upload_file(filepath)
|
||||
os.remove(filepath)
|
||||
except Exception, e:
|
||||
self.logger.error(e)
|
||||
else:
|
||||
self.logger.info("problem recording show")
|
||||
os.remove(filepath)
|
||||
|
||||
class Recorder(Thread):
|
||||
def __init__(self, q):
|
||||
Thread.__init__(self)
|
||||
self.logger = logging.getLogger('recorder')
|
||||
self.api_client = api_client(self.logger)
|
||||
self.sr = None
|
||||
self.shows_to_record = {}
|
||||
self.server_timezone = ''
|
||||
self.queue = q
|
||||
self.loops = 0
|
||||
self.logger.info("RecorderFetch: init complete")
|
||||
|
||||
success = False
|
||||
while not success:
|
||||
try:
|
||||
self.api_client.register_component('show-recorder')
|
||||
success = True
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
time.sleep(10)
|
||||
|
||||
def handle_message(self):
|
||||
if not self.queue.empty():
|
||||
message = self.queue.get()
|
||||
msg = json.loads(message)
|
||||
command = msg["event_type"]
|
||||
self.logger.info("Received msg from Pypo Message Handler: %s", msg)
|
||||
if command == 'cancel_recording':
|
||||
if self.currently_recording():
|
||||
self.cancel_recording()
|
||||
else:
|
||||
self.process_recorder_schedule(msg)
|
||||
self.loops = 0
|
||||
|
||||
if self.shows_to_record:
|
||||
self.start_record()
|
||||
|
||||
def process_recorder_schedule(self, m):
|
||||
self.logger.info("Parsing recording show schedules...")
|
||||
temp_shows_to_record = {}
|
||||
shows = m['shows']
|
||||
for show in shows:
|
||||
show_starts = getDateTimeObj(show[u'starts'])
|
||||
show_end = getDateTimeObj(show[u'ends'])
|
||||
time_delta = show_end - show_starts
|
||||
|
||||
temp_shows_to_record[show[u'starts']] = [time_delta,
|
||||
show[u'instance_id'], show[u'name'], m['server_timezone']]
|
||||
self.shows_to_record = temp_shows_to_record
|
||||
|
||||
def get_time_till_next_show(self):
|
||||
if len(self.shows_to_record) != 0:
|
||||
tnow = datetime.datetime.utcnow()
|
||||
sorted_show_keys = sorted(self.shows_to_record.keys())
|
||||
|
||||
start_time = sorted_show_keys[0]
|
||||
next_show = getDateTimeObj(start_time)
|
||||
|
||||
delta = next_show - tnow
|
||||
s = '%s.%s' % (delta.seconds, delta.microseconds)
|
||||
out = float(s)
|
||||
|
||||
if out < 5:
|
||||
self.logger.debug("Shows %s", self.shows_to_record)
|
||||
self.logger.debug("Next show %s", next_show)
|
||||
self.logger.debug("Now %s", tnow)
|
||||
return out
|
||||
|
||||
def cancel_recording(self):
|
||||
self.sr.cancel_recording()
|
||||
self.sr = None
|
||||
|
||||
def currently_recording(self):
|
||||
if self.sr is not None and self.sr.is_recording():
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def start_record(self):
|
||||
if len(self.shows_to_record) == 0: return None
|
||||
try:
|
||||
delta = self.get_time_till_next_show()
|
||||
if delta < 5:
|
||||
self.logger.debug("sleeping %s seconds until show", delta)
|
||||
time.sleep(delta)
|
||||
|
||||
sorted_show_keys = sorted(self.shows_to_record.keys())
|
||||
start_time = sorted_show_keys[0]
|
||||
show_length = self.shows_to_record[start_time][0]
|
||||
show_instance = self.shows_to_record[start_time][1]
|
||||
show_name = self.shows_to_record[start_time][2]
|
||||
server_timezone = self.shows_to_record[start_time][3]
|
||||
|
||||
T = pytz.timezone(server_timezone)
|
||||
start_time_on_UTC = getDateTimeObj(start_time)
|
||||
start_time_on_server = start_time_on_UTC.replace(tzinfo=pytz.utc).astimezone(T)
|
||||
start_time_formatted = '%(year)d-%(month)02d-%(day)02d %(hour)02d:%(min)02d:%(sec)02d' % \
|
||||
{'year': start_time_on_server.year, 'month': start_time_on_server.month, 'day': start_time_on_server.day, \
|
||||
'hour': start_time_on_server.hour, 'min': start_time_on_server.minute, 'sec': start_time_on_server.second}
|
||||
|
||||
|
||||
seconds_waiting = 0
|
||||
|
||||
#avoiding CC-5299
|
||||
while(True):
|
||||
if self.currently_recording():
|
||||
self.logger.info("Previous record not finished, sleeping 100ms")
|
||||
seconds_waiting = seconds_waiting + 0.1
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
show_length_seconds = show_length.seconds - seconds_waiting
|
||||
|
||||
self.sr = ShowRecorder(show_instance, show_name, show_length_seconds, start_time_formatted)
|
||||
self.sr.start()
|
||||
break
|
||||
|
||||
#remove show from shows to record.
|
||||
del self.shows_to_record[start_time]
|
||||
#self.time_till_next_show = self.get_time_till_next_show()
|
||||
except Exception, e :
|
||||
top = traceback.format_exc()
|
||||
self.logger.error('Exception: %s', e)
|
||||
self.logger.error("traceback: %s", top)
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Main loop of the thread:
|
||||
Wait for schedule updates from RabbitMQ, but in case there arent any,
|
||||
poll the server to get the upcoming schedule.
|
||||
"""
|
||||
try:
|
||||
self.logger.info("Started...")
|
||||
# Bootstrap: since we are just starting up, we need to grab the
|
||||
# most recent schedule. After that we can just wait for updates.
|
||||
try:
|
||||
temp = self.api_client.get_shows_to_record()
|
||||
if temp is not None:
|
||||
self.process_recorder_schedule(temp)
|
||||
self.logger.info("Bootstrap recorder schedule received: %s", temp)
|
||||
except Exception, e:
|
||||
self.logger.error( traceback.format_exc() )
|
||||
self.logger.error(e)
|
||||
|
||||
self.logger.info("Bootstrap complete: got initial copy of the schedule")
|
||||
|
||||
self.loops = 0
|
||||
heartbeat_period = math.floor(30 / PUSH_INTERVAL)
|
||||
|
||||
while True:
|
||||
if self.loops * PUSH_INTERVAL > 3600:
|
||||
self.loops = 0
|
||||
"""
|
||||
Fetch recorder schedule
|
||||
"""
|
||||
try:
|
||||
temp = self.api_client.get_shows_to_record()
|
||||
if temp is not None:
|
||||
self.process_recorder_schedule(temp)
|
||||
self.logger.info("updated recorder schedule received: %s", temp)
|
||||
except Exception, e:
|
||||
self.logger.error( traceback.format_exc() )
|
||||
self.logger.error(e)
|
||||
try: self.handle_message()
|
||||
except Exception, e:
|
||||
self.logger.error( traceback.format_exc() )
|
||||
self.logger.error('Pypo Recorder Exception: %s', e)
|
||||
time.sleep(PUSH_INTERVAL)
|
||||
self.loops += 1
|
||||
except Exception, e :
|
||||
top = traceback.format_exc()
|
||||
self.logger.error('Exception: %s', e)
|
||||
self.logger.error("traceback: %s", top)
|
||||
|
329
python_apps/pypo/pypo/telnetliquidsoap.py
Normal file
329
python_apps/pypo/pypo/telnetliquidsoap.py
Normal file
|
@ -0,0 +1,329 @@
|
|||
import telnetlib
|
||||
from timeout import ls_timeout
|
||||
|
||||
def create_liquidsoap_annotation(media):
|
||||
# We need liq_start_next value in the annotate. That is the value that controls overlap duration of crossfade.
|
||||
|
||||
filename = media['dst']
|
||||
annotation = ('annotate:media_id="%s",liq_start_next="0",liq_fade_in="%s",' + \
|
||||
'liq_fade_out="%s",liq_cue_in="%s",liq_cue_out="%s",' + \
|
||||
'schedule_table_id="%s",replay_gain="%s dB"') % \
|
||||
(media['id'],
|
||||
float(media['fade_in']) / 1000,
|
||||
float(media['fade_out']) / 1000,
|
||||
float(media['cue_in']),
|
||||
float(media['cue_out']),
|
||||
media['row_id'],
|
||||
media['replay_gain'])
|
||||
|
||||
# Override the the artist/title that Liquidsoap extracts from a file's metadata
|
||||
# with the metadata we get from Airtime. (You can modify metadata in Airtime's library,
|
||||
# which doesn't get saved back to the file.)
|
||||
if 'metadata' in media:
|
||||
|
||||
if 'artist_name' in media['metadata']:
|
||||
artist_name = media['metadata']['artist_name']
|
||||
if isinstance(artist_name, str):
|
||||
annotation += ',artist="%s"' % (artist_name.replace('"', '\\"'))
|
||||
if 'track_title' in media['metadata']:
|
||||
track_title = media['metadata']['track_title']
|
||||
if isinstance(track_title, str):
|
||||
annotation += ',title="%s"' % (track_title.replace('"', '\\"'))
|
||||
|
||||
annotation += ":" + filename
|
||||
|
||||
return annotation
|
||||
|
||||
class TelnetLiquidsoap:
|
||||
|
||||
def __init__(self, telnet_lock, logger, ls_host, ls_port, queues):
|
||||
self.telnet_lock = telnet_lock
|
||||
self.ls_host = ls_host
|
||||
self.ls_port = ls_port
|
||||
self.logger = logger
|
||||
self.queues = queues
|
||||
self.current_prebuffering_stream_id = None
|
||||
|
||||
def __connect(self):
|
||||
return telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
|
||||
def __is_empty(self, queue_id):
|
||||
return True
|
||||
tn = self.__connect()
|
||||
msg = '%s.queue\nexit\n' % queue_id
|
||||
tn.write(msg)
|
||||
output = tn.read_all().splitlines()
|
||||
if len(output) == 3:
|
||||
return len(output[0]) == 0
|
||||
else:
|
||||
raise Exception("Unexpected list length returned: %s" % output)
|
||||
|
||||
@ls_timeout
|
||||
def queue_clear_all(self):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = self.__connect()
|
||||
|
||||
for i in self.queues:
|
||||
msg = 'queues.%s_skip\n' % i
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
@ls_timeout
|
||||
def queue_remove(self, queue_id):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = self.__connect()
|
||||
|
||||
msg = 'queues.%s_skip\n' % queue_id
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
|
||||
@ls_timeout
|
||||
def queue_push(self, queue_id, media_item):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
|
||||
if not self.__is_empty(queue_id):
|
||||
raise QueueNotEmptyException()
|
||||
|
||||
tn = self.__connect()
|
||||
annotation = create_liquidsoap_annotation(media_item)
|
||||
msg = '%s.push %s\n' % (queue_id, annotation.encode('utf-8'))
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
show_name = media_item['show_name']
|
||||
msg = 'vars.show_name %s\n' % show_name.encode('utf-8')
|
||||
tn.write(msg)
|
||||
self.logger.debug(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
|
||||
@ls_timeout
|
||||
def stop_web_stream_buffer(self):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
|
||||
|
||||
msg = 'http.stop\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
msg = 'dynamic_source.id -1\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
@ls_timeout
|
||||
def stop_web_stream_output(self):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
#dynamic_source.stop http://87.230.101.24:80/top100station.mp3
|
||||
|
||||
msg = 'dynamic_source.output_stop\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
@ls_timeout
|
||||
def start_web_stream(self, media_item):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
|
||||
#TODO: DO we need this?
|
||||
msg = 'streams.scheduled_play_start\n'
|
||||
tn.write(msg)
|
||||
|
||||
msg = 'dynamic_source.output_start\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
self.current_prebuffering_stream_id = None
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
@ls_timeout
|
||||
def start_web_stream_buffer(self, media_item):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
|
||||
msg = 'dynamic_source.id %s\n' % media_item['row_id']
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
msg = 'http.restart %s\n' % media_item['uri'].encode('latin-1')
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
self.logger.debug(tn.read_all())
|
||||
|
||||
self.current_prebuffering_stream_id = media_item['row_id']
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
@ls_timeout
|
||||
def get_current_stream_id(self):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
|
||||
msg = 'dynamic_source.get_id\n'
|
||||
self.logger.debug(msg)
|
||||
tn.write(msg)
|
||||
|
||||
tn.write("exit\n")
|
||||
stream_id = tn.read_all().splitlines()[0]
|
||||
self.logger.debug("stream_id: %s" % stream_id)
|
||||
|
||||
return stream_id
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
@ls_timeout
|
||||
def disconnect_source(self, sourcename):
|
||||
self.logger.debug('Disconnecting source: %s', sourcename)
|
||||
command = ""
|
||||
if(sourcename == "master_dj"):
|
||||
command += "master_harbor.kick\n"
|
||||
elif(sourcename == "live_dj"):
|
||||
command += "live_dj_harbor.kick\n"
|
||||
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
self.logger.info(command)
|
||||
tn.write(command)
|
||||
tn.write('exit\n')
|
||||
tn.read_all()
|
||||
except Exception, e:
|
||||
self.logger.error(traceback.format_exc())
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
@ls_timeout
|
||||
def telnet_send(self, commands):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
|
||||
tn = telnetlib.Telnet(self.ls_host, self.ls_port)
|
||||
for i in commands:
|
||||
self.logger.info(i)
|
||||
tn.write(i)
|
||||
|
||||
tn.write('exit\n')
|
||||
tn.read_all()
|
||||
except Exception, e:
|
||||
self.logger.error(str(e))
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
|
||||
def switch_source(self, sourcename, status):
|
||||
self.logger.debug('Switching source: %s to "%s" status', sourcename, status)
|
||||
command = "streams."
|
||||
if sourcename == "master_dj":
|
||||
command += "master_dj_"
|
||||
elif sourcename == "live_dj":
|
||||
command += "live_dj_"
|
||||
elif sourcename == "scheduled_play":
|
||||
command += "scheduled_play_"
|
||||
|
||||
if status == "on":
|
||||
command += "start\n"
|
||||
else:
|
||||
command += "stop\n"
|
||||
|
||||
self.telnet_send([command])
|
||||
|
||||
class DummyTelnetLiquidsoap:
|
||||
|
||||
def __init__(self, telnet_lock, logger):
|
||||
self.telnet_lock = telnet_lock
|
||||
self.liquidsoap_mock_queues = {}
|
||||
self.logger = logger
|
||||
|
||||
for i in range(4):
|
||||
self.liquidsoap_mock_queues["s"+str(i)] = []
|
||||
|
||||
@ls_timeout
|
||||
def queue_push(self, queue_id, media_item):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
|
||||
self.logger.info("Pushing %s to queue %s" % (media_item, queue_id))
|
||||
from datetime import datetime
|
||||
print "Time now: %s" % datetime.utcnow()
|
||||
|
||||
annotation = create_liquidsoap_annotation(media_item)
|
||||
self.liquidsoap_mock_queues[queue_id].append(annotation)
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
@ls_timeout
|
||||
def queue_remove(self, queue_id):
|
||||
try:
|
||||
self.telnet_lock.acquire()
|
||||
|
||||
self.logger.info("Purging queue %s" % queue_id)
|
||||
from datetime import datetime
|
||||
print "Time now: %s" % datetime.utcnow()
|
||||
|
||||
except Exception:
|
||||
raise
|
||||
finally:
|
||||
self.telnet_lock.release()
|
||||
|
||||
class QueueNotEmptyException(Exception):
|
||||
pass
|
98
python_apps/pypo/pypo/testpypoliqqueue.py
Normal file
98
python_apps/pypo/pypo/testpypoliqqueue.py
Normal file
|
@ -0,0 +1,98 @@
|
|||
from pypoliqqueue import PypoLiqQueue
|
||||
from telnetliquidsoap import DummyTelnetLiquidsoap, TelnetLiquidsoap
|
||||
|
||||
|
||||
from Queue import Queue
|
||||
from threading import Lock
|
||||
|
||||
import sys
|
||||
import signal
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
|
||||
def keyboardInterruptHandler(signum, frame):
|
||||
logger = logging.getLogger()
|
||||
logger.info('\nKeyboard Interrupt\n')
|
||||
sys.exit(0)
|
||||
signal.signal(signal.SIGINT, keyboardInterruptHandler)
|
||||
|
||||
# configure logging
|
||||
format = '%(levelname)s - %(pathname)s - %(lineno)s - %(asctime)s - %(message)s'
|
||||
logging.basicConfig(level=logging.DEBUG, format=format)
|
||||
logging.captureWarnings(True)
|
||||
|
||||
telnet_lock = Lock()
|
||||
pypoPush_q = Queue()
|
||||
|
||||
|
||||
pypoLiq_q = Queue()
|
||||
liq_queue_tracker = {
|
||||
"s0": None,
|
||||
"s1": None,
|
||||
"s2": None,
|
||||
"s3": None,
|
||||
}
|
||||
|
||||
#dummy_telnet_liquidsoap = DummyTelnetLiquidsoap(telnet_lock, logging)
|
||||
dummy_telnet_liquidsoap = TelnetLiquidsoap(telnet_lock, logging, \
|
||||
"localhost", \
|
||||
1234)
|
||||
|
||||
plq = PypoLiqQueue(pypoLiq_q, telnet_lock, logging, liq_queue_tracker, \
|
||||
dummy_telnet_liquidsoap)
|
||||
plq.daemon = True
|
||||
plq.start()
|
||||
|
||||
|
||||
print "Time now: %s" % datetime.utcnow()
|
||||
|
||||
media_schedule = {}
|
||||
|
||||
start_dt = datetime.utcnow() + timedelta(seconds=1)
|
||||
end_dt = datetime.utcnow() + timedelta(seconds=6)
|
||||
|
||||
media_schedule[start_dt] = {"id": 5, \
|
||||
"type":"file", \
|
||||
"row_id":9, \
|
||||
"uri":"", \
|
||||
"dst":"/home/martin/Music/ipod/Hot Chocolate - You Sexy Thing.mp3", \
|
||||
"fade_in":0, \
|
||||
"fade_out":0, \
|
||||
"cue_in":0, \
|
||||
"cue_out":300, \
|
||||
"start": start_dt, \
|
||||
"end": end_dt, \
|
||||
"show_name":"Untitled", \
|
||||
"replay_gain": 0, \
|
||||
"independent_event": True \
|
||||
}
|
||||
|
||||
|
||||
|
||||
start_dt = datetime.utcnow() + timedelta(seconds=2)
|
||||
end_dt = datetime.utcnow() + timedelta(seconds=6)
|
||||
|
||||
media_schedule[start_dt] = {"id": 5, \
|
||||
"type":"file", \
|
||||
"row_id":9, \
|
||||
"uri":"", \
|
||||
"dst":"/home/martin/Music/ipod/Good Charlotte - bloody valentine.mp3", \
|
||||
"fade_in":0, \
|
||||
"fade_out":0, \
|
||||
"cue_in":0, \
|
||||
"cue_out":300, \
|
||||
"start": start_dt, \
|
||||
"end": end_dt, \
|
||||
"show_name":"Untitled", \
|
||||
"replay_gain": 0, \
|
||||
"independent_event": True \
|
||||
}
|
||||
pypoLiq_q.put(media_schedule)
|
||||
|
||||
plq.join()
|
||||
|
||||
|
||||
|
||||
|
||||
|
38
python_apps/pypo/pypo/timeout.py
Normal file
38
python_apps/pypo/pypo/timeout.py
Normal file
|
@ -0,0 +1,38 @@
|
|||
import threading
|
||||
import pypofetch
|
||||
|
||||
def __timeout(func, timeout_duration, default, args, kwargs):
|
||||
|
||||
class InterruptableThread(threading.Thread):
|
||||
def __init__(self):
|
||||
threading.Thread.__init__(self)
|
||||
self.result = default
|
||||
def run(self):
|
||||
self.result = func(*args, **kwargs)
|
||||
|
||||
first_attempt = True
|
||||
|
||||
while True:
|
||||
it = InterruptableThread()
|
||||
it.start()
|
||||
if not first_attempt:
|
||||
timeout_duration = timeout_duration * 2
|
||||
it.join(timeout_duration)
|
||||
|
||||
if it.isAlive():
|
||||
"""Restart Liquidsoap and try the command one more time. If it
|
||||
fails again then there is something critically wrong..."""
|
||||
if first_attempt:
|
||||
#restart liquidsoap
|
||||
pypofetch.PypoFetch.ref.restart_liquidsoap()
|
||||
else:
|
||||
raise Exception("Thread did not terminate")
|
||||
else:
|
||||
return it.result
|
||||
|
||||
first_attempt = False
|
||||
|
||||
def ls_timeout(f, timeout=15, default=None):
|
||||
def new_f(*args, **kwargs):
|
||||
return __timeout(f, timeout, default, args, kwargs)
|
||||
return new_f
|
Loading…
Add table
Add a link
Reference in a new issue