Source code for path4gmns.simulation

from math import ceil


__all__ = ['perform_simple_simulation']


[docs]def perform_simple_simulation(ui, loading_profile='uniform'): """ perform simple traffic simulation using point queue model WARNING ------- The underlying routing decision is made through find_path_for_agents(), which must be called before this function. Parameters ---------- ui network object generated by pg.read_network() loading_profile demand loading profile, i.e., how agents are loaded to network with respect to their departure times. Three loading profiles are supported, which are uniform, random, and constant. With uniform loading profile, agents will be uniformly distributed between [simulation start time, simulation start time + simulation duration). With random loading profile, the departure time of each agent is randomly set up between [simulation start time, simulation start time + simulation duration). With constant loading profile, the departure times of all agents are the same, which are simulation start time. Returns ------- None Note ---- You will need to call output_agent_trajectory() to get the simulation results, i.e., trajectory of each agent (in trajectory.csv). """ A = ui._base_assignment A.initialize_simulation(loading_profile) if not A.get_agents(): return print('conduct dynamic traffic assignment (DTA)') links = A.get_links() nodes = A.get_nodes() cum_arr = cum_dep = 0 # number of simulation intervals in one minute (60s) num = A.cast_minute_to_interval(1) for i in range(A.get_total_simu_intervals()): if i % num == 0: print(f'simu time = {i/num} min, CA = {cum_arr}, CD = {cum_dep}') # comment out from now on as link.cum_arr and link.cum_dep are not used # for critical calculation # if i > 0: # for link in links: # link.cum_arr[i] = link.cum_arr[i-1] # link.cum_dep[i] = link.cum_dep[i-1] if A.have_dep_agents(i): for a_no in A.get_td_agents(i): a = A.get_agent(a_no) if a.link_path is None: continue # retrieve the first link given link path is in reverse order link_no = a.link_path[-1] link = links[link_no] # link.cum_arr[i] += 1 link.entr_queue.append(a_no) cum_arr += 1 for link in links: while link.entr_queue: a_no = link.entr_queue.popleft() agent = A.get_agent(a_no) link.exit_queue.append(a_no) intvl = A.cast_minute_to_interval(link.get_period_fftt(0)) agent.update_dep_interval(intvl) for node in nodes: m = node.get_incoming_link_num() for j in range(m): # randomly select the first link pos = (i + j) % m link = node.incoming_links[pos] while link.outflow_cap[i] and link.exit_queue: a_no = link.exit_queue[0] agent = A.get_agent(a_no) if agent.get_curr_dep_interval() > i: break if agent.reached_last_link(): # link.cum_dep[i] += 1 cum_dep += 1 else: link_no = agent.get_next_link_no() next_link = links[link_no] next_link.entr_queue.append(a_no) agent.set_dep_interval(i) # set up arrival time for the next link, i.e., next_link agent.set_arr_interval(i, 1) travel_intvl = i - agent.get_arr_interval() waiting_intvl = travel_intvl - A.cast_minute_to_interval(link.get_period_fftt(0)) # arrival time in minutes arr_minute = A.cast_interval_to_minute(agent.get_arr_interval()) link.update_waiting_time(arr_minute, waiting_intvl) # link.cum_dep[i] += 1 # next_link.cum_arr[i] += 1 agent.increment_link_pos() # remove agent from exit queue link.exit_queue.popleft() link.outflow_cap[i] -= 1