happyme531's picture
Upload 12 files
69499b6 verified
#!/usr/bin/env python
# coding: utf-8
import numpy as np
from rknn.api import RKNN
from math import exp
from sys import exit
import onnx
import onnxscript
batch_size = 1
encoder_seq_len_list = [13]
decoder_seq_len = 1
# set current directory to the directory of this file
import os
os.chdir(os.path.dirname(os.path.abspath(__file__)))
import subprocess
import select
def run_python_code(code):
# 启动子进程并执行代码
process = subprocess.Popen(
['python', '-c', code],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# 实时读取子进程的输出和错误输出
while True:
reads = [process.stdout.fileno(), process.stderr.fileno()]
ret = select.select(reads, [], [])
for fd in ret[0]:
if fd == process.stdout.fileno():
output = process.stdout.readline()
if output:
print(output.strip())
if fd == process.stderr.fileno():
err = process.stderr.readline()
if err:
print(f"Error: {err.strip()}")
if process.poll() is not None:
break
def convert_decoder():
rknn = RKNN(verbose=True)
ONNX_MODEL="decoder_model.onnx"
RKNN_MODEL=ONNX_MODEL.replace(".onnx",".rknn")
DATASET="dataset.txt"
QUANTIZE=False
# [batch_size, encoder_seq_len, 768],
# [batch_size, decoder_seq_len, 768]]
input_shapes =[[
[batch_size, encoder_seq_len, 768],
[batch_size, decoder_seq_len, 768]] for encoder_seq_len in encoder_seq_len_list]
# pre-process config
print('--> Config model')
rknn.config(quantized_algorithm='normal', quantized_method='channel', target_platform='rk3588', optimization_level=3,
dynamic_input=input_shapes)
print('done')
# Load ONNX model
print('--> Loading model')
ret = rknn.load_onnx(model=ONNX_MODEL,
)
if ret != 0:
print('Load model failed!')
exit(ret)
print('done')
# Build model
print('--> Building model')
ret = rknn.build(do_quantization=QUANTIZE, dataset=DATASET, rknn_batch_size=None)
if ret != 0:
print('Build model failed!')
exit(ret)
print('done')
#export
print('--> Export RKNN model')
ret = rknn.export_rknn(RKNN_MODEL)
if ret != 0:
print('Export RKNN model failed!')
exit(ret)
print('done')
def convert_decoder_2():
import onnx_graphsurgeon as gs
ONNX_MODEL="decoder_model_merged.onnx"
graph = gs.import_onnx(onnx.load(ONNX_MODEL))
inp = graph.inputs[27] # use_cache_branch
inp.to_constant(np.array([True], dtype=np.bool_))
ONNX_MODEL
onnx.save(gs.export_onnx(graph), "new_model.onnx")
np_true = np.array([True], dtype=np.bool_)
np.save("np_true.npy", np_true)
rknn = RKNN(verbose=True)
RKNN_MODEL=ONNX_MODEL.replace(".onnx",".rknn")
DATASET="dataset.txt"
QUANTIZE=False
# [batch_size, encoder_seq_len, 768],
# [batch_size, decoder_seq_len, 768]]
input_shapes =[[
[batch_size, encoder_seq_len, 768],
[batch_size, decoder_seq_len, 768]] for encoder_seq_len in encoder_seq_len_list]
# pre-process config
print('--> Config model')
rknn.config(quantized_algorithm='normal', quantized_method='channel', target_platform='rk3588', optimization_level=3,
dynamic_input=input_shapes)
print('done')
# Load ONNX model
print('--> Loading model')
ret = rknn.load_onnx(model=ONNX_MODEL,
)
if ret != 0:
print('Load model failed!')
exit(ret)
print('done')
# Build model
print('--> Building model')
ret = rknn.build(do_quantization=QUANTIZE, dataset=DATASET, rknn_batch_size=None)
if ret != 0:
print('Build model failed!')
exit(ret)
print('done')
#export
print('--> Export RKNN model')
ret = rknn.export_rknn(RKNN_MODEL)
if ret != 0:
print('Export RKNN model failed!')
exit(ret)
print('done')
def convert_encoder():
rknn = RKNN(verbose=True)
ONNX_MODEL="encoder_model.onnx"
RKNN_MODEL=ONNX_MODEL.replace(".onnx",".rknn")
DATASET="dataset.txt"
QUANTIZE=False
input_shapes = [[[batch_size, encoder_seq_len, 768], [batch_size, encoder_seq_len]] for encoder_seq_len in encoder_seq_len_list]
# pre-process config
print('--> Config model')
rknn.config(quantized_algorithm='normal', quantized_method='channel', target_platform='rk3588', optimization_level=3, dynamic_input=input_shapes)
print('done')
# Load ONNX model
print('--> Loading model')
ret = rknn.load_onnx(model=ONNX_MODEL
)
if ret != 0:
print('Load model failed!')
exit(ret)
print('done')
# Build model
print('--> Building model')
ret = rknn.build(do_quantization=QUANTIZE, dataset=DATASET, rknn_batch_size=None)
if ret != 0:
print('Build model failed!')
exit(ret)
print('done')
# Export RKNN model
print('--> Export RKNN model')
ret = rknn.export_rknn(RKNN_MODEL)
if ret != 0:
print('Export RKNN model failed!')
exit(ret)
print('done')
def convert_vision():
ONNX_MODEL="vision_encoder.onnx"
DATASET="dataset.txt"
QUANTIZE=False
global batch_size
##### Build stage 1
from rknn.api import RKNN
rknn = RKNN(verbose=True)
ONNX_MODEL="vision_encoder.onnx"
RKNN_MODEL=ONNX_MODEL.replace(".onnx",".rknn")
DATASET="dataset.txt"
QUANTIZE=False
# pre-process config
print('--> Config model')
rknn.config(quantized_algorithm='normal', quantized_method='channel', target_platform='rk3588', optimization_level=3)
print('done')
# Load ONNX model
print('--> Loading model')
ret = rknn.load_onnx(model=ONNX_MODEL,
inputs=["pixel_values"],
input_size_list=[[batch_size, 3, 64, 64]],
)
if ret != 0:
print('Load model failed!')
exit(ret)
print('done')
print('--> Building model stage 1')
ret = rknn.build(do_quantization=QUANTIZE, dataset=DATASET, rknn_batch_size=None)
if ret != 0:
print('Build model failed!')
exit(ret)
print('done')
print("Build stage 1 done")
del rknn
intermidiate_model = onnx.load("check3_fuse_ops.onnx")
# fuse ops
from onnxscript.rewriter import pattern
import onnx.numpy_helper as onh
import numpy as np
def tp_rs_tp_rs_tp_pattern(op, input1, perm1, shape2, perm3, shape4, perm5):
i1 = op.Transpose(input1, perm=perm1)
i2 = op.Reshape(i1, shape2)
i3 = op.Transpose(i2, perm=perm3)
i4 = op.Reshape(i3, shape4)
i5 = op.Transpose(i4, perm=perm5)
return i5
def fused_pattern(op, input1, perm1, shape2, perm3, shape4, perm5):
rs1_shape = op.Constant(value=onh.from_array(np.array([input1.shape[0]* 3, input1.shape[1]//3, input1.shape[2], input1.shape[3]], dtype=np.int64)))
fi1 = op.Reshape(input1, rs1_shape)
fi2 = op.Transpose(fi1, perm=[0, 2, 1, 3])
elems = input1.shape[0] * input1.shape[1] * input1.shape[2] * input1.shape[3]
rs4_shape = op.Constant(value=onh.from_array(np.array([elems / 32 / 144, 32, 1, 144], dtype=np.int64)))
fi3 = op.Reshape(fi2, rs4_shape)
return fi3
rewrite_rule = pattern.RewriteRule(tp_rs_tp_rs_tp_pattern, fused_pattern)
rewrite_rule_set = pattern.RewriteRuleSet([rewrite_rule],commute=True)
fused_model = onnxscript.rewriter.rewrite(
intermidiate_model,
pattern_rewrite_rules=rewrite_rule_set
)
onnx.save(fused_model, "vision_encoder_optimized.onnx")
ONNX_MODEL = "vision_encoder_optimized.onnx"
# RKNN_MODEL=ONNX_MODEL.replace(".onnx",".rknn")
del intermidiate_model
del fused_model
rknn = RKNN(verbose=True)
# pre-process config
print('--> Config model')
rknn.config(quantized_algorithm='normal', quantized_method='channel', target_platform='rk3588', optimization_level=3)
print('done')
# Load ONNX model
print('--> Loading model')
ret = rknn.load_onnx(model=ONNX_MODEL)
if ret != 0:
print('Load model failed!')
exit(ret)
print('done')
# Build model
print('--> Building model stage 2')
ret = rknn.build(do_quantization=QUANTIZE, dataset=DATASET, rknn_batch_size=None)
if ret != 0:
print('Build model failed!')
exit(ret)
print('done')
# Export RKNN model
print('--> Export RKNN model')
ret = rknn.export_rknn(RKNN_MODEL)
if ret != 0:
print('Export RKNN model failed!')
exit(ret)
print('done')
os.remove("vision_encoder_optimized.onnx")
def check_vision_model():
rknn = RKNN(verbose=True)
ONNX_MODEL="vision_encoder.onnx"
RKNN_MODEL=ONNX_MODEL.replace(".onnx",".rknn")
DATASET="dataset.txt"
QUANTIZE=False
# pre-process config
print('--> Config model')
rknn.config(quantized_algorithm='normal', quantized_method='channel', target_platform='rk3588', optimization_level=3)
print('done')
# Load ONNX model
print('--> Loading model')
ret = rknn.load_onnx(model=ONNX_MODEL,
inputs=["pixel_values"],
input_size_list=[[batch_size, 3, vision_size[0], vision_size[1]]],
)
if ret != 0:
print('Load model failed!')
exit(ret)
print('done')
# Build model
print('--> Building model')
ret = rknn.build(do_quantization=QUANTIZE, dataset=DATASET, rknn_batch_size=None)
if ret != 0:
print('Build model failed!')
exit(ret)
print('done')
# Export RKNN model
print('--> Export RKNN model')
ret = rknn.export_rknn(RKNN_MODEL)
if ret != 0:
print('Export RKNN model failed!')
exit(ret)
print('done')
#init runtime
print('--> Init runtime environment')
ret = rknn.init_runtime(target='rk3588')
if ret != 0:
print('Init runtime environment failed!')
exit(ret)
print('done')
#precision check
print('--> Precision check')
ret = rknn.accuracy_analysis(inputs=["lena.png"], target='rk3588')
if ret != 0:
print('Precision check failed!')
exit(ret)
print('done')
import argparse
# python convert.py <decoder|encoder|vision|all>
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("model", type=str, help="Model to convert")
parser.add_argument("--check", action="store_true", help="Check model")
args = parser.parse_args()
if args.model == "decoder":
convert_decoder()
elif args.model == "encoder":
convert_encoder()
# elif args.model == "embed": # embed is faster with cpu
# convert_embed()
elif args.model == "vision":
if args.check:
check_vision_model()
else:
convert_vision()
elif args.model == "all":
convert_decoder()
convert_encoder()
# convert_embed()
convert_vision()
else:
print("Invalid model")
exit(1)