""" Graph processing utilities for BFS and DFS on adjacency list representations of graphs. """ from sys import maxsize as Inf from collections import deque, defaultdict from typing import Dict, List, Tuple, Any def loadGraph(G: Dict[str, List[str]], edgesFname: str): """ Loads a connected graph from a file containing pairs of vertices (edges). The graph is represented as an adjacency list. """ with open(edgesFname, 'r') as f: for line in f: u,v = line.strip().split() G[u].append(v) G[v].append(u) def loadGraphDisconnected(G: Dict[str, List[str]], verticesFname: str, edgesFname: str): """ Loads a (potentially disconnected) graph. Vertices are loaded from verticesFname, and edges from edgesFname. """ with open(verticesFname, 'r') as f: for line in f: v = line.strip() if v not in G: G[v] = [] loadGraph(G, edgesFname) def loadGraphFromAdjList(G: Dict[str, List[str]], fname: str): """Loads a graph from a file in adjacency list format.""" with open(fname, 'r') as f: for line in f: toks = line.strip().split(' ') G[toks[0]] = toks[1:] def saveGraphAsAdjList(G: Dict[str, List[str]], fname: str): """Saves a graph to a file in adjacency list format.""" with open(fname, 'w') as f: for u in sorted(G.keys()): neighbors = G[u] f.write(u) if len(neighbors) > 0 : f.write(' ' + ' '.join(neighbors)) f.write('\n') def bfs(G: Dict[str, List[str]], s: str) -> List[Tuple[str, int, str]]: """ Performs Breadth-First Search on a graph G starting from vertex s. Returns a list of tuples (vertex, distance, predecessor). """ Q = deque([s]) dist = {x:Inf for x in G.keys()} prev = {x: None for x in G.keys()} dist[s] = 0 visited = {s} while Q: v = Q.popleft() for neighbor in G[v]: if neighbor not in visited: visited.add(neighbor) dist[neighbor] = dist[v] + 1 prev[neighbor] = v Q.append(neighbor) # For a connected graph, all nodes should be visited. # This assertion will fail for disconnected graphs if not all components are explored. if any(d == Inf for d in dist.values()): print("Warning: Graph may be disconnected, not all nodes reachable from start node.") return [(k, dist[k], prev[k]) for k in sorted(dist, key=dist.get)] def dfs_explore(G: Dict[str, List[str]], u: str, visited: Dict[str, bool], pre: Dict[str, int] = None, post: Dict[str, int] = None, cc: Dict[str, int] = None, clock: List[int] = None, ccnum: List[int] = None): """Recursive helper for DFS.""" visited[u] = True dfs_previsit(u, pre, cc, clock, ccnum) for v in G[u]: if not visited[v]: dfs_explore(G, v, visited, pre, post, cc, clock, ccnum) #else: #print('Back edge ' + v + ',' + u) dfs_postvisit(u, post, clock) def dfs_previsit(u: str, pre: Dict[str, int], cc: Dict[str, int], clock: List[int], ccnum: List[int]): """Pre-visit actions for DFS.""" if pre != None: pre[u] = clock[0] clock[0] += 1 if cc != None: cc[u] = ccnum[0] def dfs_postvisit(u: str, post: Dict[str, int], clock: List[int]): """Post-visit actions for DFS.""" if post != None: post[u] = clock[0] clock[0] += 1 def dfs_main_connected(G: Dict[str, List[str]], s: str) -> Tuple[Dict[str, int], Dict[str, int]]: """Main driver for DFS on a connected graph.""" visited = {x: False for x in G.keys()} clock = [0] # a 1-element list to work around call by value for int pre = {x:-1 for x in G.keys()} post = {x:-1 for x in G.keys()} dfs_explore(G, s, visited, pre=pre, post=post, clock=clock) assert not any(not v for v in visited.values()) assert not any(x < 0 or x >= 2*len(G) for x in pre.values()) assert not any(x < 0 or x >= 2*len(G) for x in post.values()) return post, pre def dfs_main_disconnected(G: Dict[str, List[str]]) -> Dict[str, int]: """ Main driver for DFS on a possibly disconnected graph to find connected components. """ visited = {x: False for x in G.keys()} cc = {x:-1 for x in G.keys()} ccnum = [0] for v in sorted(G.keys()): if not visited[v]: print(f"Calling dfs_explore from {v} ccnum={ccnum[0]}") dfs_explore(G, v, visited, cc=cc, ccnum=ccnum) ccnum[0]+=1 return cc def main(): """Main function to run graph processing.""" """ US contiguity dataset processing """ G1 = defaultdict(list) loadGraph(G1, 'data/contiguous-usa.dat') # Saved for understanding and debugging purposes saveGraphAsAdjList(G1, 'output/adjacency-computed-usa.dat') # Start BFS from each of the below states for s in ['NY', 'CA', 'NE', 'ME']: with open('output/bfs_' + s + '.txt', 'w') as f: f.write('state,hops,pred\n') for state, dist, pred in bfs(G1, s): f.write(','.join([str(state), str(dist), str(pred)]) + '\n') """ World Correlates of War dataset processing """ G2 = defaultdict(list) loadGraphDisconnected(G2, 'data/all-countries-iso3.dat', 'data/country-borders-landorriver-iso3.dat') # Saved for understanding and debugging purposes saveGraphAsAdjList(G2, 'output/adjacency-computed-world.dat') # identify the connected components and write results cc = dfs_main_disconnected(G2) with open('output/cc_world.txt', 'w') as f: f.write('country,ccid\n') for country in sorted(cc, key=cc.get): f.write(country + ',' + str(cc[country]) + '\n') if __name__ == "__main__": main()