Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions sc2reader/events/game.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@ def __init__(self, frame, pid):
self.name = self.__class__.__name__

def _str_prefix(self):
if self.player:
player_name = self.player.name if getattr(self, 'pid', 16) != 16 else "Global"
if getattr(self, 'pid', 16) == 16:
player_name = "Global"
elif self.player and not self.player.name:
player_name = "Player {0} - ({1})".format(self.player.pid, self.player.play_race)
elif self.player:
player_name = self.player.name
else:
player_name = "no name"
return "{0}\t{1:<15} ".format(Length(seconds=int(self.frame / 16)), player_name)
Expand Down
128 changes: 80 additions & 48 deletions sc2reader/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,19 @@ def __init__(self, replay_file, filename=None, load_level=4, engine=sc2reader.en
self.length = self.game_length = self.real_length = utils.Length(seconds=int(self.frames/fps))

# Load basic details if requested
# .backup files are read in case the main files are missing or removed
if load_level >= 1:
self.load_level = 1
for data_file in ['replay.initData', 'replay.details', 'replay.attributes.events']:
files = [
'replay.initData.backup',
'replay.details.backup',
'replay.attributes.events',
'replay.initData',
'replay.details'
]
for data_file in files:
self._read_data(data_file, self._get_reader(data_file))
self.load_details()
self.load_all_details()
self.datapack = self._get_datapack()

# Can only be effective if map data has been loaded
Expand Down Expand Up @@ -311,18 +319,24 @@ def __init__(self, replay_file, filename=None, load_level=4, engine=sc2reader.en

engine.run(self)

def load_details(self):
def load_init_data(self):
if 'replay.initData' in self.raw_data:
initData = self.raw_data['replay.initData']
options = initData['game_description']['game_options']
self.amm = options['amm']
self.ranked = options['ranked']
self.competitive = options['competitive']
self.practice = options['practice']
self.cooperative = options['cooperative']
self.battle_net = options['battle_net']
self.hero_duplicates_allowed = options['hero_duplicates_allowed']
elif 'replay.initData.backup' in self.raw_data:
initData = self.raw_data['replay.initData.backup']
else:
return

options = initData['game_description']['game_options']
self.amm = options['amm']
self.ranked = options['ranked']
self.competitive = options['competitive']
self.practice = options['practice']
self.cooperative = options['cooperative']
self.battle_net = options['battle_net']
self.hero_duplicates_allowed = options['hero_duplicates_allowed']

def load_attribute_events(self):
if 'replay.attributes.events' in self.raw_data:
# Organize the attribute data to be useful
self.attributes = defaultdict(dict)
Expand All @@ -337,57 +351,75 @@ def load_details(self):
self.is_ladder = (self.category == "Ladder")
self.is_private = (self.category == "Private")

def load_details(self):
if 'replay.details' in self.raw_data:
details = self.raw_data['replay.details']
elif 'replay.details.backup' in self.raw_data:
details = self.raw_data['replay.details.backup']
else:
return

self.map_name = details['map_name']
self.region = details['cache_handles'][0].server.lower()
self.map_hash = details['cache_handles'][-1].hash
self.map_file = details['cache_handles'][-1]

# Expand this special case mapping
if self.region == 'sg':
self.region = 'sea'

self.map_name = details['map_name']
dependency_hashes = [d.hash for d in details['cache_handles']]
if hashlib.sha256('Standard Data: Void.SC2Mod'.encode('utf8')).hexdigest() in dependency_hashes:
self.expansion = 'LotV'
elif hashlib.sha256('Standard Data: Swarm.SC2Mod'.encode('utf8')).hexdigest() in dependency_hashes:
self.expansion = 'HotS'
elif hashlib.sha256('Standard Data: Liberty.SC2Mod'.encode('utf8')).hexdigest() in dependency_hashes:
self.expansion = 'WoL'
else:
self.expansion = ''

self.region = details['cache_handles'][0].server.lower()
self.map_hash = details['cache_handles'][-1].hash
self.map_file = details['cache_handles'][-1]
self.windows_timestamp = details['file_time']
self.unix_timestamp = utils.windows_to_unix(self.windows_timestamp)
self.end_time = datetime.utcfromtimestamp(self.unix_timestamp)

# Expand this special case mapping
if self.region == 'sg':
self.region = 'sea'
# The utc_adjustment is either the adjusted windows timestamp OR
# the value required to get the adjusted timestamp. We know the upper
# limit for any adjustment number so use that to distinguish between
# the two cases.
if details['utc_adjustment'] < 10**7*60*60*24:
self.time_zone = details['utc_adjustment']/(10**7*60*60)
else:
self.time_zone = (details['utc_adjustment']-details['file_time'])/(10**7*60*60)

dependency_hashes = [d.hash for d in details['cache_handles']]
if hashlib.sha256('Standard Data: Void.SC2Mod'.encode('utf8')).hexdigest() in dependency_hashes:
self.expansion = 'LotV'
elif hashlib.sha256('Standard Data: Swarm.SC2Mod'.encode('utf8')).hexdigest() in dependency_hashes:
self.expansion = 'HotS'
elif hashlib.sha256('Standard Data: Liberty.SC2Mod'.encode('utf8')).hexdigest() in dependency_hashes:
self.expansion = 'WoL'
else:
self.expansion = ''

self.windows_timestamp = details['file_time']
self.unix_timestamp = utils.windows_to_unix(self.windows_timestamp)
self.end_time = datetime.utcfromtimestamp(self.unix_timestamp)

# The utc_adjustment is either the adjusted windows timestamp OR
# the value required to get the adjusted timestamp. We know the upper
# limit for any adjustment number so use that to distinguish between
# the two cases.
if details['utc_adjustment'] < 10**7*60*60*24:
self.time_zone = details['utc_adjustment']/(10**7*60*60)
else:
self.time_zone = (details['utc_adjustment']-details['file_time'])/(10**7*60*60)
self.game_length = self.length
self.real_length = utils.Length(seconds=int(self.length.seconds/GAME_SPEED_FACTOR[self.expansion][self.speed]))
self.start_time = datetime.utcfromtimestamp(self.unix_timestamp-self.real_length.seconds)
self.date = self.end_time # backwards compatibility

self.game_length = self.length
self.real_length = utils.Length(seconds=int(self.length.seconds/GAME_SPEED_FACTOR[self.expansion][self.speed]))
self.start_time = datetime.utcfromtimestamp(self.unix_timestamp-self.real_length.seconds)
self.date = self.end_time # backwards compatibility
def load_all_details(self):
self.load_init_data()
self.load_attribute_events()
self.load_details()

def load_map(self):
self.map = self.factory.load_map(self.map_file, **self.opt)

def load_players(self):
# If we don't at least have details and attributes_events we can go no further
if 'replay.details' not in self.raw_data:
# We can use the backup detail files if the main files have been removed
if 'replay.details' in self.raw_data:
details = self.raw_data['replay.details']
elif 'replay.details.backup' in self.raw_data:
details = self.raw_data['replay.details.backup']
else:
return
if 'replay.attributes.events' not in self.raw_data:
return
if 'replay.initData' not in self.raw_data:
if 'replay.initData' in self.raw_data:
initData = self.raw_data['replay.initData']
elif 'replay.initData.backup' in self.raw_data:
initData = self.raw_data['replay.initData.backup']
else:
return

self.clients = list()
Expand All @@ -397,8 +429,6 @@ def load_players(self):
# information. detail_id marks the current index into this data.
detail_id = 0
player_id = 1
details = self.raw_data['replay.details']
initData = self.raw_data['replay.initData']

# Assume that the first X map slots starting at 1 are player slots
# so that we can assign player ids without the map
Expand Down Expand Up @@ -568,6 +598,8 @@ def register_default_readers(self):
"""Registers factory default readers."""
self.register_reader('replay.details', readers.DetailsReader(), lambda r: True)
self.register_reader('replay.initData', readers.InitDataReader(), lambda r: True)
self.register_reader('replay.details.backup', readers.DetailsReader(), lambda r: True)
self.register_reader('replay.initData.backup', readers.InitDataReader(), lambda r: True)
self.register_reader('replay.tracker.events', readers.TrackerEventsReader(), lambda r: True)
self.register_reader('replay.message.events', readers.MessageEventsReader(), lambda r: True)
self.register_reader('replay.attributes.events', readers.AttributesEventsReader(), lambda r: True)
Expand Down
Binary file added test_replays/4.1.2.60604/1.SC2Replay
Binary file not shown.
42 changes: 42 additions & 0 deletions test_replays/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import sc2reader
from sc2reader.exceptions import CorruptTrackerFileError
from sc2reader.events.game import GameEvent
from sc2reader.objects import Player

sc2reader.log_utils.log_to_console("INFO")

Expand Down Expand Up @@ -603,6 +605,41 @@ def test_70154(self):
factory = sc2reader.factories.SC2Factory()
replay = factory.load_replay(replayfilename)

def test_anonymous_replay(self):
replayfilename = "test_replays/4.1.2.60604/1.SC2Replay"
factory = sc2reader.factories.SC2Factory()
replay = factory.load_replay(replayfilename)

def test_game_event_string(self):
time = "00.01"
# Global
player = MockPlayer()
player.name = "TestPlayer"
player.play_race = "TestRace"
event = GameEvent(16, 16)
event.player = player
self.assertEqual("{0}\t{1:<15} ".format(time, "Global"), event._str_prefix())

# Player with name
player = MockPlayer()
player.name = "TestPlayer"
player.play_race = "TestRace"
event = GameEvent(16, 1)
event.player = player
self.assertEqual("{0}\t{1:<15} ".format(time, player.name), event._str_prefix())

# No Player
player = MockPlayer()
event = GameEvent(16, 1)
self.assertEqual("{0}\t{1:<15} ".format(time, "no name"), event._str_prefix())

# Player without name
player = MockPlayer()
player.play_race = "TestRace"
player.pid = 1
event = GameEvent(16, 1)
event.player = player
self.assertEqual("{0}\tPlayer {1} - ({2}) ".format(time, player.pid, player.play_race), event._str_prefix())

class TestGameEngine(unittest.TestCase):
class TestEvent(object):
Expand Down Expand Up @@ -658,6 +695,11 @@ def test_plugin1(self):
self.assertEqual(replay.plugin_result['TestPlugin1'], (1, dict(msg="Fail!")))
self.assertEqual(replay.plugin_result['TestPlugin2'], (0, dict()))

class MockPlayer(object):
def __init__(self):
self.name = None
self.play_race = None
self.pid = None

if __name__ == '__main__':
unittest.main()