format code to pass CI

Signed-off-by: Neo Xu <neo.xu1990@gmail.com>
This commit is contained in:
Neo Xu 2024-11-19 09:45:04 +08:00 committed by Xiang Xiao
parent 96a3bc2b5c
commit edc410f26f
7 changed files with 428 additions and 197 deletions

View file

@ -1,6 +1,8 @@
############################################################################
# tools/gdb/gcore.py
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The

View file

@ -255,16 +255,14 @@ class Nxlistforentry(gdb.Command):
argv = gdb.string_to_argv(arg)
if len(argv) != 3:
gdb.write("listforentry takes three arguments"
"head, type, member\n")
gdb.write("listforentry takes three arguments" "head, type, member\n")
gdb.write("eg: listforentry &g_list 'struct type' 'node '\n")
return
i = 0
for entry in list_for_each_entry(
gdb.parse_and_eval(argv[0]),
gdb.lookup_type(argv[1]).pointer(),
argv[2]):
gdb.parse_and_eval(argv[0]), gdb.lookup_type(argv[1]).pointer(), argv[2]
):
gdb.write(f"{i}: ({argv[1]} *){entry}\n")
gdb.execute(f"print *({argv[1]} *){entry}")
i += 1

View file

@ -41,22 +41,60 @@ import subprocess
import tempfile
PUNCTUATORS = [
"\[", "\]", "\(", "\)", "\{", "\}", "\?", ";", ",", "~",
"\.\.\.", "\.",
"\-\>", "\-\-", "\-\=", "\-",
"\+\+", "\+\=", "\+",
"\*\=", "\*",
"\!\=", "\!",
"\&\&", "\&\=", "\&",
"\/\=", "\/",
"\%\>", "%:%:", "%:", "%=", "%",
"\^\=", "\^",
"\#\#", "\#",
"\:\>", "\:",
"\|\|", "\|\=", "\|",
"<<=", "<<", "<=", "<:", "<%", "<",
">>=", ">>", ">=", ">",
"\=\=", "\=",
"\[",
"\]",
"\(",
"\)",
"\{",
"\}",
"\?",
";",
",",
"~",
"\.\.\.",
"\.",
"\-\>",
"\-\-",
"\-\=",
"\-",
"\+\+",
"\+\=",
"\+",
"\*\=",
"\*",
"\!\=",
"\!",
"\&\&",
"\&\=",
"\&",
"\/\=",
"\/",
"\%\>",
"%:%:",
"%:",
"%=",
"%",
"\^\=",
"\^",
"\#\#",
"\#",
"\:\>",
"\:",
"\|\|",
"\|\=",
"\|",
"<<=",
"<<",
"<=",
"<:",
"<%",
"<",
">>=",
">>",
">=",
">",
"\=\=",
"\=",
]
@ -92,10 +130,8 @@ def fetch_macro_info(file):
# # os.system(f"readelf -wm {file} > {output}")
process = subprocess.Popen(
f"readelf -wm {file}",
shell=True,
stdout=f1,
stderr=subprocess.STDOUT)
f"readelf -wm {file}", shell=True, stdout=f1, stderr=subprocess.STDOUT
)
process.communicate()
errcode = process.returncode
@ -108,11 +144,12 @@ def fetch_macro_info(file):
p = re.compile(".*macro[ ]*:[ ]*([\S]+\(.*?\)|[\w]+)[ ]*(.*)")
macros = {}
with open(f1.name, 'rb') as f2:
with open(f1.name, "rb") as f2:
for line in f2.readlines():
line = line.decode("utf-8")
if not line.startswith(" DW_MACRO_define") and \
not line.startswith(" DW_MACRO_undef"):
if not line.startswith(" DW_MACRO_define") and not line.startswith(
" DW_MACRO_undef"
):
continue
if not parse_macro(line, macros, p):
@ -123,8 +160,9 @@ def fetch_macro_info(file):
def split_tokens(expr):
p = "(" + "|".join(PUNCTUATORS) + ")"
res = list(filter(lambda e : e != "",
map(lambda e: e.rstrip().lstrip(), re.split(p, expr))))
res = list(
filter(lambda e: e != "", map(lambda e: e.rstrip().lstrip(), re.split(p, expr)))
)
return res

View file

@ -1,6 +1,8 @@
############################################################################
# tools/gdb/memdump.py
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The
@ -81,10 +83,7 @@ def mm_dumpnode(node, count, align, simple, detail, alive):
mm_nodesize(node["size"]),
node["seqno"],
align,
(int)(
charnode
+ gdb.lookup_type("struct mm_allocnode_s").sizeof
),
(int)(charnode + gdb.lookup_type("struct mm_allocnode_s").sizeof),
)
)
@ -92,7 +91,7 @@ def mm_dumpnode(node, count, align, simple, detail, alive):
max = node["backtrace"].type.range()[1]
firstrow = True
for x in range(0, max):
if (int(node["backtrace"][x]) == 0):
if int(node["backtrace"][x]) == 0:
break
if simple:
@ -107,10 +106,13 @@ def mm_dumpnode(node, count, align, simple, detail, alive):
gdb.write(
" [%0#*x] %-20s %s:%d\n"
% (
align, int(node["backtrace"][x]),
node["backtrace"][x].format_string(raw=False, symbols=True, address=False),
align,
int(node["backtrace"][x]),
node["backtrace"][x].format_string(
raw=False, symbols=True, address=False
),
gdb.find_pc_line(node["backtrace"][x]).symtab,
gdb.find_pc_line(node["backtrace"][x]).line
gdb.find_pc_line(node["backtrace"][x]).line,
)
)
@ -121,10 +123,7 @@ def mm_dumpnode(node, count, align, simple, detail, alive):
% (
mm_nodesize(node["size"]),
align,
(int)(
charnode
+ gdb.lookup_type("struct mm_allocnode_s").sizeof
),
(int)(charnode + gdb.lookup_type("struct mm_allocnode_s").sizeof),
)
)
@ -152,7 +151,10 @@ def mempool_realblocksize(pool):
mempool_align = 2 * gdb.lookup_type("size_t").sizeof
if get_symbol_value("CONFIG_MM_BACKTRACE") >= 0:
return align_up(pool["blocksize"] + gdb.lookup_type("struct mempool_backtrace_s").sizeof, mempool_align)
return align_up(
pool["blocksize"] + gdb.lookup_type("struct mempool_backtrace_s").sizeof,
mempool_align,
)
else:
return pool["blocksize"]
@ -162,7 +164,7 @@ def get_backtrace(node):
backtrace_list = []
max = node["backtrace"].type.range()[1]
for x in range(0, max):
if (node["backtrace"][x] != 0):
if node["backtrace"][x] != 0:
backtrace_list.append(int(node["backtrace"][x]))
else:
break
@ -187,14 +189,14 @@ def record_backtrace(node, size, backtrace_dict):
def get_count(element):
return element['count']
return element["count"]
def mempool_foreach(pool):
"""Iterate over all block in a mempool"""
blocksize = mempool_realblocksize(pool)
if (pool["ibase"] != 0):
if pool["ibase"] != 0:
nblk = pool["interruptsize"] / blocksize
while nblk > 0:
bufaddr = gdb.Value(pool["ibase"] + nblk * blocksize + pool["blocksize"])
@ -205,7 +207,9 @@ def mempool_foreach(pool):
entry = sq_queue.get_type().pointer()
for entry in sq_for_every(pool["equeue"], entry):
nblk = (pool["expandsize"] - gdb.lookup_type("sq_entry_t").sizeof) / blocksize
base = gdb.Value(entry).cast(gdb.lookup_type("char").pointer()) - nblk * blocksize
base = (
gdb.Value(entry).cast(gdb.lookup_type("char").pointer()) - nblk * blocksize
)
while nblk > 0:
nblk -= 1
bufaddr = gdb.Value(base + nblk * blocksize + pool["blocksize"])
@ -214,9 +218,7 @@ def mempool_foreach(pool):
def mempool_dumpbuf(buf, blksize, count, align, simple, detail, alive):
charnode = gdb.Value(buf).cast(
gdb.lookup_type("char").pointer()
)
charnode = gdb.Value(buf).cast(gdb.lookup_type("char").pointer())
if not alive:
# if pid is not alive put a red asterisk.
@ -240,7 +242,7 @@ def mempool_dumpbuf(buf, blksize, count, align, simple, detail, alive):
max = buf["backtrace"].type.range()[1]
firstrow = True
for x in range(0, max):
if (buf["backtrace"][x] == 0):
if buf["backtrace"][x] == 0:
break
if simple:
@ -255,10 +257,13 @@ def mempool_dumpbuf(buf, blksize, count, align, simple, detail, alive):
gdb.write(
" [%0#*x] %-20s %s:%d\n"
% (
align, int(buf["backtrace"][x]),
buf["backtrace"][x].format_string(raw=False, symbols=True, address=False),
align,
int(buf["backtrace"][x]),
buf["backtrace"][x].format_string(
raw=False, symbols=True, address=False
),
gdb.find_pc_line(buf["backtrace"][x]).symtab,
gdb.find_pc_line(buf["backtrace"][x]).line
gdb.find_pc_line(buf["backtrace"][x]).line,
)
)
@ -291,26 +296,50 @@ class Nxmemdump(gdb.Command):
self.uordblks += pool["blocksize"]
else:
for buf in mempool_foreach(pool):
if (pid == buf["pid"] or pid == PID_MM_ALLOC) and (
buf["seqno"] >= seqmin and buf["seqno"] < seqmax
) and buf["magic"] == MEMPOOL_MAGIC_ALLOC:
if (
(pid == buf["pid"] or pid == PID_MM_ALLOC)
and (buf["seqno"] >= seqmin and buf["seqno"] < seqmax)
and buf["magic"] == MEMPOOL_MAGIC_ALLOC
):
charnode = gdb.Value(buf).cast(
gdb.lookup_type("char").pointer()
)
if detail:
mempool_dumpbuf(buf, pool["blocksize"], 1, self.align, simple, detail,
self.check_alive(buf["pid"]))
mempool_dumpbuf(
buf,
pool["blocksize"],
1,
self.align,
simple,
detail,
self.check_alive(buf["pid"]),
)
else:
self.backtrace_dict = record_backtrace(buf, pool["blocksize"], self.backtrace_dict)
if address \
and (address < int(charnode)
and address >= (int)(charnode - pool["blocksize"])):
self.backtrace_dict = record_backtrace(
buf, pool["blocksize"], self.backtrace_dict
)
if address and (
address < int(charnode)
and address >= (int)(charnode - pool["blocksize"])
):
mempool_dumpbuf(buf, pool["blocksize"], 1, self.align, simple, detail,
self.check_alive(buf["pid"]))
gdb.write("\nThe address 0x%x found belongs to"
"the mempool node with base address 0x%x\n" % (address, charnode))
print_node = "p *(struct mempool_backtrace_s *)0x%x" % (charnode)
mempool_dumpbuf(
buf,
pool["blocksize"],
1,
self.align,
simple,
detail,
self.check_alive(buf["pid"]),
)
gdb.write(
"\nThe address 0x%x found belongs to"
"the mempool node with base address 0x%x\n"
% (address, charnode)
)
print_node = "p *(struct mempool_backtrace_s *)0x%x" % (
charnode
)
gdb.write(print_node + "\n")
gdb.execute(print_node)
return True
@ -321,7 +350,9 @@ class Nxmemdump(gdb.Command):
def memdump(self, pid, seqmin, seqmax, address, simple, detail):
"""Dump the heap memory"""
if pid >= PID_MM_ALLOC:
gdb.write("Dump all used memory node info, use '\x1b[33;1m*\x1b[m' mark pid is not exist:\n")
gdb.write(
"Dump all used memory node info, use '\x1b[33;1m*\x1b[m' mark pid is not exist:\n"
)
if not detail:
gdb.write("%6s" % ("CNT"))
@ -335,7 +366,9 @@ class Nxmemdump(gdb.Command):
heap = gdb.parse_and_eval("g_mmheap")
if heap.type.has_key("mm_mpool"):
if self.mempool_dump(heap["mm_mpool"], pid, seqmin, seqmax, address, simple, detail):
if self.mempool_dump(
heap["mm_mpool"], pid, seqmin, seqmax, address, simple, detail
):
return
for node in mm_foreach(heap):
@ -345,16 +378,40 @@ class Nxmemdump(gdb.Command):
or (pid == PID_MM_ALLOC and node["pid"] != PID_MM_MEMPOOL)
) and (node["seqno"] >= seqmin and node["seqno"] < seqmax):
if detail:
mm_dumpnode(node, 1, self.align, simple, detail, self.check_alive(node["pid"]))
mm_dumpnode(
node,
1,
self.align,
simple,
detail,
self.check_alive(node["pid"]),
)
else:
self.backtrace_dict = record_backtrace(node, mm_nodesize(node["size"]), self.backtrace_dict)
self.backtrace_dict = record_backtrace(
node, mm_nodesize(node["size"]), self.backtrace_dict
)
charnode = gdb.Value(node).cast(gdb.lookup_type("char").pointer())
if address and (address < int(charnode + node["size"])
and address >= (int)(charnode + gdb.lookup_type("struct mm_allocnode_s").sizeof)):
mm_dumpnode(node, 1, self.align, simple, detail, self.check_alive(node["pid"]))
gdb.write("\nThe address 0x%x found belongs to"
"the memory node with base address 0x%x\n" % (address, charnode))
if address and (
address < int(charnode + node["size"])
and address
>= (int)(
charnode + gdb.lookup_type("struct mm_allocnode_s").sizeof
)
):
mm_dumpnode(
node,
1,
self.align,
simple,
detail,
self.check_alive(node["pid"]),
)
gdb.write(
"\nThe address 0x%x found belongs to"
"the memory node with base address 0x%x\n"
% (address, charnode)
)
print_node = "p *(struct mm_allocnode_s *)0x%x" % (charnode)
gdb.write(print_node + "\n")
gdb.execute(print_node)
@ -363,7 +420,14 @@ class Nxmemdump(gdb.Command):
self.uordblks += mm_nodesize(node["size"])
else:
if pid == PID_MM_FREE:
mm_dumpnode(node, 1, self.align, simple, detail, self.check_alive(node["pid"]))
mm_dumpnode(
node,
1,
self.align,
simple,
detail,
self.check_alive(node["pid"]),
)
self.aordblks += 1
self.uordblks += mm_nodesize(node["size"])
@ -374,11 +438,28 @@ class Nxmemdump(gdb.Command):
output.sort(key=get_count, reverse=True)
for node in output:
if node["node"].type == gdb.lookup_type("struct mm_allocnode_s").pointer():
mm_dumpnode(node["node"], node["count"], self.align, simple, detail, self.check_alive(node["pid"]))
if (
node["node"].type
== gdb.lookup_type("struct mm_allocnode_s").pointer()
):
mm_dumpnode(
node["node"],
node["count"],
self.align,
simple,
detail,
self.check_alive(node["pid"]),
)
else:
mempool_dumpbuf(node["node"], node["size"], node["count"], self.align, simple, detail,
self.check_alive(node["pid"]))
mempool_dumpbuf(
node["node"],
node["size"],
node["count"],
self.align,
simple,
detail,
self.check_alive(node["pid"]),
)
gdb.write("%12s%12s\n" % ("Total Blks", "Total Size"))
gdb.write("%12d%12d\n" % (self.aordblks, self.uordblks))
@ -387,31 +468,45 @@ class Nxmemdump(gdb.Command):
return gdb.COMPLETE_SYMBOL
def parse_arguments(self, argv):
parser = argparse.ArgumentParser(description='memdump command')
parser.add_argument('-p', "--pid", type=str, help='Thread PID')
parser.add_argument('-a', "--addr", type=str, help='Query memory address')
parser.add_argument('-i', "--min", type=str, help='Minimum value')
parser.add_argument('-x', "--max", type=str, help='Maximum value')
parser.add_argument('--used', action='store_true', help='Used flag')
parser.add_argument('--free', action='store_true', help='Free flag')
parser.add_argument('-d', '--detail', action='store_true', help='Output details of each node', default=False)
parser.add_argument('-s', '--simple', action='store_true', help='Simplified Output', default=False)
parser = argparse.ArgumentParser(description="memdump command")
parser.add_argument("-p", "--pid", type=str, help="Thread PID")
parser.add_argument("-a", "--addr", type=str, help="Query memory address")
parser.add_argument("-i", "--min", type=str, help="Minimum value")
parser.add_argument("-x", "--max", type=str, help="Maximum value")
parser.add_argument("--used", action="store_true", help="Used flag")
parser.add_argument("--free", action="store_true", help="Free flag")
parser.add_argument(
"-d",
"--detail",
action="store_true",
help="Output details of each node",
default=False,
)
parser.add_argument(
"-s",
"--simple",
action="store_true",
help="Simplified Output",
default=False,
)
if argv[0] == '':
if argv[0] == "":
argv = None
try:
args = parser.parse_args(argv)
except SystemExit:
return None
return {'pid': int(args.pid, 0) if args.pid else None,
'seqmin': int(args.min, 0) if args.min else 0,
'seqmax': int(args.max, 0) if args.max else 0xFFFFFFFF,
'used': args.used,
'free': args.free,
'addr': int(args.addr, 0) if args.addr else None,
'simple': args.simple,
'detail': args.detail}
return {
"pid": int(args.pid, 0) if args.pid else None,
"seqmin": int(args.min, 0) if args.min else 0,
"seqmax": int(args.max, 0) if args.max else 0xFFFFFFFF,
"used": args.used,
"free": args.free,
"addr": int(args.addr, 0) if args.addr else None,
"simple": args.simple,
"detail": args.detail,
}
def invoke(self, args, from_tty):
if gdb.lookup_type("size_t").sizeof == 4:
@ -425,21 +520,23 @@ class Nxmemdump(gdb.Command):
return
pid = PID_MM_ALLOC
if arg['used']:
if arg["used"]:
pid = PID_MM_ALLOC
elif arg['free']:
elif arg["free"]:
pid = PID_MM_FREE
elif arg['pid']:
pid = arg['pid']
elif arg["pid"]:
pid = arg["pid"]
if get_symbol_value("CONFIG_MM_BACKTRACE") <= 0:
arg['detail'] = True
arg["detail"] = True
self.aordblks = 0
self.uordblks = 0
self.backtrace_dict = {}
self.npidhash = gdb.parse_and_eval("g_npidhash")
self.pidhash = gdb.parse_and_eval("g_pidhash")
self.memdump(pid, arg['seqmin'], arg['seqmax'], arg['addr'], arg['simple'], arg['detail'])
self.memdump(
pid, arg["seqmin"], arg["seqmax"], arg["addr"], arg["simple"], arg["detail"]
)
Nxmemdump()

View file

@ -1,6 +1,8 @@
############################################################################
# tools/gdb/stack.py
#
# SPDX-License-Identifier: Apache-2.0
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership. The
@ -44,8 +46,7 @@ class Stack(object):
def _sanity_check(self):
# do some basic sanity checking to make sure we have a sane stack object
if self._stack_base < self._stack_alloc or not self._stack_size:
raise gdb.GdbError(
"Inconsistant stack size...Maybe memory corruption?")
raise gdb.GdbError("Inconsistant stack size...Maybe memory corruption?")
# TODO: check if stack ptr is located at a sane address range!
@ -56,7 +57,8 @@ class Stack(object):
gdb.write("An overflow detected, dumping the stack:\n")
ptr_4bytes = gdb.Value(self._stack_base).cast(
gdb.lookup_type("unsigned int").pointer())
gdb.lookup_type("unsigned int").pointer()
)
for i in range(0, self._stack_size // 4):
if i % 8 == 0:
@ -68,14 +70,18 @@ class Stack(object):
gdb.write("\n")
gdb.write("\n")
raise gdb.GdbError("pls check your stack size! @ {0} sp:{1:x} base:{2:x}".format(
self._thread_name, self._cur_sp, self._stack_base))
raise gdb.GdbError(
"pls check your stack size! @ {0} sp:{1:x} base:{2:x}".format(
self._thread_name, self._cur_sp, self._stack_base
)
)
return usage
def check_max_usage(self):
ptr_4bytes = gdb.Value(self._stack_base).cast(
gdb.lookup_type("unsigned int").pointer())
gdb.lookup_type("unsigned int").pointer()
)
spare = 0
@ -117,8 +123,10 @@ def fetch_stacks():
stacks = dict()
for tcb in utils.get_tcbs():
if tcb["task_state"] == gdb.parse_and_eval("TSTATE_TASK_RUNNING") \
and not utils.in_interrupt_context():
if (
tcb["task_state"] == gdb.parse_and_eval("TSTATE_TASK_RUNNING")
and not utils.in_interrupt_context()
):
sp = utils.get_sp()
else:
sp = utils.get_sp(tcb=tcb)
@ -126,7 +134,7 @@ def fetch_stacks():
try:
stacks[int(tcb["pid"])] = Stack(
tcb["name"].string(),
hex(tcb["entry"]['pthread']), # should use main?
hex(tcb["entry"]["pthread"]), # should use main?
int(tcb["stack_base_ptr"]),
int(tcb["stack_alloc_ptr"]),
int(tcb["adj_stack_size"]),
@ -140,14 +148,16 @@ def fetch_stacks():
return stacks
class StackUsage (gdb.Command):
class StackUsage(gdb.Command):
"""Display the stack usage of each thread, similar to cat /proc/<pid>/stack"""
def __init__(self):
super(StackUsage, self).__init__("stack-usage", gdb.COMMAND_USER)
self._stacks = []
# format template
self._fmt = "{0: <4} | {1: <10} | {2: <10} | {3: <20} | {4: <10} | {5: <10} | {6: <10}"
self._fmt = (
"{0: <4} | {1: <10} | {2: <10} | {3: <20} | {4: <10} | {5: <10} | {6: <10}"
)
def format_print(self, pid, stack):
def gen_info_str(x):
@ -157,12 +167,17 @@ class StackUsage (gdb.Command):
res += "!"
return res
gdb.write(self._fmt.format(
pid, stack._thread_name[:10], stack._thread_entry,
hex(stack._stack_base), stack._stack_size,
gen_info_str(stack.cur_usage()),
gen_info_str(stack.max_usage())
))
gdb.write(
self._fmt.format(
pid,
stack._thread_name[:10],
stack._thread_entry,
hex(stack._stack_base),
stack._stack_size,
gen_info_str(stack.cur_usage()),
gen_info_str(stack.max_usage()),
)
)
gdb.write("\n")
def invoke(self, args, from_tty):
@ -172,8 +187,11 @@ class StackUsage (gdb.Command):
pids = stacks.keys() if len(args) == 0 else args
gdb.write(self._fmt.format('Pid', 'Name', 'Entry',
'Base', 'Size', 'CurUsage', 'MaxUsage'))
gdb.write(
self._fmt.format(
"Pid", "Name", "Entry", "Base", "Size", "CurUsage", "MaxUsage"
)
)
gdb.write("\n")
for pid in pids:

View file

@ -20,10 +20,11 @@
#
############################################################################
from enum import Enum, auto
import gdb
import utils
from stack import Stack
from enum import Enum, auto
UINT16_MAX = 0xFFFF
SEM_TYPE_MUTEX = 4
@ -135,7 +136,10 @@ class Nxinfothreads(gdb.Command):
% ("Index", "Tid", "Pid", "Cpu", "Thread", "Info", "Frame")
)
else:
gdb.write("%-5s %-4s %-4s %-21s %-80s %-30s\n" % ("Index", "Tid", "Pid", "Thread", "Info", "Frame"))
gdb.write(
"%-5s %-4s %-4s %-21s %-80s %-30s\n"
% ("Index", "Tid", "Pid", "Thread", "Info", "Frame")
)
for i in range(0, npidhash):
tcb = pidhash[i]
@ -165,11 +169,14 @@ class Nxinfothreads(gdb.Command):
try:
"""Maybe tcb not have name member, or name is not utf-8"""
info = "(Name: \x1b[31;1m%s\x1b[m, State: %s, Priority: %d, Stack: %d)" % (
tcb["name"].string(),
statename,
tcb["sched_priority"],
tcb["adj_stack_size"],
info = (
"(Name: \x1b[31;1m%s\x1b[m, State: %s, Priority: %d, Stack: %d)"
% (
tcb["name"].string(),
statename,
tcb["sched_priority"],
tcb["adj_stack_size"],
)
)
except gdb.error and UnicodeDecodeError:
info = "(Name: Not utf-8, State: %s, Priority: %d, Stack: %d)" % (
@ -193,10 +200,14 @@ class Nxinfothreads(gdb.Command):
if utils.is_target_smp():
cpu = f"{tcb['cpu']}"
gdb.write(
"%-5s %-4s %-4s %-4s %-21s %-80s %-30s\n" % (index, tid, pid, cpu, thread, info, frame)
"%-5s %-4s %-4s %-4s %-21s %-80s %-30s\n"
% (index, tid, pid, cpu, thread, info, frame)
)
else:
gdb.write("%-5s %-4s %-4s %-21s %-80s %-30s\n" % (index, tid, pid, thread, info, frame))
gdb.write(
"%-5s %-4s %-4s %-21s %-80s %-30s\n"
% (index, tid, pid, thread, info, frame)
)
class Nxthread(gdb.Command):
@ -264,7 +275,9 @@ class Nxthread(gdb.Command):
else:
if arg[0].isnumeric() and pidhash[int(arg[0])] != 0:
if pidhash[int(arg[0])]["task_state"] == gdb.parse_and_eval("TSTATE_TASK_RUNNING"):
if pidhash[int(arg[0])]["task_state"] == gdb.parse_and_eval(
"TSTATE_TASK_RUNNING"
):
restore_regs()
else:
gdb.execute("nxsetregs g_pidhash[%s]->xcp.regs" % arg[0])
@ -289,16 +302,19 @@ class Nxstep(gdb.Command):
restore_regs()
gdb.execute("step")
class TaskType(Enum):
TASK = 0
PTHREAD = 1
KTHREAD = 2
class TaskSchedPolicy(Enum):
FIFO = 0
RR = 1
SPORADIC = 2
class TaskState(Enum):
Invalid = 0
Waiting_Unlock = auto()
@ -309,8 +325,9 @@ class TaskState(Enum):
Inactive = auto()
Waiting_Semaphore = auto()
Waiting_Signal = auto()
if not utils.get_symbol_value("CONFIG_DISABLE_MQUEUE") or \
not utils.get_symbol_value("CONFIG_DISABLE_MQUEUE_SYSV"):
if not utils.get_symbol_value(
"CONFIG_DISABLE_MQUEUE"
) or not utils.get_symbol_value("CONFIG_DISABLE_MQUEUE_SYSV"):
Waiting_MQEmpty = auto()
Waiting_MQFull = auto()
if utils.get_symbol_value("CONFIG_PAGING"):
@ -318,6 +335,7 @@ class TaskState(Enum):
if utils.get_symbol_value("CONFIG_SIG_SIGSTOP_ACTION"):
Stopped = auto()
class Ps(gdb.Command):
def __init__(self):
super(Ps, self).__init__("ps", gdb.COMMAND_USER)
@ -326,57 +344,86 @@ class Ps(gdb.Command):
self._fmt_wx = "{0: >{width}}"
def parse_and_show_info(self, tcb):
get_macro = lambda x : utils.get_symbol_value(x)
eval2str = lambda cls, x : cls(int(x)).name
cast2ptr = lambda x, t : x.cast(gdb.lookup_type(t).pointer())
def get_macro(x):
return utils.get_symbol_value(x)
def eval2str(cls, x):
return cls(int(x)).name
def cast2ptr(x, t):
return x.cast(utils.lookup_type(t).pointer())
pid = int(tcb["pid"])
group = int(tcb["group"]["tg_pid"])
priority = int(tcb["sched_priority"])
policy = eval2str(TaskSchedPolicy, (tcb["flags"] & get_macro("TCB_FLAG_POLICY_MASK")) \
>> get_macro("TCB_FLAG_POLICY_SHIFT"))
policy = eval2str(
TaskSchedPolicy,
(tcb["flags"] & get_macro("TCB_FLAG_POLICY_MASK"))
>> get_macro("TCB_FLAG_POLICY_SHIFT"),
)
task_type = eval2str(TaskType, (tcb["flags"] & get_macro("TCB_FLAG_TTYPE_MASK")) \
>> get_macro("TCB_FLAG_TTYPE_SHIFT"))
task_type = eval2str(
TaskType,
(tcb["flags"] & get_macro("TCB_FLAG_TTYPE_MASK"))
>> get_macro("TCB_FLAG_TTYPE_SHIFT"),
)
npx = 'P' if (tcb["flags"] & get_macro("TCB_FLAG_EXIT_PROCESSING")) else '-'
npx = "P" if (tcb["flags"] & get_macro("TCB_FLAG_EXIT_PROCESSING")) else "-"
waiter = str(int(cast2ptr(tcb["waitobj"], "mutex_t")["holder"])) \
if tcb["waitobj"] and cast2ptr(tcb["waitobj"], "sem_t")["flags"] & get_macro("SEM_TYPE_MUTEX") else ""
state_and_event = eval2str(TaskState, (tcb["task_state"])) + ("@Mutex_Holder: " + waiter if waiter else "")
waiter = (
str(int(cast2ptr(tcb["waitobj"], "mutex_t")["holder"]))
if tcb["waitobj"]
and cast2ptr(tcb["waitobj"], "sem_t")["flags"] & get_macro("SEM_TYPE_MUTEX")
else ""
)
state_and_event = eval2str(TaskState, (tcb["task_state"])) + (
"@Mutex_Holder: " + waiter if waiter else ""
)
state_and_event = state_and_event.split("_")
# Append a null str here so we don't need to worry
# about the number of elements as we only want the first two
state, event = state_and_event if len(state_and_event) > 1 else state_and_event + [""]
state, event = (
state_and_event if len(state_and_event) > 1 else state_and_event + [""]
)
sigmask = "{0:#0{1}x}".format( \
sum([int(tcb["sigprocmask"]['_elem'][i] << i) for i in range(get_macro("_SIGSET_NELEM"))]), \
get_macro("_SIGSET_NELEM") * 8 + 2 \
)[2:] # exclude "0x"
sigmask = "{0:#0{1}x}".format(
sum(
[
int(tcb["sigprocmask"]["_elem"][i] << i)
for i in range(get_macro("_SIGSET_NELEM"))
]
),
get_macro("_SIGSET_NELEM") * 8 + 2,
)[
2:
] # exclude "0x"
st = Stack(
tcb["name"].string(),
hex(tcb["entry"]['pthread']), # should use main?
hex(tcb["entry"]["pthread"]), # should use main?
int(tcb["stack_base_ptr"]),
int(tcb["stack_alloc_ptr"]),
int(tcb["adj_stack_size"]),
utils.get_sp(tcb),
4,)
4,
)
stacksz = st._stack_size
used = st.max_usage()
filled = "{0:.2%}".format(st.max_usage() / st._stack_size)
cpu = tcb['cpu'] if get_macro("CONFIG_SMP") else 0
cpu = tcb["cpu"] if get_macro("CONFIG_SMP") else 0
# For a task we need to display its cmdline arguments, while for a thread we display
# pointers to its entry and argument
cmd = ""
name = tcb['name'].string()
name = tcb["name"].string()
if int(tcb["flags"] & get_macro("TCB_FLAG_TTYPE_MASK")) == int(get_macro("TCB_FLAG_TTYPE_PTHREAD")):
if int(tcb["flags"] & get_macro("TCB_FLAG_TTYPE_MASK")) == int(
get_macro("TCB_FLAG_TTYPE_PTHREAD")
):
entry = tcb["entry"]["main"]
ptcb = cast2ptr(tcb, "struct pthread_tcb_s")
arg = ptcb["arg"]
@ -386,49 +433,82 @@ class Ps(gdb.Command):
args = []
parg = argv
while (parg.dereference()):
while parg.dereference():
args.append(parg.dereference().string())
parg += 1
cmd = " ".join([name] + args)
if not utils.get_symbol_value("CONFIG_SCHED_CPULOAD_NONE"):
load = "{0:.1%}".format(int(tcb['ticks']) / int(gdb.parse_and_eval("g_cpuload_total")))
load = "{0:.1%}".format(
int(tcb["ticks"]) / int(gdb.parse_and_eval("g_cpuload_total"))
)
else:
load = "Dis."
gdb.write(" ".join((
self._fmt_wx.format(pid, width=5), self._fmt_wx.format(group, width=5), \
self._fmt_wx.format(cpu, width=3), \
self._fmt_wx.format(priority, width=3), self._fmt_wxl.format(policy, width=8), self._fmt_wxl.format(task_type, width=7), \
self._fmt_wx.format(npx, width=3), self._fmt_wxl.format(state, width=8), self._fmt_wxl.format(event, width=9), \
self._fmt_wxl.format(sigmask, width=8), \
self._fmt_wx.format(stacksz, width=7), self._fmt_wx.format(used, width=7), self._fmt_wx.format(filled, width=6), \
self._fmt_wx.format(load, width=6), cmd,
)))
gdb.write(
" ".join(
(
self._fmt_wx.format(pid, width=5),
self._fmt_wx.format(group, width=5),
self._fmt_wx.format(cpu, width=3),
self._fmt_wx.format(priority, width=3),
self._fmt_wxl.format(policy, width=8),
self._fmt_wxl.format(task_type, width=7),
self._fmt_wx.format(npx, width=3),
self._fmt_wxl.format(state, width=8),
self._fmt_wxl.format(event, width=9),
self._fmt_wxl.format(sigmask, width=8),
self._fmt_wx.format(stacksz, width=7),
self._fmt_wx.format(used, width=7),
self._fmt_wx.format(filled, width=6),
self._fmt_wx.format(load, width=6),
cmd,
)
)
)
gdb.write("\n")
def invoke(self, args, from_tty):
gdb.write(" ".join((
self._fmt_wx.format("PID", width=5), self._fmt_wx.format("GROUP", width=5), \
self._fmt_wx.format("CPU", width=3), \
self._fmt_wx.format("PRI", width=3), self._fmt_wxl.format("POLICY", width=8), self._fmt_wxl.format("TYPE", width=7), \
self._fmt_wx.format("NPX", width=3), self._fmt_wxl.format("STATE", width=8), self._fmt_wxl.format("EVENT", width=9), \
self._fmt_wxl.format("SIGMASK", width=utils.get_symbol_value("_SIGSET_NELEM") * 8), \
self._fmt_wx.format("STACK", width=7), self._fmt_wx.format("USED", width=7), self._fmt_wx.format("FILLED", width=3), \
self._fmt_wx.format("LOAD", width=6), "COMMAND",
)))
gdb.write(
" ".join(
(
self._fmt_wx.format("PID", width=5),
self._fmt_wx.format("GROUP", width=5),
self._fmt_wx.format("CPU", width=3),
self._fmt_wx.format("PRI", width=3),
self._fmt_wxl.format("POLICY", width=8),
self._fmt_wxl.format("TYPE", width=7),
self._fmt_wx.format("NPX", width=3),
self._fmt_wxl.format("STATE", width=8),
self._fmt_wxl.format("EVENT", width=9),
self._fmt_wxl.format(
"SIGMASK", width=utils.get_symbol_value("_SIGSET_NELEM") * 8
),
self._fmt_wx.format("STACK", width=7),
self._fmt_wx.format("USED", width=7),
self._fmt_wx.format("FILLED", width=3),
self._fmt_wx.format("LOAD", width=6),
"COMMAND",
)
)
)
gdb.write("\n")
for tcb in utils.get_tcbs():
self.parse_and_show_info(tcb)
# We can't use a user command to rename continue it will recursion
gdb.execute("define c\n nxcontinue \n end\n")
gdb.execute("define s\n nxstep \n end\n")
gdb.write("\n\x1b[31;1m if use thread command, please don't use 'continue', use 'c' instead !!!\x1b[m\n")
gdb.write("\x1b[31;1m if use thread command, please don't use 'step', use 's' instead !!!\x1b[m\n")
gdb.write(
"\n\x1b[31;1m if use thread command, please don't use 'continue', use 'c' instead !!!\x1b[m\n"
)
gdb.write(
"\x1b[31;1m if use thread command, please don't use 'step', use 's' instead !!!\x1b[m\n"
)
Nxsetregs()
Nxinfothreads()

View file

@ -21,6 +21,7 @@
############################################################################
import re
import gdb
from macros import fetch_macro_info, try_expand
@ -40,8 +41,7 @@ class CachedType:
if self._type is None:
self._type = gdb.lookup_type(self._name)
if self._type is None:
raise gdb.GdbError(
"cannot resolve type '{0}'".format(self._name))
raise gdb.GdbError("cannot resolve type '{0}'".format(self._name))
if hasattr(gdb, "events") and hasattr(gdb.events, "new_objfile"):
gdb.events.new_objfile.connect(self._new_objfile_handler)
return self._type
@ -60,7 +60,7 @@ class MacroCtx:
"""
def __new__(cls, *args, **kwargs):
if not hasattr(cls, 'instance'):
if not hasattr(cls, "instance"):
cls.instance = super(MacroCtx, cls).__new__(cls)
return cls.instance
@ -117,8 +117,7 @@ class ContainerOf(gdb.Function):
def invoke(self, ptr, typename, elementname):
return container_of(
ptr, gdb.lookup_type(
typename.string()).pointer(), elementname.string()
ptr, gdb.lookup_type(typename.string()).pointer(), elementname.string()
)
@ -151,13 +150,14 @@ def hexdump(address, size):
mem = inf.read_memory(address, size)
bytes = mem.tobytes()
for i in range(0, len(bytes), 16):
chunk = bytes[i:i + 16]
chunk = bytes[i : i + 16]
gdb.write(f"{i + address:08x} ")
hex_values = " ".join(f"{byte:02x}" for byte in chunk)
hex_display = f"{hex_values:<47}"
gdb.write(hex_display)
ascii_values = "".join(chr(byte) if 32 <= byte
<= 126 else "." for byte in chunk)
ascii_values = "".join(
chr(byte) if 32 <= byte <= 126 else "." for byte in chunk
)
gdb.write(f" {ascii_values} \n")
@ -179,7 +179,7 @@ class Hexdump(gdb.Command):
argv = args.split(" ")
address = 0
size = 0
if (argv[0] == ""):
if argv[0] == "":
gdb.write("Usage: hexdump address/symbol <size>\n")
return
@ -187,7 +187,7 @@ class Hexdump(gdb.Command):
address = int(argv[0], 0)
size = int(argv[1], 0)
else:
var = gdb.parse_and_eval(f'{argv[0]}')
var = gdb.parse_and_eval(f"{argv[0]}")
address = int(var.address)
size = int(var.type.sizeof)
gdb.write(f"{argv[0]} {hex(address)} {int(size)}\n")
@ -230,7 +230,7 @@ def read_memoryview(inf, start, length):
def read_u16(buffer, offset):
"""Read a 16-bit unsigned integer from a buffer"""
buffer_val = buffer[offset: offset + 2]
buffer_val = buffer[offset : offset + 2]
value = [0, 0]
if type(buffer_val[0]) is str:
@ -285,8 +285,7 @@ def is_target_arch(arch, exact=False):
if hasattr(gdb.Frame, "architecture"):
archname = gdb.newest_frame().architecture().name()
return arch in archname \
if not exact else arch == archname
return arch in archname if not exact else arch == archname
else:
global target_arch
if target_arch is None:
@ -301,8 +300,7 @@ def is_target_arch(arch, exact=False):
else:
target_arch = candidate
return arch in target_arch \
if not exact else arch == target_arch
return arch in target_arch if not exact else arch == target_arch
# Kernel Specific Helper Functions
@ -322,8 +320,8 @@ def in_interrupt_context(cpuid=0):
frame = gdb.selected_frame()
if is_target_arch("arm"):
xpsr = int(frame.read_register('xpsr'))
return xpsr & 0xf
xpsr = int(frame.read_register("xpsr"))
return xpsr & 0xF
else:
# TODO: figure out a more proper way to detect if
# we are in an interrupt context