# DPLib - Asynchronous bot framework for Digital Paint: Paintball 2 servers
# Copyright (C) 2017 Michał Rokita
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import select
from collections import OrderedDict
from enum import Enum
from subprocess import Popen
import asyncio
import os
from socket import socket, AF_INET, SOCK_DGRAM
from time import time
from dplib.parse import render_text, decode_ingame_text
[docs]class ServerEvent(Enum):
TIMEOUT = 0
CHAT = 1
ELIM = 2
RESPAWN = 3
MAPCHANGE = 4
DATE = 5
NAMECHANGE = 6
ENTRANCE = 7
FLAG_CAPTURED = 8
ELIM_TEAMS_FLAG = 9
ROUND_STARTED = 10
TEAM_SWITCHED = 11
DISCONNECT = 12
FLAG_GRAB = 13
FLAG_DROP = 14
ROUND_END = 15
GAMEMODE = 16
GAME_END = 17
[docs]class GameMode(Enum):
CTF = 'CTF'
ONE_FLAG = '1Flag'
ELIMINATION = 'Elim'
DEATHMATCH = 'DM'
SIEGE = 'Siege'
TDM = 'TDM'
KOTH = 'KOTH'
PONG = 'Pong'
[docs]class BadRconPasswordError(Exception):
pass
[docs]class SecurityCheckError(Exception):
pass
[docs]class MapNotFoundError(Exception):
pass
[docs]class ListenerType(Enum):
PERMANENT = 0
TRIGGER_ONCE = 1
REGEXPS = OrderedDict([
(re.compile('^\\[\d\d:\d\d:\d\d\\] (?:(?:\\[OBS\\] )|(?:\\[ELIM\\] ))?(.*?): (.*?)\r?\n'), ServerEvent.CHAT),
# [19:54:18] hTml: test
(re.compile(
'^\\[\d\d:\d\d:\d\d\\] \\*(.*?) (?:\\((.*?)\\) eliminated \\*(.*?) \\((.*?)\\)\\.\r?\n|'
'eliminated ((?:himself)|(?:herself)) with a paintgren\\.\r?\n)'), ServerEvent.ELIM),
# [18:54:24] *|ACEBot_1| (Spyder SE) eliminated *|herself| (Spyder SE).
# [12:25:44] *whoa eliminated herself with a paintgren.
# [12:26:09] *whoa eliminated himself with a paintgren.
(re.compile('^\\[\d\d:\d\d:\d\d\\] \\*(.*?)\\\'s (.*?) revived!\r?\n'), ServerEvent.RESPAWN),
# [19:03:57] *Red's ACEBot_6 revived!
(re.compile('^\\[\d\d:\d\d:\d\d\\] (.*?) entered the game \\((.*?)\\) \\[(.*?)\\]\r?\n'), ServerEvent.ENTRANCE),
# [19:03:57] mRokita entered the game (build 41)
(re.compile('^\\[\d\d:\d\d:\d\d\\] \\*(.*?)\\\'s (.*?) returned the(?: \\*(.*?))? flag!\r?\n'), ServerEvent.FLAG_CAPTURED),
# [18:54:24] *Red's hTml returned the *Blue flag!
(re.compile('^\\[\d\d:\d\d:\d\d\\] \\*(.*?)\\\'s (.*?) earned (\d+) points for possesion of eliminated teams flag!\r?\n'),
ServerEvent.ELIM_TEAMS_FLAG),
# [19:30:23] *Blue's mRokita earned 3 points for possesion of eliminated teams flag!
(re.compile('^\\[\d\d:\d\d:\d\d\\] Round started\\.\\.\\.\r?\n'), ServerEvent.ROUND_STARTED),
# [10:20:11] Round started...
(re.compile(
'(?:^\\[\d\d:\d\d:\d\d\\] (.*?) switched from \\*((?:Red)|(?:Purple)|(?:Blue)|(?:Yellow))'
' to \\*((?:Red)|(?:Purple)|(?:Blue)|(?:Yellow))\\.\r?\n)|'
'(?:^\\[\d\d:\d\d:\d\d\\] (.*?) joined the \\*((?:Red)|(?:Purple)|(?:Blue)|(?:Yellow)) team\\.\r?\n)|'
'(?:^\\[\d\d:\d\d:\d\d\\] (.*?) is now (observing)?\\.\r?\n)'), ServerEvent.TEAM_SWITCHED),
# [10:20:11] mRokita switched from Blue to Red.
# [10:20:11] mRokita is now observing.
# [10:20:11] mRokita is now observing.
(re.compile('^\\[\d\d:\d\d:\d\d\\] [\t|-]{2}GameEnd[\t-](.*?)\r?\n'), ServerEvent.GAME_END),
# [22:40:33] GameEnd 441.9 No winner
# [22:40:33] GameEnd 1032.6 Red:23,Blue:22
# [22:40:33] GameEnd 4.9 DPBot01 wins!
# [22:40:33] GameEnd 42.9 Yellow:5,Blue:0,Purple:0,Red:0
# [22:40:33] GameEnd 42.9 Yellow:5,Blue:12,Purple:7
(re.compile('^\\[\d\d:\d\d:\d\d\\] == Map Loaded: (.+) ==\r?\n'), ServerEvent.MAPCHANGE),
# [10:20:11] == Map Loaded: airtime ==
(re.compile('^\\[\d\d:\d\d:\d\d\\] (.*?) changed name to (.*?)\\.\r?\n'), ServerEvent.NAMECHANGE),
# [19:54:54] name1 changed name to name2.
(re.compile('^\\[\d\d:\d\d:\d\d\\] (.*?) disconnected\\.\r?\n'), ServerEvent.DISCONNECT),
# [19:03:57] whoa disconnected.
(re.compile('^\\[\d\d:\d\d:\d\d\\] \\*(.*?) got the(?: \\*(.*?))? flag\\!\r?\n'), ServerEvent.FLAG_GRAB),
# [19:03:57] *whoa got the *Red flag!
(re.compile('^\\[\d\d:\d\d:\d\d\\] \\*(.*?) dropped the flag\\!\r?\n'), ServerEvent.FLAG_DROP),
# [19:03:57] *whoa dropped the flag!
(re.compile('^\\[\d\d:\d\d:\d\d\\] (.*?) team wins the round\\!\r?\n'), ServerEvent.ROUND_END),
# [14:38:50] Blue team wins the round!
(re.compile('^\\[\d\d:\d\d:\d\d\\] === ((?:Deathmatch)|(?:Team Flag CTF)|(?:Single Flag CTF)|(?:Team Siege)|(?:Team Elim)|(?:Team Siege)|(?:Team Deathmatch)|(?:Team KOTH)|(?:Pong)) ===\r?\n'), ServerEvent.GAMEMODE),
# [09:58:11] === Team Flag CTF ===
# [13:16:19] === Team Siege ===
# [21:53:54] === Pong ===
# [12:21:05] === Deathmatch ===
])
[docs]class Player(object):
"""
Player info from sv players command
:Attributes:
* dplogin - dplogin.com account id, None when Player has no account
* nick - nickname:
* build - game build
* server - an instance of :class:`Server`
"""
def __init__(self, server, id, dplogin, nick, build):
self.server = server
self.id = id
self.dplogin = dplogin
self.nick = nick
self.build = build
[docs]class Server(object):
"""
Represents a DP:PB2 server
:param hostname: Server hostname, for example '127.0.0.1'
:type hostname: str
:param port: Server port, default 27910
:type port: int
:param logfile: Path to logfile
:param rcon_password: rcon password
:param pty_master: Master of the dp2 process (Linux only, useful only if you want to run the server from your Python script). Go to the getting started section for details.
:type pty_master: int
:param init_vars: Send come commands used for security
"""
def __init__(self, hostname, port=27910, logfile=None, rcon_password=None, pty_master=None, init_vars=True):
self.__rcon_password = rcon_password
self.__hostname = hostname
self.__init_vars = init_vars
self.__port = port
self.__log_file = None
self.__is_secure = False
self.__alive = False
self.__logfile_name = logfile if not pty_master else None
self.__pty_master = pty_master
self.handlers = {
ServerEvent.CHAT: 'on_chat',
ServerEvent.ELIM: 'on_elim',
ServerEvent.RESPAWN: 'on_respawn',
ServerEvent.ENTRANCE: 'on_entrance',
ServerEvent.FLAG_CAPTURED: 'on_flag_captured',
ServerEvent.ELIM_TEAMS_FLAG: 'on_elim_teams_flag',
ServerEvent.ROUND_STARTED: 'on_round_started',
ServerEvent.TEAM_SWITCHED: 'on_team_switched',
ServerEvent.GAME_END: 'on_game_end',
ServerEvent.MAPCHANGE: 'on_mapchange',
ServerEvent.NAMECHANGE: 'on_namechange',
ServerEvent.DISCONNECT: 'on_disconnect',
ServerEvent.FLAG_GRAB: 'on_flag_grab',
ServerEvent.FLAG_DROP: 'on_flag_drop',
ServerEvent.ROUND_END: 'on_round_end',
ServerEvent.GAMEMODE: 'gamemode',
}
self.__listeners = {
ServerEvent.CHAT: [],
ServerEvent.ELIM: [],
ServerEvent.RESPAWN: [],
ServerEvent.ENTRANCE: [],
ServerEvent.FLAG_CAPTURED: [],
ServerEvent.ELIM_TEAMS_FLAG: [],
ServerEvent.ROUND_STARTED: [],
ServerEvent.TEAM_SWITCHED: [],
ServerEvent.GAME_END: [],
ServerEvent.MAPCHANGE: [],
ServerEvent.NAMECHANGE: [],
ServerEvent.DISCONNECT: [],
ServerEvent.FLAG_GRAB: [],
ServerEvent.FLAG_DROP: [],
ServerEvent.ROUND_END: [],
ServerEvent.GAMEMODE: [],
}
self.loop = asyncio.get_event_loop()
[docs] def is_listening(self):
"""
Check if the main loop is running.
:rtype: bool
"""
return self.__alive
[docs] @asyncio.coroutine
def on_chat(self, nick, message):
"""
On chat, can be overridden using the :func:`.Server.event` decorator.
:param nick: Player's nick.
:type nick: str
:param message: Message.
:type message: str
"""
pass
[docs] @asyncio.coroutine
def on_flag_captured(self, team, nick, flag):
"""
On flag captured, can be overridden using the :func:`.Server.event` decorator.
:param team: Player's team.
:type team: str
:param nick: Player's nick.
:type nick: str
:param flag: Captured flag (Blue|Red|Yellow|Purple|White)
:type flag: str
"""
pass
[docs] @asyncio.coroutine
def on_team_switched(self, nick, old_team, new_team):
"""
On team switched, can be overridden using the :func:`.Server.event` decorator.
:param nick: Player's nick
:type nick: str
:param old_team: Old team (Blue|Red|Yellow|Purple|Observer)
:type old_team: str
:param new_team: New team (Blue|Red|Yellow|Purple|Observer)
:type new_team: str
"""
pass
[docs] @asyncio.coroutine
def on_round_started(self):
"""
On round started, can be overridden using the :func:`.Server.event` decorator.
"""
pass
[docs] @asyncio.coroutine
def on_elim_teams_flag(self, team, nick, points):
"""
On scored points for possession of eliminated teams flag, can be overridden using the :func:`.Server.event` decorator.
:param team: Player's team.
:type team: str
:param nick: Player's nick.
:type nick: str
:param points: Points earned.
:type points: int
"""
pass
[docs] @asyncio.coroutine
def on_entrance(self, nick, build, addr):
"""
On entrance, can be overriden using the :func:`.Server.event` decorator.
:param nick: Player's nick
:type nick: str
:param build: Player's game version ('build 41' for example
:type build: str
:param addr: Player's address, IP:PORT ('127.0.0.1:23414' for example)
:type addr: str
"""
pass
[docs] @asyncio.coroutine
def on_game_end(self, score_blue, score_red, score_yellow, score_purple):
"""
On game end, can be overriden using the :func:`.Server.event` decorator.
:param score_blue: Blue's score - None if there was no Blue team.
:param score_red: Red's score - None if there was no Red team.
:param score_yellow: Yellow's score - None if there was no Yellow team.
:param score_purple: Purple's score - None if there was no Purple team.
"""
pass
[docs] @asyncio.coroutine
def on_elim(self, killer_nick, killer_weapon, victim_nick, victim_weapon, suicide):
"""
On elim can be overridden using the :func:`.Server.event` decorator.
:param killer_nick: Killer's nick
:type killer_nick: str
:param killer_weapon: Killer's weapon
:type killer_weapon: str
:param victim_nick: Victim's nick
:type victim_nick: str
:param victim_weapon: Victim's weapon
:type victim_weapon: str
"""
pass
[docs] @asyncio.coroutine
def on_respawn(self, team, nick):
"""
On respawn, can be overridden using the :func:`.Server.event` decorator.
:param team: Player's team (Blue|Red|Yellow|Purple)
:type team: str
:param nick: Player's nick
:type nick: str
"""
pass
[docs] @asyncio.coroutine
def on_mapchange(self, mapname):
"""
On mapcange, can be overridden using the :func:`.Server.event` decorator.
:param mapname: Mapname
:type mapname: str
"""
pass
[docs] @asyncio.coroutine
def on_namechange(self, old_nick, new_nick):
"""
On name change, can be overridden using the :func:`.Server.event` decorator.
:param old_nick: Old nick
:type old_nick: str
:param new_nick: Old nick
:type new_nick: str
"""
pass
[docs] @asyncio.coroutine
def on_disconnect(self, nick):
"""
On disconnect, can be overridden using the :func:`.Server.event`decorator.
:param nick: Disconnected player's nick
:type nick: str
"""
pass
[docs] @asyncio.coroutine
def on_flag_grab(self, nick, flag):
"""
On flag grab, can be overridden using the :func:`.Server.event` decorator.
:param nick: Player's nick
:type nick: str
:param team: Flag color (Blue|Red|Yellow|Purple)
:type team: str
"""
pass
[docs] @asyncio.coroutine
def on_flag_drop(self, nick):
"""
On flag grab, can be overridden using the :func:`.Server.event` decorator.
:param nick: Player's nick
:type nick: str
:param team: Flag color (Blue|Red|Yellow|Purple)
:type team: str
"""
pass
[docs] @asyncio.coroutine
def on_round_end(self):
"""
Onround end, can be overridden using the :func:`.Server.event` decorator.
"""
pass
[docs] @asyncio.coroutine
def gamemode(self, gamemode):
"""
Onround end, can be overridden using the :func:`.Server.event` decorator.
:param gamemode: map's gamemode
:type gamemode: str
"""
pass
[docs] def event(self, func):
"""
Decorator, used for event registration.
:param func: function to register
:rtype: builtin_function_or_method
:example:
.. code-block:: python
:linenos:
>>> from dplib.server import Server
>>> s = Server(hostname='127.0.0.1', port=27910, logfile=r'qconsole27910.log', rcon_password='hello')
>>> @s.event
... def on_chat(nick, message):
... print((nick, message))
...
>>> s.run()
('mRokita', 'Hi')
"""
if func.__name__ in self.handlers.values():
setattr(self, func.__name__, asyncio.coroutine(func))
return func
else:
raise Exception('Event \'%s\' doesn\'t exist' % func.__name__)
[docs] def stop_listening(self):
"""
Stop the main loop
"""
self.__alive = False
def __perform_listeners(self, event_type, args, kwargs):
"""
Performs all pending listeners.
:param event_type: Event type, one of members :class:`ServerEvent`
:param args: Event info
:type args: tuple
:param kwargs: Event info
:type kwargs: dict
"""
to_remove = list()
for i, (check, future) in enumerate(self.__listeners[event_type]):
if not future.cancelled() and not future.done():
if check(*args):
future.set_result(kwargs)
else:
to_remove.append(i)
for i in reversed(to_remove):
self.__listeners[event_type].pop(i)
[docs] def nicks_valid(self, *nicks):
nicks_ingame = [p.nick for p in self.get_players()]
for nick in nicks:
if nick not in nicks_ingame:
return False
return True
@asyncio.coroutine
def __handle_event(self, event_type, args):
"""
Handles an event.
:param event_type: Event type, one of members :class:`ServerEvent`
:param args: Event info (re.findall() results)
"""
kwargs = dict()
if event_type == ServerEvent.CHAT:
if args[0] not in [p.nick for p in self.get_players()]:
return
kwargs = {
'nick': args[0],
'message': args[1],
}
self.__perform_listeners(ServerEvent.CHAT, args, kwargs)
elif event_type == ServerEvent.ELIM:
kwargs = {
'killer_nick': args[0],
'killer_weapon': args[1],
'victim_nick': args[2],
'victim_weapon': args[3],
'suicide': args[4],
}
self.__perform_listeners(ServerEvent.ELIM, args, kwargs)
elif event_type == ServerEvent.RESPAWN:
kwargs = {
'team': args[0],
'nick': args[1],
}
self.__perform_listeners(ServerEvent.RESPAWN, args, kwargs)
elif event_type == ServerEvent.ENTRANCE:
kwargs = {
'nick': args[0],
'build': args[1],
'addr': args[2],
}
self.__perform_listeners(ServerEvent.ENTRANCE, args, kwargs)
elif event_type == ServerEvent.FLAG_CAPTURED:
kwargs = {
'team': args[0],
'nick': args[1],
'flag': args[2],
}
self.__perform_listeners(ServerEvent.FLAG_CAPTURED, args, kwargs)
elif event_type == ServerEvent.ELIM_TEAMS_FLAG:
kwargs = {
'team': args[0],
'nick': args[1],
'points': int(args[2]),
}
self.__perform_listeners(ServerEvent.ELIM_TEAMS_FLAG, args, kwargs)
elif event_type == ServerEvent.ROUND_STARTED:
kwargs = dict()
self.__perform_listeners(ServerEvent.ROUND_STARTED, args, kwargs)
elif event_type == ServerEvent.TEAM_SWITCHED:
new_args = tuple([arg for arg in args if arg])
kwargs = {
'nick': new_args[0],
'old_team': new_args[1] if len(new_args) > 2 else 'Observer',
'new_team': new_args[2] if len(new_args) > 2 else new_args[1]
}
if kwargs['new_team'] == 'observing':
kwargs['new_team'] = 'Observer'
kwargs['old_team'] = None
self.__perform_listeners(ServerEvent.TEAM_SWITCHED, new_args, kwargs)
elif event_type == ServerEvent.GAME_END:
kwargs = {
'score_blue': None,
'score_red': None,
'score_purple': None,
'score_yellow': None,
}
teams = args.split(',')
for t in teams:
data = t.split(':')
if data[0] == 'Blue':
kwargs['score_blue'] = data[1]
elif data[0] == 'Red':
kwargs['score_red'] = data[1]
elif data[0] == 'Yellow':
kwargs['score_yellow'] = data[1]
elif data[0] == 'Purple':
kwargs['score_purple'] = data[1]
self.__perform_listeners(ServerEvent.GAME_END,
(kwargs['score_blue'],
kwargs['score_red'],
kwargs['score_yellow'],
kwargs['score_purple']), kwargs)
elif event_type == ServerEvent.MAPCHANGE:
kwargs = {
'mapname': args
}
self.__perform_listeners(ServerEvent.MAPCHANGE, (kwargs['mapname'],), kwargs)
elif event_type == ServerEvent.NAMECHANGE:
kwargs = {
'old_nick': args[0],
'new_nick': args[1]
}
self.__perform_listeners(ServerEvent.NAMECHANGE, (kwargs['old_nick'], kwargs['new_nick']), kwargs)
elif event_type == ServerEvent.DISCONNECT:
kwargs = {
'nick': args
}
self.__perform_listeners(ServerEvent.DISCONNECT, (kwargs['nick'],), kwargs)
elif event_type == ServerEvent.FLAG_GRAB:
kwargs = {
'nick': args[0],
'flag': args[1],
}
self.__perform_listeners(ServerEvent.FLAG_GRAB, (kwargs['nick'], kwargs['flag']), kwargs)
elif event_type == ServerEvent.FLAG_DROP:
kwargs = {
'nick': args
}
self.__perform_listeners(ServerEvent.FLAG_GRAB, (kwargs['nick'],), kwargs)
elif event_type == ServerEvent.ROUND_END:
kwargs = dict()
self.__perform_listeners(ServerEvent.ROUND_END, args, kwargs)
elif event_type == ServerEvent.GAMEMODE:
kwargs = {
'gamemode': args
}
self.__perform_listeners(ServerEvent.GAMEMODE, args, kwargs)
asyncio.ensure_future(self.get_event_handler(event_type)(**kwargs))
[docs] def get_event_handler(self, event_type):
return getattr(self, self.handlers[event_type])
@asyncio.coroutine
def __parse_line(self, line):
"""
Tries to match line with all event regexps.
:param line: Line from logs
"""
for r in REGEXPS:
results = r.findall(line)
e = REGEXPS[r]
for res in results:
if e == ServerEvent.CHAT: # For security reasons
if self.nicks_valid(res[0]):
yield from self.__handle_event(event_type=e, args=res)
return
else:
continue
yield from self.__handle_event(event_type=e, args=res)
[docs] def rcon(self, command, socket_timeout=3):
"""
Execute a console command using RCON.
:param command: Command
:param socket_timeout: Timeout for the UDP socket.
:return: Response from server
:rtype: str
:example:
.. code-block:: python
:linenos:
>>> from dplib.server import Server
>>> s = Server(hostname='127.0.0.1', port=27910, logfile=r'qconsole27910.log', rcon_password='hello')
>>> s.rcon('sv listuserip')
'ÿÿÿÿprint\\n mRokita [127.0.0.1:9419]\\nadmin is listing IP for mRokita [127.0.0.1:9419]\\n'
"""
sock = socket(AF_INET, SOCK_DGRAM)
sock.connect((self.__hostname, self.__port))
sock.settimeout(socket_timeout)
sock.send(bytes('\xFF\xFF\xFF\xFFrcon {} {}\n'.format(self.__rcon_password, command).encode('latin-1')))
ret = sock.recv(2048).decode('latin-1')
return ret
[docs] def status(self):
"""
Execute status query.
:return: Status string
:rtype: str
"""
sock = socket(AF_INET, SOCK_DGRAM)
sock.connect((self.__hostname, self.__port))
sock.settimeout(3)
sock.send(b'\xFF\xFF\xFF\xFFstatus\n')
return sock.recv(2048).decode('latin-1')
[docs] def new_map(self, map_name, gamemode=None):
"""
Changes the map using sv newmap <mapname> <gamemode>
:param map_name: map name, without .bsp
:param gamemode: Game mode
:type gamemode: GameMode
:return: Rcon response
:raises MapNotFoundError: When map is not found on the server
:rtype: str
"""
command = 'sv newmap {map}'
if gamemode:
command += ' {gamemode}'
res = self.rcon(command.format(map=map_name, gamemode=gamemode))
if 'Cannot find mapfile' in res or 'usage' in res:
raise MapNotFoundError
return res
[docs] def permaban(self, ip=None):
"""
Bans IP address or range of adresses and saves ban list to disk.
:param ip: IP address to ban
:return: Rcon response
:rtype: str
"""
if ip:
resp = self.rcon('addip %s' % ip)
resp += '\n' + self.rcon('writeban')
return resp
else:
raise TypeError('IP address is required.')
[docs] def remove_permaban(self, ip=None):
"""
Removes ban on IP address and saves ban list to disk.
:param ip: IP address to unban
:return: Rcon response
:rtype: str
"""
if ip:
resp = self.rcon('removeip %s' % ip)
resp += '\n' + self.rcon('writeban')
return resp
else:
raise TypeError('IP address is required.')
[docs] def tempoban(self, id=None, nick=None, duration=3):
"""
Temporarily bans a player with specified id using rcon
:param id: Player's id
:param nick: Player's nick
:param duration: Ban duration in minutes (defaults to 3)
:return: Rcon response
:rtype: str
"""
if type(duration) != int:
raise TypeError('Ban duration should be an integer, not a ' + str(type(duration)))
if nick:
id = self.get_ingame_info(nick).id
if id:
return self.rcon('tban %s %s' % (id, str(duration)))
else:
raise TypeError('Player id or nick is required.')
[docs] def remove_tempobans(self):
"""
Removes all temporary bans
:return: Rcon response
:rtype: str
"""
return self.rcon("removetbans")
[docs] def kick(self, id=None, nick=None):
"""
Kicks a player with id using rcon.
:param id: Player's id
:param nick: Player's nick
:return: Rcon response
:rtype: str
"""
if nick:
id = self.get_ingame_info(nick).id
if id:
return self.rcon('kick %s' % id)
else:
raise TypeError('Player id or nick is required.')
[docs] def say(self, message):
"""
Say a message
:param message: Text, can contain {C} - color char {U} - underline char {I} italic.
Remember to escape user input using :func:`dplib.parse.escape_braces`.
:rtype: str
:return: Rcon response
:example:
.. code-block:: python
:linenos:
>>> from dplib.server import Server
>>> s = Server(hostname='127.0.0.1', port=27910, logfile=r'qconsole27910.log', rcon_password='hello')
>>> s.say('{C}ARed text')
>>> s.say('{U}Underline{U}')
>>> s.say('{I}Italic{I}')
:ingame result:
.. image:: ..\..\doc\images\say_test.png
"""
return self.rcon('say "%s"' % render_text(message))
[docs] def cprint(self, message):
"""
Cprints a message.
:param message: Text, can contain {C} - color char {U} - underline char {I} italic.
Remember to escape user input using :func:`dplib.parse.escape_brac
:return: Rcon response
:rtype: str
"""
return self.rcon('sv cprint "%s"' % render_text(message))
[docs] def set_cvar(self, var, value):
"""
Set a server cvar
:param var: cvar name
:param value: value to set
:return: Rcon response
:rtype: str
"""
return self.rcon('set %s "%s"' % (var, value))
[docs] def get_cvar(self, var):
"""
Gets cvar value
:param var: Variable name
:type var: str
:return: Cvar value
:rtype: str
"""
res = self.rcon('"%s"' % var)
if re.match('^....print\\\nUnknown command \\"%s"\\.\\\n' % re.escape(var), res):
raise NameError('Cvar "%s" does not exist' % var)
return re.findall('^....print\\\n\\"%s\\" is \\"(.*?)\\"\\\n' % re.escape(var), res)[0]
@staticmethod
def __get_predicate(margs, check):
"""
Returns a comparator.
:param margs: Args to check
:param check: Check function
:return: Returns a function that compiles the check function and comparision strings
"""
def predicate(*args):
if len(args) != len(margs):
raise TypeError('predicate() takes %d positional arguments but %d were given' % (len(margs), len(args)))
result = True
for i, a in enumerate(margs):
if a:
result = result and a == args[i]
if callable(check):
result = result and check(*args)
return result
return predicate
[docs] @asyncio.coroutine
def wait_for_entrance(self, timeout=None, nick=None, build=None, addr=None, check=None):
"""
Waits for entrance.
:param timeout: Time to wait for entrance event, if exceeded, returns None.
:param nick: Player's nick.
:param build: Player's build.
:param addr: Player's address (IP:PORT)
:return:
"""
future = asyncio.Future(loop=self.loop)
margs = (nick, build, addr)
predicate = self.__get_predicate(margs, check)
self.__listeners[ServerEvent.ENTRANCE].append((predicate, future))
try:
data = yield from asyncio.wait_for(future, timeout,
loop=self.loop)
except asyncio.TimeoutError:
data = None
return data
[docs] @asyncio.coroutine
def wait_for_respawn(self, timeout=None, team=None, nick=None, check=None):
"""
Waits for respawn event.
:param timeout: Time to wait for respawn event, if exceeded, returns None.
:param team: Player's team.
:param nick: Player's nick.
:param check: Check function, ignored if none.
:return: Returns message info dict keys: ('team', 'nick').
:rtype: dict
"""
future = asyncio.Future(loop=self.loop)
margs = (team, nick)
predicate = self.__get_predicate(margs, check)
self.__listeners[ServerEvent.RESPAWN].append((predicate, future))
try:
data = yield from asyncio.wait_for(future, timeout,
loop=self.loop)
except asyncio.TimeoutError:
data = None
return data
[docs] @asyncio.coroutine
def wait_for_elim_teams_flag(self, timeout=None, team=None, nick=None, points=None, check=None):
"""
Waits for elim teams flag event.
:param timeout: Time to wait for event, if exceeded, returns None.
:param team: Player's team.
:param nick: Player's nick.
:param points: Points scored.
:type points: int
:param check: Check function, ignored if none.
:return: Returns message info dict keys: ('team', 'nick', 'points').
:rtype: dict
"""
future = asyncio.Future(loop=self.loop)
margs = (team, nick, points)
predicate = self.__get_predicate(margs, check)
self.__listeners[ServerEvent.ELIM_TEAMS_FLAG].append((predicate, future))
try:
data = yield from asyncio.wait_for(future, timeout,
loop=self.loop)
except asyncio.TimeoutError:
data = None
return data
[docs] @asyncio.coroutine
def wait_for_team_switched(self, timeout=None, nick=None, old_team=None, new_team=None, check=None):
"""
Waits for team switch event.
:param timeout: Time to wait for event, if exceeded, returns None.
:param old_team: Player's old team.
:param new_team: Player's new team.
:param nick: Player's nick.
:param check: Check function, ignored if none.
:return: Returns message info dict keys: ('nick', 'old_team', 'new_nick').
:rtype: dict
"""
future = asyncio.Future(loop=self.loop)
margs = (nick, old_team, new_team)
predicate = self.__get_predicate(margs, check)
self.__listeners[ServerEvent.TEAM_SWITCHED].append((predicate, future))
try:
data = yield from asyncio.wait_for(future, timeout,
loop=self.loop)
except asyncio.TimeoutError:
data = None
return data
[docs] @asyncio.coroutine
def wait_for_round_started(self, timeout=None, check=None):
"""
Waits for round start.
:param timeout: Time to wait for event, if exceeded, returns None.
:param check: Check function, ignored if none.
:return: Returns an empty dict.
:rtype: dict
"""
future = asyncio.Future(loop=self.loop)
margs = tuple()
predicate = self.__get_predicate(margs, check)
self.__listeners[ServerEvent.ROUND_STARTED].append((predicate, future))
try:
data = yield from asyncio.wait_for(future, timeout,
loop=self.loop)
except asyncio.TimeoutError:
data = None
return data
[docs] @asyncio.coroutine
def wait_for_flag_captured(self, timeout=None, team=None, nick=None, flag=None, check=None):
"""
Waits for flag capture.
:param timeout: Time to wait for event, if exceeded, returns None.
:param team: Player's team.
:param nick: Player's nick.
:param flag: Captured flag.
:param check: Check function, ignored if none.
:return: Returns an empty dict.
:rtype: dict
"""
future = asyncio.Future(loop=self.loop)
margs = (team, nick, flag)
predicate = self.__get_predicate(margs, check)
self.__listeners[ServerEvent.FLAG_CAPTURED].append((predicate, future))
try:
data = yield from asyncio.wait_for(future, timeout,
loop=self.loop)
except asyncio.TimeoutError:
data = None
return data
[docs] @asyncio.coroutine
def wait_for_game_end(self, timeout=None, score_blue=None, score_red=None, score_yellow=None, score_purple=None, check=None):
"""
Waits for game end.
:param timeout: Time to wait for event, if exceeded, returns None.
:param score_blue: Blue score
:param score_red: Red score.
:param score_yellow: Yellow score.
:param score_purple: Purple score.
:param check: Check function, ignored if none.
:return: Returns an empty dict.
:rtype: dict
"""
future = asyncio.Future(loop=self.loop)
margs = (score_blue, score_red, score_yellow, score_purple)
predicate = self.__get_predicate(margs, check)
self.__listeners[ServerEvent.GAME_END].append((predicate, future))
try:
data = yield from asyncio.wait_for(future, timeout,
loop=self.loop)
except asyncio.TimeoutError:
data = None
return data
[docs] @asyncio.coroutine
def wait_for_elim(self, timeout=None, killer_nick=None, killer_weapon=None, victim_nick=None, victim_weapon=None,
check=None):
"""
Waits for elimination event.
:param timeout: Time to wait for elimination event, if exceeded, returns None.
:param killer_nick: Killer's nick to match, ignored if None.
:param killer_weapon: Killer's weapon to match, ignored if None.
:param victim_nick: Victim's nick to match, ignored if None.
:param victim_weapon: Victim's weapon to match, ignored if None.
:param check: Check function, ignored if None.
:return: Returns message info dict keys: ('killer_nick', 'killer_weapon', 'victim_nick', 'victim_weapon')
:rtype: dict
"""
future = asyncio.Future(loop=self.loop)
margs = (killer_nick, killer_weapon, victim_nick, victim_weapon)
predicate = self.__get_predicate(margs, check)
self.__listeners[ServerEvent.ELIM].append((predicate, future))
try:
elim_info = yield from asyncio.wait_for(future, timeout, loop=self.loop)
except asyncio.TimeoutError:
elim_info = None
return elim_info
[docs] @asyncio.coroutine
def wait_for_mapchange(self, timeout=None, mapname=None, check=None):
"""
Waits for mapchange.
:param timeout: Time to wait for elimination event, if exceeded, returns None.
:param mapname: Killer's nick to match, ignored if None.
:param check: Check function, ignored if None.
:return: Returns message info dict keys: ('killer_nick', 'killer_weapon', 'victim_nick', 'victim_weapon')
:rtype: dict
"""
future = asyncio.Future(loop=self.loop)
margs = (mapname,)
predicate = self.__get_predicate(margs, check)
self.__listeners[ServerEvent.MAPCHANGE].append((predicate, future))
try:
mapchange_info = yield from asyncio.wait_for(future, timeout, loop=self.loop)
except asyncio.TimeoutError:
mapchange_info = None
return mapchange_info
[docs] @asyncio.coroutine
def wait_for_namechange(self, timeout=None, old_nick=None, new_nick=None, check=None):
"""
Waits for mapchange.
:param timeout: Time to wait for elimination event, if exceeded, returns None.
:param mapname: Killer's nick to match, ignored if None.
:param check: Check function, ignored if None.
:return: Returns message info dict keys: ('killer_nick', 'killer_weapon', 'victim_nick', 'victim_weapon')
:rtype: dict
"""
future = asyncio.Future(loop=self.loop)
margs = (old_nick, new_nick)
predicate = self.__get_predicate(margs, check)
self.__listeners[ServerEvent.NAMECHANGE].append((predicate, future))
try:
mapchange_info = yield from asyncio.wait_for(future, timeout, loop=self.loop)
except asyncio.TimeoutError:
mapchange_info = None
return mapchange_info
[docs] @asyncio.coroutine
def wait_for_message(self, timeout=None, nick=None, message=None, check=None):
"""
Waits for a message.
:param timeout: Time to wait for message, if exceeded, returns None.
:param nick: Player's nick to match, ignored if None
:type nick: str
:param message: Message text to match, ignored if None
:type message: str
:param check: Check function, ignored if None
:return: Returns message info dict keys: ('nick', 'message')
:rtype: dict
:example:
.. code-block:: python
:linenos:
@s.event
def on_chat(nick, message):
if message == '!start' and not elim_active:
msg = yield from s.wait_for_message(check=lambda n, m: m.startswith('!hi '))
s.say('Hi ' + msg['message'].split('!hi ')[1] + '!')
"""
future = asyncio.Future(loop=self.loop)
margs = (nick, message)
predicate = self.__get_predicate(margs, check)
self.__listeners[ServerEvent.CHAT].append((predicate, future))
try:
message = yield from asyncio.wait_for(future, timeout,
loop=self.loop)
except asyncio.TimeoutError:
message = None
return message
[docs] @asyncio.coroutine
def wait_for_flag_drop(self, timeout=None, nick=None, check=None):
"""
Waits for flag drop.
:param timeout: Time to wait for event, if exceeded, returns None.
:param nick: Player's nick.
:param flag: dropped flag.
:param check: Check function, ignored if none.
:return: Returns an empty dict.
:rtype: dict
"""
future = asyncio.Future(loop=self.loop)
margs = (nick)
predicate = self.__get_predicate(margs, check)
self.__listeners[ServerEvent.FLAG_DROP].append((predicate, future))
try:
data = yield from asyncio.wait_for(future, timeout,
loop=self.loop)
except asyncio.TimeoutError:
data = None
return data
[docs] def start(self, scan_old=False, realtime=True, debug=False):
"""
Main loop.
:param scan_old: Scan present logfile data
:type scan_old: bool
:param realtime: Wait for incoming logfile data
:type realtime: bool
"""
if not (self.__logfile_name or self.__pty_master):
raise AttributeError("Logfile name or a Popen process is required.")
self.__alive = True
if self.__logfile_name:
self.__log_file = open(self.__logfile_name, 'rb')
if self.__log_file and scan_old:
self.__log_file.readlines()
buf = ''
if realtime:
while self.__alive:
try:
buf += self._read_log()
lines = buf.splitlines(True)
line = ''
for line in lines:
if debug:
print("[DPLib] %s" % line.strip())
yield from self.__parse_line(line)
if not line or line[-1] != '\n':
buf = line
else:
buf = ''
yield from asyncio.sleep(0.05)
except OSError as e:
raise e
if self.__log_file:
self.__log_file.close()
if self.__pty_master:
os.close(self.__pty_master)
def _read_log(self):
if self.__log_file:
return self.__log_file.readline().decode('latin-1')
elif self.__pty_master:
r, w, x = select.select([self.__pty_master], [], [], 0.01)
if r:
return os.read(self.__pty_master, 1024).decode('latin-1')
else:
return ''
[docs] def get_players(self):
"""
Gets playerlist.
:return: List of :class:`.Player` instances
:rtype: list
"""
response = self.rcon('sv players')
response = re.findall('(\d+) \\(?(.*?)\\)?\\] \\* (?:OP \d+, )?(.+) \\((b\d+)\\)', response)
players = list()
for p_data in response:
player = Player(nick=p_data[2],
id=p_data[0],
dplogin=p_data[1],
build=p_data[3],
server=self)
players.append(player)
return players
[docs] def get_simple_playerlist(self):
"""
Get a list of player names
:return: List of nicks
:rtype: list
"""
status = self.get_status()
players = status['players']
playerlist = []
for p in players:
playerlist.append(p['name'])
return playerlist
[docs] def get_status(self):
"""
Gets server status
:example:
.. code-block:: python
:linenos:
>>> s = Server(hostname='127.0.0.1', port=27910, logfile=r'C:\Games\Paintball2\pball\qconsole27910.log', rcon_password='hello')
>>> s.get_status()
{'players': [{'score': '0', 'ping': '13', 'name': 'mRokita'}], 'sv_certificated': '1', 'mapname': 'beta/wobluda_fix', 'TimeLeft': '20:00', '_scores': 'Red:0 Blue:0 ', 'gamename': 'Digital Paint Paintball 2 v1.930(186)', 'gameversion': 'DPPB2 v1.930(186)', 'sv_login': '1', 'needpass': '0', 'gamedate': 'Aug 10 2015', 'protocol': '34', 'version': '2.00 x86 Aug 10 2015 Win32 RELEASE (41)', 'hostname': 'asdfgh', 'elim': 'airtime', 'fraglimit': '50', 'timelimit': '20', 'gamedir': 'pball', 'game': 'pball', 'maxclients': '8'}
:return: status dict
:rtype: dict
"""
dictionary = {}
players = []
response = self.status().split('\n')[1:]
variables = response[0]
players_str = (response[1:])
for i in players_str:
if not i:
continue
temp_dict = {}
cleaned_name = decode_ingame_text(i)
separated = cleaned_name.split(' ')
temp_dict['score'] = separated[0]
temp_dict['ping'] = separated[1]
temp_dict['name'] = cleaned_name.split("%s %s " % (separated[0], separated[1]))[1][1:-1]
players.append(temp_dict)
dictionary['players'] = players
variables = variables.split('\\')[1:]
for i in range(0, len(variables), 2):
dictionary[variables[i]] = variables[i + 1]
return dictionary
[docs] def get_ingame_info(self, nick):
"""
Get ingame info about a player with nickname
:param nick: Nick
:return: An instance of :class:`.Player`
"""
players = self.get_players()
for p in players:
if p.nick == nick:
return p
return None
[docs] def make_secure(self, timeout=10):
"""
This function fixes some compatibility and security issues on DP server side
- Adds "mapchange" to sv_blockednames
- Sets sl_logging to 1
All variables are set using the rcon protocol, use this function if you want to wait for the server to start.
:param timeout: Timeout in seconds
"""
sl_logging_set = False
sv_blockednames_set = False
self.__is_secure = False
start_time = time()
while not (sl_logging_set and sv_blockednames_set) and time() - start_time < timeout:
try:
if not sl_logging_set:
sl_logging = self.get_cvar('sl_logging')
if sl_logging != '1':
self.set_cvar('sl_logging', '1')
else:
sl_logging_set = True
if not sv_blockednames_set:
blockednames = self.get_cvar('sv_blockednames')
if not 'maploaded' in blockednames:
self.set_cvar('sv_blockednames', ','.join([blockednames, 'maploaded']))
else:
sv_blockednames_set = True
except ConnectionError or timeout:
pass
if not (sl_logging_set and sv_blockednames_set):
raise SecurityCheckError(
"Configuring the DP server failed,"
" check if the server is running "
"and the rcon_password is correct.")
else:
self.__is_secure = True
[docs] def run(self, scan_old=False, realtime=True, debug=False, make_secure=True):
"""
Runs the main loop using asyncio.
:param scan_old: Scan present logfile data
:type scan_old: bool
:param realtime: Wait for incoming logfile data
:type realtime: bool
"""
if make_secure and not self.__rcon_password:
raise AttributeError(
"Setting the rcon_password is required to secure DPLib."
" You have to either set a rcon_password or add set"
" \"sl_logging 1; set sv_blockednames mapname\" "
"to your DP server config and use Server.run with"
" make_secure=False")
if make_secure:
self.make_secure()
self.loop.run_until_complete(self.start(scan_old, realtime, debug))