from time import time
from .path import single_source_shortest_path
from .classes import Column
from .consts import EPSILON, MAX_LABEL_COST, MIN_COL_VOL
__all__ = ['find_ue']
def _update_link_cost_array(spnetworks):
""" update generalized link costs for each SPNetwork """
for sp in spnetworks:
tau = sp.get_demand_period().get_id()
vot = sp.get_agent_type().get_vot()
for link in sp.get_links():
if not link.length:
break
sp.link_cost_array[link.get_seq_no()] = (
link.get_generalized_cost(tau, vot)
)
def _update_link_travel_time(links):
for link in links:
if not link.length:
break
link.calculate_td_vdf()
def _update_link_and_column_volume(column_pool, links, iter_num, reduce_path_vol = True):
if not iter_num:
return
for link in links:
if not link.length:
break
link.reset_period_flow_vol()
for k, cv in column_pool.items():
# k= (at, tau, oz_id, dz_id)
tau = k[1]
for col in cv.get_columns():
path_vol = col.get_volume()
for i in col.links:
# 04/10/22, remove it from now on as it is not in need
# pce_ratio = 1 and path_vol * pce_ratio
links[i].increase_period_flow_vol(tau, path_vol)
# 06/11/24: if we check cv.is_route_fixed() here,
# 1. shall we check it anywhere else??
# 2. is it really necessary?
# if reduce_path_vol and not cv.is_route_fixed():
if reduce_path_vol:
col.vol *= iter_num / (iter_num + 1)
def _update_column_gradient_cost_and_flow(column_pool, links, agent_types, iter_num):
total_gap = 0
total_sys_travel_time = 0
for k, cv in column_pool.items():
# k = (at, tau, oz_id, dz_id)
vot = agent_types[k[0]].get_vot()
tau = k[1]
least_gradient_cost = MAX_LABEL_COST
least_gradient_cost_path_id = -1
for col in cv.get_columns():
# i is link sequence no
path_gradient_cost = sum(
links[i].get_generalized_cost(tau, vot) for i in col.links
)
col.set_gradient_cost(path_gradient_cost)
# col.get_volume() >= EPSILON is the key to eliminate ultra-low-volume
# columns. along with the new_vol reset below if new_vol < MIN_COL_VOL,
# it enables shifting volume from the shortest path to a non-shortest path.
if path_gradient_cost < least_gradient_cost and col.get_volume() >= EPSILON:
least_gradient_cost = path_gradient_cost
least_gradient_cost_path_id = col.get_id()
total_switched_out_path_vol = 0
if cv.get_column_num() > 1:
for col in cv.get_columns():
if col.get_id() == least_gradient_cost_path_id:
continue
if not col.get_volume():
continue
col.update_gradient_cost_diffs(least_gradient_cost)
# with the forgoing col.get_volume() >= EPSILON,
# col.get_cap() could be negative
total_gap += col.get_gap()
total_sys_travel_time += col.get_sys_travel_time()
step_size = 1 / (iter_num + 2) * cv.get_od_volume()
cur_vol = col.get_volume()
new_vol = cur_vol - step_size * col.get_gradient_cost_rel_diff()
if new_vol < MIN_COL_VOL:
new_vol = 0
col.set_volume(new_vol)
total_switched_out_path_vol += cur_vol - new_vol
if least_gradient_cost_path_id != -1:
col = cv.get_column(least_gradient_cost_path_id)
total_sys_travel_time += col.get_sys_travel_time()
col.increase_volume(total_switched_out_path_vol)
rel_gap = total_gap / max(total_sys_travel_time, EPSILON)
print(f'current iteration number in column update: {iter_num}\n'
f'total gap: {total_gap:.4e}; relative gap: {rel_gap:.4%}')
return rel_gap
def _backtrace_shortest_path_tree(centroid,
centroids,
links,
node_preds,
link_preds,
at_id,
dp_id,
column_pool,
iter_num):
oz_id = centroid.get_zone_id()
k_path_prob = 1 / (iter_num + 1)
for c in centroids:
dz_id = c.get_zone_id()
if dz_id == oz_id:
continue
if (at_id, dp_id, oz_id, dz_id) not in column_pool:
continue
cv = column_pool[(at_id, dp_id, oz_id, dz_id)]
# disable it as it is always false
# if cv.is_route_fixed():
# continue
link_path = []
dist = 0
curr_node_no = c.get_node_no()
# retrieve the sequence backwards
while curr_node_no >= 0:
curr_link_no = link_preds[curr_node_no]
if curr_link_no >= 0:
link_path.append(curr_link_no)
dist += links[curr_link_no].length
curr_node_no = node_preds[curr_node_no]
# make sure this is a valid path
if not link_path:
continue
vol = k_path_prob * cv.get_od_volume()
existing = False
for col in cv.get_columns():
if col.get_distance() != dist:
continue
# the first and the last are connectors
if col.get_links() == link_path[1:-1]:
col.increase_volume(vol)
existing = True
break
if not existing:
path_id = cv.get_column_num()
col = Column(path_id)
col.set_volume(vol)
col.set_distance(dist)
col.links = [x for x in link_path[1:-1]]
cv.add_new_column(col)
def _update_column_attributes(column_pool, links, agent_types, spnetworks):
""" update toll and travel time for each column, and compute final UE convergency """
total_gap = 0
total_sys_travel_time = 0
total_min_sys_travel_time = 0
for k, cv in column_pool.items():
# k = (at, dp, oz, dz)
dp = k[1]
vot = agent_types[k[0]].get_vot()
least_gradient_cost = MAX_LABEL_COST
i = 0
for col in cv.get_columns():
if not col.get_volume():
del col
continue
# reset column id
col.set_id(i)
i += 1
nodes = []
path_toll = 0
travel_time = 0
path_gradient_cost = 0
for j in col.links:
link = links[j]
nodes.append(link.to_node_no)
travel_time += link.travel_time_by_period[dp]
path_toll += link.get_toll()
path_gradient_cost += link.get_generalized_cost(dp, vot)
# last node
nodes.append(links[col.links[-1]].from_node_no)
col.set_gradient_cost(path_gradient_cost)
col.set_travel_time(travel_time)
col.set_toll(path_toll)
col.nodes = [x for x in nodes]
if path_gradient_cost < least_gradient_cost:
least_gradient_cost = path_gradient_cost
total_min_sys_travel_time += least_gradient_cost * cv.get_od_volume()
# an equivalent but more efficient way to compute the total system travel
# time when the column pool is large in size.
for sp in spnetworks:
tau = sp.get_demand_period().get_id()
vot = sp.get_agent_type().get_vot()
for link in sp.get_links():
if not link.length:
break
total_sys_travel_time += (
link.get_generalized_cost(tau, vot) * link.get_period_flow_vol(tau)
)
total_gap = total_sys_travel_time - total_min_sys_travel_time
rel_gap = total_gap / max(total_sys_travel_time, EPSILON)
print('current iteration number in column update: postprocessing\n'
f'total gap: {total_gap:.4e}; relative gap: {rel_gap:.4%}\n')
return rel_gap
def _generate(spn, column_pool, iter_num):
for c in spn.get_orig_centroids():
single_source_shortest_path(spn, c.get_node_id())
_backtrace_shortest_path_tree(
c,
spn.get_centroids(),
spn.get_links(),
spn.get_node_preds(),
spn.get_link_preds(),
spn.get_agent_type().get_id(),
spn.get_demand_period().get_id(),
column_pool,
iter_num
)
def _generate_column_pool(spnetworks, column_pool, iter_num):
# single processing
# it could be multiprocessing
for spn in spnetworks:
_generate(spn, column_pool, iter_num)
def update_links_using_columns(network):
""" a helper function for load_columns() """
A = network._base_assignment
column_pool = A.get_column_pool()
links = A.get_links()
# do not update column volume
_update_link_and_column_volume(column_pool, links, 1, False)
_update_link_travel_time(links)
[docs]def find_ue(ui, column_gen_num, column_upd_num, rel_gap_tolerance=1e-04):
""" find user equilibrium (UE)
WARNING
-------
Only Path/Column-based User Equilibrium (UE) is implemented in Python.
If you need other assignment modes or dynamic traffic assignment (DTA),
please use DTALiteClassic()
Parameters
----------
ui
network object generated by pg.read_network()
column_gen_num
number of iterations to be performed on generating column pool
column pool. it also specifies the maximum number of routes / columns
for each OD pair.
column_upd_num
number of iterations to be performed on optimizing column pool
rel_gap_tolerance
target relative gap. find_ue() stops when either column_upd_num or
rel_gap_tolerance is reached.
Returns
-------
rel_gap
relative GAP to measure the UE convergency
Note
----
You will need to call output_columns() and output_link_performance() to
get the assignment results, i.e., paths/columns (in route_assignment.csv) and
volumes and other link attributes on each link (in link_performance.csv).
"""
# make sure iteration numbers are both non-negative
assert column_gen_num >= 0
assert column_upd_num >= 0
# base assignment
A = ui._base_assignment
# set up SPNetwork
A.setup_spnetwork(True)
ats = A.get_agent_types()
column_pool = A.get_column_pool()
links = A.get_links()
print('find user equilibrium (UE)')
st = time()
for i in range(column_gen_num):
print(f'current iteration number in column generation: {i}')
_update_link_and_column_volume(column_pool, links, i)
_update_link_travel_time(links)
# update generalized link cost before assignment
_update_link_cost_array(A.get_spnetworks())
# loop through all centroids on the base network
_generate_column_pool(A.get_spnetworks(), column_pool, i)
print(f'\nprocessing time of generating columns: {time()-st:.2f}s\n')
st = time()
i = 0
rel_gap = 1
while i < column_upd_num and rel_gap_tolerance < rel_gap:
_update_link_and_column_volume(column_pool, links, i, False)
_update_link_travel_time(links)
rel_gap = _update_column_gradient_cost_and_flow(column_pool, links, ats, i)
i += 1
# postprocessing
_update_link_and_column_volume(column_pool, links, column_gen_num, False)
_update_link_travel_time(links)
rel_gap = _update_column_attributes(column_pool, links, ats, A.get_spnetworks())
print(f'processing time of updating columns and postprocessing: {time()-st:.2f}s\n')
return rel_gap