diff --git a/sc2reader/engine/plugins/__init__.py b/sc2reader/engine/plugins/__init__.py index d506b69b..be6d155f 100644 --- a/sc2reader/engine/plugins/__init__.py +++ b/sc2reader/engine/plugins/__init__.py @@ -5,3 +5,5 @@ from sc2reader.engine.plugins.selection import SelectionTracker from sc2reader.engine.plugins.context import ContextLoader from sc2reader.engine.plugins.gameheart import GameHeartNormalizer +from sc2reader.engine.plugins.supply import SupplyTracker +from sc2reader.engine.plugins.creeptracker import CreepTracker diff --git a/sc2reader/engine/plugins/creeptracker.py b/sc2reader/engine/plugins/creeptracker.py new file mode 100644 index 00000000..42eee635 --- /dev/null +++ b/sc2reader/engine/plugins/creeptracker.py @@ -0,0 +1,262 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, unicode_literals, division + +from itertools import dropwhile +from sets import Set +from Image import open as PIL_open +from Image import ANTIALIAS +from StringIO import StringIO +from collections import defaultdict +from sc2reader.engine import PluginExit + + +# The creep tracker plugin +class CreepTracker(object): + ''' + The Creep tracker populates player.max_creep_spread and + player.creep_spread by minute + This uses the creep_tracker class to calculate the features + ''' + name = "CreepTracker" + + def handleInitGame(self, event, replay): + if len(replay.tracker_events) == 0 : + yield PluginExit(self, code=0, details={}) + + if replay.map is None: + replay.load_map() + self.creepTracker = creep_tracker(replay) + for player in replay.players: + self.creepTracker.init_cgu_lists(player.pid) + + def handleUnitDiedEvent(self, event, replay): + self.creepTracker.remove_from_list(event.unit_id,event.second) + + def handleUnitInitEvent(self,event,replay): + if event.unit_type_name in ["CreepTumor", "Hatchery","NydusCanal"] : + self.creepTracker.add_to_list(event.control_pid,event.unit_id,\ + (event.x, event.y), event.unit_type_name,event.second) + + def handleUnitBornEvent(self,event,replay): + if event.unit_type_name== "Hatchery": + self.creepTracker.add_to_list(event.control_pid, event.unit_id,\ + (event.x,event.y),event.unit_type_name,event.second) + + def handleEndGame(self, event, replay): + for player_id in replay.player: + self.creepTracker.reduce_cgu_per_minute(player_id) + for player in replay.players: + player.creep_spread_by_minute = self.creepTracker.get_creep_spread_area(player.pid) + for player in replay.players: + if player.creep_spread_by_minute: + player.max_creep_spread = max(player.creep_spread_by_minute.items(),key=lambda x:x[1]) + else: + ## Else statement is for players with no creep spread(ie: not Zerg) + player.max_creep_spread =0 + +## The class used to used to calculate the creep spread +class creep_tracker(): + def __init__(self,replay): + #if the debug option is selected, minimaps will be printed to a file + ##and a stringIO containing the minimap image will be saved for + ##every minite in the game and the minimap with creep highlighted + ## will be printed out. + self.debug = replay.opt.debug + ##This list contains creep spread area for each player + self.creep_spread_by_minute = dict() + ## this list contains a minimap highlighted with creep spread for each player + if self.debug: + self.creep_spread_image_by_minute = dict() + ## This list contains all the active cgus in every time frame + self.creep_gen_units = dict() + ## Thist list corresponds to creep_gen_units storing the time of each CGU + self.creep_gen_units_times= dict() + ## convert all possible cgu radii into a sets of coordinates centred around the origin, + ## in order to use this with the CGUs, the centre point will be + ## subtracted with coordinates of cgus created in game + self.unit_name_to_radius={'CreepTumor': 10,"Hatchery":8,"NydusCanal": 5} + self.radius_to_coordinates= dict() + for x in self.unit_name_to_radius: + self.radius_to_coordinates[self.unit_name_to_radius[x]] =\ + self.radius_to_map_positions(self.unit_name_to_radius[x]) + + #Get map information + replayMap = replay.map + # extract image from replay package + mapsio = StringIO(replayMap.minimap) + im = PIL_open(mapsio) + ##remove black box around minimap + cropped = im.crop(im.getbbox()) + cropsize = cropped.size + self.map_height = 100.0 + # resize height to MAPHEIGHT, and compute new width that + # would preserve aspect ratio + self.map_width = int(cropsize[0] * (float(self.map_height) / cropsize[1])) + self.mapSize =self.map_height * self.map_width + ## the following parameters are only needed if minimaps have to be printed + minimapSize = ( self.map_width,int(self.map_height) ) + self.minimap_image = cropped.resize(minimapSize, ANTIALIAS) + mapOffsetX= replayMap.map_info.camera_left + mapOffsetY = replayMap.map_info.camera_bottom + mapCenter = [mapOffsetX + cropsize[0]/2.0, mapOffsetY + cropsize[1]/2.0] + # this is the center of the minimap image, in pixel coordinates + imageCenter = [(self.map_width/2), self.map_height/2] + # this is the scaling factor to go from the SC2 coordinate + # system to pixel coordinates + self.image_scale = float(self.map_height) / cropsize[0] + self.transX =imageCenter[0] + self.image_scale * (mapCenter[0]) + self.transY = imageCenter[1] + self.image_scale * (mapCenter[1]) + + def radius_to_map_positions(self,radius): + ## this function converts all radius into map coordinates + ## centred around the origin that the creep can exist + ## the cgu_radius_to_map_position function will simply + ## substract every coordinate with the centre point of the tumour + output_coordinates = list() + # Sample a square area using the radius + for x in range (-radius,radius): + for y in range (-radius, radius): + if (x**2 + y**2) <= (radius * radius): + output_coordinates.append((x,y)) + return output_coordinates + + def init_cgu_lists(self, player_id): + self.creep_spread_by_minute[player_id] = defaultdict(int) + if self.debug: + self.creep_spread_image_by_minute[player_id] = defaultdict(StringIO) + self.creep_gen_units[player_id] = list() + self.creep_gen_units_times[player_id] = list() + + def add_to_list(self,player_id,unit_id,unit_location,unit_type,event_time): + # This functions adds a new time frame to creep_generating_units_list + # Each time frame contains a list of all CGUs that are alive + length_cgu_list = len(self.creep_gen_units[player_id]) + if length_cgu_list==0: + self.creep_gen_units[player_id].append([(unit_id, unit_location,unit_type)]) + self.creep_gen_units_times[player_id].append(event_time) + else: + #if the list is not empty, take the previous time frame, + # add the new CGU to it and append it as a new time frame + previous_list = self.creep_gen_units[player_id][length_cgu_list-1][:] + previous_list.append((unit_id, unit_location,unit_type)) + self.creep_gen_units[player_id].append(previous_list) + self.creep_gen_units_times[player_id].append(event_time) + + def remove_from_list(self,unit_id,time_frame): + ## This function searches is given a unit ID for every unit who died + ## the unit id will be searched in cgu_gen_units for matches + ## if there are any, that unit will be removed from active CGUs + ## and appended as a new time frame + for player_id in self.creep_gen_units: + length_cgu_list = len(self.creep_gen_units[player_id]) + if length_cgu_list ==0: + break + cgu_per_player = self.creep_gen_units[player_id]\ + [length_cgu_list-1] + creep_generating_died = dropwhile(lambda x: x[0] != \ + unit_id, cgu_per_player) + for creep_generating_died_unit in creep_generating_died: + cgu_per_player.remove(creep_generating_died_unit) + self.creep_gen_units[player_id].append(cgu_per_player) + self.creep_gen_units_times[player_id].append(time_frame) + + def reduce_cgu_per_minute(self,player_id): + #the creep_gen_units_lists contains every single time frame + #where a CGU is added, + #To reduce the calculations required, the time frame containing + #the most cgus every minute will be used to represent that minute + last_minute_found = 0 + cgu_per_player_new = list() + cgu_time_per_player_new = list() + cgu_last_minute_list = list() + for index,cgu_time in enumerate(self.creep_gen_units_times[player_id]): + cgu_last_minute_list.append(self.creep_gen_units[player_id][index]) + if (cgu_time//60) ==0: + cgu_per_player_new.append(self.creep_gen_units[player_id][0]) + cgu_time_per_player_new.append(0) + cgu_last_minute_list = list() + if (cgu_time//60)>last_minute_found: + cgu_max_in_min = max(cgu_last_minute_list,key = len) + cgu_per_player_new.append(cgu_max_in_min) + cgu_max_in_min_index = self.creep_gen_units[player_id].index(cgu_max_in_min) + cgu_time_per_player_new.append(self.creep_gen_units_times[player_id][cgu_max_in_min_index]) + last_minute_found = cgu_time//60 + cgu_last_minute_list=list() + + self.creep_gen_units[player_id] = cgu_per_player_new + self.creep_gen_units_times[player_id] = cgu_time_per_player_new + + def get_creep_spread_area(self,player_id): + ## iterates through all cgus and and calculate the area + for index,cgu_per_player in enumerate(self.creep_gen_units[player_id]): + # convert cgu list into centre of circles and radius + cgu_radius = map(lambda x: (x[1], self.unit_name_to_radius[x[2]]), cgu_per_player) + # convert event coords to minimap coords + cgu_radius = self.convert_cgu_radius_event_to_map_coord(cgu_radius) + creep_area_positions = self.cgu_radius_to_map_positions(cgu_radius,self.radius_to_coordinates) + cgu_last_event_time = self.creep_gen_units_times[player_id][index]//60 + if self.debug: + self.print_image(creep_area_positions,player_id,cgu_last_event_time) + creep_area = len(creep_area_positions) + self.creep_spread_by_minute[player_id][cgu_last_event_time]=float(creep_area)/self.mapSize*100 + return self.creep_spread_by_minute[player_id] + + def cgu_radius_to_map_positions(self,cgu_radius,radius_to_coordinates): + ## This function uses the output of radius_to_map_positions + total_points_on_map = Set() + if len(cgu_radius)==0: + return [] + for cgu in cgu_radius: + point = cgu[0] + radius = cgu[1] + ## subtract all radius_to_coordinates with centre of + ## cgu radius to change centre of circle + cgu_map_position = map( lambda x:(x[0]+point[0],x[1]+point[1])\ + ,self.radius_to_coordinates[radius]) + total_points_on_map= total_points_on_map | Set(cgu_map_position) + return total_points_on_map + + def print_image(self,total_points_on_map,player_id,time_stamp): + minimap_copy = self.minimap_image.copy() + # Convert all creeped points to white + for points in total_points_on_map: + x = points[0] + y = points[1] + x,y = self.check_image_pixel_within_boundary(x,y) + minimap_copy.putpixel((x,y) , (255, 255, 255)) + creeped_image = minimap_copy + # write creeped minimap image to a string as a png + creeped_imageIO = StringIO() + creeped_image.save(creeped_imageIO, "png") + self.creep_spread_image_by_minute[player_id][time_stamp]=creeped_imageIO + ##debug for print out the images + f = open(str(player_id)+'image'+str(time_stamp)+'.png','w') + f.write(creeped_imageIO.getvalue()) + creeped_imageIO.close() + f.close() + + def check_image_pixel_within_boundary(self,pointX, pointY): + pointX = 0 if pointX <0 else pointX + pointY=0 if pointY <0 else pointY + # put a minus 1 to make sure the pixel is not directly on the edge + pointX = int(self.map_width-1 if pointX >= self.map_width else pointX) + pointY = int(self.map_height-1 if pointY >= self.map_height else pointY) + return pointX,pointY + + def convert_cgu_radius_event_to_map_coord(self,cgu_radius): + cgu_radius_new = list() + for cgu in cgu_radius: + x = cgu[0][0] + y = cgu[0][1] + (x,y) = self.convert_event_coord_to_map_coord(x,y) + cgu = ((x,y),cgu[1]) + cgu_radius_new.append(cgu) + return cgu_radius_new + + def convert_event_coord_to_map_coord(self,x,y): + imageX = int(self.map_height - self.transX + self.image_scale * x) + imageY = int(self.transY - self.image_scale * y) + return imageX, imageY + + + diff --git a/sc2reader/engine/plugins/supply.py b/sc2reader/engine/plugins/supply.py new file mode 100644 index 00000000..394a3a64 --- /dev/null +++ b/sc2reader/engine/plugins/supply.py @@ -0,0 +1,109 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, print_function, unicode_literals, division + +from collections import defaultdict + +class SupplyTracker(object): + def add_to_units_alive(self,event,replay): + unit_name = event.unit_type_name + if unit_name in self.unit_name_to_supply: + supplyCount = self.unit_name_to_supply[event.unit_type_name][0] + buildTime = self.unit_name_to_supply[event.unit_type_name][1] + time_built = event.second - buildTime + time_built= 0 if time_built < 0 else time_built + new_unit = (supplyCount, event.unit_id) + self.units_alive[event.control_pid].append(new_unit) + total_supply = sum([x[0] for x in self.units_alive[event.control_pid]]) + replay.players[event.control_pid-1].current_food_used[time_built]= total_supply + print("Second",time_built,replay.players[event.control_pid-1],"SUPPLY",replay.players[event.control_pid-1].current_food_used[time_built]) + + elif unit_name in self.supply_gen_unit: + ## see if the unit provides supply + supply_gen_count = self.supply_gen_unit[event.unit_type_name][0] + build_time = self.supply_gen_unit[event.unit_type_name][1] + time_complete = event.second+ build_time + supply_gen_unit = (supply_gen_count,event.unit_id) + self.supply_gen[event.control_pid].append(supply_gen_unit) + total_supply_gen = sum([x[0] for x in self.supply_gen[event.control_pid]]) + replay.players[event.control_pid-1].current_food_made[time_complete]= total_supply_gen + print("Second",time_complete, replay.players[event.control_pid-1],"Built",replay.players[event.control_pid-1].current_food_made[time_complete]) + else: + print("Unit name {0} does not exist".format(event.unit_type_name)) + return + + def remove_from_units_alive(self,event,replay): + died_unit_id = event.unit_id + for player in replay.player: + dead_unit = filter(lambda x:x[1]==died_unit_id,self.units_alive[player]) + if dead_unit: + self.units_alive[player].remove(dead_unit[0]) + total_supply = sum([x[0] for x in self.units_alive[player]]) + + replay.players[player-1].current_food_used[event.second] = total_supply + print("Second", event.second, "Killed", event.unit.name,"SUPPLY",replay.players[player-1].current_food_used[event.second]) + + dead_supply_gen=filter(lambda x:x[1]==died_unit_id, self.supply_gen[player]) + if dead_supply_gen: + self.supply_gen[player].remove(dead_supply_gen[0]) + total_supply_gen = sum([x[0] for x in self.supply_gen[player]]) + replay.players[player-1].current_food_made[event.second] = total_supply_gen + print("Second", event.second, "Killed", event.unit.name,"SUPPLY",replay.players[player-1].current_food_made[event.second]) + + def handleInitGame(self, event, replay): + ## This dictionary contains te supply of every unit + self.unit_name_to_supply = { + #Zerg + "Drone":(1,17),"Zergling":(1,25),"Baneling":(0,20),"Queen":(2,50),\ + "Hydralisk":(2,33),"Roach":(2,27),"Infestor":(2,50),"Mutalisk":(2,33),\ + "Corruptor":(2,40),"Utralisk":(6,55),"Broodlord":(2,34),\ + "SwarmHost":(3,40), "Viper":(3,40),\ + #Terran + "SCV":(1,17),"Marine":(1,25),"Marauder":(2,30),"SiegeTank":(2,45),\ + "Reaper":(1,45),"Ghost":(2,40),"Hellion":(2,30),"Thor":(6,60),\ + "Viking":(2,42),"Medivac":(2,42),"Raven":(2,60), "Banshee":(3,60),\ + "Battlecruiser":(6,90), "Hellbat":(2,30),"WidowMine":(2,40),\ + #Protoss + "Probe":(1,17),"Zealot":(2,38),"Stalker":(2,42),"Sentry":(2,42),\ + "Observer":(1,30), "Immortal":(4,55),"WarpPrism":(2,50),\ + "Colossus":(6,75), "Phoenix":(2,35),"VoidRay":(4,60), \ + "HighTemplar":(2,55),"DarkTemplar":(2,55), "Archon":(4,12),\ + "Carrier":(6,120), "Mothership":(6,100),"MothershipCore":(2,30),\ + "Oracle":(3,50),"Tempest":(4,60)} + + self.supply_gen_unit = { + #overlord build time is zero because event for units are made when + # it is born not when it's created + "Overlord":(8,0),"Hatchery":(2,100), \ + "SupplyDepot":(8,30),"CommandCenter":(11,100),\ + "Pylon":(8,25),"Nexus":(10,100) + } + ## This list contains a turple of the units supply and unit ID. + ## the purpose of the list is to know which user owns which unit + ## so that when a unit dies, that + self.units_alive = dict() + ## + self.supply_gen = dict() + for player in replay.players: + self.supply_gen[player.pid] = list() + self.units_alive[player.pid] = list() + player.current_food_used = defaultdict(int) + player.current_food_made = defaultdict(int) + player.time_supply_capped = int() + + def handleUnitInitEvent(self,event,replay): + #print ("Init",event.unit_type_name, event.unit_id) + self.add_to_units_alive(event,replay) + + def handleUnitBornEvent(self,event,replay): + #print ("Born",event.unit_type_name,event.unit_id) + self.add_to_units_alive(event,replay) + + def handleUnitDiedEvent(self,event,replay): + if event.unit.name not in self.unit_name_to_supply: + return + self.remove_from_units_alive(event,replay) + + def handleEndGame(self, event, replay): + for player in replay.players: + player.current_food_used = sorted(player.current_food_used.iteritems(), key=lambda x: x[0]) + player.current_food_made = sorted(player.current_food_made.iteritems(), key=lambda x:x[0]) diff --git a/sc2reader/factories/plugins/replay.py b/sc2reader/factories/plugins/replay.py index 81a05c54..8267dff3 100644 --- a/sc2reader/factories/plugins/replay.py +++ b/sc2reader/factories/plugins/replay.py @@ -120,7 +120,6 @@ def APMTracker(replay): return replay - @plugin def SelectionTracker(replay): debug = replay.opt.debug diff --git a/test_replays/2.0.8.25605/ggtracker_3621322.SC2Replay b/test_replays/2.0.8.25605/ggtracker_3621322.SC2Replay new file mode 100644 index 00000000..3ee9b5ea Binary files /dev/null and b/test_replays/2.0.8.25605/ggtracker_3621322.SC2Replay differ diff --git a/test_replays/2.0.8.25605/ggtracker_3621402.SC2Replay b/test_replays/2.0.8.25605/ggtracker_3621402.SC2Replay new file mode 100644 index 00000000..a7e1f666 Binary files /dev/null and b/test_replays/2.0.8.25605/ggtracker_3621402.SC2Replay differ diff --git a/test_replays/2.0.8.25605/ggtracker_3663861.SC2Replay b/test_replays/2.0.8.25605/ggtracker_3663861.SC2Replay new file mode 100644 index 00000000..ca6356de Binary files /dev/null and b/test_replays/2.0.8.25605/ggtracker_3663861.SC2Replay differ diff --git a/test_replays/2.0.8.25605/ggtracker_3695400.SC2Replay b/test_replays/2.0.8.25605/ggtracker_3695400.SC2Replay new file mode 100644 index 00000000..96f80983 Binary files /dev/null and b/test_replays/2.0.8.25605/ggtracker_3695400.SC2Replay differ diff --git a/test_replays/test_all.py b/test_replays/test_all.py index d0af7009..e682eec0 100644 --- a/test_replays/test_all.py +++ b/test_replays/test_all.py @@ -1,4 +1,4 @@ -# -*- coding: UTF-8 -*- +# -*- coding: utf-8 -*- from __future__ import unicode_literals import datetime @@ -362,5 +362,27 @@ def test_replay_event_order(self): replay = sc2reader.load_replay("test_replays/event_order.SC2Replay") + def test_creepTracker(self): + from sc2reader.engine.plugins import CreepTracker + pluginEngine = sc2reader.engine.GameEngine(plugins=[CreepTracker()]) + + for replayfilename in [ + "test_replays/2.0.8.25605/ggtracker_3621322.SC2Replay", + "test_replays/2.0.8.25605/ggtracker_3621402.SC2Replay", + "test_replays/2.0.8.25605/ggtracker_3663861.SC2Replay", + "test_replays/2.0.8.25605/ggtracker_3695400.SC2Replay", + ]: + + replay = sc2reader.load_replay(replayfilename, engine=pluginEngine, load_map=True) + for player_id in replay.player: + if replay.player[player_id].play_race == "Zerg": + self.assertTrue(replay.player[player_id].max_creep_spread > 0) + self.assertTrue(replay.player[player_id].creep_spread_by_minute) + + replay = sc2reader.load_replay("test_replays/2.0.8.25605/ggtracker_3621402.SC2Replay", load_map=True, engine=pluginEngine) + self.assertEqual(replay.player[2].max_creep_spread, (14,22.95)) + self.assertEqual(replay.player[2].creep_spread_by_minute[7], 8.21) + self.assertEqual(replay.player[2].creep_spread_by_minute[13], 22.42) + if __name__ == '__main__': unittest.main()