Kaggle中常用的调试脚本

5/6/2022 kaggle

# 动态执行语句

注意这个语句不可以写在其他文件中

exec(compile(open("动态执行的代码", "rb").read(), "tmp.py", 'exec'))
1

# Imports

import os
import warnings
from pprint import pprint
from tqdm import tqdm

import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torchvision.transforms as T

from torchvision.io import read_image
from torch.utils.data import DataLoader
from datasets import Dataset,DatasetDict

import tensorflow as tf
import gc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# v2ray代理

用于解决huggingface,wandb等网络问题。Idea系列的代理等也是这个端口。Proxychains系列是Sock5 10808端口。

os.environ['http_proxy'] = "http://127.0.0.1:10809" 
os.environ['https_proxy'] = "http://127.0.0.1:10809"
1
2

# g-Config 常用函数

from __future__ import print_function
import inspect, re,os,pickle

from contextlib import contextmanager
import time,re

os.environ["WANDB__SERVICE_WAIT"] = "300"
# os.environ["WANDB_DISABLED"] = "true"

class g: 
    debug=False
    # debug=True
    
    # d1 = False
    d1 = True # explanation -> 
    ''' fast start '''

    # d2 = False
    d2 = True # explanation -> 
    ''' '''

    # d3 = False
    d3 = True # explanation -> 
    ''' '''

    # d4 = False
    d4 = True # explanation -> 
    ''' '''

    # d5 = False
    d5 = True # explanation -> 
    ''' '''
    
    # d6 = False
    d6 = True # explanation -> 
    ''' '''

    # d7 = False
    d7 = True # explanation -> 
    ''' '''

    # d8 = False
    d8 = True # explanation -> 
    ''' '''

    # d9 = False
    d9 = True # explanation -> 
    ''' '''

    # d10 = False
    d10 = True # explanation -> 
    ''' '''

    # d11 = False
    d11 = True # explanation -> 
    ''' '''

    # d12 = False
    d12 = True # explanation -> 
    ''' '''
    
    # d13 = False
    d13 = True # explanation -> 
    ''' '''

    # d14 = False
    d14 = True # explanation -> 
    ''' '''

    # d15 = False
    d15 = True # explanation -> 
    ''' '''

    # d16 = False
    d16 = True # explanation -> 
    ''' '''

    # d17 = False
    d17 = True # explanation -> 
    ''' '''

    # d18 = False
    d18 = True # explanation -> 
    ''' '''

    # d19 = False
    d19 = True # explanation -> 
    ''' '''
    
    # d20 = False
    d20 = True # explanation -> 
    ''' '''

    # d21 = False
    d21 = True # explanation -> 
    ''' '''
    
    if not debug:
        d1,d2,d3,d4,d5,d6,d7,d8,d9,d10,d11,d12,d13,d14,d15,d16,d17,d18,d19,d20,d21 = False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False,False    
        
    seed = 42
    
import uuid

class EnvHelper:
    def new_name(self):
        return uuid.uuid4().__str__().replace('-','_')
    def __init__(self):
        self.data = {}
    def save(self,*names):
        commands = []
        for n in names:
            commands.append(f'global {n}')
            if (not self.data) or not self.data.__contains__(n):
                print(n)
                self.data[n] = []
        for n in names:
            new_name = n + self.new_name()
            self.data[n].append(new_name)
            commands.append(f'global {new_name}')
            commands.append(f'{new_name} = {n}')
            commands.append(f'{n} = None')
        exec(compile('\n'.join(commands), "tmp.py", 'exec'))
    def restore(self,*names):
        commands = []
        for n in names:
            commands.append(f'global {n}')
            if not self.data.__contains__(n):
                raise f"{n} not saved !"
        for n in names:
            shadow_name = self.data[n].pop()
            commands.append(f'global {shadow_name}')
            commands.append(f'{n} = {shadow_name}')
        exec(compile('\n'.join(commands), "tmp.py", 'exec'))
    
def dbg(*args, **kwargs):
#     return
    print(*args, **kwargs)

class NamePrinter:
  def __init__(self,funcname,print_fun = print,argprint_lambda = lambda x: x):
    self.funcname = funcname
    self.print_fun = print_fun
    self.argprint_lambda = argprint_lambda
  def adb(self,p=''):
#      return 
      funcname= self.funcname
      argument_real_name = None
      for line in inspect.getframeinfo(inspect.currentframe().f_back)[3]:
          m = re.search(r'\b%s\s*\(\s*([A-Za-z_][A-Za-z0-9_]*)\s*\)' %funcname, line)
      if m:
          argument_real_name = m.group(1)

      self.print_fun('\n>>>',argument_real_name,f'\n{self.argprint_lambda(p)}',)
      self.print_fun('<<< --------------','\n')

adb = NamePrinter('adb').adb
sdb = NamePrinter('sdb',argprint_lambda=lambda x : x.shape).adb
tdb = NamePrinter('tdb',argprint_lambda=lambda x : (time.sleep(1),x)[1]).adb

if locals().get('get_ipython',None):

    from IPython.core.magic import Magics, magics_class, line_magic,cell_magic

    @magics_class
    class MyMagics(Magics):

        @cell_magic
        def loop(self, line, cell):
            # get cmagic args
            args = line.split(' ')
            for i in range(int(args[0])):
                print('>>> loop',i+1,'of',args[0])
                self.shell.run_cell(cell, store_history=False)
                
    get_ipython().register_magics(MyMagics)


class classproperty(property):
    def __get__(self, cls, owner):
        return classmethod(self.fget).__get__(None, owner)()
    
    
def monkeypatch_method_to_class(cls): #为torch tensor等挂载一些函数
    def decorator(func):
        setattr(cls, func.__name__, func)
        return func
    return decorator


# 时间测试工具
@contextmanager
def timed(label="NoLabel",enabled = True):
    start = time.time()  # Setup - __enter__
    if enabled:
        print(f"[{label}] time benchmark started")
    try:
        yield  # yield to body of `with` statement
    finally:  # Teardown - __exit__
        if enabled:
            end = time.time()
            print(f"[{label}] used {end - start} s")

def write_obj(path, obj3):
    obj3 = pickle.dumps(obj3)
    with open(path, 'wb')as f:
        f.write(obj3)

def read_obj(path,default=''):
    if not os.path.exists(path):
        return default
    f = open(path, "rb")
    obj = default
    try:
        obj = pickle.load(f)
    except:
        obj = default
    f.close()
    return obj

class HitTest:
    def __init__(self,n,enable=True) -> None:
        self.count = 0
        self.n = n
        self.enable = enable
    def hit_test(self):
        if not self.enable: return False
        self.count += 1
        if self.count % self.n == 0:
            return True
        else:
            return False      
        
def iv(func,*args,**kwargs):
#     print(f'{func.__name__}')
    return func(*args,**kwargs)


def parse_args(kv_spliter = ':',default_args = {},from_string=None,override_default=True,type_template=None):
    import sys 
    if from_string:
        argv = from_string.split()
    else:
        argv = sys.argv
    type_template = default_args if type_template is None else type_template
    cmd = ' '.join(argv)
    version_matches = re.findall(r'[\-\d]+(?=\.py$)',argv[0])
    version_code = version_matches[0] if version_matches else 0
    rt = {**{'version_code':version_code,'cmd':cmd},**default_args}
    for i in range(len(argv)):
        if kv_spliter not in argv[i]:
            continue
        else:
            k,v = argv[i].split(kv_spliter)
            if k in {**type_template,**default_args}:
                if k in default_args:
                    if override_default:
                        if v in ['True','False']: v = eval(v)
                        rt[k] = type(default_args[k])(v)
                    else:
                        rt[k] = default_args[k]
                elif k in type_template: # 仅仅在type_template中出现的参数
                    if v in ['True','False']: v = eval(v)
                    rt[k] = type(type_template[k])(v)
            else:
                rt[k] = v# 默认普通string类型
    print('[args received]',rt)
    return rt



def build_default_args():
    # write your args here
	
    # then paste this in your CFG
    '''
    ***no dependency variables***
    
    for __ in ARGUMENTS:
        exec(f'{__} = ARGUMENTS["{__}"]')
        print(__,'-->',ARGUMENTS[__])
        
    ***dependency variables***
    '''    
    print('sample cmd:',' '.join([f'{k}:{v}' for k,v in locals().items() ]))
    return locals().copy()
ARGUMENTS = parse_args(default_args=\
                    build_default_args())


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290

# 需要pip安装的工具

!pip install py-heat-magic 

1
2

# colabcode远程连接

import time
from threading import Thread
import os,re
! pip install git+https://github.com/kuro7766/colabcode.git
!ngrok authtoken 27xqdUZec8gJCpJ2g8maHKgQAuA_6uT52UntAv25GP48JzA4?
from colabcode import ColabCode
import random
def a():
    ColabCode(port=random.randint(10000,12000),lab=True)
Thread(target=a,name='a').start()
import pickle,re
while not os.path.exists("_ng_url.pkl"):
    time.sleep(1)
    print('waiting for url')
with open("_ng_url.pkl", "rb") as f:
    url = pickle.load(f)
    print('>>> url')
    print(re.findall('https://.*.ngrok.io',str(url))[0]+'/?token=123456')
os.remove('_ng_url.pkl')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# Numpy拼接

import numpy as np


class GrowableNumpyArray:

    def __init__(self, dtype=np.float, grow_speed=4):
        self.data = np.zeros((100,), dtype=dtype)
        self.capacity = 100
        self.size = 0
        self.grow_speed = grow_speed

    def update(self, row):
        for r in row:
            self.add(r)

    def add(self, x):
        if self.size == self.capacity:
            self.capacity *= self.grow_speed
            newdata = np.zeros((self.capacity,))
            newdata[:self.size] = self.data
            self.data = newdata

        self.data[self.size] = x
        self.size += 1

    def finalize(self):
        data = self.data[:self.size]
        return data

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 可以计数的上下文

import sys
import inspect

class CounterExec(object):
    counter = 0
    
    def __init__(self,enabled=False,every=50):
        """
        if mode = 0, proceed as normal
        if mode = 1, may do not execute block
        """
        self.mode=enabled
        self.every = every
    def __enter__(self):
        self.__class__.counter += 1
        
        exec_flag = False
        if self.mode == 1:
            if self.__class__.counter%self.every==0:
                exec_flag = True
        elif self.mode ==0:
            exec_flag=True
            
        if exec_flag:
            pass
        else:
            print('Skipping Context ... ')
            sys.settrace(lambda *args, **keys: None)
            frame = sys._getframe(1)
            frame.f_trace = self.trace
            return 'SET BY TRICKY CONTEXT MANAGER!!'
    def trace(self, frame, event, arg):
        raise
    def __exit__(self, type, value, traceback):
        print('Exiting context ...')
        return True
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 随机数&可复现性

def set_seed(seed = 42):
    '''Sets the seed of the entire notebook so results are the same every time we run.
    This is for REPRODUCIBILITY.'''
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    # When running on the CuDNN backend, two further options must be set
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    # Set a fixed value for the hash seed
    os.environ['PYTHONHASHSEED'] = str(seed)
    print('> SEEDING DONE')
    
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 调参用

def param_picker(dict_of_list,constrant = lambda c: True):
    retry = 0
    while True:
        params = {}
        import random
        for key in dict_of_list.keys():
            lst = dict_of_list[key]
            elem = random.choice(lst)
            params[key] = elem
        if constrant(params):
            return params
        retry += 1
        
        assert retry < 1000,f'param_picker retry too many times with failure'
{**param_picker({'a':[1,2,3],'b':[4,5,6]}),**{'ext_dict':10}}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# Kaggle初始化&文件上传

!pip install kaggle
!mkdir .kaggle
!mkdir ~/.kaggle/
import json
token = {"username":"galegale05","key":"keykey"}
with open('kaggle.json', 'w') as file:
    json.dump(token, file)
!cp kaggle.json ~/.kaggle/kaggle.json
!chmod 600 /root/.kaggle/kaggle.json
!kaggle config set -n path -v ./autodl-tmp
!kaggle datasets list
1
2
3
4
5
6
7
8
9
10
11
upd = 'path-to-dir'
!kaggle datasets init -p {upd}
with open(f'{upd}/dataset-metadata.json','w') as f:
  f.write('''
  {
    "title": "%s",
    "id": "galegale05/%s",
    "licenses": [
      {
        "name": "CC0-1.0"
      }
    ]
  }
  ''' % (upd,upd))

!cat {upd}/dataset-metadata.json
!kaggle datasets create -p {upd} --dir-mode zip
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

1
最后更新时间: 3/7/2024, 1:47:24 AM