-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtwostage_demo.py
More file actions
152 lines (124 loc) · 5.25 KB
/
twostage_demo.py
File metadata and controls
152 lines (124 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import ets_fiber_assigner.netflow as nf
import numpy as np
from collections import defaultdict
from getBench import getBench
# make runs reproducible
np.random.seed(20)
# define locations of the input files
catalog_path = "data/"
fscience_targets = catalog_path+"pfs_preliminary_target_cosmology.dat"
# So far, we only have test data for targets.
# Once we have files for calibration stars and sky locations, we can add them
# here.
#fcal_stars = catalog_path+"pfs_preliminary_target_cosmology_fcstars.dat"
#fsky_pos = catalog_path+"pfs_preliminary_target_cosmology_sky.dat"
# read all targets into a single list, giving them their proper types
tgt = nf.readScientificFromFile(fscience_targets, "sci")
# move some science targets to the second stage
for x in tgt[::4]:
x.stage=1
# add calibration targets
#tgt += nf.readCalibrationFromFile(fcal_stars, "cal")
#tgt += nf.readCalibrationFromFile(fsky_pos, "sky")
bench = getBench()
# point the telescope at the center of all science targets
raTel, decTel = nf.telescopeRaDecFromFile(fscience_targets)
posang = 0.
otime = "2016-04-03T08:00:00Z"
telescopes = []
# number of distinct observations
nvisit = 2
# generate randomly jittered telescope pointings for every observation
for _ in range(nvisit):
telescopes.append(nf.Telescope(raTel+np.random.normal()*1e-2,
decTel+np.random.normal()*1e-2, posang, otime))
# get focal plane positions for all targets and all visits
tpos = [tele.get_fp_positions(tgt) for tele in telescopes]
# create the dictionary containing the costs and constraints for all classes
# of targets
classdict = {}
classdict["sci_P1"] = {"nonObservationCost": 100,
"partialObservationCost": 1e9, "calib": False}
classdict["sci_P2"] = {"nonObservationCost": 90,
"partialObservationCost": 1e9, "calib": False}
classdict["sci_P3"] = {"nonObservationCost": 80,
"partialObservationCost": 1e9, "calib": False}
classdict["sci_P4"] = {"nonObservationCost": 70,
"partialObservationCost": 1e9, "calib": False}
classdict["sci_P5"] = {"nonObservationCost": 60,
"partialObservationCost": 1e9, "calib": False}
classdict["sci_P6"] = {"nonObservationCost": 50,
"partialObservationCost": 1e9, "calib": False}
classdict["sci_P7"] = {"nonObservationCost": 40,
"partialObservationCost": 1e9, "calib": False}
classdict["sky"] = {"numRequired": 240,
"nonObservationCost": 1e6, "calib": True}
classdict["cal"] = {"numRequired": 40,
"nonObservationCost": 1e6, "calib": True}
# durations of the individual observations in seconds
t_obs = [150, 750]
time_budget={("sci_P1",): 10.}
gurobiOptions = dict(seed=0, presolve=1, method=4, degenmoves=0,
heuristics=0.8, mipfocus=0, mipgap=1.0e-04)
# first stage
# compute observation strategy
prob = nf.buildProblem(bench, tgt, tpos, classdict, t_obs,
collision_distance=2., elbow_collisions=True,
gurobi=False, gurobiOptions=gurobiOptions,
obsprog_time_budget=time_budget)
print("solving the problem")
prob.solve()
# extract solution
res = [{} for _ in range(nvisit)]
for k1, v1 in prob._vardict.items():
if k1.startswith("Tv_Cv_"):
visited = prob.value(v1) > 0
if visited:
_, _, tidx, cidx, ivis = k1.split("_")
res[int(ivis)][int(tidx)] = int(cidx)
for i, vis in enumerate(res):
print("exposure {}:".format(i+1))
print(" assigned Cobras: {}".format(len(vis)))
tdict = defaultdict(int)
for tidx, cidx in vis.items():
tdict[tgt[tidx].targetclass] += 1
for cls, num in tdict.items():
print(" {}: {}".format(cls, num))
# Write the problem into a file for analysis
#prob.dump("dump0")
# create the list of preassigned cobras/targets
preassigned_list = [{} for _ in range(nvisit)] #list (dict(TargetID: Cobra index))
for i, vis in enumerate(res):
for tidx, cidx in vis.items():
if tgt[tidx].stage != 0:
print("Should not happen")
preassigned_list[i][tgt[tidx].ID] = cidx
# second stage
print("\nSECOND STAGE\n")
print(preassigned_list)
# compute observation strategy, now with preassigned list and stage=1
prob = nf.buildProblem(bench, tgt, tpos, classdict, t_obs,
collision_distance=2., elbow_collisions=True,
gurobi=False, gurobiOptions=gurobiOptions,
stage=1, preassigned=preassigned_list)
print("solving the problem")
prob.solve()
prob.dump("dump")
# extract solution
res = [{} for _ in range(nvisit)]
for k1, v1 in prob._vardict.items():
if k1.startswith("Tv_Cv_"):
visited = prob.value(v1) > 0
if visited:
_, _, tidx, cidx, ivis = k1.split("_")
if int(tidx) in res[int(ivis)]:
raise RuntimeError("oops")
res[int(ivis)][int(tidx)] = int(cidx)
for i, (vis, tp, tel) in enumerate(zip(res, tpos, telescopes)):
print("exposure {}:".format(i+1))
print(" assigned Cobras: {}".format(len(vis)))
tdict = defaultdict(int)
for tidx, cidx in vis.items():
tdict[tgt[tidx].targetclass] += 1
for cls, num in tdict.items():
print(" {}: {}".format(cls, num))