皆様こんにちは。
AMDlabの齋藤です。
今回はGrasshopperのPython 3を用いてSciPy Optimizationによる最適化を、Rhino Grasshopper側がリアルタイムで動くように実装してみます。
具体的には以下のようなプログラムです。
藤田先生のポストから気になって実装できるか挑戦してみました。
scipyのSLSQP例を実装して動かす様子で、xが変数でobjが目的関数になります。
まだ内部に勾配や制約を入れたままで、GHをストップさせるのも入れなければいけませんが、とりあえずできるようです。 https://t.co/qZXZtXCq74 pic.twitter.com/AcPvaFVyNb— K. SAITO (@Saito40k) April 11, 2025
それでは始めましょう。
今回のデータはこちらに挙げております。
https://github.com/AMDlab/TechBlog-Grasshopper-Realtime-Optimization
SciPyの最適化
それではまず、今回行うSciPyで扱える最適化、SLSQP(逐次二次計画法)について簡単に説明します。
SLSQPではGrasshopperで使える遺伝的アルゴリズムのGalapagosやNSGA2のWallacei、SPEA2等のOctopus、さらにベイズ最適化等のTunnyとはまた違った最適化が行えます。
具体的には、上記は簡単に最適化を構成することができますが、それらに比べてSLSQPではより短時間で厳密な最適解を求めることができ、さらに、制約条件をペナルティ法等を利用せずに設定することができます。
その代わりに、感度解析と呼ばれる目的関数や制約条件の関数を設計変数で偏微分した関数を用意する必要があったり、設計変数は連続的な変数しかできない、さらに、最適解を出したとしても局所最適解へおちいっていることもあったりもします。
実際に建築実務で扱う場合、これらの条件を考えると、SLSQPよりカスタムコンポーネントで実装されているアルゴリズムの方が良かったりもしますが、今回はSLSQPを動かそうと思います。
今回の目的関数と制約条件
今回の最適化で扱う目的関数は、SciPyでチュートリアルで使われているローゼンブロック関数です。
https://docs.scipy.org/doc/scipy/tutorial/optimize.html#sequential-least-squares-programming-slsqp-algorithm-method-slsqp
ローゼンブロック関数はよく最適化のベンチマークとして利用されているようです。
制約条件は2次関数で、境界条件はx0が0~1の範囲、x1が-0.5~2の範囲です。
今回の最適化は、具体的には以下のような最適化となります。
x0をx、x1をyとした場合、コンター図に表すと以下のようになります。
コンター図の赤が目的関数の数値が大きく、青が小さいです。
制約条件は赤色で示されている範囲をNGとします。
コンポーネント内でSLSQPを実行
次にPython 3 Scriptのコンポーネント内でSLSQPを動かしてみます。
以下をPython 3 Scriptに入れて実行します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# r: numpy # venv: site-packages # r: scipy import numpy as np from scipy.optimize import minimize from scipy.optimize import Bounds # 目的関数 def rosen(x): """The Rosenbrock function""" return sum(100.0*(x[1:]-x[:-1]**2.0)**2.0 + (1-x[:-1])**2.0) # 目的関数の勾配 def rosen_der(x): xm = x[1:-1] xm_m1 = x[:-2] xm_p1 = x[2:] der = np.zeros_like(x) der[1:-1] = 200*(xm-xm_m1**2) - 400*(xm_p1 - xm**2)*xm - 2*(1-xm) der[0] = -400*x[0]*(x[1]-x[0]**2) - 2*(1-x[0]) der[-1] = 200*(x[-1]-x[-2]**2) return der # 制約 ineq_cons = {'type': 'ineq', 'fun' : lambda x: 1 - x[0]**2 - x[1], 'jac' : lambda x: np.array([-2*x[0], -1.0])} # 境界条件 bounds = Bounds([0, -0.5], [1.0, 2.0]) x0 = np.array([0.5, 0]) if run: # ここで最適化させる opt = minimize(rosen, x0, method='SLSQP', jac=rosen_der, constraints=ineq_cons, options={'ftol': 1e-9, 'disp': True}, bounds=bounds) print(opt) # 結果を出力 a = opt.x.tolist() |
以上を動かすことによって結果が出力されます。
しかし、このコンポーネント内で最適化を行うため、最適化がうまく動いているか、いつ最適化が終わるか、把握が難しいです。
そこで、最適化をリアルタイムで動かして、最適化が動いている様子を見てみようと思います。
リアルタイムに最適化を動かす
次は実際にGrasshopperへ設計変数を流して、最適化を動かすプログラムを作成します。
設計変数
まず、変数をNumber Sliderで作成します。
変数はそれぞれ0~1と-0.5~2の範囲でしたので、そのように設定します。
今回の変数は連続変数となるので、より細かい小数が扱えるようにしたいです。
そのようにする場合は、Number Sliderを数字で作るときに小数点以下のゼロをいくつも入れることで、より細かく刻むことのできるNumber Sliderを作ることができます。
目的関数、制約条件、感度解析
次に目的関数、制約条件、そしてそれぞれの感度解析のコンポーネントを作成します。
中のプログラムは以下へ記載します。
https://github.com/AMDlab/TechBlog-Grasshopper-Realtime-Optimization/tree/main/Method
x0 と
x1 のType hintはfloatとしています。
最適化コンポーネント
最後に最適化コンポーネントを作成します。
まずPython 3 Scriptの入出力を作成します。
入力は上から順に
- run , Type Hint: bool, Item Access, 解析を実行する関数
- x , Type Hint: (float), Tree Access, 設計変数(設計変数のNumber Sliderを取得するために繋ぐ)
- obj , Type Hint: float, Item Access, 目的関数の数値
- obj_der , Type Hint: float, List Access, 目的関数の感度解析の数値
- cons , Type Hint: float, Item Access, 制約条件の数値
- cons_jac , Type Hint: float, List Access, 制約条件の感度解析の数値
- reset , Type Hint: bool, Item Access, リセット
さらに、
x に関してはNumber Sliderをつなぐ順序があります。上から順に繋いでください。
出力は
h_obj 、
h_x でobj, xの履歴を保存します。
以上で入出力を作成できました。
次に中身を作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
# r: numpy # venv: site-packages # r: scipy import scriptcontext as sc from Grasshopper.Kernel.Special import GH_NumberSlider from System.Threading.Tasks import Task from System import Decimal import numpy as np from scipy.optimize import minimize from scipy.optimize import Bounds import time from ghpythonlib.treehelpers import list_to_tree main_key = "optimization" status_key = "status" objective_key = "objective" obj_der_key = "obj_der" cons_key = "cons" cjac_key = "cjac" params_key = "params" sliders_key = "sliders" result_key = "result" step_key = "step" history_key = "history" def run_process(x): """ ここでGrasshopperへ処理を流す """ # 変数が現在のものと一致するか for idx in range(len(x)): if sc.sticky[main_key][params_key][idx] != x[idx]: break else: # 一致する場合、終了する sc.sticky[main_key][status_key] = "processed" return # 一致しなかったとき処理を流す sc.sticky[main_key][status_key] = "processing" x_list = x.tolist() for idx in range(len(x)): sc.sticky[main_key][sliders_key][idx].SetSliderValue(Decimal(x_list[idx])) for idx in range(len(x)): sc.sticky[main_key][sliders_key][idx].ExpireSolution(True) # ここで結果を待つ for i in range(999): time.sleep(0.1) if sc.sticky[main_key][status_key] == "processed": break else: # 待てなかったらエラーとして出す sc.sticky[main_key][status_key] = "processed" raise Exception("loop end") sc.sticky[main_key][params_key] = np.array(x) def func(x): run_process(x) obj = sc.sticky[main_key][objective_key] sc.sticky[main_key][step_key] += 1 sc.sticky[main_key][history_key][0].append(obj) sc.sticky[main_key][history_key][1].append(x.tolist()) return obj def obj_der_func(x): run_process(x) sc.sticky[main_key][history_key][2].append(x) return sc.sticky[main_key][obj_der_key] def cons_func(x): run_process(x) sc.sticky[main_key][history_key][3].append(sc.sticky[main_key][cons_key]) return sc.sticky[main_key][cons_key] def cjac_func(x): run_process(x) sc.sticky[main_key][history_key][4].append(x) return sc.sticky[main_key][cjac_key] def optimization(): """ この関数を別タスクで開いて最適化を動かす。 """ # 制約 ineq_cons = {'type': 'ineq', 'fun' : lambda x: cons_func(x), 'jac' : lambda x: cjac_func(x)} # 境界条件 bounds = Bounds([0, -0.5], [1.0, 2.0]) x0 = np.array([0.5, 0]) # すぐに回し始めるとエラーになるため、遅延を挟む time.sleep(0.01) sc.sticky[main_key][result_key] = minimize(func, x0, method='SLSQP', jac=obj_der_func, constraints=[ineq_cons], options={'ftol': 1e-6, 'disp': True}, bounds=bounds) sc.sticky[main_key][status_key] = "complete" time.sleep(0.01) ghenv.Component.ExpireSolution(True) # 初回とResetが押されたときにリセット if not main_key in sc.sticky.keys() or reset: if not run: sc.sticky[main_key] = {} sc.sticky[main_key][step_key] = 0 # ステップ数 # 解析の状態を表す。 # stop: 待機 # processing: GHへ投げて結果待ち # processed: 結果がsc.sticky[main_key][objective_key]に反映された状態 # complete: 解析完了 sc.sticky[main_key][status_key] = "stop" sc.sticky[main_key][objective_key] = 0. sc.sticky[main_key][obj_der_key] = np.zeros(2) sc.sticky[main_key][cons_key] = 0. sc.sticky[main_key][cjac_key] = np.zeros(2) sc.sticky[main_key][params_key] = np.zeros(2) sc.sticky[main_key][sliders_key] = [slider for slider in ghenv.Component.Params.Input[1].Sources if isinstance(slider, GH_NumberSlider)] # 変更するNumberSliderを格納 sc.sticky[main_key][result_key] = None # 結果を格納する sc.sticky[main_key][history_key] = [[], [], [], [], []] else: print("turn off run.") # runがTrueの時はリセットされない if run: # processingの場合、数値を渡し、processedにする if sc.sticky[main_key][status_key] == "processing": time.sleep(0.01) sc.sticky[main_key][objective_key] = obj sc.sticky[main_key][obj_der_key] = np.array(obj_der) sc.sticky[main_key][cons_key] = cons sc.sticky[main_key][cjac_key] = np.array(cons_jac) sc.sticky[main_key][status_key] = "processed" # 実行されたとき解析を回す。 if sc.sticky[main_key][status_key] == "stop": sc.sticky[main_key][step_key] = 0 # ここで別タスクとして起動 Task.Factory.StartNew(optimization) # 解析完了時に結果を返す if sc.sticky[main_key][status_key] == "complete": print(sc.sticky[main_key][status_key]) print(sc.sticky[main_key][result_key]) print(sc.sticky[main_key][status_key]) print(sc.sticky[main_key][step_key]) h_obj = sc.sticky[main_key][history_key][0] h_x = list_to_tree(sc.sticky[main_key][history_key][1]) |
それでは説明していきます。
1 2 3 |
# r: numpy # venv: site-packages # r: scipy |
ここではNumPyとSciPyのライブラリをインストールします。
既にインストールした後であれば、必要はありません。
4 5 6 7 8 9 10 11 12 13 14 15 |
import scriptcontext as sc from Grasshopper.Kernel.Special import GH_NumberSlider from System.Threading.Tasks import Task from System import Decimal import numpy as np from scipy.optimize import minimize from scipy.optimize import Bounds import time from ghpythonlib.treehelpers import list_to_tree |
ライブラリの関数を使えるようにします。
NumPyとSciPyに加え、Number Sliderの判別や、最適化を別タスクで動かす、Number Sliderで読み取れる数値へ変換する、遅延させる、2次元配列をTreeへ置き換える等が行えるようにします。
17 18 19 20 21 22 23 24 25 26 27 |
main_key = "optimization" status_key = "status" objective_key = "objective" obj_der_key = "obj_der" cons_key = "cons" cjac_key = "cjac" params_key = "params" sliders_key = "sliders" result_key = "result" step_key = "step" history_key = "history" |
ここでは
sticky のkeyを作成します。
sticky はRhino上で保持されるPython辞書です。
ここで変数として作成しておけば、誤字をしたときにエラーの原因が特定しやすくなります。
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
def run_process(x): """ ここでGrasshopperへ処理を流す """ # 変数が現在のものと一致するか for idx in range(len(x)): if sc.sticky[main_key][params_key][idx] != x[idx]: break else: # 一致する場合、終了する sc.sticky[main_key][status_key] = "processed" return # 一致しなかったとき処理を流す sc.sticky[main_key][status_key] = "processing" x_list = x.tolist() for idx in range(len(x)): sc.sticky[main_key][sliders_key][idx].SetSliderValue(Decimal(x_list[idx])) for idx in range(len(x)): sc.sticky[main_key][sliders_key][idx].ExpireSolution(True) # ここで結果を待つ for i in range(999): time.sleep(0.1) if sc.sticky[main_key][status_key] == "processed": break else: # 待てなかったらエラーとして出す sc.sticky[main_key][status_key] = "processed" raise Exception("loop end") sc.sticky[main_key][params_key] = np.array(x) |
ここでGrasshopperへ処理を流します。
この処理のアイデアはTunnyから少し頂きました。
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
def func(x): run_process(x) obj = sc.sticky[main_key][objective_key] sc.sticky[main_key][step_key] += 1 sc.sticky[main_key][history_key][0].append(obj) sc.sticky[main_key][history_key][1].append(x.tolist()) return obj def obj_der_func(x): run_process(x) sc.sticky[main_key][history_key][2].append(x) return sc.sticky[main_key][obj_der_key] def cons_func(x): run_process(x) sc.sticky[main_key][history_key][3].append(sc.sticky[main_key][cons_key]) return sc.sticky[main_key][cons_key] def cjac_func(x): run_process(x) sc.sticky[main_key][history_key][4].append(x) return sc.sticky[main_key][cjac_key] |
ここでは、SciPyのOptimizationから投げられる命令を処理する関数です。
履歴が残るようにもしています。
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
def optimization(): """ この関数を別タスクで開いて最適化を動かす。 """ # 制約 ineq_cons = {'type': 'ineq', 'fun' : lambda x: cons_func(x), 'jac' : lambda x: cjac_func(x)} # 境界条件 bounds = Bounds([0, -0.5], [1.0, 2.0]) x0 = np.array([0.5, 0]) # すぐに回し始めるとエラーになるため、遅延を挟む time.sleep(0.01) sc.sticky[main_key][result_key] = minimize(func, x0, method='SLSQP', jac=obj_der_func, constraints=[ineq_cons], options={'ftol': 1e-6, 'disp': True}, bounds=bounds) sc.sticky[main_key][status_key] = "complete" time.sleep(0.01) ghenv.Component.ExpireSolution(True) |
ここでは最適化を動かします。
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# 初回とResetが押されたときにリセット if not main_key in sc.sticky.keys() or reset: if not run: sc.sticky[main_key] = {} sc.sticky[main_key][step_key] = 0 # ステップ数 # 解析の状態を表す。 # stop: 待機 # processing: GHへ投げて結果待ち # processed: 結果がsc.sticky[main_key][objective_key]に反映された状態 # complete: 解析完了 sc.sticky[main_key][status_key] = "stop" sc.sticky[main_key][objective_key] = 0. sc.sticky[main_key][obj_der_key] = np.zeros(2) sc.sticky[main_key][cons_key] = 0. sc.sticky[main_key][cjac_key] = np.zeros(2) sc.sticky[main_key][params_key] = np.zeros(2) sc.sticky[main_key][sliders_key] = [slider for slider in ghenv.Component.Params.Input[1].Sources if isinstance(slider, GH_NumberSlider)] # 変更するNumberSliderを格納 sc.sticky[main_key][result_key] = None # 結果を格納する sc.sticky[main_key][history_key] = [[], [], [], [], []] else: print("turn off run.") # runがTrueの時はリセットされない |
初回と
reset が押されたときにリセットします。
run が動いているときに
reset されると壊れるため、エラーを投げるようにもしています。
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
if run: # processingの場合、数値を渡し、processedにする if sc.sticky[main_key][status_key] == "processing": time.sleep(0.01) sc.sticky[main_key][objective_key] = obj sc.sticky[main_key][obj_der_key] = np.array(obj_der) sc.sticky[main_key][cons_key] = cons sc.sticky[main_key][cjac_key] = np.array(cons_jac) sc.sticky[main_key][status_key] = "processed" # 実行されたとき解析を回す。 if sc.sticky[main_key][status_key] == "stop": sc.sticky[main_key][step_key] = 0 # ここで別タスクとして起動 Task.Factory.StartNew(optimization) # 解析完了時に結果を返す if sc.sticky[main_key][status_key] == "complete": print(sc.sticky[main_key][status_key]) print(sc.sticky[main_key][result_key]) |
run が押されたとき、最適化の状態に応じて処理を実行ます。
解析を回す際は
System.Threading.Tasks の
Task.Factory.StartNew で別のタスクとして解析を回せるようにします。
これでプログラムが完成しました。
実際に動かしてみます。
ちゃんと最適解が求まりました。
コンポーネント内でSLSQPを実行した場合と解はある程度一致していそうです。
処理の仕組み
今回作成した処理はリアルタイムで動かすため、かなり煩雑な処理となっています。
最適化中は以下のような処理を行っていました。
設計変数がSciPyのSLSQPで作成され、別タスクで宣言している関数へ流れます。
その後、設計変数はPythonの外側のNumber Sliderへ渡され、処理が行われます。
その処理がPythonへ入り、sticky経由で関数へ返ります。それまではstatusが”processed”に変わるまで、待ちます。
そして結果がSLSQPへ送られます。
注意点
今回はGrasshopper上で最適化を行いましたが、Grasshopperでは取り扱える数値は少し癖があります。
数値をパラメーターからパラメーターへ渡すとき、GH_Gooでの変換が自動的に起こり、数値が少し変わることがあります。
具体的には0に近い数値が0として変換され、処理へ流れることがあります。
そのため、SLSQPのような連続変数最適化は実はGrasshopperで動かすのは向いていなかったりします。
実際に今回のプログラムでは
ftol の値を少し上げて、最適解となりやすいようにしています。
まとめ
今回はGrasshopperをリアルタイムで最適化させてみました。
この方法では最適解が厳密にならない等少し甘い部分がありましたが、この方法を利用してより良いものができれば良いなと思っています。
ぜひ皆さんも試してみてください。
AMDlabでは、開発に力を貸していただけるエンジニアさんを大募集しております。少しでもご興味をお持ちいただけましたら、カジュアルにお話するだけでも大丈夫ですのでお気軽にご連絡ください!
中途求人ページ: https://www.amd-lab.com/recruit-list/mid-career
カジュアル面談がエントリーフォームからできるようになりました。
採用種別を「カジュアル面談(オンライン)」にして必要事項を記載の上送信してください!
エントリーフォーム: https://www.amd-lab.com/entry
COMMENTS