#!/bin/bash
#
# Copyright 2011-2018 Nicolas Thauvin. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# 
#  1. Redistributions of source code must retain the above copyright
#     notice, this list of conditions and the following disclaimer.
#  2. Redistributions in binary form must reproduce the above copyright
#     notice, this list of conditions and the following disclaimer in the
#     documentation and/or other materials provided with the distribution.
# 
# THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#

version="2.3"

# Hardcoded configuration
PGXLOG=
BACKUP_DIR="/var/lib/pgsql/backups/pitr"
STORAGE="tar"
BACKUP_HOST=
BACKUP_USER=
BACKUP_COMPRESS_BIN="gzip -4"
BACKUP_COMPRESS_SUFFIX="gz"
BACKUP_UNCOMPRESS_BIN="gunzip"
PGPSQL="psql"
PRE_BACKUP_COMMAND=
POST_BACKUP_COMMAND=
LOG_TIMESTAMP="yes"
USE_ISO8601_TIMESTAMPS="no"
PGOWNER=`id -un`
OVERWRITE="no"
RESTORE_COMMAND=
PURGE_KEEP_COUNT=
PURGE_OLDER_THAN=
RSYNC_WHOLEFILE="no"
RSYNC_BWLIMIT=

# We do not hardcode defaults for ARCHIVE_HOST, ARCHIVE_USER and
# ARCHIVE_DIR so that they can be deduced from their BACKUP_
# counterpart by parse_target_uri()
ARCHIVE_COMPRESS="yes"
ARCHIVE_OVERWRITE="yes"
ARCHIVE_CHECK="no"
ARCHIVE_FLUSH="no"
ARCHIVE_COMPRESS_BIN="gzip -f -4"
ARCHIVE_COMPRESS_SUFFIX="gz"
ARCHIVE_UNCOMPRESS_BIN="gunzip"
SYSLOG="no"
SYSLOG_FACILITY="local0"
SYSLOG_IDENT="postgres"

BACKUP_ENCRYPT="no"
ARCHIVE_ENCRYPT="no"
GPG_ENCRYPT_KEYS=""
GPG_BIN="/usr/bin/gpg"

config_dir="/etc/pitrery"
config="pitrery.conf"

out_rc=0
rsync_opts=""

usage() {
    case $1 in
        list)
            echo "`basename $0` list - Display information about backups"
            echo
            echo "usage: `basename $0` list [options] [[[user@]host:]/path/to/backups]"
            echo
            echo "options:"
            echo "    -v              Display details of the backup"
            echo
            echo "    -?              Print help"
            echo
            exit ${2:-0}
            ;;
        backup)
            echo "`basename $0` backup - Perform a base backup"
            echo
            echo "usage: `basename $0` backup [options] [[[user@]host:]/path/to/backups]"
            echo
            echo "options:"
            echo "    -D dir               Path to \$PGDATA"
            echo "    -s mode              Storage method, tar or rsync"
            echo "    -c compress_bin      Compression command for tar method"
            echo "    -e compress_suffix   Suffix added by the compression program"
            echo "    -E                   Encrypt tar backups with GPG"
            echo "    -r keys:...          Colon separated list of recipients for GPG encryption"
            echo "    -t                   Use ISO 8601 format to name backups"
            echo "    -T                   Timestamp log messages"
            echo
            echo "    -P PSQL              path to the psql command"
            echo "    -h HOSTNAME          database server host or socket directory"
            echo "    -p PORT              database server port number"
            echo "    -U NAME              connect as specified database user"
            echo "    -d DATABASE          database to use for connection"
            echo
            echo "    -?                   Print help"
            echo
            exit ${2:-0}
            ;;
        restore)
            echo "`basename $0` restore - Restore a base backup and prepare PITR"
            echo
            echo "usage: `basename $0` restore [options] [[[user@]host:]/path/to/backups]"
            echo
            echo " options:"
            echo "    -D dir               Path to target \$PGDATA"
            echo "    -x dir               Path to the xlog directory (only if outside \$PGDATA)"
            echo "    -d date              Restore until this date"
            echo "    -O user              If run by root, owner of the files"
            echo "    -t tblspc:dir        Change the target directory of tablespace \"tblspc\""
            echo "                           this switch can be used many times"
            echo "    -n                   Dry run: show restore information only"
            echo "    -R                   Overwrite destination directories"
            echo "    -c compress_bin      Uncompression command for tar method"
            echo "    -e compress_suffix   Suffix added by the compression program"
            echo "    -r command           Command line to use in restore_command"
            echo "    -C config            Configuration file for restore_xlog in restore_command"
            echo "    -T                   Timestamp log messages"
            echo
            echo "    -?                   Print help"
            echo
            exit ${2:-0}
            ;;
        purge)
            echo "`basename $0` purge - Clean old base backups and archived WAL files"
            echo
            echo "usage: `basename $0` purge [options] [[[user@]host:]/path/to/backups]"
            echo
            echo "options:"
            echo "    -m count               Keep this number of backups"
            echo "    -d days                Purge backups older than this number of days"
            echo
            echo "    -a [[user@]host:]/dir  Path to WAL archives"
            echo
            echo "    -N                     Dry run: show what would be purged only"
            echo
            echo "    -T                     Timestamp log messages"
            echo "    -?                     Print help"
            echo
            exit ${2:-0}
            ;;
        check)
            echo "`basename $0` check - Verify configuration and backups integrity"
            echo
            echo "usage: `basename $0` check [options] [[[user@]host:]/path/to/backups]"
            echo
            echo "options:"
            echo "    -C conf                Configuration file"
            echo
            echo "    -B                     Check backups"
            echo "    -m count               Fail when the number of backups is less than count"
            echo "    -g age                 Fail when the newest backup is older than age"
            echo
            echo "    -A                     Check WAL archives"
            echo "    -a [[user@]host:]/dir  Path to WAL archives"
            echo "    -c command             Uncompression command"
            echo
            echo "    -n                     Nagios compatible output for -b and -A"
            echo
            echo "    -?                     Print help"
            echo
            exit ${2:-0}
            ;;
        configure)
            echo "`basename $0` configure - Create a configuration file from the command line"
            echo
            echo "usage: `basename $0` configure [options] [[user@]host:]/path/to/backups"
            echo
            echo "options:"
            echo "    -o config_file         Output configuration file"
            echo "    -f                     Overwrite the destination file"
            echo "    -C                     Do not connect to PostgreSQL"
            echo
            echo "    -s mode                Storage method, tar or rsync"
            echo "    -m count               Number of backups to keep when purging"
            echo "    -g days                Remove backups older than this number of days"
            echo "    -D dir                 Path to \$PGDATA"
            echo "    -a [[user@]host:]/dir  Place to store WAL archives"
            echo "    -E                     Encrypt tar backups with GPG"
            echo "    -r keys:...            Colon separated list of recipients for GPG encryption"
            echo
            echo "    -P psql                Path to the psql command"
            echo "    -h hostname            Database server host or socket directory"
            echo "    -p port                Database server port number"
            echo "    -U name                Connect as specified database user"
            echo "    -d database            Database to use for connection"
            echo
            echo "    -?                     Print help"
            echo
            exit ${2:-0}
            ;;
        help)
            echo "`basename $0` help - Print help, optionnally for an action"
            echo
            echo "usage: `basename $0` help [options] [action]"
            echo
            echo "options:"
            echo "    -?                     Print help"
            echo
            exit ${2:-0}
            ;;
    esac

    # Fallback to the main command usage
    echo "`basename $0` $version - PostgreSQL Point In Time Recovery made easy"
    echo
    echo "usage: `basename $0` [options] action [args]"
    echo
    echo "options:"
    echo "    -f file      Path to the configuration file"
    echo "    -l           List configuration files in the default directory"
    echo "    -V           Display the version and exit"
    echo "    -?           Print help"
    echo
    echo "actions:"
    echo "    list - Display information about backups"
    echo "    backup - Perform a base backup"
    echo "    restore - Restore a base backup and prepare PITR"
    echo "    purge - Clean old base backups and archived WAL files"
    echo "    check - Verify configuration and backups integrity"
    echo "    configure - Create a configuration file from the command line"
    echo "    help - Print help, optionnally for an action"
    echo
    exit ${2:-0}
}

# Apply an extra level of shell quoting to each of the arguments passed.
# This is necessary for remote-side arguments of ssh (including commands that
# are executed by the remote shell and remote paths for scp and rsync via ssh)
# since they will strip an extra level of quoting off on the remote side.
# This makes it safe for them to include spaces or other special characters
# which should not be interpreted or cause word-splitting on the remote side.
qw() {
    while (( $# > 1 )); do
	printf "%q " "$1"
	shift
    done
    (( $# == 1 )) && printf "%q" "$1"
}

can_cleanup="no"
cleanup() {
    # Ensure we have a safe guard against removing data too soon
    [ "$can_cleanup" = "yes" ] || return 0

    # Cleanup depends on the action and the location of the backups
    case $action in
        "backup")
            info "cleaning..."
            if [ "$backup_local" = "yes" ]; then
	        if [ -d "$backup_dir" ]; then
	            rm -rf -- "$backup_dir"
	        fi
            elif [ -n "$backup_ssh_target" ] && [ -n "$backup_dir" ]; then
	        bd=$(qw "$backup_dir")
	        if ! ssh -n -- "$backup_ssh_target" "test -d $bd && rm -rf -- $bd" 2>/dev/null; then
		    error "cleanup of $backup_dir failed on $backup_ssh_target"
		    exit 1
		fi
            fi

            [ -n "$tblspc_list" ] && rm -f -- "$tblspc_list"
            [ -n "$replslot_list" ] && rm -f -- "$replslot_list"
            [ -n "$psql_stderr" ] && rm -f -- "$psql_stderr"
            [ -n "$backup_label_file" ] && rm -f -- "$backup_label_file"
            [ -n "$tablespace_map_file"  ] && rm -f -- "$tablespace_map_file"
            [ -n "$pg_control_file"   ] && rm -f -- "$pg_control_file"
            [ -n "$tmpwal" ] && rm -f -- "$tmpwal"
            ;;
        "check")
            if [ "$output" = "nagios" ]; then
                # Fail with return code 3 (unknown) in die() when
                # behaving like a nagios plugin
                exit 3
            fi
            ;;
    esac
}

# Messaging functions
now() {
    [ "$LOG_TIMESTAMP" = "yes" ] && echo "$(date "+%F %T %Z ")"
}

die() {
    echo "$(now)FATAL: $*" 1>&2
    cleanup
    exit 1
}

error() {
    echo "$(now)ERROR: $*" 1>&2
    out_rc=1
}

warn() {
    echo "$(now)WARNING: $*" 1>&2
}

info() {
    echo "$(now)INFO: $*"
}


load_config() {
    # Load the configuration file
    if [ -f "$config" ]; then
	. "$config"

        # Export libpq variables sourced from the configuration file
        # so that we have them in subshells
        export PGUSER
        export PGPORT
        export PGHOST
        export PGDATABASE
    else
        # Output an error if the file is missing and the user
        # specified it on the command line
        if [ "$cli_config" = "yes" ]; then
	    error "cannot access configuration file: $config"
        fi
    fi
}

parse_target_uri() {
    local backup_target="$1"
    local archive_target="$2"

    if [ -n "$backup_target" ]; then
        # Parse the backuptarget into user, host and path
        backup_user="$(echo "$backup_target" | grep '@' | cut -d'@' -f1 )"
        backup_host="$(echo "$backup_target" | grep ':' | sed -re 's/(.*):(.*)/\1/' | cut -d'@' -f2 )"
        backup_dir="$(echo "$backup_target" | sed -re 's/(.*):(.*)/\2/')"
    else
        # Fallback to the values from the configuration file
        [ -n "$BACKUP_USER" ] && backup_user="$BACKUP_USER"
        [ -n "$BACKUP_HOST" ] && backup_host="$BACKUP_HOST"
        [ -n "$BACKUP_DIR" ] && backup_dir="$BACKUP_DIR"
    fi

    [ -n "$backup_dir" ] || error "missing backup directory"

    # Deduce if backup is local
    if [ -z "$backup_host" ]; then
        backup_local="yes"
    else
        backup_local="no"

        # Wrap IPv6 addresses with brackets
        echo $backup_host | grep -qi '^[0123456789abcdef:]*:[0123456789abcdef:]*$' && backup_host="[${backup_host}]"

        # Add a shortcut for ssh/rsync commands
        backup_ssh_target="${backup_user:+$backup_user@}$backup_host"
    fi

    # Ensure the backup directory is an absolute path
    if [ "$backup_local" = "yes" ]; then
        backup_dir="$(readlink -m -- "$backup_dir")"
    else
        backup_dir="$(ssh -n -- "$backup_ssh_target" "readlink -m -- $(qw "$backup_dir")")"
    fi

    # Parse archive target the same way
    if [ -n "$archive_target" ]; then
        archive_user="$(echo "$archive_target" | grep '@' | cut -d'@' -f1 )"
        archive_host="$(echo "$archive_target" | grep ':' | sed -re 's/(.*):(.*)/\1/' | cut -d'@' -f2 )"
        archive_dir="$(echo "$archive_target" | sed -re 's/(.*):(.*)/\2/')"
    else
        # Fallback to the values of the configuration file. When the
        # path is not provided in the config file, fallback to backup values
        if [ -n "$ARCHIVE_DIR" ]; then
            [ -n "$ARCHIVE_USER" ] && archive_user="$ARCHIVE_USER"
            [ -n "$ARCHIVE_HOST" ] && archive_host="$ARCHIVE_HOST"
            archive_dir=$ARCHIVE_DIR
        else
            archive_user="$backup_user"
            archive_host="$backup_host"
            archive_dir="$backup_dir/archived_xlog"
        fi
    fi

    # Deduce if archives are local
    if [ -z "$archive_host" ]; then
	archive_local="yes"
    else
	archive_local="no"

        # Wrap IPv6 addresses with brackets
        echo $archive_host | grep -qi '^[0123456789abcdef:]*:[0123456789abcdef:]*$' && archive_host="[${archive_host}]"

        # Add a shortcut for ssh/rsync commands
        archive_ssh_target="${archive_user:+$archive_user@}$archive_host"
    fi

    [ -n "$archive_dir" ] || error "missing archive directory"

    # Ensure the achives directory is an absolute path
    if [ "$archive_local" = "yes" ]; then
        archive_dir="$(readlink -m -- "$archive_dir")"
    else
        archive_dir="$(ssh -n -- "$archive_ssh_target" "readlink -m -- $(qw "$archive_dir")")"
    fi

    return $out_rc
}

list_backups() {
    local dir="$1"
    
    if [ "$backup_local" = "yes" ]; then
        list=( "$dir/"[0-9]*/ )

        (( ${#list[@]} > 0 )) || die "could not find any backups in $dir/"
    else
        list=()
        while read -r -d '' d; do
	    list+=("$d")
        done < <(
	    ssh -n -- "$backup_ssh_target" \
	        "find $(qw "$dir") -mindepth 1 -maxdepth 1 -name '[0-9]*' -type d -print0 | sort -z"
        )

        (( ${#list[@]} > 0 )) || die "could not find any backups in $dir/ on $backup_host"
    fi
}

# Backup related functions
post_backup_hook() {
    if [ -n "$POST_BACKUP_COMMAND" ]; then
	# We need to set PITRERY_BACKUP_DIR again here, because it will have
	# changed since the PRE_BACKUP_COMMAND was run, unless something failed
	# and we're bailing out early via error_and_hook().
	info "running post backup command"
	PITRERY_HOOK="post_backup"
	PITRERY_BACKUP_DIR=$backup_dir
	export PITRERY_EXIT_CODE
	if ! $POST_BACKUP_COMMAND; then
	    die "post_backup command exited with a non-zero code"
	fi
    fi
}

# This special error function permit to run the post hook when the
# backup fails. This is because the post hook must run after the pre
# hook, while it is possible to have failure before (which need
# error())
die_and_hook() {
    echo "$(now)FATAL: $*" 1>&2
    PITRERY_EXIT_CODE=1
    post_backup_hook
    cleanup
    exit 1
}

stop_backup() {
    # This function is a signal handler, so block signals it handles
    trap '' INT TERM EXIT

    # Tell PostgreSQL the backup is done
    info "stopping the backup process"
    if (( $pg_version >= 90600 )) && (( ${BASH_VERSINFO[0]} >= 4 )); then

        # We have to parse the multiple column output of
        # pg_stop_backup(), so change the field separator to , so that
        # the pipe does not conflict with the output from
        # get_psql_output, which uses pipes for newline (newlines get
        # lost when storing the output as a string)
        echo '\pset fieldsep ,' >&${COPROC[1]}
        get_psql_output > /dev/null

        echo "select labelfile, spcmapfile from pg_stop_backup(false);" >&${COPROC[1]}
        if [ $? != 0 ]; then
            check_psql_stderr || cat $psql_stderr
            die_and_hook "could not stop backup process"
        fi

        result=$(get_psql_output)
        if [ -z "$result" ]; then
            check_psql_stderr || cat $psql_stderr
            die_and_hook "error while stopping the backup process"
        fi

        echo '\pset fieldsep |' >&${COPROC[1]}
        get_psql_output > /dev/null

        # We need a stop time for the backup to make the automatic
        # time-based selection of the backup. The stop time is not
        # part of the output of pg_stop_backup().
        echo "select to_char(now(), 'YYYY-MM-DD HH24:MI:SS TZ');" >&${COPROC[1]}
        if [ $? != 0 ]; then
            check_psql_stderr || cat $psql_stderr
            die_and_hook "could not get the stop time of the backup"
        fi

        stop_time=$(get_psql_output)
        if [ -z "$stop_time" ]; then
            check_psql_stderr || cat $psql_stderr
            die_and_hook "could not get the stop time of the backup"
        fi

        # Get the backup_label. We keep the contents in memory, so
        # that thes signal handler does not create any temporary files
        label_contents=$(cut -d',' -f1 <<< "$result")

        # Add the stop time field usually found in the archived label
        # file from exclusive backups
        label_contents="${label_contents}STOP TIME: $stop_time"

        # Get the tablespace map file
        spcmap_contents=$(cut -d',' -f2- <<< "$result")


    else
        if ! "${psql_command[@]}" -Atc "SELECT pg_stop_backup();" -- "$psql_condb" >/dev/null; then
	    die_and_hook "could not stop backup process"
        fi
    fi

    # Reset the signal handler, this function should only be called once
    trap - INT TERM KILL EXIT
}

# Restore related functions
check_and_fix_directory() {
    [ $# = 1 ] || die "check_and_fix_directory called with $# arguments"
    local dir=$1

    [ -n "$dir" ] || die "check_and_fix_directory called with empty dir argument"

    # Check if directory exists
    if [ ! -d "$dir" ]; then
        if [ "$dry_run" != "yes" ]; then
	    info "creating $dir with permission 0700"
	    # Note that if this creates any parent directories, their mode will be set
	    # according to the current umask, only the final leaf dir will be set 0700.
	    mkdir -p -m 700 -- "$dir" || die "could not create $dir"
        fi
    else
        # Check if directory is empty
	info "checking if $dir is empty"
	if [ -n "$(ls -A -- "$dir")" ]; then
            if [ "$dry_run" != "yes" ]; then
                [ "$OVERWRITE" = "yes" ] || die "$dir is not empty. Contents won't be overwritten"

	        # Cancel in case there may be a postmaster running.
	        if [ -e "$dir/postmaster.pid" ]; then
		    die "Found $dir/postmaster.pid. A postmaster may be running. Aborting."
	        fi

	        info "$dir is not empty, its contents will be overwritten"
	        # This is called after we know the storage
	        # method. When using "tar", we must clean the target
	        # directory. When using "rsync", we just let it do its
	        # diffs.
	        if [ "$STORAGE" = "tar" ]; then
		    info "Removing contents of $dir"
		    rm -rf -- "$dir/"*
	        fi
            else
                if [ "$OVERWRITE" = "yes" ]; then
                    warn "$dir is not empty, its contents WILL be overwritten"
                else
                    error "$dir is not empty, its contents won't be overwritten"
                fi
            fi
	else
	    # make rsync copy the whole files because target
	    # directories are empty
	    rsync_opts="$rsync_opts --whole-file"
	fi

	# Check permissions
	dperms=`stat -c %a -- "$dir" 2>/dev/null` || die "Unable to get permissions of $dir"

	if [ "$dperms" != "700" ]; then
            if [ "$dry_run" != "yes" ]; then
	        info "setting permissions of $dir"
	        chmod -- 700 "$dir" || die "$dir must have 0700 permission"
            fi
	fi
    fi

    if [ "$dry_run" != "yes" ]; then
        # Check owner
        downer=`stat -c %U -- "$dir" 2>/dev/null` || die "Unable to get owner of $dir"

        if [ "$downer" != "$PGOWNER" ]; then
	    if [ "`id -u`" = 0 ]; then
	        info "setting owner of $dir"
	        chown -- "$PGOWNER:" "$dir" || die "could not change owner of $dir to $PGOWNER"
	    else
	        die "$dir must be owned by $PGOWNER"
	    fi
        fi
    fi
}

# Check related functions
check_psql_command() {
    # Check if the provided psql works by asking its version
    if ! "${psql_command[@]}" -V >/dev/null 2>&1; then
        die "unable to execute psql"
    fi
}

check_ssh() {
    local ssh_target=$1

    if ssh -n -- "$ssh_target" "test -d /" 2>/dev/null; then
	info "ssh connection to $ssh_target ok"
	return 0
    else
	error "cannot connect to $ssh_target with ssh"
	return 1
    fi
}

check_remote_directory() {
    local ssh_target=$1
    local dest_dir=$2
    local dest_exists
    local dest_isdir
    local dest_writable

    dest_exists=$(ssh -n -- "$ssh_target" "[ -e $(qw "$dest_dir") ] || echo 'ko'")
    if [ $? = 0 ]; then
	if [ ! -n "$dest_exists" ]; then
	    dest_isdir=$(ssh -n -- "$ssh_target" "[ -d $(qw "$dest_dir") ] || echo 'ko'")
	    if [ $? = 0 ]; then
		if [ ! -n "$dest_isdir" ]; then
		    info "target directory '$dest_dir' exists"
		    dest_writable=$(ssh -n -- "$ssh_target" "[ -w $(qw "$dest_dir") ] || echo 'ko'")
		    if [ $? = 0 ]; then
			if [ ! -n "$dest_writable" ]; then
			    info "target directory '$dest_dir' is writable"
			else
			    error "target directory '$dest_dir' is NOT writable"
			fi
		    else
			error "could not check directory ${ssh_target}:$dest_dir"
		    fi
		else
		    error "target '$dest_dir' exists but is NOT a directory"
		fi
	    else
		error "could not check directory ${ssh_target}:$dest_dir"
	    fi
	else
	    error "target directory '$dest_dir' does NOT exist or is NOT reachable"
	fi
    else
	error "could not check directory ${ssh_target}:$dest_dir"
    fi
}

check_local_directory() {
    local dest_dir=$1

    if [ -e "$dest_dir" ]; then
	if [ -d "$dest_dir" ]; then
	    info "target directory '$dest_dir' exists"
	    if [ -w "$dest_dir" ]; then
		info "target directory '$dest_dir' is writable"
	    else
		error "target directory '$dest_dir' is NOT writable"
	    fi
	else
	    error "target '$dest_dir' exists but is NOT a directory"
	fi
    else
	error "target directory '$dest_dir' does NOT exist or is NOT reachable"
    fi
}

check_postgresql_config() {
    local out_rc=0

    # Get the complete version from PostgreSQL
    pg_dotted_version=$("${psql_command[@]}" -Atc "SELECT setting FROM pg_settings WHERE name = 'server_version';" -- "$psql_condb" 2>/dev/null)
    rc=$?
    if [ $rc = 127 ]; then
        error "psql invocation error: command not found"
    elif [ $rc = 2 ]; then
        error "could not connect to PostgreSQL"
    elif [ $rc != 0 ]; then
        error "could not get the version of the server"
    else
        info "PostgreSQL version is: $pg_dotted_version"

        # Now get the numerical version of PostgreSQL so that we can compare
        # it
        if ! pg_version=$("${psql_command[@]}" -Atc "SELECT setting FROM pg_settings WHERE name = 'server_version_num';" -- "$psql_condb"); then
	    error "could not get the numerical version of the server"
        fi

        # Check if the server is in hot standby. If so, basebackup
        # functions won't work. It is only necessary as of 9.0, before it
        # was not possible to a warm standby anyway.
        if (( 10#$pg_version >= 90000 )); then
	    if ! hot_standy=$("${psql_command[@]}" -Atc "SELECT pg_is_in_recovery();" -- "$psql_condb"); then
	        error "could not check if the server is in hot standby"
	    else
	        if [[ $hot_standby == "t" ]]; then
		    error "server is in hot standby, backup won't work"
	        fi
	    fi
        fi

        # Check the attributes of the connection role, it must be
        # superuser or have the replication attribute (>= 9.1)
        if (( 10#$pg_version >= 90100 )); then
	    if ! role=$("${psql_command[@]}" -Atc "SELECT rolname FROM pg_roles WHERE rolname = current_user AND (rolsuper OR rolreplication);" -- "$psql_condb"); then
	        error "could not check if connection role has privileges to run backup functions"
	    else
	        if [ -n "$role" ]; then
		    info "connection role can run backup functions"
	        else
		    error "connection role cannot run backup functions"
	        fi
	    fi
        else
	    if ! role=$("${psql_command[@]}" -Atc "SELECT rolname FROM pg_roles WHERE rolname = current_user AND rolsuper;" -- "$psql_condb"); then
	        error "could not check if connection role has privileges to run backup functions"
	    else
	        if [ -n "$role" ]; then
		    info "connection role can run backup functions"
	        else
		    error "connection role cannot run backup functions"
	        fi
	    fi
        fi

        # Check configuration of PostgreSQL
        info "current configuration:"
        if (( $pg_version >= 90000 )); then
	    if ! wal_level=$("${psql_command[@]}" -Atc "SELECT setting FROM pg_settings WHERE name = 'wal_level';" -- "$psql_condb"); then
	        error "could not get the get the value of wal_level"
	    fi
	    info "  wal_level = $wal_level"
        fi

        if (( $pg_version >= 80300 )); then
	    if ! archive_mode=$("${psql_command[@]}" -Atc "SELECT setting FROM pg_settings WHERE name = 'archive_mode';" -- "$psql_condb"); then
	        error "could not get the get the value of archive_mode"
	    fi
	    info "  archive_mode = $archive_mode"
        fi

        if ! archive_command=$("${psql_command[@]}" -Atc "SELECT setting FROM pg_settings WHERE name = 'archive_command';" -- "$psql_condb"); then
	    error "could not get the get the value of archive_command"
        fi
        info "  archive_command = '$archive_command'"

	if ! syslog=$("${psql_command[@]}" -Atc "SELECT setting ~ 'syslog' FROM pg_settings WHERE name = 'log_destination';" -- "$psql_condb"); then
	    warn "could not get get the value of log_destination"
	fi

	if [ "$syslog" = "t" ]; then
	    if ! syslog_facility=$("${psql_command[@]}" -Atc "SELECT setting FROM pg_settings WHERE name = 'syslog_facility';" -- "$psql_condb"); then
		warn "could not get get the value of syslog_facility"
	    fi

	    if ! syslog_ident=$("${psql_command[@]}" -Atc "SELECT setting FROM pg_settings WHERE name = 'syslog_ident';" -- "$psql_condb"); then
		warn "could not get get the value of syslog_ident"
	    fi
	fi

        # wal_level must be different than minimal
        if [ -n "$wal_level" ] && [ $wal_level = "minimal" ]; then
            if (( $pg_version >= 90000 )); then
                if (( $pg_version < 90600 )); then
	            error "wal_level must be set at least to archive"
                else
                    # archive and hot_standby levels have been merged into
                    # replica starting from 9.6
                    error "wal_level must be set at least to replica"
                fi
            fi
        fi

        if [ -n "$archive_mode" ] && [ $archive_mode = "off" ]; then
	    error "archive_mode must be set to on"
        fi

        if [ -z "$archive_command" ]; then
	    error "archive_command is empty"
        fi

        # Get the data directory from PostgreSQL, later we can compare it
        # to PGDATA variable and see if the configuration is correct.
        if ! data_directory=$("${psql_command[@]}" -Atc "SELECT setting FROM pg_settings WHERE name = 'data_directory';" -- "$psql_condb"); then
	    error "could not get the get the value of data_directory"
        fi

    fi

    return $out_rc
}

# Simple helper function to output a parameter and replace it in the
# given file. Since some OS do not support the -i option of sed, we
# have to use a temporary file. Since we are in the configuration
# stage, switching a lot between temp files is not a performance
# issue.
output_param() {
    local param=$1
    local value=$2
    local file=$3

    echo "${param}=\"${value}\""

    if [ -n "$file" ]; then
	local tmpfile=$(mktemp -t pitr_config_sed.XXXXXXXXXX) ||
            error "Failed to create temporary file"

	# Replace the parameter with sed, the value is quoted so that
	# commas in the value do not conflict with our sed sed
	# construct
	v=$(qw "$value")

	# qw can output string starting with a $, sourcing the
	# configuration file later will make the value ok, but if the
	# sed that replaces the values inside the configuration puts
	# double quote around the value it will prevent source from
	# interpreting $ starting strings correctly.
	echo "$v" | grep '^\$' >/dev/null 2>&1
	if [ $? = 0 ]; then
	    cat "$file" | sed -re "s/^#${param}=.*/${param}=${v//\//\\/}/" > "$tmpfile" ||
		error "Cannot change parameter in configuration file"
	else
	    cat "$file" | sed -re "s/^#${param}=.*/${param}=\"${v//\//\\/}\"/" > "$tmpfile" ||
		error "Cannot change parameter in configuration file"
	fi

	mv "$tmpfile" "$file" || error "Cannot rename tmpfile to configuration file"
    fi
}

# Helpers for different formats in the list action
list_output_indent() {
    if [ -z "$indent_spc" ]; then
        indent_spc=0
    else
            if [ "$comma" = "yes" ]; then
                echo ","
                comma="no"
            else
                echo
            fi
                    if (( $indent_spc > 0 )); then
            printf ' %.0s' $(seq 1 $indent_spc)
        fi
    fi
}

list_output_start_hash() {
    case "$output" in
        "json")
            list_output_indent
            # wrap the lower cased input with double quote before opening
            echo -n "${1:+"\"${1,,}\": "}{"
            indent_spc=$(($indent_spc + 2))
            ;;
        *)
            if [ -n "$1" ]; then
                echo "${1}:"
            fi
            ;;
    esac
}

list_output_end_hash() {
    case "$output" in
        "json")
            comma="no"
            (( ${indent_spc:-0} >= 2 )) && indent_spc=$(($indent_spc - 2))
            list_output_indent
            echo -n "}"
            comma="yes"
            # trailing newline when closing the json object
            (( ${indent_spc:-0} == 0 )) && echo
            ;;
    esac
}

list_output_text() {
    if [ "$output" != "json" ]; then
        echo "${1}${2:+": $2"}"
    fi
}

list_output_json() {
    if [ "$output" = "json" ]; then
        list_output_indent

        shopt -s extglob
        p=${1##*( )}

        if [ -z "$2" ]; then
            # default to null
            echo -n "\"${p,,}\": null"
        elif [ "$2" = "true" ] || [ "$2" = "false" ] || [ "$2" = "null" ] || [[ "$2" =~ ^[0-9]+$ ]]; then
            # do not quote numbers, booleans and null
            echo -n "\"${p,,}\": $2"
        else
            echo -n "\"${p,,}\": \"$2\""
        fi
        comma="yes"
        shopt -u extglob
    fi
}

list_output_all() {
    case "$output" in
        "json")
            list_output_json "$@"
            ;;
        *)
            list_output_text "$@"
            ;;
    esac
}

list_output_start_list() {
    case "$output" in
        "json")
            list_output_indent
            echo -n "${1:+"\"${1,,}\": "}["
            [ -n "$1" ] && indent_spc=$(($indent_spc + 2))
            ;;
    esac
}

list_output_end_list() {
    case "$output" in
        "json")
            comma="no"
            (( ${indent_spc:-0} >= 2 )) && indent_spc=$(($indent_spc - 2))
            list_output_indent
            echo -n "]"
            comma="yes"
            ;;
    esac
}


init_rsync_opts() {
    if [[ "$RSYNC_WHOLEFILE" == "yes" ]]; then
        rsync_opts="$rsync_opts --whole-file"
    fi

    if [[ -n "$RSYNC_BWLIMIT" ]]; then
        if [[ "$RSYNC_BWLIMIT" == "$(sed -r -e 's/([^kmgbi0-9])//gi' <<< "$RSYNC_BWLIMIT")" ]]; then
            rsync_opts="$rsync_opts --bwlimit=$RSYNC_BWLIMIT"
        else
            warn "bad value for RSYNC_BWLIMIT: '$RSYNC_BWLIMIT'"
        fi
    fi
}

while getopts "c:f:lV?" opt; do
    case $opt in
	c|f)
            config=$OPTARG
            cli_config="yes"
            ;;
	l) # List the configuration files and exit
            info "listing configuration files in $config_dir"
            if [ -d "$config_dir" ]; then
                find "$config_dir" -mindepth 1 -maxdepth 1 -name '*.conf' | xargs -i basename '{}' .conf
            fi
            exit 0
            ;;
	V) echo "pitrery $version"; exit 0;;
	'?') usage;;
	*) die "error while processing options";;
    esac
done

# Ensure failed globs will be empty, not left containing the literal glob pattern
shopt -s nullglob

# check for an action/subcommand, ensure OPTIND is correctly set to
# process the options of the action
if (( $# < 1 )); then
    error "missing action"
    usage 1
fi

action=${@:$OPTIND:1}
OPTIND=$(( $OPTIND + 1 ))

# Process the help action as soon as possible, it boils down to
# printing usage and exiting
if [ "$action" = "help" ]; then
    while getopts "?" arg  2>/dev/null; do
	case $arg in
            '?') usage "help";;
            *) die "error while processing options";;
        esac
    done

    usage ${@:$OPTIND:1}
fi

# Check if the config option is a path or just a name in the
# configuration directory.  Prepend the configuration directory and
# .conf when needed.
if [[ $config != */* ]]; then
    config="$config_dir/$(basename -- "$config" .conf).conf"
fi


opts=()
case $action in
    list)
	load_config
        verbose=""
        output=""

	# Parse args after action: they should take precedence over the configuration
	while getopts "jv?" arg 2>/dev/null; do
	    case $arg in
                j) output="json"; verbose="yes";;
		v) verbose="yes";;
		'?') usage "list";;
	    esac
	done

        parse_target_uri "${@:$OPTIND:1}" || usage "list"

        list_backups "$backup_dir" # to the list array

        # Begin json
        list_output_start_hash
        
        # Print a header
        if [ "$backup_local" = "yes" ]; then
            list_output_text "List of local backups"

            list_output_json "local" "true"
            list_output_json "host" "$backup_host"
            list_output_json "user"
        else
            list_output_text "List of backups on $backup_host"

            list_output_json "local" "false"
            list_output_json "host" "$backup_host"
            list_output_json "user" "${backup_user:-$(id -un)}"
        fi

        list_output_json "directory" "$backup_dir"

        # Print the directory and stop time of each backup
        list_output_start_list "backups"

        for dir in "${list[@]%/}"; do
            # Check the permissions
            if [ "$backup_local" = "yes" ]; then
                if [[ ! -r "$dir" ]] || [[ ! -x "$dir" ]]; then
                    error "acces denied to $dir"
                    continue
                fi
            else
                if ! ssh -n -- "$backup_ssh_target" "([ -r $(qw "$dir") ] && [ -x $(qw "$dir") ])"; then
                   error "acces denied to $dir"
                   continue
                fi
            fi

            # Print the details of the backup dir
            if [ -n "$verbose" ]; then
	        list_output_text "----------------------------------------------------------------------"
                list_output_text "Directory:"
	        list_output_start_hash
                list_output_text "  $dir"

                list_output_json "directory" "$dir"
            else
	        echo -ne "$dir\t"
            fi

	    # Compute the size of full backup
            backup_size=
            if [ "$backup_local" = "yes" ]; then
	        backup_size=( $(du -sh -- "$dir" 2>/dev/null) )
            else
                backup_size=( $(ssh -n -- "$backup_ssh_target" "du -sh -- $(qw "$dir") 2>/dev/null" 2>/dev/null) )
            fi

            if [ -n "$backup_size" ]; then
	        if [ -n "$verbose" ]; then
                    list_output_text "  space used" "$backup_size"
                    list_output_json "space_used" "$backup_size"
	        else
	            echo -ne "$backup_size\t"
	        fi
            else
                error "could not find size of $dir"
	    fi
            
	    # Find the storage method of the backup, with compression suffix for tar
            storage=
            encryption="false"
            if [ "$backup_local" = "yes" ]; then
	        if [ -d "$dir/pgdata" ]; then
                    storage="rsync"
	        else
		    prefix=$dir/pgdata.tar.
		    tarfile=( "$prefix"* )
                    [[ ${#tarfile[@]} == 1 ]] && storage="tar"
                fi
            else
                if ssh -n -- "$backup_ssh_target" "test -d $(qw "$dir/pgdata")" 2>/dev/null; then
                    storage="rsync"
	        else
		    prefix=$dir/pgdata.tar.
		    tarfile=()
		    while read -r -d '' d; do
		        tarfile+=("$d")
		    done < <(
		        ssh -n -- "$backup_ssh_target" "find $(qw "$dir") -maxdepth 1 -name 'pgdata.tar.*' -type f -print0"
		    )
                    [[ ${#tarfile[@]} == 1 ]] && storage="tar"
                fi
            fi

            if [ -n "$verbose" ]; then
                case "$storage" in
                    rsync)
                        list_output_all "  storage" "rsync"
                        list_output_json "compression" ""
                        ;;
                    tar)
                        if [[ "${tarfile[0]}" =~ pgdata\.tar\.gpg ]]; then
                            encryption="true"
                        fi
		        suffix=${tarfile#$prefix}
		        list_output_text "  storage" "tar with ${suffix:-"no"} compression"
                        list_output_json "storage" "tar"
                        list_output_json "compression" "$suffix"
                        ;;
                    *)
		        error "no rsync dir and ${#tarfile[@]} pgdata.tar files found"
                        ;;
                esac

                list_output_text "  encryption" $encryption
                list_output_json "encryption" $encryption
	    fi

            # Print the minimum recovery target time with this backup
            stop_time=
            if [ "$backup_local" = "yes" ]; then
	        if [ -f "$dir/backup_label" ]; then
	            if ! stop_time=$(sed -n 's/STOP TIME: //p' -- "$dir/backup_label") || [ -z "$stop_time" ]; then
                        error "could not get \"stop time\" from $dir/backup_label"
                    fi
                else
                    error "could not find the backup_label file"
                fi
            else
                if ssh -n -- "$backup_ssh_target" "test -f $(qw "$dir/backup_label")" 2>/dev/null; then
                    if ! stop_time=$(ssh -n -- "$backup_ssh_target" "sed -n 's/STOP TIME: //p' -- $(qw "$dir/backup_label")") || [ -z "$stop_time" ]; then
                        error "could not get \"stop time\" from $backup_host:$dir/backup_label"
                    fi
                else
                    error "could find the backup_label file"
	        fi
            fi

            if [ -n "$stop_time" ]; then
                if [ -n "$verbose" ]; then
                    list_output_text "Minimum recovery target time:"
		    list_output_text "  $stop_time"
                    list_output_json "stop_time" "$stop_time"
                else
                    echo "  $stop_time"
                fi
	    fi

            # Name, path and space used at backup time of PGDATA and tablespaces
            tblspc_list=
            if [ "$backup_local" = "yes" ]; then
	        if [ -f "$dir/tblspc_list" ]; then
		    # Only show sizes of PGDATA if available
                    tblspc_list=$(< "$dir/tblspc_list") || error "failed to read $dir/tblspc_list"
                else
                    error "could not find the list of tablespaces (tblspc_list)"
                fi
            else
                if ssh -n -- "$backup_ssh_target" "test -f $(qw "$dir/tblspc_list")" 2>/dev/null; then
                    if ! tblspc_list=$(ssh -n -- "$backup_ssh_target" "cat $(qw "$dir/tblspc_list")" 2>/dev/null); then
                        error "could not read the list of tablespaces (tblspc_list)"
                    fi
                else
                    error "could not find the list of tablespaces (tblspc_list)"
                fi
            fi

            if [ -n "$verbose" ]; then
                list_output_start_list "tablespaces"
                list_output_text "PGDATA:"
                if [ -n "$tblspc_list" ]; then
                    while read -r l; do
                        tname=$(cut -d '|' -f 1 <<< "$l")
                        tdir=$(cut -d '|' -f 2 <<< "$l")
                        toid=$(cut -d '|' -f 3 <<< "$l")
                        tsize=$(cut -d '|' -f 4 <<< "$l")

                        # only show PGDATA
                        [ -n "$tdir" ] && continue

                        list_output_text "  $tname $tsize"

                        list_output_start_hash
                        list_output_json "name" "$tname"
                        list_output_json "location" "$tdir"
                        list_output_json "oid" "$toid"
                        list_output_json "size" "$tsize"
                        list_output_end_hash
                        comma="yes"
                    done <<< "$tblspc_list"

		    list_output_text "Tablespaces:"

                    while read -r l; do
                        tname=$(cut -d '|' -f 1 <<< "$l")
                        tdir=$(cut -d '|' -f 2 <<< "$l")
                        toid=$(cut -d '|' -f 3 <<< "$l")
                        tsize=$(cut -d '|' -f 4 <<< "$l")

                        # do not show PGDATA
                        [ -z "$tdir" ] && continue

                        list_output_text "  \"$tname\" $tdir ($toid) $tsize"

                        list_output_start_hash
                        list_output_json "name" "$tname"
                        list_output_json "location" "$tdir"
                        list_output_json "oid" "$toid"
                        list_output_json "size" "$tsize"
                        list_output_end_hash
                        comma="yes"
                    done <<< "$tblspc_list"
                else
                    error "no tablespaces listed"
	        fi
                list_output_end_list
            else
                [ -n "$tblspc_list" ] || error "no tablespaces listed"
            fi

            if [ "$out_rc" != 0 ]; then
                list_output_json "valid" "false"
	        error "this backup may be incomplete or corrupted"
                # We can safely reset out_rc here: only this for loop may call error()
                out_rc=0
            else
                list_output_json "valid" "true"
                [ -n "$verbose" ] && list_output_text ""
            fi

            list_output_end_hash
        done
        list_output_end_list
        list_output_end_hash
        ;;

    backup)
	load_config

	# Parse args after action: they should take precedence over the configuration
	while getopts "D:s:c:e:Er:tTP:h:p:U:d:?" arg 2>/dev/null; do
	    case $arg in
                D) PGDATA=$OPTARG;;
		s) STORAGE=$OPTARG;;
		c) BACKUP_COMPRESS_BIN=$OPTARG;;
		e) BACKUP_COMPRESS_SUFFIX=$OPTARG;;
                E) BACKUP_ENCRYPT="yes";;
                r) GPG_ENCRYPT_KEYS="$OPTARG";;
                t) USE_ISO8601_TIMESTAMPS="yes";;
		T) LOG_TIMESTAMP="yes";;

	        P) PGPSQL=( "$OPTARG" );;
	        h) dbhost=$OPTARG;;
	        p) dbport=$OPTARG;;
	        U) dbuser=$OPTARG;;
	        d) dbname=$OPTARG;;

		'?') usage "backup";;
	    esac
	done

        # set the variables needed to access storage (handling of cli
        # vs config is handled by the function)
        parse_target_uri "${@:$OPTIND:1}" || usage "backup"

        # initialize the target path
        backup_root=$backup_dir
        backup_dir="$backup_root/current"

        # Only tar or rsync are allowed as storage method
        if [ "$STORAGE" != "tar" ] && [ "$STORAGE" != "rsync" ]; then
            die "storage method must be 'tar' or 'rsync'"
        fi

        # PGDATA may be used directly from the environment, so check if it is ok
        [ -n "$PGDATA" ] || die "PGDATA is not set"
        [ -d "$PGDATA" ] || die "PGDATA is not a directory"

        # When GPG encryption is used, ensure the storage is tar or print a warning
        if [ "$BACKUP_ENCRYPT" = "yes" ] && [ "$STORAGE" != "tar" ]; then
            warn "encryption only available with the 'tar' storage method"
            BACKUP_ENCRYPT="no"
        fi

        # Check if we have recipients for encryption
        if [ "$BACKUP_ENCRYPT" = "yes" ]; then
            if [ -z "$GPG_ENCRYPT_KEYS" ]; then
                die "missing recipients for GPG encryption"
            else
                # Prepare and check GPG command line
                gpg_command=( "$GPG_BIN" "--batch" "--yes" "--encrypt" )

                while read -r -d '' o; do
                    gpg_command+=( "--recipient" "$o" )
                done < <(tr ':' '\0' <<< "${GPG_ENCRYPT_KEYS}:")

                info "Checking GPG encryption"
                echo "test" | "${gpg_command[@]}" > /dev/null
                rc=(${PIPESTATUS[*]})
                gpg_rc=${rc[1]}
                if [ "$gpg_rc" != 0 ]; then
                    die "failed to check GPG encryption. command line:" "${gpg_command[@]}"
                fi
            fi
        fi

        # Prepare the text will put in the backup_label
        label_text="pitrery_${version}_`date +%s`"

        # Prepare psql command line. Starting from 9.6 .psqlrc is sourced with
        # psql -c or -f, so we force -X
        psql_command=( "$PGPSQL" "-X" )
        [ -n "$dbhost" ] && psql_command+=( "-h" "$dbhost" )
        [ -n "$dbport" ] && psql_command+=( "-p" "$dbport" )
        [ -n "$dbuser" ] && psql_command+=( "-U" "$dbuser" )

        psql_condb=$dbname

        check_psql_command

        # Exports for both the pre and post backup hooks.
        export PITRERY_HOOK="pre_backup"
        export PITRERY_BACKUP_DIR=$backup_dir
        export PITRERY_PSQL="${psql_command[@]}"
        export PITRERY_DATABASE=$psql_condb
        export PITRERY_BACKUP_LOCAL=$backup_local
        export PITRERY_SSH_TARGET=$backup_ssh_target

        # Get the version of the server
        if ! pg_version=$("${psql_command[@]}" -Atc "SELECT setting FROM pg_settings WHERE name = 'server_version_num';" \
				               -- "$psql_condb"); then
            die "could not get the version of the server"
        fi

        # As of PostgreSQL 10, "xlog" has been changed to "wal", it
        # applies to directories and functions
        if (( 10#$pg_version >= 100000 )); then
            xlog_or_wal="wal"
        else
            xlog_or_wal="xlog"
        fi

        # Check if the server is in hot standby, it can happen from 9.0
        # otherwise we would have already exited on error.
        if (( 10#$pg_version >= 90000 )); then
            if ! standby=$("${psql_command[@]}" -Atc "SELECT pg_is_in_recovery();" -- "$psql_condb"); then
	        die "could not check if the server is in recovery"
            fi

            if [ "$standby" = "t" ]; then
                # Starting from 9.6 it is possible to backup from a standby
                # using an non-exclusive backup
                if (( $pg_version < 90600 )); then
                    die "unable to perform a base backup on a server in recovery mode. Aborting"
                fi

                # We use a coprocess for non-exclusive backups, the feature is
                # available in bash 4 and later.
                if (( ${BASH_VERSINFO[0]} < 4 )); then
                    die "bash version is too old to perform a non-exclusive backup from a standby server. Aborting"
                else
                    info "performing backup from hot standby server"
                fi
            fi
        fi

        # Prepare target directories
        info "preparing directories in ${backup_ssh_target:+$backup_ssh_target:}$backup_root/"

        if [ "$backup_local" = "yes" ]; then
            # Ensure the destination is clean from failed backups and that no
            # concurrent backup is running, the "current" temporary directory
            # acts as a lock.
            if [ -e "$backup_dir" ]; then
                die "$backup_dir already exists, another backup may be in progress"
            fi

            can_cleanup="yes"
            if ! mkdir -p -- "$backup_dir/tblspc"; then
	        die "could not create $backup_dir/tblspc"
            fi

            # Make the backup directory private
            if ! chmod 700 -- "$backup_dir"; then
                die "could not change permissions of $backup_dir"
            fi
        else
            if ssh -n -- "$backup_ssh_target" "test -e $(qw "$backup_dir")" 2>/dev/null; then
	        die "$backup_dir already exists, another backup may be in progress"
            fi

            can_cleanup="yes"
            if ! ssh -n -- "$backup_ssh_target" "mkdir -p -- $(qw "$backup_dir/tblspc")" 2>/dev/null; then
	        die "could not create $backup_dir/tblspc"
            fi

            # Make the backup directory private
            if ! ssh -n -- "$backup_ssh_target" "chmod 700 -- $(qw "$backup_dir")" 2>/dev/null; then
	        die "could not change permissions of $backup_dir"
            fi
        fi

        # Execute the pre-backup command
        if [ -n "$PRE_BACKUP_COMMAND" ]; then
            info "running pre backup hook"
            if ! $PRE_BACKUP_COMMAND; then
	        die "pre_backup command exited with a non-zero code"
            fi
        fi

        # Get the list of tablespaces. It comes from PostgreSQL to be sure to
        # process only defined tablespaces.
        info "listing tablespaces"
        if ! tblspc_list=$(mktemp -t backup_pitr.XXXXXXXXXX); then
            die_and_hook "could not create temporary file"
        fi

        # Starting from 9.2, the location of tablespaces is no longer stored
        # in pg_tablespace. This allows to change locations of tablespaces by
        # modifying the symbolic links in pg_tblspc. As a result, the query to
        # get list of tablespaces is different.

        # Ask PostgreSQL the list of tablespaces
        if (( $pg_version >= 90200 )); then
            "${psql_command[@]}" -Atc "SELECT spcname, pg_tablespace_location(oid), oid, pg_size_pretty(pg_tablespace_size(oid)) FROM pg_tablespace;" -- "$psql_condb" > "$tblspc_list"
            rc=$?
        else
            "${psql_command[@]}" -Atc "SELECT spcname, spclocation, oid, pg_size_pretty(pg_tablespace_size(oid)) FROM pg_tablespace;" -- "$psql_condb" > "$tblspc_list"
            rc=$?
        fi

        if [ $rc != 0 ]; then
            die_and_hook "could not get the list of tablespaces from PostgreSQL"
        fi

        # Start the backup
        info "starting the backup process"

        # Starting from 9.6, PostgreSQL support concurrent base backups, those
        # are names non-exclusive backups. The older behaviour of exclusive
        # backups may be deprecated.
        if (( $pg_version >= 90600 )) && (( ${BASH_VERSINFO[0]} >= 4 )); then
            info "performing a non-exclusive backup"
            # When taking a base backup in non-exclusive mode, the session
            # that issues the call to pg_start_backup() must stay connected
            # during the whole operation. We start psql inside a coprocess and
            # interact with it. Since the coprocess do not offer a pipe to
            # capture stderr, we redirect it to a temporary file. Testing if
            # this file is empty let us know that no error occured.
            if ! psql_stderr=$(mktemp -t backup_pitr_psql_stderr.XXXXXXXXXX); then
                die_and_hook "could not create temporary file"
            fi

            coproc ${psql_command[@]} -At $psql_condb 2>$psql_stderr

            get_psql_output() {
                # First wait for output to be ready on the fd. When there is
                # an error, no output go to the fd
                while ! read -t 0 -u ${COPROC[0]}; do
                    if ! grep -E "^(ERROR|FATAL)" $psql_stderr >/dev/null 2>&1; then
                        sleep 1
                    else
                        return 1
                    fi
                done

                # When the fd has some data, read everything and change
                # newlines to pipe characters, to pass through expansion of
                # newline to space, when stored as a string.
                ret_str=''
                while read -t 1 -u ${COPROC[0]} line; do
                    [ -n "$line" ] && ret_str="$ret_str$line|"
                done
                echo -n $ret_str | sed 's/|$//'
                return 0
            }

            check_psql_stderr() {
                if grep -E "^(ERROR|FATAL)" $psql_stderr >/dev/null 2>&1; then
                    return 1
                else
                    return 0
                fi
            }

            # Check if the connection works by getting the pid of the backend
            echo 'select pg_backend_pid();' >&${COPROC[1]} # 2>/dev/null
            if [ $? != 0 ]; then
                check_psql_stderr || cat $psql_stderr
                die_and_hook "could not check connection to PostgreSQL"
            fi
            psql_pid=$(get_psql_output)

            # Start the base backup
            echo "select pg_start_backup('${label_text}', true, false);"  >&${COPROC[1]}
            if [ $? != 0 ]; then
                check_psql_stderr || cat $psql_stderr
                die_and_hook "could not start backup process (command sending)"
            fi

            start_backup_lsn=$(get_psql_output)
            if [ -z "$start_backup_lsn" ]; then
                check_psql_stderr || cat $psql_stderr
                die_and_hook "could not start backup process (empty output)"
            fi
        else
            # Force a checkpoint for version >= 8.4. We add some
            # parsing of the result of pg_xlogfile_name_offset (or
            # pg_walfile_name_offset as of 10) on the LSN returned by
            # pg_start_backup, so that we have the name of the
            # backup_label that will be archived after pg_stop_backup
            # completes
            if (( $pg_version >= 80400 )); then
                start_backup_label_file=`${psql_command[@]} -Atc "select i.file_name ||'.'|| lpad(upper(to_hex(i.file_offset)), 8, '0') || '.backup' from pg_${xlog_or_wal}file_name_offset(pg_start_backup('${label_text}', true)) as i;" $psql_condb`
                rc=$?
            else
                start_backup_label_file=`${psql_command[@]} -Atc "select i.file_name ||'.'|| lpad(upper(to_hex(i.file_offset)), 8, '0') || '.backup' from pg_${xlog_or_wal}file_name_offset(pg_start_backup('${label_text}')) as i;" $psql_condb`
                rc=$?
            fi

            if [ $rc != 0 ]; then
                die_and_hook "could not start backup process"
            fi
        fi

        # Add a signal handler to avoid leaving the cluster in backup mode when exiting on error
        trap stop_backup INT TERM KILL EXIT

        # When using rsync storage, search for the previous backup to prepare
        # the target directories. We try to optimize the space usage by
        # hardlinking the previous backup, so that files that have not changed
        # between backups are not duplicated from a filesystem point of view
        if [ "$STORAGE" = "rsync" ]; then
            if [ "$backup_local" = "yes" ]; then
	        list=( "$backup_root/"[0-9]*/ )
	        if (( ${#list[@]} > 0 )); then
	            _dir=${list[*]: -1}

	            # Since the previous backup can be in tar storage, check
	            # that a pgdata subdirectory exists
	            [ -d "${_dir%/}/pgdata" ] && prev_backup=${_dir%/}
	        fi
            else
	        _dir=$(ssh -n -- "$backup_ssh_target" "f=\$(find $(qw "$backup_root") -mindepth 1 -maxdepth 1 -name '[0-9]*' -type d -print0 | sort -rz | cut -d '' -f1) && printf '%s' \"\$f\"")
	        if ssh -n -- "$backup_ssh_target" "test -d $(qw "$_dir/pgdata")" 2>/dev/null; then
	            prev_backup="$_dir"
	        fi
            fi
        fi

        # Enable the extended pattern matching operators.
        # We use them here for replacing whitespace in the tablespace tarball names.
        shopt -s extglob

        # Copy the files
        case $STORAGE in
            "tar")
                # Tar $PGDATA
	        info "backing up PGDATA with tar"
	        was=`pwd`
	        if ! cd -- "$PGDATA"; then
	            die_and_hook "could not change current directory to $PGDATA"
	        fi

                # prepare the list of files and directories to exclude
                excluded=("pg_${xlog_or_wal}" 'pg_replslot/*' 'postmaster.*' 'pgsql_tmp' 'restored_config_files' 'backup_label.old' '*.sql')
                excluded_opts=()
                for i in "${excluded[@]}"; do
                    exclude_opts+=("--exclude=$i")
                done
                
	        info "archiving $PGDATA"
	        if [ "$backup_local" = "yes" ]; then
                    if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                        tar -cpf - --ignore-failed-read "${exclude_opts[@]}" -- * 2>/dev/null | "${gpg_command[@]}" -o "$backup_dir/pgdata.tar.gpg"
                        rc=(${PIPESTATUS[*]})
	                tar_rc=${rc[0]}
                        gpg_rc=${rc[1]}
                        if [ "$tar_rc" = 2 ] || [ "$gpg_rc" != 0 ]; then
		            die_and_hook "could not tar PGDATA"
	                fi
                    else
	                tar -cpf - --ignore-failed-read "${exclude_opts[@]}" -- * 2>/dev/null | $BACKUP_COMPRESS_BIN > "$backup_dir/pgdata.tar.$BACKUP_COMPRESS_SUFFIX"
	                rc=(${PIPESTATUS[*]})
	                tar_rc=${rc[0]}
	                compress_rc=${rc[1]}
	                if [ "$tar_rc" = 2 ] || [ "$compress_rc" != 0 ]; then
		            die_and_hook "could not tar PGDATA"
	                fi
                    fi
	        else
                    if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                        tar -cpf - --ignore-failed-read "${exclude_opts[@]}" -- * 2>/dev/null | "${gpg_command[@]}" | ssh -- "$backup_ssh_target" "cat > $(qw "$backup_dir/pgdata.tar.gpg")" 2>/dev/null
                        rc=(${PIPESTATUS[*]})
	                tar_rc=${rc[0]}
                        gpg_rc=${rc[1]}
                        ssh_rc=${rc[2]}
                        if [ "$tar_rc" = 2 ] || [ "$gpg_rc" != 0 ] || [ "$ssh_rc" != 0 ]; then
		            die_and_hook "could not tar PGDATA"
	                fi
                    else
	                tar -cpf - --ignore-failed-read "${exclude_opts[@]}" -- * 2>/dev/null | $BACKUP_COMPRESS_BIN | ssh -- "$backup_ssh_target" "cat > $(qw "$backup_dir/pgdata.tar.$BACKUP_COMPRESS_SUFFIX")" 2>/dev/null
	                rc=(${PIPESTATUS[*]})
	                tar_rc=${rc[0]}
	                compress_rc=${rc[1]}
	                ssh_rc=${rc[2]}
	                if [ "$tar_rc" = 2 ] || [ "$compress_rc" != 0 ] || [ "$ssh_rc" != 0 ]; then
		            die_and_hook "could not tar PGDATA"
	                fi
	            fi
                fi
	        cd -- "$was"

	        # Tar the tablespaces
	        while read line ; do
	            name=$(cut -d '|' -f 1 <<< "$line")
	            _name=${name//+([[:space:]])/_}	# No space version, we want paths without spaces
	            location=$(cut -d '|' -f 2 <<< "$line")

	            # Skip empty locations used for pg_default and pg_global, which are in PGDATA
	            [ -z "$location" ] && continue

	            info "backing up tablespace \"$name\" with tar"

                    # Change directory to the parent directory or the tablespace to be
                    # able to tar only the base directory
	            was=`pwd`
	            if ! cd -- "$location"; then
		        die_and_hook "could not change current directory to $location"
	            fi

	            # Tar the directory, directly to the remote location if needed.  The name
                    # of the tar file is the tablespace name defined in the cluster, which is
                    # unique.
	            info "archiving $location"
	            if [ "$backup_local" = "yes" ]; then
                        if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                            tar -cpf - --ignore-failed-read --exclude='pgsql_tmp' -- * 2>/dev/null | "${gpg_command[@]}" -o "$backup_dir/tblspc/${_name}.tar.gpg"
                            rc=(${PIPESTATUS[*]})
		            tar_rc=${rc[0]}
                            gpg_rc=${rc[1]}
		            if [ "$tar_rc" = 2 ] || [ "$gpg_rc" != 0 ]; then
		                die_and_hook "could not tar tablespace \"$name\""
		            fi
                        else
		            tar -cpf - --ignore-failed-read --exclude='pgsql_tmp' -- * 2>/dev/null | $BACKUP_COMPRESS_BIN > "$backup_dir/tblspc/${_name}.tar.$BACKUP_COMPRESS_SUFFIX"
		            rc=(${PIPESTATUS[*]})
		            tar_rc=${rc[0]}
		            compress_rc=${rc[1]}
		            if [ "$tar_rc" = 2 ] || [ "$compress_rc" != 0 ]; then
		                die_and_hook "could not tar tablespace \"$name\""
		            fi
                        fi
	            else
                        if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                            tar -cpf - --ignore-failed-read --exclude='pgsql_tmp' -- * 2>/dev/null | "${gpg_command[@]}" | ssh -- "$backup_ssh_target" "cat > $(qw "$backup_dir/tblspc/${_name}.tar.gpg")" 2>/dev/null
		            rc=(${PIPESTATUS[*]})
		            tar_rc=${rc[0]}
                            gpg_rc=${rc[1]}
		            ssh_rc=${rc[2]}
		            if [ "$tar_rc" = 2 ] || [ "$gpg_rc" != 0 ] || [ "$ssh_rc" != 0 ]; then
		                die_and_hook "could not tar tablespace \"$name\""
		            fi
                        else
		            tar -cpf - --ignore-failed-read --exclude='pgsql_tmp' -- * 2>/dev/null | $BACKUP_COMPRESS_BIN | ssh -- "$backup_ssh_target" "cat > $(qw "$backup_dir/tblspc/${_name}.tar.$BACKUP_COMPRESS_SUFFIX")" 2>/dev/null
		            rc=(${PIPESTATUS[*]})
		            tar_rc=${rc[0]}
		            compress_rc=${rc[1]}
		            ssh_rc=${rc[2]}
		            if [ "$tar_rc" = 2 ] || [ "$compress_rc" != 0 ] || [ "$ssh_rc" != 0 ]; then
		                die_and_hook "could not tar tablespace \"$name\""
		            fi
	                fi
                    fi

	            cd -- "$was"

	        done < "$tblspc_list"
	        ;;



            "rsync")
	        info "backing up PGDATA with rsync"
                init_rsync_opts
	        rsync_link=()
	        if [ -n "$prev_backup" ]; then
	            # Link previous backup of pgdata
	            info "backup with hardlinks from $prev_backup"
	            if [ "$backup_local" = "yes" ]; then
		        rsync_link=( '--link-dest' "$prev_backup/pgdata" )
	            else
		        rsync_link=( '--link-dest' "$(qw "$prev_backup/pgdata")" )
	            fi
	        fi

	        info "transferring data from $PGDATA"
	        if [ "$backup_local" = "yes" ]; then
	            rsync $rsync_opts -aq --delete-excluded --exclude 'pgsql_tmp' --exclude "pg_${xlog_or_wal}" --exclude 'pg_replslot/*' --exclude 'postmaster.*' --exclude 'restored_config_files' --exclude 'backup_label.old' --exclude '*.sql' "${rsync_link[@]}" -- "$PGDATA/" "$backup_dir/pgdata/"
	            rc=$?
	            if [ $rc != 0 ] && [ $rc != 24 ]; then
		        die_and_hook "rsync of PGDATA failed with exit code $rc"
	            fi
	        else
	            rsync $rsync_opts -e "ssh -o Compression=no" -zaq --delete-excluded --exclude 'pgsql_tmp' --exclude "pg_${xlog_or_wal}" --exclude 'pg_replslot/*' --exclude 'postmaster.*' --exclude 'restored_config_files' --exclude 'backup_label.old' --exclude '*.sql' "${rsync_link[@]}" -- "$PGDATA/" "$backup_ssh_target:$(qw "$backup_dir/pgdata/")"
	            rc=$?
	            if [ $rc != 0 ] && [ $rc != 24 ]; then
		        die_and_hook "rsync of PGDATA failed with exit code $rc"
	            fi
	        fi


	        # Tablespaces. We do the same as pgdata: hardlink the previous
	        # backup directory if possible, then rsync.
	        while read line; do
	            name=$(cut -d '|' -f 1 <<< "$line")
	            _name=${name//+([[:space:]])/_}	# No space version, we want paths without spaces
	            location=$(cut -d '|' -f 2 <<< "$line")

	            # Skip empty locations used for pg_default and pg_global, which are in PGDATA
	            [ -z "$location" ] && continue

	            info "backing up tablespace \"$name\" with rsync"

	            rsync_link=()
	            if [ -n "$prev_backup" ]; then
	    	        # Link previous backup of the tablespace
		        if [ "$backup_local" = "yes" ]; then
		            [ -d "$prev_backup/tblspc/$_name" ] && rsync_link=( '--link-dest' "$prev_backup/tblspc/$_name" )
		        else
                            if ssh -n -- "$backup_ssh_target" "test -d $(qw "$prev_backup/tblspc/$_name")" 2>/dev/null; then
		                rsync_link=( '--link-dest' "$(qw "$prev_backup/tblspc/$_name")" )
                            fi
		        fi
	            fi

	            # rsync
	            info "transferring data from $location"
	            if [ "$backup_local" = "yes" ]; then
		        rsync $rsync_opts -aq --delete-excluded --exclude 'pgsql_tmp' "${rsync_link[@]}" -- "$location/" "$backup_dir/tblspc/$_name/"
		        rc=$?
		        if [ $rc != 0 ] && [ $rc != 24 ]; then
	    	            die_and_hook "rsync of tablespace \"$name\" failed with exit code $rc"
	    	        fi
	            else
		        rsync $rsync_opts -e "ssh -o Compression=no" -zaq --delete-excluded --exclude 'pgsql_tmp' "${rsync_link[@]}" -- "$location/" "$backup_ssh_target:$(qw "$backup_dir/tblspc/$_name/")"
		        rc=$?
		        if [ $rc != 0 ] && [ $rc != 24 ]; then
	    	            die_and_hook "rsync of tablespace \"$name\" failed with exit code $rc"
	    	        fi
	            fi

	        done < "$tblspc_list"
	        ;;



            *)
	        die_and_hook "Unknown STORAGE method '$STORAGE'"
	        ;;
        esac

        # Backup replication slots informations to a separate file. If we take
        # their status files and restore them, they would be restored as stale
        # slots. Instead we'll give the commands to recreate them after the
        # restore.
        if (( $pg_version >= 90400 )); then
            if ! replslot_list=$(mktemp -t backup_pitr.XXXXXXXXXX); then
	        die_and_hook "could not create temporary file"
            fi

            "${psql_command[@]}" -Atc \
	                         "SELECT slot_name,plugin,slot_type,database FROM pg_replication_slots;" \
	                         -- "$psql_condb" 2>/dev/null > "$replslot_list" ||
	        die_and_hook "could not get the list of replication slots from PostgreSQL"

            if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                "${gpg_command[@]}" -o "${replslot_list}.gpg" "$replslot_list" || die_and_hook "could not encrypt the list of replication slots"
            fi
        fi

        # Starting from 9.6 and when the backup is from a standby server,
        # PostgreSQL relies on the pg_control file being backed up last to
        # find the backend end location in the WAL. In this case, backup the
        # pg_control file just before ending the backup.
        if (( $pg_version >= 90600 )) && [ "$standby" = "t" ]; then
            info "backup it taken from a standby server, copying the pg_control file"

            if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                # Use a temporary file so that we can encrypt it away from PGDATA
                if ! pg_control_file=$(mktemp -t backup_pitr.XXXXXXXXXX); then
                    die_and_hook "could not create temporary file"
                fi

                "${gpg_command[@]}" -o "$pg_control_file" "$PGDATA/global/pg_control" || \
                    die_and_hook "could not encrypt the pg_control file"

                if [ "$backup_local" = "yes" ]; then
                    if ! cp -- "$pg_control_file" "$backup_dir/pg_control.gpg"; then
                        die_and_hook "could not copy the pg_control file to $backup_dir"
                    fi
                else
                    if ! scp -- "$pg_control_file" "$backup_ssh_target:$(qw "$backup_dir/pg_control.gpg")" > /dev/null; then
	                die_and_hook "could not copy the pg_control file to $backup_ssh_target:$backup_dir"
                    fi
                fi
            else
                if [ "$backup_local" = "yes" ]; then
                    if ! cp -- "$PGDATA/global/pg_control" "$backup_dir/"; then
                        die_and_hook "could not copy the pg_control file to $backup_dir"
                    fi
                else
                    if ! scp -- "$PGDATA/global/pg_control" "$backup_ssh_target:$(qw "$backup_dir/")" > /dev/null; then
                        die_and_hook "could not copy the pg_control file to $backup_ssh_target:$backup_dir"
                    fi
                fi

            fi
        fi

        # Stop backup
        stop_backup

        if (( $pg_version >= 90600 )) && (( ${BASH_VERSINFO[0]} >= 4 )); then
            # In non-exclusive mode, we have to write the backup_label files
            # ourselves. When using the tar storage, we cannot add the file to
            # PGDATA inside the tarball, so we just create files locally, and
            # copy them to the backup directory later. It is the job of the
            # restore to put them back in the correct location ($PGDATA) when
            # restoring.

            # Create the backup_label as a temporary file and put its contents
            if ! backup_label_file=$(mktemp -t backup_pitr_backup_label.XXXXXXXXXX); then
                die_and_hook "could not create temporary file"
            fi

            # Use an alternative name so we do not have to check the version
            # to remove this temp file
            backup_file=$backup_label_file
            
            echo $label_contents | tr '|' '\n' > $backup_file
            if [ $? != 0 ]; then
                die_and_hook "could not write temporary backup_label file"
            fi

            # Same goes for the tablespace mapfile (tablespace_map)
            if [ -n "$spcmap_contents" ]; then
                if ! tablespace_map_file=$(mktemp -t backup_pitr_tablespace_map.XXXXXXXXXX); then
                    die_and_hook "could not create temporary file"
                fi

                echo $spcmap_contents | tr '|' '\n' > $tablespace_map_file
                if [ $? != 0 ]; then
                    die_and_hook "could not write temporary tablespace_map file"
                fi
            fi
        else
            # The complete backup_label is going to be archived. We put it in the
            # backup, just in case and also use the stop time from the file to
            # name the backup directory and have the minimum datetime required to
            # select this backup on restore.
            backup_file="$PGDATA/pg_${xlog_or_wal}/$start_backup_label_file"

            # Get the stop date of the backup. 
            stop_time=$(sed -n 's/STOP TIME: //p' -- "$backup_file")
        fi

        # Convert the stop time to UTC, this make it easier when searching for
        # a proper backup when restoring
        if [ -n "$stop_time" ]; then
            timestamp=$(${psql_command[@]} -Atc "SELECT EXTRACT(EPOCH FROM TIMESTAMP WITH TIME ZONE '${stop_time}');" $psql_condb) ||
                warn "could not get the stop time timestamp from PostgreSQL"
        else
            die_and_hook "Failed to get STOP TIME from '$backup_file'"
        fi

        # Starting from 11, the size of WAL segment can be configured
        # when running initdb, we need to ask PostgreSQL what is the
        # size of a segment to make the check action work, keeping the
        # size of a segment in the backup is an optimisation so that
        # we do not have to uncompress a WAL file to find out its
        # size.
        if (( $pg_version >= 110000 )); then
            wal_segsize=$(${psql_command[@]} -Atc "SELECT setting FROM pg_settings WHERE name = 'wal_segment_size'" $psql_conndb) ||
                warn "could not get the size of a WAL segment from PostgreSQL"
        fi

        # Ask PostgreSQL where are its configuration file. When they are
        # outside PGDATA, copy them in the backup
        _pgdata=`readlink -f -- "$PGDATA"`

        while read -r -d '' f; do
            file=`readlink -f -- "$f"`
            if [[ ! $file =~ ^"$_pgdata" ]]; then
	        # the file in not inside PGDATA, copy it
	        destdir=$backup_dir/conf
	        dest=$destdir/$(basename -- "$file")
	        info "saving $f"

	        if [ "$backup_local" = "yes" ]; then
	            mkdir -p -- "$destdir"
                    if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                        "${gpg_command[@]}" -o "${dest}.gpg" "$file" || \
                            die_and_hook "could not encrypt $f and store it in backup directory"
                    else
	                cp -- "$file" "$dest" || \
		            die_and_hook "could not copy $f to backup directory"
                    fi
	        else
	            ssh -n -- "$backup_ssh_target" "mkdir -p -- $(qw "$destdir")" 2>/dev/null
                    if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                        # Use a temporary file so that we can encrypt it
                        if ! conf_file=$(mktemp -t backup_pitr.XXXXXXXXXX); then
                            die_and_hook "could not create temporary file"
                        fi

                        "${gpg_command[@]}" -o "$conf_file" "$file" || die_and_hook "could not encrypt $f"
	                if ! scp -- "$conf_file" "$backup_ssh_target:$(qw "${dest}.gpg")" >/dev/null; then
		            die_and_hook "could not copy $f to backup directory on $backup_ssh_target"
	                fi
                    else
                        if ! scp -- "$file" "$backup_ssh_target:$(qw "$dest")" >/dev/null; then
		            die_and_hook "could not copy $f to backup directory on $backup_ssh_target"
	                fi
                    fi
	        fi
            fi
        done < <(
            # The values of the settings is dependant of the user, it means
            # those path can include characters such as newline, which can
            # conflict with the record separator. We could use psql -0 but it
            # is not available before 9.2, this is why we loop and build nul
            # separated output this way.
            for f in 'config_file' 'hba_file' 'ident_file'; do
	        "${psql_command[@]}" -Atc \
                                     "SELECT setting FROM pg_settings WHERE name = '$f';" \
	                             -- "$psql_condb" \
 	            || warn "could not get the list of configuration files from PostgreSQL"
	        printf "\0"
            done
        )

        # Compute the name of the backup directory from the stop time, use
        # date to format the stop time as ISO 8601 if required. When we can have
        if [[ "$USE_ISO8601_TIMESTAMPS" == "yes" ]]; then
            backup_name=$(date -d "$stop_time" +"%FT%T%z") ||
                error_and_hook "could not format stop time to a directory name"
        else
            backup_name=$(echo $stop_time | awk '{ gsub(/[:-]/, "."); print $1"_"$2 }')
        fi
        new_backup_dir=$backup_root/$label_prefix/$backup_name

        # Finish the backup by copying needed files and rename the backup
        # directory to a useful name
        if [ "$backup_local" = "yes" ]; then
            [ ! -e "$new_backup_dir" ] ||
	        die_and_hook "backup directory '$new_backup_dir' already exists"

            # Rename the backup directory using the stop time
            if ! mv -- "$backup_dir" "$new_backup_dir"; then
	        die_and_hook "could not rename the backup directory"
            fi
            backup_dir=$new_backup_dir

            # Copy the backup history file
            info "copying the backup history file"
            if ! cp -- "$backup_file" "$backup_dir/backup_label"; then
	        die_and_hook "could not copy backup history file to $backup_dir"
            fi

            # Copy the tablespace mapfile from pg_stop_backup() in
            # non-exclusive mode
            if (( $pg_version >= 90600 )) && (( ${BASH_VERSINFO[0]} >= 4 )); then
                if [ -n "$tablespace_map_file" ]; then
                    info "copying the tablespace_map file"
                    if ! cp -- "$tablespace_map_file" "$backup_dir/tablespace_map"; then
                        die_and_hook "could not copy tablespace_map to $backup_dir"
                    fi
                fi
            fi

            # Save the end of backup timestamp to a file
            if [ -n "$timestamp" ]; then
	        echo "$timestamp" > "$backup_dir/backup_timestamp" || warn "could not save timestamp"
            fi

            # Save the size of a WAL segment for later check actions
            if [ -n "$wal_segsize" ]; then
                echo "$wal_segsize" > "$backup_dir/wal_segsize" || warn "could not save WAL segment size"
            fi

            # Add the name and location of the tablespace to an helper file for
            # the restoration script
            info "copying the tablespaces list"
            if ! cp -- "$tblspc_list" "$backup_dir/tblspc_list"; then
	        die_and_hook "could not copy the tablespace list to $backup_dir"
            fi

            # Save the list of defined replication slots
            if [ -f "$replslot_list" ] && (( $(cat -- "$replslot_list" | wc -l) > 0 )); then
	        info "copying the replication slots list"
                replslot_list_target="$backup_dir/replslot_list"
                if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                    # since we know there are some replication slots
                    # to save, we can remove the clear tempfile
                    rm -- "$replslot_list"
                    replslot_list="${replslot_list}.gpg"
                    replslot_list_target="$backup_dir/replslot_list.gpg"
                fi
                cp -- "$replslot_list" "$replslot_list_target" ||
                    die_and_hook "could not copy the replication slots list to $backup_dir"
            fi

            # Copy the PG_VERSION file
            info "copying PG_VERSION"
            if ! cp -- "$PGDATA/PG_VERSION" "$backup_dir"; then
                die_and_hook "could not copy PG_VERSION to $backup_dir"
            fi

            # Store the commandline and configuration file
            echo "workdir: $(pwd)" > "$backup_dir/backup_command" 2>/dev/null
            echo "command: $0" "$@" >> "$backup_dir/backup_command" 2>/dev/null
            echo "libpq exported variables:" >> "$backup_dir/backup_command" 2>/dev/null
            env | grep ^PG 2>/dev/null >> "$backup_dir/backup_command" 2>/dev/null
            
            if [ -f "$config" ]; then
                echo "config: $config" >> "$backup_dir/backup_command" 2>/dev/null
                if ! cp -- "$config" "$backup_dir/pitrery.conf"; then
                    warn "could not copy $config to $backup_dir"
                fi
            fi
        else
            if ssh -n -- "$backup_ssh_target" "test -e $(qw "$new_backup_dir")" 2>/dev/null; then
	        die_and_hook "backup directory '$backup_ssh_target:$new_backup_dir' already exists"
            fi

            # Rename the backup directory using the stop time
            if ! ssh -n -- "$backup_ssh_target" "mv -- $(qw "$backup_dir" "$new_backup_dir")" 2>/dev/null; then
	        die_and_hook "could not rename the backup directory"
            fi
            backup_dir=$new_backup_dir

            # Save the end of backup timestamp to a file
            if [ -n "$timestamp" ]; then
	        ssh -n -- "$backup_ssh_target" "echo '$timestamp' > $(qw "$backup_dir/backup_timestamp")" 2>/dev/null ||
	            warn "could not save timestamp"
            fi

            # Save the size of a WAL segment for latter check actions
            if [ -n "$wal_segsize" ]; then
                ssh -n -- "$backup_ssh_target" "echo '$wal_segsize' > $(qw "$backup_dir/wal_segsize")" 2>/dev/null ||
                    warn "could not save WAL segment size"
            fi

            # Copy the backup history file
            info "copying the backup history file"
            if ! scp -- "$backup_file" "$backup_ssh_target:$(qw "$backup_dir/backup_label")" > /dev/null; then
	        die_and_hook "could not copy backup history file to $backup_ssh_target:$backup_dir"
            fi

            # Copy the tablespace mapfile from pg_stop_backup() in
            # non-exclusive mode
            if (( $pg_version >= 90600 )) && (( ${BASH_VERSINFO[0]} >= 4 )); then
                if [ -n "$tablespace_map_file" ]; then
                    info "copying the tablespace_map file"
                    if ! scp -- $tablespace_map_file "$backup_ssh_target:$(qw "$backup_dir/tablespace_map")" > /dev/null; then
                        die_and_hook "could not copy tablespace_map to $backup_ssh_target:$backup_dir"
                    fi
                fi
            fi

            # Add the name and location of the tablespace to an helper file for
            # the restoration script
            info "copying the tablespaces list"
            if ! scp -- "$tblspc_list" "$backup_ssh_target:$(qw "$backup_dir/tblspc_list")" >/dev/null; then
	        die_and_hook "could not copy the tablespace list to $backup_ssh_target:$backup_dir"
            fi

            # Save the list of defined replication slots
            if [ -f "$replslot_list" ] && (( $(cat -- "$replslot_list" | wc -l) > 0 )); then
	        info "copying the replication slots list"
                replslot_list_target="$backup_dir/replslot_list"
                if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                    # since we know there are some replication slots
                    # to save, we can remove the clear text tempfile
                    rm -- "$replslot_list"
                    replslot_list="${replslot_list}.gpg"
                    replslot_list_target="$backup_dir/replslot_list.gpg"
                fi
	        scp -- "$replslot_list" "$backup_ssh_target:$(qw "$replslot_list_target")" >/dev/null ||
	            die_and_hook "could not copy the replication slots list to $backup_ssh_target:$backup_dir"
            fi

            # Copy the PG_VERSION file
            info "copying PG_VERSION"
            if ! scp -- "$PGDATA/PG_VERSION" "$backup_ssh_target:$(qw "$backup_dir")" >/dev/null; then
	        die_and_hook "could not copy PG_VERSION to $backup_ssh_target:$backup_dir"
            fi

            # Store the commandline and configuration file
            (echo "workdir: $(pwd)"
             echo "command: $0" "$@"
             echo "libpq exported variables:"
             env | grep ^PG) 2>/dev/null | ssh -- "$backup_ssh_target" "cat > $(qw "$backup_dir/backup_command")" 2>/dev/null

            if [ -f "$config" ]; then
                echo "config: $config" | ssh -- "$backup_ssh_target" "cat > $(qw "$backup_dir/backup_command")" 2>/dev/null
                if ! scp -- "$config" "$backup_ssh_target:$(qw "$backup_dir/pitrery.conf")"; then
                    warn "could not copy $config to $backup_ssh_target:$backup_dir"
                fi
            fi
        fi

        # Give the name of the backup
        info "backup directory is ${backup_ssh_target:+$backup_ssh_target:}$backup_dir"

        # Execute the post-backup command. It does not return on failure.
        PITRERY_EXIT_CODE=0
        post_backup_hook

        # Cleanup
        rm -f -- "$tblspc_list"
        [ -n "$replslot_list" ] && rm -f -- "$replslot_list"
        [ -n "$psql_stderr" ] && rm -f -- "$psql_stderr"
        [ -n "$backup_label_file" ] && rm -f -- "$backup_label_file"
        [ -n "$tablespace_map_file"  ] && rm -f -- "$tablespace_map_file"

        info "done"
	;;

    restore)
        dry_run="no"
        tsmv_list=()
        target_date=
        restore_xlog_config=$config
        
	load_config

	# Parse args after action: they should take precedence over the configuration
	while getopts "D:x:d:O:t:nRc:e:r:C:T?" arg 2>/dev/null; do
	    case $arg in
		D) PGDATA=$OPTARG;;
		x) PGXLOG=$OPTARG;;
		d) target_date=$OPTARG;;
		O) PGOWNER=$OPTARG;;
		t) tsmv_list+=( "$OPTARG" );;
		n) dry_run="yes";;
		R) OVERWRITE="yes";;
		c) BACKUP_UNCOMPRESS_BIN="$OPTARG";;
		e) BACKUP_COMPRESS_SUFFIX="$OPTARG";;
		r) RESTORE_COMMAND="$OPTARG";;
		C) restore_xlog_config=$OPTARG;;
		T) LOG_TIMESTAMP="yes";;
		'?') usage "restore";;
	    esac
	done

        parse_target_uri "${@:$OPTIND:1}" || usage "restore"

        # An unprivileged target owner is mandatory as PostgreSQL cannot run
        # as root.
        if [ "$(id -u -- "$PGOWNER")" = 0 ]; then
            die "the target owner cannot be root. Use -O when restoring as root"
        fi

        # When no restore_command is given, build it using restore_xlog
        if [ -z "$RESTORE_COMMAND" ]; then
            [[ "$restore_xlog_config" == */* ]] && restore_xlog_config=$(readlink -m "$restore_xlog_config")
            RESTORE_COMMAND="restore_xlog${restore_xlog_config:+ -C $(qw "$restore_xlog_config")} %f %p"
        fi

        # The parent directory of all backups must be saved so that we
        # can use backup_dir for the selected backup
        backup_root=$backup_dir
        backup_dir=

        # Find the backup according to given date.  The target date converted
        # to a timestamp is compared to the timestamp of the stop time of the
        # backup. Only after the stop time a backup is sure to be consistent.
        info "searching backup directory"

        # search the store
        if [ "$backup_local" = "yes" ]; then
            list=( "$backup_root/"[0-9]*/backup_timestamp )
            (( ${#list[@]} > 0 )) ||
	        die "Could not find any backup_timestamp files in $backup_root/*"
        else
            list=()
            while read -r -d '' d; do
	        list+=("$d")
            done < <(
	        # We could 'optimise' this slightly for the case where we only want the latest,
	        # by adding a `| cut -d '' -f1` after the sort, to only return the first one,
	        # but the amount of extra data transferred here is tiny compared to the rest of
	        # the backup, so it is probably better to just reuse this for both cases than to
	        # duplicate the logic needed just for that.
	        ssh -n -- "$backup_ssh_target" "find $(qw "$backup_root") -path $(qw "$backup_root/[0-9]*/backup_timestamp") -type f -print0 | sort -z"
            )

            (( ${#list[@]} > 0 )) ||
	        die "Could not find any backup_timestamp files in $backup_root/* on $backup_ssh_target"
        fi

        if [ -n "$target_date" ]; then
            # Target recovery time in seconds since the epoch, for easy archive searching.
            target_timestamp=$(date -d "$target_date" '+%s') || die "invalid target date '$target_date'"

            # Target recovery time in a format suitable for use in recovery.conf
            recovery_target_time=$(date -d "$target_date" '+%F %T %z') || die "invalid target date '$target_date'"

            # The timestamp must be a string of (only) digits, we do arithmetic with it below.
            # This shouldn't ever fail, but better to catch it here than let odd things happen later.
            [[ $target_timestamp =~ ^[[:digit:]]+$ ]] || die "invalid target_timestamp '$target_timestamp'"

            # find the latest backup
            for t in "${list[@]}"; do
	        # get the timestamp of the end of the backup
	        if [ "$backup_local" = "yes" ]; then
	            backup_timestamp=$(< "$t")
	        else
	            backup_timestamp=$(ssh -n -- "$backup_ssh_target" "cat -- $(qw "$t")")
	        fi

	        if [[ $backup_timestamp =~ ^[[:digit:]]+$ ]]; then
	            (( $backup_timestamp < $target_timestamp )) || break;
	            backup_dir=$(dirname -- "$t")
	        else
	            warn "could not get the ending timestamp of $t"
	        fi
            done
        else
            # Get the latest backup.
            # The test for list being empty here is just belt and braces,
            # we should have already failed with an error above if it is.
            (( ${#list[@]} > 0 )) && backup_dir=$(dirname -- "${list[*]: -1}")
        fi

        [ -n "$backup_dir" ] || die "Could not find a backup${recovery_target_time:+ for $recovery_target_time}"

        # As of PostgreSQL 10, "xlog" has been renamed to "wal" in
        # functions and directories, so we need to get the PostgreSQL
        # version of the backup
        info "retrieving the PostgreSQL version of the backup"
        if [ "$backup_local" = "yes" ]; then
            if [ -f "$backup_dir/PG_VERSION" ]; then
                info "PostgreSQL version: $(cat -- "$backup_dir/PG_VERSION")"
                vdigit=$(sed -nr 's/^([0-9]+).*/\1/p' -- "$backup_dir/PG_VERSION")
            fi
        else
            vfile=$(qw "$backup_dir/PG_VERSION")
            pgversion=$(ssh -n -- "$backup_ssh_target" "[ ! -f $vfile ] || cat -- $vfile") ||
                die "Failed to read $backup_ssh_target:$backup_dir/PG_VERSION"
            info "PostgreSQL version: $pgversion"
            vdigit=$(sed -nr 's/^([0-9]+).*/\1/p' <<< "$pgversion")
        fi

        if [ -z "$vdigit" ]; then
            warn "PG_VERSION not found or usable in the backup, assuming PostgreSQL < 10"
            xlog_or_wal="xlog"
        else
            if (( 10#$vdigit >= 10 )); then
                xlog_or_wal="wal"
            else
                xlog_or_wal="xlog"
            fi
        fi

        # get the tablespace list and check the directories
        info "searching for tablespaces information"
        if [ "$backup_local" = "yes" ]; then
            if [ -f "$backup_dir/tblspc_list" ]; then
	        tblspc_list=$(< "$backup_dir/tblspc_list") || die "Failed to read $backup_dir/tblspc_list"
            fi
        else
            tfile=$(qw "$backup_dir/tblspc_list")
            tblspc_list=$(ssh -n -- "$backup_ssh_target" "[ ! -f $tfile ] || cat -- $tfile") ||
	        die "Failed to read $backup_ssh_target:$backup_dir/tblspc_list"
        fi

        # Prepare the final list of tablespace directories
        if [ -n "$tblspc_list" ]; then
            while read -r l; do
	        tdir=$(cut -d '|' -f 2 <<< "$l")

	        # skip pg_default and pg_global, they are located inside PGDATA
	        [ -z "$tdir" ] && continue

	        i=${#tspc_name[@]}
	        tspc_name[$i]=$(cut -d '|' -f 1 <<< "$l")
	        tspc_dir[$i]=$tdir
	        tspc_oid[$i]=$(cut -d '|' -f 3 <<< "$l")
	        tspc_reloc[$i]="no"

	        for t in "${tsmv_list[@]}"; do
                    tname=$(cut -s -d ':' -f 1 <<< "$t")
                    tdir=$(cut -s -d ':' -f 2 <<< "$t")
                    [ -n "$tdir" ] || die "missing tablespace name in relocation option: $t"

                    # relocation can be done using the name or the oid of the tablespace
                    if [ "$tname" = "${tspc_name[$i]}" ] || [ "$tname" = "${tspc_oid[$i]}" ]; then
	                if [ "${tspc_dir[$i]}" != "$tdir" ]; then
	                    tspc_dir[$i]=$tdir
	                    tspc_reloc[$i]="yes"
	                fi
	                break
                    fi
                done
	    done <<< "$tblspc_list"
        fi

        tspc_count=${#tspc_name[@]}

        # Find out what storage method is used in the backup. If the PGDATA is
        # stored as a gzip'ed tarball, the method is tar, if it is a
        # directory, then we used rsync to put files there.
        if [ "$backup_local" = "yes" ]; then
            if [ -f "$backup_dir/pgdata.tar.$BACKUP_COMPRESS_SUFFIX" ]; then
	        storage="tar"
            elif [ -f "$backup_dir/pgdata.tar.gpg" ]; then
                storage="tar"
                BACKUP_ENCRYPT="yes"
            elif [ -d "$backup_dir/pgdata" ]; then
	        storage="rsync"
            else
	        # Check if we have a tarball with different compression to what we are expecting.
	        storage=$(find "$backup_dir" -mindepth 1 -maxdepth 1 -name 'pgdata.tar.*' -type f -printf '%f' -quit)
            fi
        else
            storage=$(ssh -n -- "$backup_ssh_target" "if [ -f $(qw "$backup_dir/pgdata.tar.$BACKUP_COMPRESS_SUFFIX") ]; then echo 'tar'; elif [ -d $(qw "$backup_dir/pgdata") ]; then echo 'rsync'; else find $(qw "$backup_dir") -maxdepth 1 -mindepth 1 -name 'pgdata.tar.*' -type f -printf '%f' -quit; fi")

            if [ "$storage" = "pgdata.tar.gpg" ]; then
                storage="tar"
                BACKUP_ENCRYPT="yes"
            fi
        fi

        [ -n "$storage" ] ||
            die "could not find what storage method is used in ${backup_ssh_target:+$backup_ssh_target:}$backup_dir"

        # Check if the compression suffix is unexpected, so that it
        # can be fixed by the user in the config or commandline
        if [[ $storage =~ ^pgdata\.tar\. ]]; then
            die "expecting '$BACKUP_COMPRESS_SUFFIX' compression, but found ${backup_ssh_target:+$backup_ssh_target:}$backup_dir/$storage"
        fi

        # Prepare the decryption commandline, with a gpg-agent
        if [ "$BACKUP_ENCRYPT" = "yes" ]; then
            gpg_command=( "$GPG_BIN" "--batch" "--yes" "--decrypt" "--quiet" )
        fi

        # Display some info on the restore
        info
        info "backup:"
        info "  directory: $backup_dir"
        info "  storage: $storage"
        info "  encryption: $BACKUP_ENCRYPT"
        info
        info "destinations directories:"
        info "  PGDATA -> $PGDATA"

        [ -n "$PGXLOG" ] && info "  PGDATA/pg_${xlog_or_wal} -> $PGXLOG"

        # Populate an array with tablespace directory to check we have duplicates
        declare -a tspc_dedup

        # Print the tablespace relocation information
        for (( i=0; i<$tspc_count; ++i )); do
            info "  tablespace \"${tspc_name[$i]}\" (${tspc_oid[$i]}) -> ${tspc_dir[$i]} (relocated: ${tspc_reloc[$i]})"
            tspc_dedup+=( ${tspc_dir[$i]} )
        done

        info
        info "recovery configuration:"
        info "  target owner of the restored files: $PGOWNER"
        info "  restore_command = '$RESTORE_COMMAND'"
        [ -n "$recovery_target_time" ] && info "  recovery_target_time = '$recovery_target_time'"
        info

        # Check if tablespace relocation list have duplicates
        if (( $(for o in "${tspc_dedup[@]}"; do echo $o; done | sort -u | wc -l) < $tspc_count )); then
            die "found duplicates in tablespace relocations. Check options and the list of tablespaces of the backup"
        fi

        
        # Check target directories
        check_and_fix_directory "$PGDATA"

        if [ -n "$PGXLOG" ]; then
            [[ $PGXLOG == /* ]] || die "pg_${xlog_or_wal} must be an absolute path"

            if [ "$PGXLOG" = "$PGDATA/pg_${xlog_or_wal}" ]; then
	        die "pg_${xlog_or_wal} path cannot be \$PGDATA/pg_${xlog_or_wal}, this path is reserved. It seems you do not need -x"
            fi

            check_and_fix_directory "$PGXLOG"
        fi

        # Check the tablespaces directory and create them if possible
        for d in "${tspc_dir[@]}"; do
            check_and_fix_directory "$d"
        done

        if [ "$dry_run" = "yes" ]; then
            exit 0
        fi

        # Real work starts here

        # Extract everything
        case $storage in
            "tar")
	        # pgdata
	        info "extracting PGDATA to $PGDATA"
	        was=`pwd`
	        cd -- "$PGDATA"
	        if [ "$backup_local" = "yes" ]; then
                    if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                        "${gpg_command[@]}" -o - "$backup_dir/pgdata.tar.gpg" | tar xf -
                        rc=(${PIPESTATUS[*]})
                        gpg_rc=${rc[0]}
	                tar_rc=${rc[1]}
                        if [ "$gpg_rc" != 0 ] || [ "$tar_rc" != 0 ]; then
		            die "could not extract $backup_dir/pgdata.tar.gpg to $PGDATA"
	                fi
                    else
	                $BACKUP_UNCOMPRESS_BIN -c -- "$backup_dir/pgdata.tar.$BACKUP_COMPRESS_SUFFIX" | tar xf -
	                rc=(${PIPESTATUS[*]})
	                uncompress_rc=${rc[0]}
	                tar_rc=${rc[1]}
	                if [ "$uncompress_rc" != 0 ] || [ "$tar_rc" != 0 ]; then
		            die "could not extract $backup_dir/pgdata.tar.$BACKUP_COMPRESS_SUFFIX to $PGDATA"
	                fi
                    fi
	        else
                    if [ "$BACKUP_ENCRYPT" = "yes" ]; then
	                ssh -n -- "$backup_ssh_target" "cat -- $(qw "$backup_dir/pgdata.tar.gpg")" 2>/dev/null | "${gpg_command[@]}" | tar xf - 2>/dev/null
	                rc=(${PIPESTATUS[*]})
	                ssh_rc=${rc[0]}
                        gpg_rc=${rc[1]}
	                tar_rc=${rc[2]}
	                if [ "$ssh_rc" != 0 ] || [ "$gpg_rc" != 0 ] || [ "$tar_rc" != 0 ]; then
		            die "could not extract $backup_ssh_target:$backup_dir/pgdata.tar.gpg to $PGDATA"
	                fi
                    else
                        ssh -n -- "$backup_ssh_target" "cat -- $(qw "$backup_dir/pgdata.tar.$BACKUP_COMPRESS_SUFFIX")" 2>/dev/null | $BACKUP_UNCOMPRESS_BIN | tar xf - 2>/dev/null
	                rc=(${PIPESTATUS[*]})
	                ssh_rc=${rc[0]}
	                uncompress_rc=${rc[1]}
	                tar_rc=${rc[2]}
	                if [ "$ssh_rc" != 0 ] || [ "$uncompress_rc" != 0 ] || [ "$tar_rc" != 0 ]; then
		            die "could not extract $backup_ssh_target:$backup_dir/pgdata.tar.$BACKUP_COMPRESS_SUFFIX to $PGDATA"
	                fi
                    fi
	        fi
	        cd -- "$was"
	        info "extraction of PGDATA successful"
	        ;;

            "rsync")
	        info "transferring PGDATA to $PGDATA with rsync"

                # Force remove the bandwidth limit when restoring
                RSYNC_BWLIMIT=""
                init_rsync_opts

	        if [ "$backup_local" = "yes" ]; then
	            rsync $rsync_opts -aq --delete -- "$backup_dir/pgdata/" "$PGDATA/"
	            rc=$?
	            if [ $rc != 0 ] && [ $rc != 24 ]; then
		        die "rsync of PGDATA failed with exit code $rc"
	            fi
	        else
	            rsync $rsync_opts -e "ssh -o Compression=no" -za --delete -- "$backup_ssh_target:$(qw "$backup_dir/pgdata/")" "$PGDATA/"
	            rc=$?
	            if [ $rc != 0 ] && [ $rc != 24 ]; then
		        die "rsync of PGDATA failed with exit code $rc"
	            fi
	        fi
	        info "transfer of PGDATA successful"
	        ;;

            *)
	        die "Unknown STORAGE method '$storage'"
	        ;;
        esac

        # On PostgreSQL >= 9.6 restore the backup_label file produced outside
        # of the PGDATA. It is always in the backup and it is mandatory for
        # PITR, so we just have to check if the file is in PGDATA and restore
        # it when missing.
        if [ ! -f "$PGDATA/backup_label" ]; then
            if [ "$backup_local" = "yes" ]; then
                if ! cp -- "$backup_dir/backup_label" "$PGDATA/backup_label"; then
                    die "could not restore the backup_label file to $PGDATA"
                fi
            else
                if ! scp -- "$backup_ssh_target:$backup_dir/backup_label" "$PGDATA/backup_label" >/dev/null; then
                    die "could not restore the backup_label file to $PGDATA"
                fi
            fi
        fi

        # Restore the configuration file in a subdirectory of PGDATA
        restored_conf="$PGDATA/restored_config_files"

        if [ "$backup_local" = "yes" ]; then
            # Check the directory, when configuration files are
            # inside PGDATA it does not exist
            if [ -d "$backup_dir/conf" ]; then
	        info "restoring configuration files to $restored_conf"
	        if ! cp -r -- "$backup_dir/conf" "$restored_conf"; then
	            die "could not copy $backup_dir/conf to $restored_conf"
	        fi
            fi

        else
            confdir=$(qw "$backup_dir/conf")
            if ssh -n -- "$backup_ssh_target" "test -d $confdir" 2>/dev/null; then
	        info "restoring configuration files to $restored_conf"
	        if ! scp -r -- "$backup_ssh_target:$confdir" "$restored_conf" >/dev/null; then
	            die "could not copy $backup_ssh_target:$backup_dir/conf to $restored_conf"
	        fi
            fi
        fi

        # Decrypt restored config files
        if [ "$BACKUP_ENCRYPT" = "yes" ]; then
            _files=( "$restored_conf"/*.gpg )
            for f in "${_files[@]}"; do
                if ! "${gpg_command[@]}" -o "$restored_conf/$(basename "$f" .gpg)" "$f"; then
                    die "could not decrypt configuration file $f"
                else
                    rm -- "$f" || error "could not remove $f"
                fi
            done
        fi

        # change owner of PGDATA to the target owner
        if [ "`id -u`" = 0 ] && [ "`id -un`" != "$PGOWNER" ]; then
            info "setting owner of PGDATA ($PGDATA)"
            if ! chown -R -- "$PGOWNER:" "$PGDATA"; then
	        die "could not change owner of PGDATA to $PGOWNER"
            fi
        fi

        # Enable the extended pattern matching operators.
        # We use them here for replacing whitespace in the tablespace tarball names.
        shopt -s extglob

        # tablespaces
        for (( i=0; i<$tspc_count; ++i )); do
            name=${tspc_name[$i]}
            _name=${name//+([[:space:]])/_} # No space version, we want paths without spaces
            tbldir=${tspc_dir[$i]}
            oid=${tspc_oid[$i]}

            # Change the symlink in pg_tblspc when the tablespace directory changes
            if [ "${tspc_reloc[$i]}" = "yes" ]; then
	        ln -sf "$tbldir" "$PGDATA/pg_tblspc/$oid" || die "could not update the symbolic of tablespace $name ($oid) to $tbldir"

	        # Ensure the new link has the correct owner, the chown -R
	        # issued after extraction will not do it
	        if [ "`id -u`" = 0 ] && [ "`id -un`" != "$PGOWNER" ]; then
	            chown -h -- "$PGOWNER:" "$PGDATA/pg_tblspc/$oid"
	        fi
            fi

            # Get the data in place
            case $storage in
	        "tar")
	            info "extracting tablespace \"${name}\" to $tbldir"
	            was=`pwd`
	            cd -- "$tbldir"
	            if [ "$backup_local" = "yes" ]; then
                        if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                            "${gpg_command[@]}" -o - "$backup_dir/tblspc/${_name}.tar.gpg" | tar xf -
		            rc=(${PIPESTATUS[*]})
                            gpg_rc=${rc[0]}
		            tar_rc=${rc[1]}
		            if [ "$gpg_rc" != 0 ] || [ "$tar_rc" != 0 ]; then
		                die "Could not extract tablespace $name to $tbldir"
		            fi
                        else
                            $BACKUP_UNCOMPRESS_BIN -c -- "$backup_dir/tblspc/${_name}.tar.$BACKUP_COMPRESS_SUFFIX" | tar xf -
		            rc=(${PIPESTATUS[*]})
		            uncompress_rc=${rc[0]}
		            tar_rc=${rc[1]}
		            if [ "$uncompress_rc" != 0 ] || [ "$tar_rc" != 0 ]; then
		                die "Could not extract tablespace $name to $tbldir"
		            fi
                        fi
	            else
                        if [ "$BACKUP_ENCRYPT" = "yes" ]; then
		            ssh -n -- "$backup_ssh_target" "cat -- $(qw "$backup_dir/tblspc/${_name}.tar.gpg")" 2>/dev/null | "${gpg_command[@]}" | tar xf - 2>/dev/null
		            rc=(${PIPESTATUS[*]})
		            ssh_rc=${rc[0]}
                            gpg_rc=${rc[1]}
	                    tar_rc=${rc[2]}
		            if [ "$ssh_rc" != 0 ] || [ "$gpg_rc" != 0 ] || [ "$tar_rc" != 0 ]; then
		                die "Could not extract tablespace $name to $tbldir"
		            fi
                        else
                            ssh -n -- "$backup_ssh_target" "cat -- $(qw "$backup_dir/tblspc/${_name}.tar.$BACKUP_COMPRESS_SUFFIX")" 2>/dev/null | $BACKUP_UNCOMPRESS_BIN | tar xf - 2>/dev/null
		            rc=(${PIPESTATUS[*]})
		            ssh_rc=${rc[0]}
		            uncompress_rc=${rc[1]}
		            tar_rc=${rc[2]}
		            if [ "$ssh_rc" != 0 ] || [ "$uncompress_rc" != 0 ] || [ "$tar_rc" != 0 ]; then
		                die "Could not extract tablespace $name to $tbldir"
		            fi
                        fi
	            fi
	            cd -- "$was"
	            info "extraction of tablespace \"${name}\" successful"
	            ;;

	        "rsync")
	            info "transferring tablespace \"${name}\" to $tbldir with rsync"
	            if [ "$backup_local" = "yes" ]; then
		        rsync $rsync_opts -aq --delete -- "$backup_dir/tblspc/${_name}/" "$tbldir/"
		        rc=$?
		        if [ $rc != 0 ] && [ $rc != 24 ]; then
		            die "rsync of tablespace \"${name}\" failed with exit code $rc"
		        fi
	            else
		        rsync $rsync_opts -e "ssh -o Compression=no" -za --delete -- "$backup_ssh_target:$(qw "$backup_dir/tblspc/${_name}/")" "$tbldir/"
		        rc=$?
		        if [ $rc != 0 ] && [ $rc != 24 ]; then
		            die "rsync of tablespace \"${name}\" failed with exit code $rc"
		        fi
	            fi
	            info "transfer of tablespace \"${name}\" successful"
	            ;;

	        *)
	            die "Unknown STORAGE method '$storage'"
	            ;;
            esac

            # change owner of the tablespace files to the target owner
            if [ "`id -u`" = 0 ] && [ "`id -un`" != "$PGOWNER" ]; then
	        info "setting owner of tablespace \"$name\" ($tbldir)"
	        if ! chown -R -- "$PGOWNER:" "$tbldir"; then
	            die "could not change owner of tablespace \"$name\" to $PGOWNER"
	        fi
            fi
        done

        # Starting from 9.6 and when the backup is from a standby server,
        # PostgreSQL relies on the pg_control file being backed up last to
        # find the backend end location in the WAL. If we find the pg_control
        # file at the root of the backup directory, resore it.
        if [ "$backup_local" = "yes" ]; then
            if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                if [ -f "$backup_dir/pg_control.gpg" ]; then
                    info "pg_control file found in $backup_dir, restoring it"
                    if ! "${gpg_command[@]}" -o "$PGDATA/global/pg_control" "$backup_dir/pg_control.gpg"; then
                        die "could not decrypt and restore pg_control file to $PGDATA/global/"
                    fi
                fi
            else
                if [ -f "$backup_dir/pg_control" ]; then
                    info "pg_control file found in $backup_dir, restoring it"
                    if ! cp -- "$backup_dir/pg_control" "$PGDATA/global/pg_control"; then
                        die "could not restore pg_control file to $PGDATA/global/"
                    fi
                fi
            fi
        else
            if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                if ssh -n -- "$backup_ssh_target" "test -f $(qw "$backup_dir/pg_control.gpg")"; then
                    info "pg_control file found in $backup_dir, restoring it"
                    ssh -n --  "$backup_ssh_target" "cat -- $(qw "$backup_dir/pg_control.gpg")" 2>/dev/null | "${gpg_command[@]}" > "$PGDATA/global/pg_control"
                    rc=(${PIPESTATUS[*]})
	            ssh_rc=${rc[0]}
                    gpg_rc=${rc[1]}
                    if [ "$ssh_rc" != 0 ] || [ "$gpg_rc" != 0 ]; then
                        die "could not decrypt and restore pg_control file to $PGDATA/global/"
                    fi
                fi
            else
                if ssh -n -- "$backup_ssh_target" "test -f $(qw "$backup_dir/pg_control")"; then
                    info "pg_control file found in $backup_dir, restoring it"
                    if ! scp -- "$backup_ssh_target:$(qw "$backup_dir/pg_control")" "$PGDATA/global/pg_control" >/dev/null; then
                        die "could not restore pg_control file to $PGDATA/global/"
                    fi
                fi
            fi
        fi

        # As of PostgreSQL 9.5, a map of tablespaces is created in the
        # tablespace_map file during the basebackup. The postmaster use the
        # file recreate the symlinks in pg_tblspc when starting a
        # recovery. This ensure symlinks are correctly restored on Windows but
        # it interferes with our tablespace relocation feature. So we just
        # remove the tablespace_map file, because our implementation works for
        # versions older than 9.5.
        if [ -f "$PGDATA/tablespace_map" ]; then
            if ! rm -- "$PGDATA/tablespace_map"; then
                die "could not remove the tablespace_map file"
            fi
        fi

        # Create or symlink pg_xlog/pg_wal directory if needed
        if [ -d "$PGXLOG" ]; then
            info "creating symbolic link pg_${xlog_or_wal} to $PGXLOG"
            if ! ln -sf -- "$PGXLOG" "$PGDATA/pg_${xlog_or_wal}"; then
	        die "could not create $PGDATA/pg_${xlog_or_wal} symbolic link"
            fi
            if [ "`id -u`" = 0 ] && [ "`id -un`" != "$PGOWNER" ]; then
	        if ! chown -h -- "$PGOWNER:" "$PGDATA/pg_${xlog_or_wal}"; then
	            die "could not change owner of pg_${xlog_or_wal} symbolic link to $PGOWNER"
	        fi
            fi
        fi

        if [ ! -d "$PGDATA/pg_${xlog_or_wal}/archive_status" ]; then
            info "preparing pg_${xlog_or_wal} directory"
            if ! mkdir -p -- "$PGDATA/pg_${xlog_or_wal}/archive_status"; then
	        die "could not create $PGDATA/pg_${xlog_or_wal}"
            fi

            if ! chmod -- 700 "$PGDATA/pg_${xlog_or_wal}" "$PGDATA/pg_${xlog_or_wal}/archive_status" 2>/dev/null; then
	        die "could not set permissions of $PGDATA/pg_${xlog_or_wal} and $PGDATA/pg_${xlog_or_wal}/archive_status"
            fi

            if [ "`id -u`" = 0 ] && [ "`id -un`" != "$PGOWNER" ]; then
	        if ! chown -R -- "$PGOWNER:" "$PGDATA/pg_${xlog_or_wal}"; then
	            die "could not change owner of $dir to $PGOWNER"
	        fi
            fi
        fi

        # Check PG_VERSION
        if [ -f "$PGDATA/PG_VERSION" ]; then
            pgvers=$(< "$PGDATA/PG_VERSION")
            # As of PostgreSQL 10, replacing the dot in the major
            # version is no longer enough to do integer comparison, so
            # we put to 0 behind to make it possible
            if [[ $pgvers =~ \. ]]; then
                pgvers=${pgvers//./0}
            else
                pgvers="${pgvers}00"
            fi
        else
            warn "PG_VERSION file is missing"
        fi

        # Create a recovery.conf file in $PGDATA
        info "preparing recovery.conf file"
        echo "restore_command = '$RESTORE_COMMAND'" > "$PGDATA/recovery.conf"

        # Put the given target date in recovery.conf
        if [ -n "$recovery_target_time" ]; then
            echo "recovery_target_time = '$recovery_target_time'" >> "$PGDATA/recovery.conf"
        else
            echo "#recovery_target_time = ''	# e.g. '2004-07-14 22:39:00 EST'" >> "$PGDATA/recovery.conf"
        fi

        # Add all possible parameters for recovery, commented out.
        case $pgvers in
            802|803)
	        echo "#recovery_target_xid = ''		# 'number'"
	        echo "#recovery_target_inclusive = 'true'		# 'true' or 'false'"
	        echo "#recovery_target_timeline = ''		# number or 'latest'"
	        ;;
            804)
	        echo "#recovery_end_command = ''"
	        echo "#recovery_target_xid = ''		# 'number'"
	        echo "#recovery_target_inclusive = 'true'		# 'true' or 'false'"
	        echo "#recovery_target_timeline = ''		# number or 'latest'"
	        ;;
            901|902|903)
	        echo "#recovery_end_command = ''"
	        echo "#recovery_target_name = ''  # e.g. 'daily backup 2011-01-26'"
	        echo "#recovery_target_xid = ''"
	        echo "#recovery_target_inclusive = true"
	        echo "#recovery_target_timeline = 'latest'"
	        echo "#pause_at_recovery_target = true"
	        ;;
            904)
	        echo "#recovery_end_command = ''"
	        echo "#recovery_target_name = ''	# e.g. 'daily backup 2011-01-26'"
	        echo "#recovery_target_xid = ''"
	        echo "#recovery_target_inclusive = true"
	        echo "#recovery_target = 'immediate'"
	        echo "#recovery_target_timeline = 'latest'"
	        echo "#pause_at_recovery_target = true"
	        ;;
            905|906)
	        echo "#recovery_end_command = ''"
	        echo "#recovery_target_name = ''	# e.g. 'daily backup 2011-01-26'"
	        echo "#recovery_target_xid = ''"
	        echo "#recovery_target_inclusive = true"
	        echo "#recovery_target = 'immediate'"
	        echo "#recovery_target_timeline = 'latest'"
	        echo "#recovery_target_action = 'pause'"
	        ;;
            1000|1100)
	        echo "#recovery_end_command = ''"
	        echo "#recovery_target_name = ''	# e.g. 'daily backup 2011-01-26'"
	        echo "#recovery_target_xid = ''"
                echo "#recovery_target_lsn = ''	# e.g. '0/70006B8'"
	        echo "#recovery_target_inclusive = true"
	        echo "#recovery_target = 'immediate'"
	        echo "#recovery_target_timeline = 'latest'"
	        echo "#recovery_target_action = 'pause'"
                ;;
        esac >> "$PGDATA/recovery.conf"


        # Ensure recovery.conf as the correct owner so that PostgreSQL can
        # rename it at the end of the recovery
        if [ "`id -u`" = 0 ] && [ "`id -un`" != "$PGOWNER" ]; then
            if ! chown -R -- "$PGOWNER:" "$PGDATA/recovery.conf"; then
	        die "could not change owner of recovery.conf to $PGOWNER"
            fi
        fi

        # Generate a SQL file in PGDATA to update the catalog when tablespace
        # locations have changed. It is only needed when using PostgreSQL <=9.1
        updsql=$PGDATA/update_catalog_tablespaces.sql
        rm -f -- "$updsql"
        if (( $tspc_count > 0 )) && [ -n "$pgvers" ] && (( 10#$pgvers <= 901 )); then
            for (( i=0; i<$tspc_count; ++i )); do
	        if [ "${tspc_reloc[$i]}" = "yes" ]; then
	            echo "-- update location of ${tspc_name[$i]} to ${tspc_dir[$i]}" >> "$updsql"
	            printf "UPDATE pg_catalog.pg_tablespace SET spclocation = '%s' WHERE oid = %s;\n" "${tspc_dir[$i]}" "${tspc_oid[$i]}" >> "$updsql"
	        fi
            done

            if [ "`id -u`" = 0 ] && [ "`id -un`" != "$PGOWNER" ]; then
	        chown -- "$PGOWNER:" "$updsql" 2>/dev/null
            fi
        fi

        # Generate a SQL file in PGDATA to let the user recreate the
        # replication slots existing at backup time
        replslots_sql=$PGDATA/restore_replication_slots.sql
        rm -f -- "$replslots_sql"
        if [ -n "$pgvers" ] && (( 10#$pgvers >= 904 )); then
            replslot_list_file="$backup_dir/replslot_list"            
            if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                replslot_list_file="${replslot_list_file}.gpg"
            fi

            if [ "$backup_local" = "yes" ]; then
	        if [ -f "$replslot_list_file" ]; then
                    if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                        replslot_list=$("${gpg_command[@]}" < "$replslot_list_file") || die "Failed to decrypt $replslot_list_file"
                    else
	                replslot_list=$(< "$replslot_list_file") || die "Failed to read $replslot_list_file"
                    fi
                fi
            else
	        rfile=$(qw "$replslot_list_file")
                if [ "$BACKUP_ENCRYPT" = "yes" ]; then
                    if ssh -n -- "$backup_ssh_target" "[ -f $rfile ]"; then
                        replslot_list=$(ssh -n -- "$backup_ssh_target" "cat -- $rfile" | "${gpg_command[@]}") ||
                            die "Failed to read $backup_ssh_target:$replslot_list_file"
                    fi
                else
	            replslot_list=$(ssh -n -- "$backup_ssh_target" "[ ! -f $rfile ] || cat -- $rfile") ||
                        die "Failed to read $backup_ssh_target:$replslot_list_file"
                fi
            fi

            while read -r l; do
	        rs_name=$(cut -d '|' -f 1 <<< "$l")
	        rs_plugin=$(cut -d '|' -f 2 <<< "$l")
	        rs_type=$(cut -d '|' -f 3 <<< "$l")
	        rs_db=$(cut -d '|' -f 4 <<< "$l")

	        case $rs_type in
	            "physical")
		        echo "SELECT pg_create_physical_replication_slot('$rs_name');"
		        ;;
	            "logical")
		        echo "\connect $rs_db"
		        echo "SELECT pg_create_logical_replication_slot('$rs_name', '$rs_plugin');"
		        ;;
	        esac >> "$replslots_sql"
            done <<< "$replslot_list"
        fi

        info "done"
        info
        if [ -d "$restored_conf" ]; then
            info "saved configuration files have been restored to:"
            info "  $restored_conf"
            info
        fi
        info "please check directories and recovery.conf before starting the cluster"
        info "and do not forget to update the configuration of pitrery if needed:"
        info "  $PGDATA/recovery.conf"
        info

        if [ -f "$replslots_sql" ]; then
            if [[ $(cat "$replslots_sql" | wc -l) > 0 ]]; then
                info "replication slots defined at the time of the backup can be restored"
                info "with the SQL commands from:"
                info "  $replslots_sql"
                info
            else
                rm -f -- "$replslots_sql"
            fi
        fi

        if [ -f "$updsql" ]; then
            warn "locations of tablespaces have changed, after recovery update the catalog with:"
            warn "  $updsql"
        fi


        
	;;

    purge)
        dry_run=""

	load_config

	# Parse args after action: they should take precedence over the configuration
	while getopts "m:d:a:NT?" arg 2>/dev/null; do
	    case $arg in
		m) PURGE_KEEP_COUNT=$OPTARG;;
		d) PURGE_OLDER_THAN=$OPTARG;;
                a) archive_path=$OPTARG;;
		N) dry_run="yes";;
		T) LOG_TIMESTAMP="yes";;

		"?") usage "purge";;
	    esac
	done

        parse_target_uri "${@:$OPTIND:1}" "$archive_path" || usage "purge"

        # Either -m or -d must be specified
        if [ -z "$PURGE_KEEP_COUNT" ] && [ -z "$PURGE_OLDER_THAN" ]; then
            echo "ERROR: missing purge condition. Use -m or -d." 1>&2
            usage "purge"
        fi

        PURGE_KEEP_COUNT=${PURGE_KEEP_COUNT:-0}
        PURGE_OLDER_THAN=${PURGE_OLDER_THAN:-0}

        # Ensure failed globs will be empty, not left containing the literal glob pattern
        shopt -s nullglob

        # Get the list of backups
        info "searching backups"
        list_backups "$backup_dir"

        # Get the stop time timestamp of each backup, comparing timestamps is better
        #
        # We store them in a (sparse) indexed array, which for our purposes here is
        # effectively an associative array, just with the keys automatically sorted
        # so that the oldest (numerically smallest) timestamps come first.  Things
        # would be a little bit simpler if they were sorted newest first, but not by
        # so much that it is worth going to the effort of manually sorting them in
        # the reverse order.
        candidates=()
        for dir in "${list[@]%/}"; do
            if [ "$backup_local" = "yes" ]; then
	        ts=$(< "$dir/backup_timestamp")
            else
	        ts=$(ssh -n -- "$backup_ssh_target" "cat -- $(qw "$dir/backup_timestamp")")
            fi

            # The timestamp must be a string of (only) digits, we do arithmetic with it
            if [[ $ts =~ ^[[:digit:]]+$ ]]; then
	        candidates[$ts]=$dir
            else
	        # We shouldn't normally ever be here, but if we are it's probably one
	        # of two main reasons:
	        # - This is a dir that starts with a digit but isn't actually a backup.
	        # - Is is a backup, but either we failed to put a valid backup_timestamp
	        #   into it, or that somehow got removed or corrupted again.
	        #
	        # In the latter case we could try to reconstruct a timestamp here, but
	        # it's probably safer to just let the admin figure out what went wrong
	        # if this happens.  There are two consequences to that:
	        # - We won't consider this directory for automatic purge.
	        # - We can't rely on oldest_unpurged actually being the oldest remaining
	        #   backup (for the purpose of purging WAL files), since this one could
	        #   be older than that.
	        #
	        # We could do more trickery based on the dirname to try to guess if the
	        # latter case is true, but again, Something Went Wrong Somewhere, so
	        # just play safe until the admin figures out what that was.
	        have_unknown_timestamp="yes"
	        warn "Could not get backup_timestamp for '$dir', it will not be purged"
            fi
        done

        # If a minimum number of backup must be kept, remove the $PURGE_KEEP_COUNT
        # youngest backups from the list.
        if [ -n "$PURGE_KEEP_COUNT" ]; then
            [[ $PURGE_KEEP_COUNT =~ ^[[:digit:]]+$ ]] || error "PURGE_KEEP_COUNT '$PURGE_KEEP_COUNT' is not a number"

            if (( $PURGE_KEEP_COUNT > 0 )); then
	        if (( ${#candidates[@]} > $PURGE_KEEP_COUNT )); then
	            keys=( "${!candidates[@]}" )
	            for k in "${keys[@]:$((-$PURGE_KEEP_COUNT))}"; do
		        # The list of purge candidates is sorted oldest first, so capture
		        # the first one removed from it as the oldest backup that we'll keep.
		        [ -n "$oldest_unpurged" ] || oldest_unpurged=${candidates[$k]}
		        unset "candidates[$k]"
	            done
	        else
	            oldest_unpurged=${candidates[@]::1}
	            candidates=()
	        fi
            fi
        fi

        # If older backups must be removed, filter the list by timestamp
        if [ -n "$PURGE_OLDER_THAN" ]; then
            [[ $PURGE_OLDER_THAN =~ ^[[:digit:]]+$ ]] || error "PURGE_OLDER_THAN '$PURGE_OLDER_THAN' is not a number"

            if (( $PURGE_OLDER_THAN > 0 )); then
	        limit_ts=$(($(date +%s) - 86400 * $PURGE_OLDER_THAN))
	        keys=( "${!candidates[@]}" )
	        for k in "${keys[@]}"; do
	            if (( $k >= $limit_ts )); then
		        [ -n "$oldest_unpurged_day" ] || oldest_unpurged_day=${candidates[$k]}
		        unset "candidates[$k]"
	            fi
	        done
            fi
        fi

        # If this is a dry run, we need to cache this information for checking the WAL expiry
        [ -z "$oldest_unpurged_day" ] || oldest_unpurged=$oldest_unpurged_day

        # Purge the backups
        if (( ${#candidates[@]} > 0 )); then
            info "${dry_run:+Would be }purging the following backups:"
            for d in "${candidates[@]}"; do
	        info " $d"
            done

            if [ "$dry_run" != "yes" ]; then
	        if [ "$backup_local" = "yes" ]; then
	            rm -rf -- "${candidates[@]}" || die "Failed to remove all purge candidates"
	        else
	            # We can't preserve the word splitting behaviour of a quoted array across the
	            # call to ssh, so ensure each argument is properly shell quoted instead.
	            ssh -n -- "$backup_ssh_target" "rm -rf -- $(qw "${candidates[@]}")" ||
		        die "Failed to remove all purge candidates"
	        fi
            fi
        else
            info "there are no backups to purge"
        fi


        # To be able to purge the archived xlogs, the backup_label of the oldest backup
        # is needed to find the oldest xlog file to keep.

        # The easy case, where every directory had a valid backup_timestamp when we scanned them.
        # This is all we should ever need normally.
        get_oldest_unpurged_label() {
            if [ "$backup_local" = "yes" ]; then
	        backup_label=$(< "$oldest_unpurged/backup_label") ||
	            die "Unable to read '$oldest_unpurged/backup_label'"
            else
	        backup_label=$(ssh -n -- "$backup_ssh_target" "cat -- $(qw "$oldest_unpurged/backup_label")") ||
	            die "Unable to read '$backup_ssh_target:$oldest_unpurged/backup_label'"
            fi
        }

        # The fallback case, where we have directories which matched the [0-9]* glob,
        # that _might_ be backups and have a backup_label in them, but didn't have a
        # valid backup_timestamp for some reason.
        find_oldest_backup_label() {
            if [ "$backup_local" = "yes" ]; then
	        blist=( "$backup_dir/"[0-9]*/backup_label )
	        if (( ${#blist[@]} > 0 )); then
	            backup_label=$(< "${blist[0]}") || die "Unable to read ${blist[0]}"
	        fi
            else
	        # It would probably be better to do something more like the local version above,
	        # but this should be portable regardless of the remote login shell, and gets it
	        # done with a single ssh connection.
	        backup_label=$(ssh -n -- "$backup_ssh_target" "f=\$(find $(qw "$backup_dir") -path $(qw "$backup_dir/[0-9]*/backup_label") -type f -print0 | sort -z | cut -d '' -f1) && [ -n \"\$f\" ] && cat -- \"\$f\"") || die "Unable to read the backup_label file of the oldest backup"
            fi
        }

        if [ "$dry_run" = "yes" ]; then
            # For a dry run we want the backup_label from the oldest one that we wouldn't have removed.

            if [ -n "$oldest_unpurged" ]; then
	        # We have timestamps for at least some directories, if not all of them,
	        # and we aren't purging every directory with a backup_timestamp file.
	        # Scanning for backup_label files would give the wrong answer (since we
	        # didn't actually delete anything this time), so just use the best answer
	        # we have, and warn if we can't be certain that it's 100% correct.
	        [ "$have_unknown_timestamp" != "yes" ] ||
	            warn "Some directories are missing a backup_timestamp.  Dry run report may not be correct."

	        get_oldest_unpurged_label

            elif (( ${#candidates[@]} == 0 )); then
	        # We found no directories with a backup_timestamp file (and so there were
	        # no candidates for purging).  If we do have some directories that did not
	        # have a backup_timestamp, we can 'safely' still scan those for backup_label
	        # files which we can use to expire old WAL segment files (and we should have
	        # errored out already before getting here if we don't have some of those).
	        if [ "$have_unknown_timestamp" = "yes" ]; then
	            warn "The backup_timestamp was missing from all directories.  Basing WAL expiry on the oldest backup_label found"
	            find_oldest_backup_label
	        fi
            else
	        # We would have purged all backups with a backup_timestamp file if this wasn't
	        # a dry run.  If there are directories without one, then we can't simply scan
	        # them here.  We could add even more logic to enumerate them based on whether
	        # they contain a backup_label and the stop time recorded in it, but this really
	        # shouldn't ever happen in normal use, so that seems like overkill if we aren't
	        # going to just do that always and get rid of the backup_timestamp files.
	        # If something is that messed up, best we just leave the admin to sort it out.
	        [ "$have_unknown_timestamp" != "yes" ] ||
	            warn "All directories with a backup_timestamp would be purged, but some directories without one would remain."
            fi

        else
            # If every directory had a backup_timestamp file, then we already know the oldest
            # one that we didn't purge (which we have to track to be able to do dry runs).
            # If there were some that didn't, then they may be older than it is, so scan the
            # remaining directories again looking for backup_label files, to ensure we don't
            # purge any WAL files which they would need.
            if [ "$have_unknown_timestamp" = "yes" ]; then
	        warn "Some directories are missing a backup_timestamp.  They have not been purged and WAL files they depend on will not be expired."
	        find_oldest_backup_label

            elif [ -n "$oldest_unpurged" ]; then
	        get_oldest_unpurged_label

                #else
	        # We have just purged all of the available backup directories, we have no
	        # remaining backup_label to use for WAL expiry.  We could purge all the WAL
	        # files up to what was needed for the most recent purged backup, or simply
	        # nuke them all - but leave this as an Admin Problem for now, if they really
	        # had intended to delete Everything, there's probably more afoot than simply
	        # pruning old files to make space.
            fi
        fi

        if [ -z "$backup_label" ]; then
            info "no backup found after purge. Please remove old archives by hand."
            exit 0
        fi


        # Extract the name of the WAL file from the backup history file, and
        # split it in timeline, log and segment
        wal_file=$(awk '/^START WAL LOCATION/ { gsub(/[^0-9A-F]/,"",$6); print $6 }' <<< "$backup_label")

        # This must be only (hex) digits, or the arithmetic operations below will not do what we hope
        [[ $wal_file =~ ^[0-9A-F]{24}$ ]] || die "'$wal_file' does not appear to be a WAL segment file name"

        info "listing WAL files older than $(basename -- "$wal_file")"

        # List the WAL files and remove the old ones based on their name which
        # are ordered in time by their naming scheme. The filter is on the
        # nine first chars so that history files are excluded.
        wal_list=()
        if [ "$archive_local" = "yes" ]; then
            wal_list=( "$archive_dir/"[0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F]* )
        else
            while read -r -d '' f; do
	        wal_list+=("$f")
            done < <(
	        ssh -n -- "$archive_ssh_target" "find $(qw "$archive_dir") -maxdepth 1 -mindepth 1 -name '[0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F]*' -type f -print0 | sort -z"
            )
        fi

        # Compare and remove files from the list
        wal_purge_list=()
        stop_wal_file_found="no"
        for wal in "${wal_list[@]}"; do
            # the wal files come ordered, when the first to keep comes, our list is complete
            if [[ $wal =~ $wal_file ]]; then
                stop_wal_file_found="yes"
                break
            fi
            wal_purge_list+=( "$wal" )
        done

        if [ $stop_wal_file_found = "yes" ]; then
            info "${#wal_purge_list[@]} old WAL file(s) to remove"
            if (( ${#wal_purge_list[@]} > 0 )); then
                if [ "$dry_run" = "yes" ]; then
	            info "Would purge ${#wal_purge_list[@]} old WAL file(s):"
	            info " First: $(basename -- "${wal_purge_list[1]}")"
	            info " Last: $(basename -- "${wal_purge_list[@]:(-1)}")"
                else
	            info "purging old WAL files"

	            # This may look ugly, but it is very easy to create a rm
	            # command with too many arguments.
	            if [ "$archive_local" = "yes" ]; then
	                for wal in "${wal_purge_list[@]}"; do
		            echo "rm -- $(qw "$wal")"
	                done | bash || die "unable to remove wal files"
	            else
	                for wal in "${wal_purge_list[@]}"; do
		            echo "rm -- $(qw "$wal")"
	                done | ssh -- "$archive_ssh_target" "cat | sh" || die "unable to remove wal files on $archive_host"
	            fi
                fi
            fi
        else
            warn "WAL file $wal_file not found in the archives"
            warn "could not create a list of WAL files to purge, not purging"
        fi
        info "done"


	;;

    check)
        check_conf="yes"
        check_backups="no"
        check_archives="no"
        output=

        # Parse args after action
	while getopts "C:Bg:m:Ac:a:n?" arg 2>/dev/null; do
	    case $arg in
                C) config="$OPTARG";;
                B) check_backups="yes"; check_conf="no";;
                g) check_max_age="$OPTARG";;
                m) check_min_count="$OPTARG";;
                A) check_archives="yes"; check_conf="no";;
                c) archive_compress_bin_override="yes"; CLI_ARCHIVE_UNCOMPRESS_BIN="$OPTARG";;
                a) archive_path="$OPTARG";;
                n) output="nagios";;
		"?") usage "check";;
	    esac
	done

        if [[ "$check_backups" == "yes" ]] && [[ "$check_archives" == "yes" ]] && [[ "$output" == "nagios" ]]; then
            die "cannot check backups and archives at the same time with nagios output"
        fi

        if [[ "$check_conf" = "yes" ]]; then
            # Some extra check, since we are going to manipulate the
            # configuration file

            # Recheck if the config option is a path or just a name in the
            # configuration directory.  Prepend the configuration directory and
            # .conf when needed.
            if [[ $config != */* ]]; then
                config="$config_dir/$(basename -- "$config" .conf).conf"
            fi

            if [ -e "$config" ]; then
                [ -r "$config" ] || die "configuration file $config is not readable"

                info "checking $config"

	        load_config

                # Dump configuration file
                info "the configuration file contains:"
                grep -- '=' "$config" | grep -E '^[^[:space:]#]+'
                echo
            else
                warn "configuration file $config does not exist"
            fi
        else
            load_config
            [ -n "$archive_compress_bin_override" ] && ARCHIVE_UNCOMPRESS_BIN="$CLI_ARCHIVE_UNCOMPRESS_BIN"
        fi

        if [[ "$check_conf" = "yes" ]]; then
            info "==> checking the configuration for inconsistencies"

            conf_ok=1
            # Sanity checks on the configuration
            # Only tar or rsync are allowed as storage method
            if [ "$STORAGE" != "tar" ] && [ "$STORAGE" != "rsync" ]; then
                error "storage method (STORAGE) must be 'tar' or 'rsync'"
                conf_ok=0
            fi

            # At least one of PURGE_KEEP_COUNT and PURGE_OLDER_THAN must be
            # configured, otherwise purge won't work
            if [ -z "$PURGE_KEEP_COUNT" ] && [ -z "$PURGE_OLDER_THAN" ]; then
                error "purge not configured, either PURGE_KEEP_COUNT or PURGE_OLDER_THAN must be set"
                conf_ok=0
            fi

            if [[ $conf_ok == 1 ]]; then
                info "configuration seems correct"
            else
                warn "errors found in the configuration. The following tests may not be accurate"
            fi

            # Prepare psql command, it must be an array.
            if [ -n "$PGPSQL" ]; then
                psql_command=( "$PGPSQL" )
            fi

            info "==> checking backup configuration"

            parse_target_uri

            if [ "$backup_local" = "yes" ]; then
                info "backups are local, not checking SSH"

                check_local_directory "$backup_dir"
            else
                info "checking SSH connection for backups"
                if ! check_ssh "$backup_ssh_target"; then
	            die "the backup host must be reachable. Aborting"
                fi

                info "checking backup directory: $backup_dir"
                # check if the target directory exist, or can be created. It means a
                # parent must exist and be writable. At this point we cannot see
                # if it is a proper mount point that would have enough space
                check_remote_directory "$backup_ssh_target" "$backup_dir"

                # Check if rsync is available on the remote host, it is only
                # needed by the rsync storage method
                if [[ $STORAGE == "rsync" ]]; then
	            info "checking rsync on the remote host: $backup_ssh_target"
	            if ! ssh -n -- "$backup_ssh_target" "which rsync" >/dev/null 2>&1; then
	                error "could not find rsync in the PATH on $backup_ssh_target"
	            else
	                info "rsync found on the remote host"
	            fi
                fi
            fi

            # Check the local rsync
            if [[ $STORAGE == "rsync" ]]; then
                info "checking rsync on the local host"
                if ! which rsync >/dev/null 2>&1; then
	            error "could not find rsync in the PATH of the local host"
                else
	            info "rsync found on the local host"
                fi
            fi

            info "==> checking WAL files archiving configuration"

            arch_ok="yes"
            if [ "$archive_local" = "yes" ]; then
                info "WAL archiving is local, not checking SSH"
                info "checking WAL archiving directory: $archive_dir"
                check_local_directory "$archive_dir" || arch_ok="no"
            else
                info "checking SSH connection for WAL archiving"
                check_ssh "$archive_ssh_target" || arch_ok="no"

                info "checking WAL archiving directory: $archive_dir"
                check_remote_directory "$archive_ssh_target" "$archive_dir" || arch_ok="no"

                # Check if rsync is installed on the remote host
                info "checking rsync on the remote host: $archive_ssh_target"
                if ! ssh -n -- "$archive_ssh_target" "which rsync" >/dev/null 2>&1; then
	            error "could not find rsync in the PATH on $archive_ssh_target"
	            arch_ok="no"
                else
	            info "rsync found on the remote host"
                fi
            fi

            # Check the local rsync for WAL archiving
            if [[ $STORAGE == "rsync" ]] || [[ $archive_local == "no" ]]; then
                info "checking rsync on the local host"
                if ! which rsync >/dev/null 2>&1; then
	            error "could not find rsync in the PATH of the local host"
	            arch_ok="no"
                else
	            info "rsync found on the local host"
                fi
            fi

            # Archiving with archive_xlog is not mandatory, so tell the user that
            # it may be unusable
            if [[ $arch_ok == "no" ]]; then
                error "archiving may not work with archive_xlog and the current configuration"
                error "please consider another way to archive WAL files"
            fi

            # Check access to postgres
            info "==> checking access to PostgreSQL"

            # Prepare psql command line. Starting from 9.6 .psqlrc is sourced with
            # psql -c or -f, so we force -X
            psql_command+=( "-X" )
            [ -n "$PGHOST" ] && psql_command+=( "-h" "$PGHOST" )
            [ -n "$PGPORT" ] && psql_command+=( "-p" "$PGPORT" )
            [ -n "$PGUSER" ] && psql_command+=( "-U" "$PGUSER" )

            psql_condb=${PGDATABASE:-postgres}

            check_psql_command

            # Show the command line of psql and the contents of the environnement
            # variables starting with PG, they may affect the behaviour of psql.
            info "psql command and connection options are: ${psql_command[@]}"
            info "connection database is: $psql_condb"
            info "environment variables (maybe overwritten by the configuration file):"
            while read -r -d '' v; do
                echo $v | grep -q "^PG" || continue
                info "  $v"
            done < <(env -0 2>/dev/null || warn "could not read the environment: env -0 failed")
            
            check_postgresql_config || out_rc=1

            info "==> checking access to PGDATA"

            # Access to PGDATA
            if [ -n "$data_directory" ]; then
                test "$PGDATA" -ef "$data_directory"
                if [ $? != 0 ]; then
	            info "data_directory setting is: $data_directory"
	            info "PGDATA is: $PGDATA"
	            error "configured PGDATA is different than the data directory reported by PostgreSQL"
                else
	            info "PostgreSQL and the configuration reports the same PGDATA"
                fi
            fi

            # This test may be stupid but in case we do not have access to
            # PostgreSQL, we only have the configuration which may be
            # incorrect
            if [ ! -e "$PGDATA" ]; then
                error "$PGDATA does not exist"
                exit 1
            fi

            if [ ! -d "$PGDATA" ]; then
                error "$PGDATA is not a directory"
                exit 1
            fi

            dperms=$(stat -c %a -- "$PGDATA" 2>/dev/null) || error "Unable to get permissions of $PGDATA"
            downer=$(stat -c %U -- "$PGDATA" 2>/dev/null) || error "Unable to get owner of $PGDATA"

            if [[ "$dperms" == "700" ]]; then
                info "permissions of PGDATA ok"
            else
                warn "permissions of PGDATA are not 700: $dperms"
            fi

            # Do not run the owner test if run as root, superuser won't have
            # problems accessing the files
            if [ "$(id -u)" != 0 ]; then
                owner=${PGOWNER:-$(id -un)}
                if [[ "$owner" == "$downer" ]]; then
	            info "owner of PGDATA is the current user"
                else
	            warn "owner of PGDATA is not the current user: $downer"
                fi

                # To see if we can backup, just check if we can read the version
                # file
                if [[ -r "$PGDATA/PG_VERSION" ]]; then
	            info "access to the contents of PGDATA ok"
                else
	            error "cannot read $PGDATA/PG_VERSION, access to PGDATA may not be possible"
                fi
            else
                info "running as root, not checking access to the contents of PGDATA"
            fi
        fi

        if [[ "$check_backups" = "yes" ]]; then
            
            if [ -n "$check_max_age" ]; then
                # transform age to seconds so that we can compare with backup_timestamp
                if [[ "$check_max_age" =~ ^[[:digit:]]+(s|min|h|d)?$ ]]; then
                    val=$(sed -r -e 's/^([0-9]+)(s|min|h|d)?$/\1/g' <<< "$check_max_age")
                    unit=$(sed -r -e 's/^([0-9]+)(s|min|h|d)?$/\2/g' <<< "$check_max_age")

                    case $unit in
                        "s") age=$val;;
                        "min") age=$(($val * 60));;
                        "h") age=$(($val * 3600));;
                        # default unit is days
                        *) age=$(($val * 86400));;
                    esac
                else
                    die "invalid value for age: not a time, e.g. positive number with s,min,h,d units"
                fi
            else
                # fallback to PURGE_OLDER_THAN
                if [ -n "$PURGE_OLDER_THAN" ]; then
                    if [[ "$PURGE_OLDER_THAN" =~ ^[[:digit:]]+$ ]]; then
                        age=$(($PURGE_OLDER_THAN * 86400))
                    else
                        die "invalid value for PURGE_OLDER_THAN: not a positive number"
                    fi
                fi
            fi

            if [ -n "$check_min_count" ]; then
                if [[ "$check_min_count" =~ ^[[:digit:]]+$ ]]; then
                    min=$check_min_count
                else
                    die "invalid value for count: not a positive number"
                fi
            else
                if [ -n "$PURGE_KEEP_COUNT" ]; then
                    if [[ "$PURGE_KEEP_COUNT" =~ ^[[:digit:]]+$ ]]; then
                        min=$PURGE_KEEP_COUNT
                    else
                        die "invalid value for PURGE_KEEP_COUNT: not a positive number"
                    fi
                fi
            fi

            if [ -z "$age" ] && [ -z "$min" ]; then
                die "no limit set. -m or -g required"
            fi

            parse_target_uri "${@:$OPTIND:1}" "$archive_path"
            list_backups "$backup_dir"

            # Count the backups
            count=${#list[@]}

            # for max age, compute the timestamp limit for comparison count stuff for perfdata
            [ -n "$age" ] && limit_ts=$(($(date +%s) - $age))

            # Get the backup_timestamp from the backup and compare
            newest_ts=0
            too_old="yes"
            for dir in "${list[@]%/}"; do
                if [ "$backup_local" = "yes" ]; then
	            ts=$(< "$dir/backup_timestamp")
                else
	            ts=$(ssh -n -- "$backup_ssh_target" "cat -- $(qw "$dir/backup_timestamp")")
                fi

                # The timestamp must be a string of (only) digits, we do arithmetic with it
                if [[ $ts =~ ^[[:digit:]]+$ ]]; then

                    # remember the date of the newest backup
                    if (( $ts > $newest_ts )); then
                        newest_ts=$ts
                    fi

                    # just check the age and keep the information for the output part later
                    if [ -n "$age" ]; then
                        if (( $ts >= $limit_ts )); then
                            too_old="no"
                        fi
                    fi
                fi
            done

            # compute the age of the newest backups for logging
            nts_age=$(($(date +%s) - $newest_ts))
            nts_age_human="$(($nts_age/86400))d $(($nts_age%86400/3600))h $(($nts_age%86400%3600/60))min $(($nts_age%86400%3600%60))s"

            # Output the result
            if [[ "$output" == "nagios" ]]; then
                if [ -n "$age" ]; then
                    if [[ $too_old == "yes" ]]; then
                        out_rc=2
                    fi
                fi

                if [ -n "$min" ]; then
                    if (( $count < $min )); then
                        out_rc=2
                    fi
                fi

                case $out_rc in
                    0) status="OK";;
                    2) status="CRITICAL";;
                esac

                echo "PITRERY BACKUPS $status - count: $count, newest: $nts_age_human | count=${count};${min};${min} newest=${nts_age}s;${age};${age}"

            else
                if [[ "$backup_local" == "yes" ]]; then
                    info "checking local backups in $backup_dir"
                else
                    info "checking backups in $backup_host:$backup_dir"
                fi
                info "newest backup age: ${nts_age_human}"
                info "number of backups: $count"

                if [ -n "$age" ]; then
                    if [[ $too_old == "yes" ]]; then
                        error "backups are too old"
                    fi
                fi

                if [ -n "$min" ]; then
                    if (( $count < $min )); then
                        error "not enough backups, $min required"
                    fi
                fi

                [[ $out_rc = 0 ]] && info "backups policy checks ok"

            fi

        fi

        if [[ "$check_archives" == "yes" ]]; then

            _arg="${@:$OPTIND:1}"
            parse_target_uri "$_arg" "$archive_path"
            list_backups "$backup_dir"

            if [[ "$output" != "nagios" ]]; then
                if [[ "$archive_local" == "yes" ]]; then
                    info "checking local archives in $archive_dir"
                else
                    info "checking archives in $archive_host:$archive_dir"
                fi
            fi

            if [ -n "$wal_segsize" ]; then
                if ! [[ "$wal_segsize" =~ ^[0-9]+$ ]]; then
                    die "wal segment size must a number"
                fi
            fi

            # Find the oldest backup
            oldest_ts=
            oldest_dir=
            for dir in "${list[@]%/}"; do
                if [ "$backup_local" = "yes" ]; then
	            ts=$(< "$dir/backup_timestamp")
                else
	            ts=$(ssh -n -- "$backup_ssh_target" "cat -- $(qw "$dir/backup_timestamp")")
                fi

                # The timestamp must be a string of (only) digits, we do arithmetic with it
                if [[ $ts =~ ^[[:digit:]]+$ ]]; then

                    # remember the date of the oldest backup
                    if [ -z "$oldest_ts" ]; then
                        oldest_ts=$ts
                        oldest_dir="$dir"
                    elif (( $ts < $oldest_ts )); then
                        oldest_ts=$ts
                        oldest_dir="$dir"
                    fi
                fi
            done

            if [ -n "$oldest_dir" ]; then
                [[ "$output" != "nagios" ]] && info "oldest backup is: $oldest_dir"

                # find the start wal
                if [ "$backup_local" = "yes" ]; then
                    backup_label=$(< "$oldest_dir/backup_label") ||
	                die "unable to read '$oldest_dir/backup_label'"
                else
	            backup_label=$(ssh -n -- "$backup_ssh_target" "cat -- $(qw "$oldest_dir/backup_label")") ||
	            die "unable to read '$backup_ssh_target:$oldest_dir/backup_label'"
                fi

                # Extract the name of the WAL file from the backup
                start_wal_file=$(awk '/^START WAL LOCATION/ { gsub(/[^0-9A-F]/,"",$6); print $6 }' <<< "$backup_label")

                # This must be only (hex) digits, or the arithmetic operations below will not do what we hope
                [[ $start_wal_file =~ ^[0-9A-F]{24}$ ]] || die "start wal file '$start_wal_file' does not appear to be a WAL segment file name in backup_label"

                [[ "$output" != "nagios" ]] && info "start wal file is: $start_wal_file"
                start_wal_tln=$(( 0x$(cut -b 1-8 <<< "$start_wal_file") ))
                start_wal_wal=$(( 0x$(cut -b 9-16 <<< "$start_wal_file") ))
                start_wal_seg=$(( 0x$(cut -b 17-24 <<< "$start_wal_file") ))

                # Get the version of PostgreSQL to determine if we
                # check WAL files which finish by FF.
                if [ "$backup_local" = "yes" ]; then
                    pgvers=$(< "$oldest_dir/PG_VERSION") ||
                        die "unable to read '$oldest_dir/PG_VERSION'"
                else
                    pgvers=$(ssh -n -- "$backup_ssh_target" "cat -- $(qw "$oldest_dir/PG_VERSION")") ||
                        die "unable to read '$backup_ssh_target:$oldest_dir/PG_VERSION'"
                fi

                # Ensure we can do integer comparison on the version,
                # even with the changing in numbering introduced with
                # PostgreSQL 10
                if [[ $pgvers =~ \. ]]; then
                    pgvers=${pgvers//./0}
                else
                    pgvers="${pgvers}00"
                fi

                [[ "$output" != "nagios" ]] && info "listing WAL files"

                # List the WAL files. The filter is on the nine first
                # chars so that history files are excluded.
                wal_list=()
                if [ "$archive_local" = "yes" ]; then
                    wal_list=( "$archive_dir/"[0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F]* )
                else
                    while read -r -d '' f; do
	                wal_list+=("$f")
                    done < <(
	                ssh -n -- "$archive_ssh_target" "find $(qw "$archive_dir") -maxdepth 1 -mindepth 1 -name '[0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F][0-9A-F]*' -type f -print0 | sort -z"
                    )
                fi

                # Versions before 9.3 skip the FF segment
                if (( $pgvers < 903 )); then
                    max_seg=254
                elif (( $pgvers < 11 )); then
                    max_seg=255
                else
                    # Starting from 11, the WAL segment size can be
                    # configured at cluster creation, using
                    # --wal-segsize of initdb.  This means we have to
                    # find the size of a segment to compute the
                    # maximum number of segments per log.

                    # First we check if the wal_segsize exists and
                    # contains the size of the segments. When it
                    # fails, we take the first segment, uncompress it
                    # and measure its size
                    [[ "$output" != "nagios" ]] && info "looking for WAL segment size"

                    if [ "$backup_local" = "yes" ]; then
                        wal_segsize=$(cat "$oldest_dir/wal_segsize" 2>/dev/null)
                    else
                        wal_segsize=$(ssh -n -- "$backup_ssh_target" "cat -- $(qw "$oldest_dir/wal_segsize")" 2>/dev/null)
                    fi

                    if [ -z "$wal_segsize" ]; then
                        for wal_file in "${wal_list[@]}"; do
                            [[ "$wal_file" =~ .*\.backup.* ]] && continue
                            first_wal_file="$wal_file"
                            break
                        done

                        tmpwal=$(mktemp -t pitr_wal.XXXXXXXXXX) ||
                            error "Failed to create temporary file"
                        if [ "$archive_local" = "yes" ]; then
                            if ! cp "$first_wal_file" $tmpwal; then
                                die "Could not copy archived wal file"
                            fi
                        else
                            if ! scp -- "${archive_ssh_target}:$(qw "$first_wal_file")" $tmpwal >/dev/null 2>&1; then
                                die "Could not copy archived wal file from ${archive_ssh_target}"
                            fi
                        fi

                        if [ -n "$ARCHIVE_UNCOMPRESS_BIN" ]; then
                            [[ "$output" != "nagios" ]] && info "uncompressing $first_wal_file to measure its size"

                            wal_segsize=$($ARCHIVE_UNCOMPRESS_BIN -c $tmpwal 2>/dev/null| wc -c)
                            if [[ -z "$wal_segsize" ]] || [[ "$wal_segsize" == 0 ]]; then
                                die "Could not uncompress the WAL to check its size"
                            fi
                        else
                            [[ "$output" != "nagios" ]] && info "assuming WAL segments are not compressed"
                            wal_segsize=$(cat $tmpwal 2>/dev/null| wc -c)
                            if [[ -z "$wal_segsize" ]] || [[ "$wal_segsize" == 0 ]]; then
                                die "Could not get the size of the WAL segment"
                            fi
                        fi

                        rm -f -- $tmpwal
                    fi

                    # wal_segsize is in bytes, compute the number of segments per log
                    max_seg=$(( 0xFFFFFFFF / $wal_segsize ))
                    segsize=$(( $wal_segsize / 1048576 ))
                    [[ "$output" != "nagios" ]] && info "WAL segment size: ${segsize}MB"
                fi

                [[ "$output" != "nagios" ]] && info "WAL segments per log: $max_seg"

                start_wal_file_found="no"

                # initiate counters, mc for missing count, chk_c and
                # prev_ch_c for progress and speed, tot_c for total to
                # check
                mc=0
                chk_c=0
                tot_c="${#wal_list[@]}"
                ts_c=$(date +%s)
                for wal_file in "${wal_list[@]}"; do
                    # Skip archived backup_label files
                    [[ "$wal_file" =~ .*\.backup.* ]] && continue
                    wal_file="$(basename -- "$wal_file")"
                    if ! [[ $wal_file =~ ^[0-9A-F]{24}.*$ ]]; then
                        [[ "$output" != "nagios" ]] && warn "'$wal_file' does not appear to be a WAL segment file name"
                        continue
                    fi

                    # Initialize or update the counters
                    if [ -z "$cur_tln" ]; then
                        [[ "$output" != "nagios" ]] && info "first WAL file checked is: $wal_file"
                        cur_tln=$(( 0x$(cut -b 1-8 <<< "$wal_file") ))
                        cur_wal=$(( 0x$(cut -b 9-16 <<< "$wal_file") ))
                        cur_seg=$(( 0x$(cut -b 17-24 <<< "$wal_file") ))
                    else
                        if (( $cur_seg >= $max_seg )); then
                            cur_seg=0
                            cur_wal=$((cur_wal + 1))
                            if (( $(( $cur_wal % 4 )) == 0)); then
                                if [[ $start_wal_file_found != "yes" ]]; then
                                    [[ "$output" != "nagios" ]] && info "looking for $(printf "%08X%08X%08X" "$cur_tln" "$cur_wal" "$cur_seg")"
                                else
                                    speed=0
                                    delta=$(( $(date +%s) - ts_c ))
                                    (( $delta > 0 )) && speed=$(( chk_c / delta ))
                                    [[ "$output" != "nagios" ]] && info "looking for $(printf "%08X%08X%08X" "$cur_tln" "$cur_wal" "$cur_seg")" \
                                                                        "(checked ${chk_c}/${tot_c}, $speed segs/s)"
                                fi
                            fi
                        else
                            cur_seg=$((cur_seg + 1))
                        fi
                    fi

                    # split the name to tln wal seg
                    tln=$(( 0x$(cut -b 1-8 <<< "$wal_file") ))
                    wal=$(( 0x$(cut -b 9-16 <<< "$wal_file") ))
                    seg=$(( 0x$(cut -b 17-24 <<< "$wal_file") ))

                    # check if we have reached the start wal file of the oldest backup
                    if ( (( $start_wal_tln == $tln )) && (( $start_wal_wal == $wal )) && (( $start_wal_seg == $seg )) ); then
                        [[ "$output" != "nagios" ]] && info "start WAL file found"
                        start_wal_file_found="yes"
                    fi
                    if [[ $start_wal_file_found != "yes" ]]; then
                        tot_c=$((tot_c - 1))
                        continue
                    else
                        chk_c=$((chk_c + 1))
                    fi

                    # compare to counters
                    while ! ( (( $cur_tln == $tln )) && (( $cur_wal == $wal )) && (( $cur_seg == $seg )) ); do
                        if (( $cur_tln != $tln )); then
                            # if the timeline has changed, reset the counted to continue the check
                            # on the new timeline from the current
                            # wal_file
                            cur_tln=$tln
                            cur_wal=$wal
                            cur_seg=$seg
                        else
                            # Only print the first missing file
                            if [ -z "$missing" ]; then
                                missing=$(printf "%08X%08X%08X" "$cur_tln" "$cur_wal" "$cur_seg")
                                [[ "$output" != "nagios" ]] && error "missing WAL file: $missing"
                            fi
                            mc=$((mc + 1))

                            if (( $cur_seg == $max_seg )); then
                                cur_seg=0
                                cur_wal=$((cur_wal + 1))
                            else
                                cur_seg=$((cur_seg + 1))
                            fi
                        fi
                    done

                    if [ -n "$missing" ]; then
                        [[ "$output" != "nagios" ]] && info "next found is: $(printf "%08X%08X%08X" "$cur_tln" "$cur_wal" "$cur_seg")"
                        missing=
                    fi
                done

                [[ "$output" != "nagios" ]] && info "last WAL file checked is: $wal_file"

                if [[ $start_wal_file_found != "yes" ]]; then
                    [[ "$output" != "nagios" ]] && error "start WAL file of the oldest backup not found"
                    out_rc=1
                else
                    if (( $mc > 0 )); then
                        [[ "$output" != "nagios" ]] && info "missing count is: $mc"
                        out_rc=1
                    else
                        [[ "$output" != "nagios" ]] && info "all archived WAL files found"
                    fi
                fi
            else
                [[ "$output" != "nagios" ]] && info "no backup found"
                no_backup="yes"
                out_rc=1
            fi

            if [[ "$output" == "nagios" ]]; then
                if [[ "$no_backup" == "yes" ]]; then
                    out_rc=3
                else
                    if (( $mc > 0 )); then
                        out_rc=2
                    else
                        out_rc=0
                    fi

                    if [[ $start_wal_file_found != "yes" ]]; then
                        out_rc=2
                    fi
                fi

                case $out_rc in
                    0) status="OK";;
                    2) status="CRITICAL";;
                    3) status="UNKNOWN";;
                esac

                echo "PITRERY WAL ARCHIVES $status - total: ${#wal_list[@]}, missing: $mc | total=${#wal_list[@]};; missing=${mc};;"
            fi
        fi
	;;

    configure)
	# Here we do not load any configuration so that it does not
	# change the output in the back of the user. This is important
	# for PostgreSQL related environment variables that we do not
	# want to overwrite. This is why the following loop getopts on
	# get just pass in the command line.

        psql_command=( "psql" "-X" )
        storage="tar"
        connect="yes"
        overwrite_config="no"
        encrypt="no"

        while getopts "o:fCs:m:g:D:a:Er:P:h:p:U:d:?" opt; do
            case $opt in
	        o) output=$OPTARG;;
	        f) overwrite_config="yes";;
	        C) connect="no";;
	        s) storage=$OPTARG;;
	        m) max_count=$OPTARG;;
	        g) max_days=$OPTARG;;
	        D) pgdata=$(readlink -m -- "$OPTARG");;
	        a) archive_path="$OPTARG";;
                E) encrypt="yes";;
                r) gpg_encrypt_keys="$OPTARG";;

	        P) psql=$OPTARG; psql_command=( "$OPTARG" );;
                h) dbhost=$OPTARG;;
                p) dbport=$OPTARG;;
                U) dbuser=$OPTARG;;
                d) dbname=$OPTARG;;

	        "?") usage "configure";;
            esac
        done

        # target directory for backups must be provided
        if [ -z "${@:$OPTIND:1}" ]; then
            error "missing target backup directory"
            usage "configure"
        fi

        # Only tar and rsync are allowed as storage method
        if [ "$storage" != "tar" ] && [ "$storage" != "rsync" ]; then
            die "storage method must be 'tar' or 'rsync'"
        fi

        # Only tar can be encrypted
        if [ "$storage" != "tar" ] && [ "$encrypt" = "yes" ]; then
            die "encryption only available with tar storage"
        fi

        # we need recipients
        if [ "$encrypt" = "yes" ] && [ -z "$gpg_encrypt_keys" ]; then
            die "list of recipients for GPG encryption missing"
        fi

        parse_target_uri "${@:$OPTIND:1}" "$archive_path" || usage "configure"

        # the configuration for archive_xlog must be an absolute path or
        # relative to PGDATA. Just convert it to a absolute path if needed,
        # because we cannot garantee what the user provides is relative to
        # PGDATA.
        if [ -n "$output" ]; then
            if [[ "$output" == */* ]]; then
	        restore_xlog_config=$(readlink -m "$output")
            else
	        restore_xlog_config="$output"
            fi
        fi

        if [ "$connect" = "yes" ]; then
            # Check if PostgreSQL is available and check its configuration. The
            # purpose is to output what should be changed to configure WAL
            # archiving.
            info "==> checking access to PostgreSQL"

            # Starting from 9.6 .psqlrc is sourced with psql -c or -f, so we
            # force -X
            psql_command+=( "-X" )
            [ -n "$dbhost" ] && psql_command+=( "-h" "$dbhost" )
            [ -n "$dbport" ] && psql_command+=( "-p" "$dbport" )
            [ -n "$dbuser" ] && psql_command+=( "-U" "$dbuser" )
            [ -n "$dbname" ] && psql_condb="$dbname"

            check_postgresql_config

            if [ -n "$restore_xlog_config" ]; then
	        info "please ensure archive_command includes 'archive_xlog -C $restore_xlog_config %p'"
            else
	        info "please ensure archive_command includes a call to archive_xlog"
            fi
        fi

        # When check of PostgreSQL is disabled, only output what should be
        # changed in postgresql.conf
        if [ -z "$data_directory" ]; then
            info "==> PostgreSQL configuration to change in 'postgresql.conf':"
            info "  wal_level = archive # or higher (>= 9.0 and < 9.6), replica or higher >= 9.6"
            info "  archive_mode = on # (>= 8.3)"
            if [ -n "$restore_xlog_config" ]; then
	        info "  archive_command = 'archive_xlog -C $restore_xlog_config %p'"
            else
	        info "  archive_command = 'archive_xlog -C {your_conf} %p'"
            fi
        fi

        # Check PGDATA, if it consistent with the data_directory parameter of
        # PostgreSQL. It has it flows but it is a simple consitency test.
        info "==> checking \$PGDATA"
        if [ -n "$pgdata" ]; then
            if [ -n "$data_directory" ]; then
	        test "$pgdata" -ef "$data_directory"
	        if [ $? != 0 ]; then
	            info "data_directory setting is: $data_directory"
	            info "PGDATA is: $pgdata"
	            die "configured PGDATA is different than the data directory reported by PostgreSQL"
	        fi
            fi
        else
            pgdata="$data_directory"
        fi

        # When PGDATA is not provided on the command line and PostgreSQL is
        # unreachable, fallback on the PGDATA environment variable.
        if [ -z "$pgdata" ] && [ -z "$data_directory" ]; then
            if [ -n "$PGDATA" ]; then
	        pgdata="$PGDATA"
            else
	        die "could not find what PGDATA is. Use -D or -? for help."
            fi
        fi

        # This test may be stupid but in case we do not have access to
        # PostgreSQL, we only have the configuration which may be
        # incorrect
        [ -e "$pgdata" ] || die "$pgdata does not exist"
        [ -d "$pgdata" ] || die "$pgdata is not a directory"

        downer=$(stat -c %U -- "$pgdata" 2>/dev/null) || error "Unable to get owner of $pgdata"

        # Do not run the owner test if run as root, superuser won't have
        # problems accessing the files
        if [ "$(id -u)" != 0 ]; then
            owner=$(id -un)
            if [[ "$owner" != "$downer" ]]; then
	        warn "owner of PGDATA is not the current user: $downer"
	        pgowner=$downer
            fi

            # To see if we can backup, just check if we can read the version
            # file
            if [ -r "$pgdata/PG_VERSION" ]; then
	        info "access to the contents of PGDATA ok"
            else
	        warn "cannot read $pgdata/PG_VERSION, access to PGDATA may not be possible"
            fi
        fi

        # Check gpg availability
        if [[ "$encrypt" == "yes" ]]; then
            gpg_command=( "$GPG_BIN" "--batch" "--yes" "--encrypt" )

            while read -r -d '' o; do
                gpg_command+=( "--recipient" "$o" )
            done < <(tr ':' '\0' <<< "${gpg_encrypt_keys}:")

            info "checking GPG encryption"
            echo "test" | "${gpg_command[@]}" > /dev/null
            rc=(${PIPESTATUS[*]})
            gpg_rc=${rc[1]}
            if [ "$gpg_rc" != 0 ]; then
                die "failed to check GPG encryption. command line:" "${gpg_command[@]}"
            fi
        fi

        info "==> contents of the configuration file"
        echo
        if [ -n "$output" ]; then
            # We want to create a configuration file with all the comments and
            # possible options so that it can be easily tuned afterwards.
	    conffile=$(mktemp -t pitr_config.XXXXXXXXXX) ||
                die "Failed to create temporary file for the new configuration file"

            # Ensure the configuration file has everything commented out, so
            # that we can uncomment only what is configured here
            cat > "$conffile" <<EOF
# pitrery.conf -- central configuration file for pitrery
#
# This file is intended to be copied for each backup set to be
# performed with pitrery.
#

####################
# Backup management
####################

# Data directory of the PostgreSQL cluster to backup.
#PGDATA="/var/lib/pgsql/data"

# Specify all needed information to be able to run SQL command on the
# PostgreSQL cluster to backup.  Those options may be commented out to
# let the scripts use the ones defined in the environment (be careful
# with cron which may need them)
#PGUSER="postgres"
#PGPORT=5432
#PGHOST="/tmp"
#PGDATABASE="postgres"

# Command line to invoke psql.
#PGPSQL="$PGPSQL"

# Target owner of the files when restoring with a different user
# (e.g. one may need to restore as root to create some missing
# directories, then PGOWNER should be set to "postgres" or whatever
# unprivileged user is used to run PostgreSQL)
#PGOWNER=$PGUSER

# If the transaction logs directory is outside PGDATA (see -X option
# of initdb), specify its path here to recreate the symbolic link when
# restoring.
#PGXLOG=$PGXLOG

# Directory where to store the files.  Each backup has its own
# subdirectory named after the date and time it was launched.
#BACKUP_DIR="$BACKUP_DIR"

# When performing backups to a remote host, a target host with a batch SSH
# access is needed (e.g. create passwordless SSH key pairs).  Be aware
# that commands will be run by the scripts through SSH, so this should
# be allowed. Keep this empty to perform local backups.
#BACKUP_HOST="$BACKUP_HOST"

# User to use when accessing BACKUP_HOST through SSH.  If blank, the
# login will be performed with the default SSH configuration.
#BACKUP_USER="$BACKUP_USER"

# After restoring from a backup, PostgreSQL will need a command to run
# to get archived WAL segments that need to be replayed on the
# cluster files.  This can be any valid command or script for the
# restore_command parameter of PostgreSQL (in recovery.conf).  The
# restore_xlog script is intended to be used here: leave empty to use it
#RESTORE_COMMAND=

# The old backups can be removed by setting one of those two
# parameters, one can keep at most PURGE_KEEP_COUNT backups or keep
# backups that are younger than PURGE_OLDER_THAN in days. When both
# parameters are set, age based purge keeps at least the given count.
#PURGE_KEEP_COUNT="$PURGE_KEEP_COUNT"
#PURGE_OLDER_THAN="$PURGE_OLDER_THAN"

# Hooks. Command run at some point in the different actions
#PRE_BACKUP_COMMAND="$PRE_BACKUP_COMMAND"
#POST_BACKUP_COMMAND="$POST_BACKUP_COMMAND"

# Backup storage method. "tar" creates one compressed tarball (with
# gzip) for PGDATA and each tablespace. "rsync" synchronizes PGDATA
# and each tablespace and try to optimize data transfer by hardlinking
# the previous backup (provided it was done with the "rsync"
# method). Use "tar" if space for the backup matters more than speed,
# use "rsync" if speed is more important.
#STORAGE="$STORAGE"

# Timestamp the messages.
#LOG_TIMESTAMP="$LOG_TIMESTAMP"

# When naming the backup directory from the stop time of the backup,
# use ISO 8601 format. Defaults to "no" to keep the backward
# compatibility, as mixing formats of backup names would break the
# sorting of backups on restore.
#USE_ISO8601_TIMESTAMPS="$USE_ISO8601_TIMESTAMPS"

# rsync storage method options.
# Disable the rsync on the fly comparison algorithm by adding
# --whole-file to the rsync commandline. This may improve performance
# over NFS.
#RSYNC_WHOLEFILE="no"

# Limit the bandwidth usage for rsync. This is the value of --bwlimit
# of rsync. with no unit, it is in kB/s.
#RSYNC_BWLIMIT=""

####################
# WAL archiving
####################

# The host storing the archived WAL files. Must not be set to trigger local
# archiving for archive_xlog and restore_xlog.
#ARCHIVE_HOST="$ARCHIVE_HOST"

# The user for login to ARCHIVE_HOST through SSH. If blank, PostgreSQL
# server process owner is used
#ARCHIVE_USER="$ARCHIVE_USER"

# The directory where to look for archived WAL files on BACKUP_HOST.
#ARCHIVE_DIR="\$BACKUP_DIR/archived_xlog"

# Compress the WAL segment
#ARCHIVE_COMPRESS="$ARCHIVE_COMPRESS"

# Allow overwriting the file when it exists in the archive
# directory. Since checking if the file exists before archiving has a
# performance overhead, the option is enabled by default.
#ARCHIVE_OVERWRITE="$ARCHIVE_OVERWRITE"

# Check the md5 of the archived file to the md5 of the original WAL file.
# Useful if you are paranoid or don't trust the remote storage reliability.
# Note that you can not check the archive if compression is enabled.
# If overwriting is disabled, the md5 check enabled and the archive already
# exists, the archiving returns success if the md5 check is successful.
#ARCHIVE_CHECK="$ARCHIVE_CHECK"

# Force an immediate flush of the archived file to disk before returning
# success. This is often a life saver on power loss to get ALL the archive back
# and clean.
#ARCHIVE_FLUSH="$ARCHIVE_FLUSH"

# Logging options.  When SYSLOG is no, messages are written to stderr,
# to allow the logging collector or redirection to catch them. These
# parameters are intended to match their counterparts in
# postgresql.conf. This only applies to WAL archiving.
#SYSLOG="$SYSLOG"
#SYSLOG_FACILITY="$SYSLOG_FACILITY"
#SYSLOG_IDENT="$SYSLOG_IDENT"

####################
# Compression
####################

# The following options allow to customize compression tools for WAL
# archiving. The program used in COMPRESS_BIN must support a -c option
# to output to stdout and access data from stdin (tested with gzip, pigz,
# bzip2, pbzip2, xz).  Defaults to "gzip -4"
#ARCHIVE_COMPRESS_BIN="$ARCHIVE_COMPRESS_BIN"

# The suffix added by the compression tool (needed for
# decompression). Defaults to "gz"
#ARCHIVE_COMPRESS_SUFFIX="$ARCHIVE_COMPRESS_SUFFIX"

# Path to the decompression program, it must take the file to process
# as its first parameter. Defaults to "gunzip"
#ARCHIVE_UNCOMPRESS_BIN="$ARCHIVE_UNCOMPRESS_BIN"

# The compression used with the "tar" storage method can be configured
# with these options. The command must be able to be used with
# pipes (tested with gzip, pigz, bzip2, pbzip2). Defaults to "gzip -4".
#BACKUP_COMPRESS_BIN="$BACKUP_COMPRESS_BIN"

# The suffix added by the compression tool (needed for
# decompression). Defaults to "gz".
#BACKUP_COMPRESS_SUFFIX="$BACKUP_COMPRESS_SUFFIX"

# Path to the decompression program for backup using the "tar" storage
# method, it must take the file to process as its first parameter,
# work with pipes and be able output to stdout with -c. Defaults to
# "gunzip".
#BACKUP_UNCOMPRESS_BIN="$BACKUP_UNCOMPRESS_BIN"

####################
# Encryption (gpg)
####################

# This only applies to the tar storage, as ciphered files would not be
# synchronized by rsync. Note that compression options do not apply
# when encyrpting files as GPG already compresses the output file
# using zlib by default.
#BACKUP_ENCRYPT="no"

#ARCHIVE_ENCRYPT="no"

# When gpg encryption is unabled, specifies the recipients to encrypt
# data to. It can be a colon separated list of recipients. All public
# keys must be available in the keyring of the user running the
# PostgreSQL cluster and the user who runs pitrery for base backups if
# different.
#GPG_ENCRYPT_KEYS=""

EOF
        fi

        # Write all configured parameters
        output_param "PGDATA" "$pgdata" "$conffile"

        if [ -n "$dbuser" ]; then
            output_param "PGUSER" "$dbuser" "$conffile"
        elif [ -n "$PGUSER" ]; then
            output_param "PGUSER" "$PGUSER" "$conffile"
        fi

        if [ -n "$dbport" ]; then
            output_param "PGPORT" "$dbport" "$conffile"
        elif [ -n "$PGPORT" ]; then
            output_param "PGPORT" "$PGPORT" "$conffile"
        fi

        if [ -n "$dbhost" ]; then
            output_param "PGHOST" "$dbhost" "$conffile"
        elif [ -n "$PGHOST" ]; then
            output_param "PGHOST" "$PGHOST" "$conffile"
        fi

        if [ -n "$dbname" ]; then
            output_param "PGDATABASE" "$dbname" "$conffile"
        elif [ -n "$PGDATABASE" ]; then
            output_param "PGDATABASE" "$PGDATABASE" "$conffile"
        fi

        [ -n "$psql" ] && output_param "PGPSQL" "$psql" "$conffile"
        [ -n "$pgowner" ] && output_param "PGOWNER" "$pgowner" "$conffile"

        output_param "BACKUP_DIR" "$backup_dir" "$conffile"
        [ -n "$backup_host" ] && output_param "BACKUP_HOST" "$backup_host" "$conffile"
        [ -n "$backup_user" ] && output_param "BACKUP_USER" "$backup_user" "$conffile"

        [ -n "$max_count" ] && output_param "PURGE_KEEP_COUNT" "$max_count" "$conffile"
        [ -n "$max_days" ] && output_param "PURGE_OLDER_THAN" "$max_days" "$conffile"
        # Fallback on a keep count of 2, so that our configuration passes
        # the check action
        if [ -z "$max_count" ] && [ -z "$max_days" ]; then
            output_param "PURGE_KEEP_COUNT" 2 "$conffile"
        fi

        output_param "STORAGE" "$storage" "$conffile"

        [ -n "$archive_host" ] && output_param "ARCHIVE_HOST" "$archive_host" "$conffile"
        [ -n "$archive_user" ] && output_param "ARCHIVE_USER" "$archive_user" "$conffile"
        output_param "ARCHIVE_DIR" "$archive_dir" "$conffile"
        if [ "$syslog" = "t" ]; then
            output_param "SYSLOG" "yes" "$conffile"
            [ "$syslog_facility" != "local0" ] && output_param "SYSLOG_FACILITY" "$syslog_facility" "$conffile"
            [ "$syslog_ident" != "postgres" ] && output_param "SYSLOG_IDENT" "$syslog_ident" "$conffile"
        fi

        if [[ "$encrypt" == "yes" ]]; then
            output_param "BACKUP_ENCRYPT" "$encrypt" "$conffile"
            output_param "ARCHIVE_ENCRYPT" "$encrypt" "$conffile"
            output_param "GPG_ENCRYPT_KEYS" "$gpg_encrypt_keys" "$conffile"
        fi

        echo

        # Write the configuration file
        if [ -n "$output" ]; then
            # Check if the output config option is a path or just a name in
            # the configuration directory.  Prepend the configuration
            # directory and .conf when needed.
            if [[ $output != */* ]]; then
	        output="$config_dir/$(basename -- "$output" .conf).conf"
            fi

            info "writing configuration file: $output"

            # Do not overwrite an existing configuration file
            if [ -f "$output" ] && [ $overwrite_config = "no" ]; then
	        die "target configuration file '$output' already exists"
            fi

            if [ -w "$(dirname -- "$output")" ]; then
	        if [ -n "$conffile" ]; then
	            cp -- "$conffile" "$output" || die "Could not write $output"
	            rm -- $conffile
	        fi
            else
	        die "Could not write $output: directory is not writable"
            fi
        else
            [ -n "$conffile" ] && rm -- $conffile
        fi
        ;;

    *)
	error "unknown action"
	;;
esac

exit $out_rc
