#!/usr/bin/python3
# pylint: disable=line-too-long
# kate: space-indent on; indent-width 4; replace-tabs on; indent-mode python; remove-trailing-space modified;
# vim: expandtab ts=4
# pylint: enable=line-too-long
############################################################################
# Copyright © 2018 José Manuel Santamaría Lema <panfaust@gmail.com> #
# #
# This program is free software; you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation; either version 2 of the License, or #
# (at your option) any later version. #
############################################################################
"""
This module provides various functions to operate with tritemio's wannabuild
and reprepro setup.
"""
#pylint: disable=too-many-arguments,too-many-locals
import subprocess
import time
import sys
from libka.ka_configuration import get_ka_config_parser
from libka.ka_data_utils import read_packages_file
from libka.ka_print import ka_print_plain
from libka.ka_print import ka_print_error
[docs]
def tritemio_exec_ssh(user, host, command, retry_number=10, retry_delay=3, exit_on_error=True):
"""Executes the given ssh command at tritemio"""
#pylint: disable=line-too-long
#FIXME: If we don't have the host 'tritemio' in ssh's known_hosts the
# subprocess.check_output(ssh_command) below is probably going to
# hang. Maybe we should add it to known_hosts in advance just in
# case? see:
# https://stackoverflow.com/questions/13597115/how-to-add-hostname-to-known-hosts-using-python
#pylint: enable=line-too-long
ssh_command = ['ssh', '%s@%s' % (user, host)] + command
while retry_number > 0:
retry_number -= 1
try:
output_bytes = subprocess.check_output(ssh_command, stderr=subprocess.STDOUT)
retry_number = 0 #exit the loop as soon as the ssh command is successful
except subprocess.CalledProcessError as exception:
if retry_number == 0:
ka_print_error(str(exception))
ka_print_plain(exception.output.decode().strip())
if exit_on_error:
sys.exit(1)
else:
return None
else:
time.sleep(retry_delay)
ssh_reply = output_bytes.decode("utf-8").strip()
return ssh_reply
[docs]
def tritemio_exec_reprepro(command, retry_number=10, retry_delay=3, exit_on_error=True):
"""Executes the given reprepro command at tritemio"""
#Load configuration
ka_config = get_ka_config_parser()
#Read configuration variables
reprepro_host = ka_config['tritemio repository']['host_repo_fqdn']
reprepro_user = ka_config['tritemio repository']['host_ssh_user']
reprepro_dir = ka_config['tritemio repository']['host_repo_dir']
#Prepare ssh command
command = ['reprepro', '-b', reprepro_dir] + command
#Execute ssh command
reprepro_reply = tritemio_exec_ssh(reprepro_user, reprepro_host, command,
retry_number, retry_delay, exit_on_error)
#Return result
return reprepro_reply
[docs]
def tritemio_list_all_srcs(dist):
"""Returns a list of all source package names available in a given tritemio distribution"""
reprepro_command = ['-A', 'source', 'list', dist]
reprepro_reply = tritemio_exec_reprepro(reprepro_command)
reprepro_reply = reprepro_reply.splitlines()
src_pkg_list = []
for line in reprepro_reply:
#Find out the package name from a line like this:
#ubuntu-exp3|main|source: umbrello 4:17.12.3-0ubuntu1+tritemio1
src_pkg_name = line.split(' ')[1]
#Add the source package name to the final list
src_pkg_list.append(src_pkg_name)
#Return the list with all the source package names
return src_pkg_list
[docs]
def tritemio_pkglist(release_type, dist):
"""
Finds out a package list for a given tritemio distribution,
for instance, tritemio_pkglist('other','ubuntu-exp3') would give you a list
of the source pakcages which are available in 'ubuntu-exp3' and are not part
of qt/frameworks/plasma/applications
"""
pkg_list = []
real_release_types = ['qt', 'frameworks', 'plasma', 'applications']
if release_type in real_release_types:
pkg_list = read_packages_file('tritemio/package-name-lists/%s-%s'
% (release_type, dist))
elif release_type == 'all':
return tritemio_list_all_srcs(dist)
elif release_type == 'other':
#Build a list with all source package names available in the given dist
all_srcs_list = tritemio_list_all_srcs(dist)
#Build a set with all source package names available for all real release types
real_rt_list = []
for real_release_type in real_release_types:
real_rt_list += read_packages_file('tritemio/package-name-lists/%s-%s'
% (real_release_type, dist))
real_rt_set = set(real_rt_list)
#Inspeact all the source packages and if they are not part of a real release type, they
#are suposed to belong to the virtual 'other' release type.
for src_pkg in all_srcs_list:
if src_pkg not in real_rt_set:
pkg_list.append(src_pkg)
else:
raise ValueError('Wrong release type: %s' % release_type)
return pkg_list
[docs]
def tritemio_ls(package_name, arch='any'):
"""Returns the output of reprepro ls at tritemio"""
if arch == 'any':
reprepro_command = ['ls', package_name]
else:
reprepro_command = ['-A', arch, 'ls', package_name]
#Get the package information at tritemio's repository
reprepro_reply = tritemio_exec_reprepro(reprepro_command)
return reprepro_reply
[docs]
def tritemio_copysrc(origin_dist, destination_dist, src_package_name):
"""Copy the given source package from origin_dist to destination_dist"""
return tritemio_exec_reprepro(
['copysrc', destination_dist, origin_dist, src_package_name],
exit_on_error=False)
[docs]
def tritemio_removesrc(dist, src_package_name):
"""Removes given package from given distribution"""
return tritemio_exec_reprepro(['removesrc', dist, src_package_name],
exit_on_error=False)
[docs]
def tritemio_bin_nmu(dist, arch, src_package_name, changelog_entry='Rebuild.'):
"""Performs a binNMU of the given package"""
#Load configuration
ka_config = get_ka_config_parser()
#Read configuration variables
wannabuild_ssh_user = ka_config['tritemio database']['host_ssh_user']
wannabuild_ssh_host = ka_config['tritemio database']['host_ssh_fqdn']
#Get the package information from wannabuild
info_command = ['wanna-build', '-d', dist, '-A', arch, '--info', src_package_name]
ssh_info_reply = tritemio_exec_ssh(wannabuild_ssh_user, wannabuild_ssh_host,
info_command, exit_on_error=False)
#Abort in the wanna-build command failed
if ssh_info_reply is None:
return ''
#Parse the ssh info reply in order to get a dictionary with fields and values
ssh_info_reply_dict = dict()
ssh_info_lines = ssh_info_reply.splitlines()
for line in ssh_info_lines:
split_line = line.split(':', 1)
field_name = split_line[0].strip()
field_value = split_line[1].strip()
ssh_info_reply_dict[field_name] = field_value
#Find out the current version
if 'Version' not in ssh_info_reply_dict:
#Abort if we couldn't find info for the given package
ka_print_error('Package %s not found' % src_package_name)
return ''
src_package_version = ssh_info_reply_dict['Version']
#Find out the current binNMU number and increase it
bin_nmu_number = 0
if 'Binary-NMU-Version' in ssh_info_reply_dict:
bin_nmu_number = int(ssh_info_reply_dict['Binary-NMU-Version'])
bin_nmu_number += 1
#Exec the wannabuild binNMU command and return its output
bin_nmu_command = ['wanna-build', '-d', dist, '-A', arch,
'--binNMU', str(bin_nmu_number),
'--message', changelog_entry,
'%s_%s' % (src_package_name, src_package_version)]
ssh_bin_nmu_reply = tritemio_exec_ssh(wannabuild_ssh_user,
wannabuild_ssh_host,
bin_nmu_command,
exit_on_error=True)
return ssh_bin_nmu_reply
[docs]
def tritemio_give_back(dist, arch, src_package_name):
"""Performs a give-back of the given package"""
#Load configuration
ka_config = get_ka_config_parser()
#Read configuration variables
wannabuild_ssh_user = ka_config['tritemio database']['host_ssh_user']
wannabuild_ssh_host = ka_config['tritemio database']['host_ssh_fqdn']
#Get the package information from wannabuild
info_command = ['wanna-build', '-d', dist, '-A', arch, '--info', src_package_name]
ssh_info_reply = tritemio_exec_ssh(wannabuild_ssh_user, wannabuild_ssh_host,
info_command, exit_on_error=False)
#Abort in the wanna-build command failed
if ssh_info_reply is None:
return ''
#Parse the ssh info reply in order to get a dictionary with fields and values
ssh_info_reply_dict = dict()
ssh_info_lines = ssh_info_reply.splitlines()
for line in ssh_info_lines:
split_line = line.split(':', 1)
field_name = split_line[0].strip()
field_value = split_line[1].strip()
ssh_info_reply_dict[field_name] = field_value
#Find out the current version
if 'Version' not in ssh_info_reply_dict:
#Abort if we couldn't find info for the given package
ka_print_error('Package %s not found' % src_package_name)
return ''
src_package_version = ssh_info_reply_dict['Version']
#Exec the wannabuild binNMU command and return its output
bin_nmu_command = ['wanna-build', '-d', dist, '-A', arch,
'--give-back',
'%s_%s' % (src_package_name, src_package_version)]
ssh_bin_give_back_reply = tritemio_exec_ssh(wannabuild_ssh_user,
wannabuild_ssh_host,
bin_nmu_command,
exit_on_error=True)
return ssh_bin_give_back_reply