티스토리 뷰
import os
import numpy as np
import torch
from torch.utils.data import DataLoader
from .dataset import Dataset
from .models import EdgeModel, InpaintingModel
from .utils import Progbar, create_dir, stitch_images, imsave
from .metrics import PSNR, EdgeAccuracy
class EdgeConnect():
def __init__(self, config):
self.config = config
# config.MODEL에 따라 어떤 것을 수행할지 결정.
if config.MODEL == 1:
model_name = 'edge'
elif config.MODEL == 2:
model_name = 'inpaint'
elif config.MODEL == 3:
model_name = 'edge_inpaint'
elif config.MODEL == 4:
model_name = 'joint'
self.debug = False # 디버그 모드 비활성화
self.model_name = model_name # 인스턴스 변수로 설정
self.edge_model = EdgeModel(config).to(config.DEVICE) # 엣지 모델 생성
self.inpaint_model = InpaintingModel(config).to(config.DEVICE) # 인페인팅 모델 생성
self.psnr = PSNR(255.0).to(config.DEVICE) # 메트릭 생성
self.edgeacc = EdgeAccuracy(config.EDGE_THRESHOLD).to(config.DEVICE) # 메트릭 생성
# test mode
if self.config.MODE == 2: # 테스트 데이터셋 불러오기
self.test_dataset = Dataset(config, config.TEST_FLIST, config.TEST_EDGE_FLIST, config.TEST_MASK_FLIST, augment=False, training=False)
else: # 데이터셋 불러오기
self.train_dataset = Dataset(config, config.TRAIN_FLIST, config.TRAIN_EDGE_FLIST, config.TRAIN_MASK_FLIST, augment=True, training=True)
self.val_dataset = Dataset(config, config.VAL_FLIST, config.VAL_EDGE_FLIST, config.VAL_MASK_FLIST, augment=False, training=True)
# 검증 데이터에 대한 반복자 생성 (이건 밑에 sample()함수에서 사용됨)
self.sample_iterator = self.val_dataset.create_iterator(config.SAMPLE_SIZE)
self.samples_path = os.path.join(config.PATH, 'samples') # 샘플 이미지를 저장할 경로 지정
self.results_path = os.path.join(config.PATH, 'results') # 실험 결과를 저장할 경로 지정
if config.RESULTS is not None: # 사용자 정의 경로가 주어진 경우
self.results_path = os.path.join(config.RESULTS) # 그 곳으로 디렉토리 지정
if config.DEBUG is not None and config.DEBUG != 0: # 디버그 모드가 활성화된 경우
self.debug = True # 해당 변수를 True로 설정
self.log_file = os.path.join(config.PATH, 'log_' + model_name + '.dat')
# 모델 가중치 업데이트 하는 함수
def load(self):
if self.config.MODEL == 1:
self.edge_model.load() # 엣지 모델의 가중치만 로드
elif self.config.MODEL == 2:
self.inpaint_model.load() # 인페인팅 모델의 가중치만 로드
else:
self.edge_model.load() # 에지, 인페인팅 모델 가중치 모두 로드
self.inpaint_model.load()
def save(self):
if self.config.MODEL == 1:
self.edge_model.save() # 엣지 모델 세이브
elif self.config.MODEL == 2 or self.config.MODEL == 3:
self.inpaint_model.save() # 인페인트 모델 세이브
else: # 모두 세이브
self.edge_model.save()
self.inpaint_model.save()
# 모델 훈련을 돌리는 함수
def train(self):
train_loader = DataLoader(
dataset=self.train_dataset,
batch_size=self.config.BATCH_SIZE,
num_workers=4,
drop_last=True,
shuffle=True
)
epoch = 0
keep_training = True
model = self.config.MODEL
max_iteration = int(float((self.config.MAX_ITERS)))
total = len(self.train_dataset) # 전체 훈련 데이터 개수
print("max_iteration: ", max_iteration)
if total == 0:
print('No training data was provided! Check \'TRAIN_FLIST\' value in the configuration file.')
return
while(keep_training):
epoch += 1
print('\n\nTraining epoch: %d' % epoch)
progbar = Progbar(total, width=20, stateful_metrics=['epoch', 'iter'])
for items in train_loader: # 배치 단위로 데이터 가져오기
self.edge_model.train() # 에지 모델을 훈련 모드로 설정
self.inpaint_model.train() # 인페인트 모델을 훈련 모드로 설정
images, images_gray, edges, masks = self.cuda(*items) # items는 dataset의 __getitem__() 메서드에 의해 이미지, 흑백 이미지, 엣지, 마스크를 텐서로 불러온다.
# edge model
if model == 1:
# train
outputs, gen_loss, dis_loss, logs = self.edge_model.process(images_gray, edges, masks)
# metrics
precision, recall = self.edgeacc(edges * masks, outputs * masks)
logs.append(('precision', precision.item()))
logs.append(('recall', recall.item()))
# backward
self.edge_model.backward(gen_loss, dis_loss)
iteration = self.edge_model.iteration
# inpaint model
elif model == 2:
# train
outputs, gen_loss, dis_loss, logs = self.inpaint_model.process(images, edges, masks)
outputs_merged = (outputs * masks) + (images * (1 - masks))
# metrics
psnr = self.psnr(self.postprocess(images), self.postprocess(outputs_merged))
mae = (torch.sum(torch.abs(images - outputs_merged)) / torch.sum(images)).float()
logs.append(('psnr', psnr.item()))
logs.append(('mae', mae.item()))
# backward
self.inpaint_model.backward(gen_loss, dis_loss)
iteration = self.inpaint_model.iteration
# inpaint with edge model
elif model == 3:
# train
if True or np.random.binomial(1, 0.5) > 0: # 이게 항상 참이다 사실.
outputs = self.edge_model(images_gray, edges, masks) # 우선 엣지맵을 생성
outputs = outputs * masks + edges * (1 - masks) # 생성된 엦시를 마스크 된 영역에 적용하고, 나머지 영역은 원래 엣지를 사용.
else: # 이 부분은 고려할 필요가 없음.
outputs = edges
outputs, gen_loss, dis_loss, logs = self.inpaint_model.process(images, outputs.detach(), masks) # edtach는 생성된 에지가 그래디언트 계산에서 제외되록 하는 것.
outputs_merged = (outputs * masks) + (images * (1 - masks)) # 복원된 이미지를 마스크 된 영역에 적용하고, 나머지 영역은 원본 이미지를 사용.
# metrics
psnr = self.psnr(self.postprocess(images), self.postprocess(outputs_merged))
mae = (torch.sum(torch.abs(images - outputs_merged)) / torch.sum(images)).float()
logs.append(('psnr', psnr.item()))
logs.append(('mae', mae.item()))
# backward
self.inpaint_model.backward(gen_loss, dis_loss)
iteration = self.inpaint_model.iteration
# joint model
else:
# train
e_outputs, e_gen_loss, e_dis_loss, e_logs = self.edge_model.process(images_gray, edges, masks)
e_outputs = e_outputs * masks + edges * (1 - masks)
i_outputs, i_gen_loss, i_dis_loss, i_logs = self.inpaint_model.process(images, e_outputs, masks)
outputs_merged = (i_outputs * masks) + (images * (1 - masks))
# metrics
psnr = self.psnr(self.postprocess(images), self.postprocess(outputs_merged))
mae = (torch.sum(torch.abs(images - outputs_merged)) / torch.sum(images)).float()
precision, recall = self.edgeacc(edges * masks, e_outputs * masks)
e_logs.append(('pre', precision.item()))
e_logs.append(('rec', recall.item()))
i_logs.append(('psnr', psnr.item()))
i_logs.append(('mae', mae.item()))
logs = e_logs + i_logs
# backward
self.inpaint_model.backward(i_gen_loss, i_dis_loss)
self.edge_model.backward(e_gen_loss, e_dis_loss)
iteration = self.inpaint_model.iteration
if iteration >= max_iteration:
keep_training = False
break
logs = [
("epoch", epoch),
("iter", iteration),
] + logs
progbar.add(len(images), values=logs if self.config.VERBOSE else [x for x in logs if not x[0].startswith('l_')])
# log model at checkpoints
if self.config.LOG_INTERVAL and iteration % self.config.LOG_INTERVAL == 0:
self.log(logs) # 로그 기록
# sample model at checkpoints
if self.config.SAMPLE_INTERVAL and iteration % self.config.SAMPLE_INTERVAL == 0:
self.sample() # 샘플 저장
# evaluate model at checkpoints
if self.config.EVAL_INTERVAL and iteration % self.config.EVAL_INTERVAL == 0:
print('\nstart eval...\n')
self.eval() # 모델 평가
# save model at checkpoints
if self.config.SAVE_INTERVAL and iteration % self.config.SAVE_INTERVAL == 0:
self.save() # 모델 상태 저장
print('\nEnd training....')
def eval(self):
val_loader = DataLoader(
dataset=self.val_dataset,
batch_size=self.config.BATCH_SIZE,
drop_last=True,
shuffle=True
)
# <train()에만 있던 변수>
# epoch = 0
# keep_training = True
# max_iteration = int(float((self.config.MAX_ITERS)))
model = self.config.MODEL
total = len(self.val_dataset)
self.edge_model.eval()
self.inpaint_model.eval()
progbar = Progbar(total, width=20, stateful_metrics=['it'])
iteration = 0
for items in val_loader:
iteration += 1
images, images_gray, edges, masks = self.cuda(*items)
# edge model
if model == 1:
# eval
outputs, gen_loss, dis_loss, logs = self.edge_model.process(images_gray, edges, masks)
# metrics
precision, recall = self.edgeacc(edges * masks, outputs * masks)
logs.append(('precision', precision.item()))
logs.append(('recall', recall.item()))
# inpaint model
elif model == 2:
# eval
outputs, gen_loss, dis_loss, logs = self.inpaint_model.process(images, edges, masks)
outputs_merged = (outputs * masks) + (images * (1 - masks))
# metrics
psnr = self.psnr(self.postprocess(images), self.postprocess(outputs_merged))
mae = (torch.sum(torch.abs(images - outputs_merged)) / torch.sum(images)).float()
logs.append(('psnr', psnr.item()))
logs.append(('mae', mae.item()))
# inpaint with edge model
elif model == 3:
# eval
outputs = self.edge_model(images_gray, edges, masks)
outputs = outputs * masks + edges * (1 - masks)
outputs, gen_loss, dis_loss, logs = self.inpaint_model.process(images, outputs.detach(), masks)
outputs_merged = (outputs * masks) + (images * (1 - masks))
# metrics
psnr = self.psnr(self.postprocess(images), self.postprocess(outputs_merged))
mae = (torch.sum(torch.abs(images - outputs_merged)) / torch.sum(images)).float()
logs.append(('psnr', psnr.item()))
logs.append(('mae', mae.item()))
# joint model
else:
# eval
e_outputs, e_gen_loss, e_dis_loss, e_logs = self.edge_model.process(images_gray, edges, masks)
e_outputs = e_outputs * masks + edges * (1 - masks)
i_outputs, i_gen_loss, i_dis_loss, i_logs = self.inpaint_model.process(images, e_outputs, masks)
outputs_merged = (i_outputs * masks) + (images * (1 - masks))
# metrics
psnr = self.psnr(self.postprocess(images), self.postprocess(outputs_merged))
mae = (torch.sum(torch.abs(images - outputs_merged)) / torch.sum(images)).float()
precision, recall = self.edgeacc(edges * masks, e_outputs * masks)
e_logs.append(('pre', precision.item()))
e_logs.append(('rec', recall.item()))
i_logs.append(('psnr', psnr.item()))
i_logs.append(('mae', mae.item()))
logs = e_logs + i_logs
logs = [("it", iteration), ] + logs
progbar.add(len(images), values=logs)
def test(self):
self.edge_model.eval()
self.inpaint_model.eval()
model = self.config.MODEL
create_dir(self.results_path)
test_loader = DataLoader(
dataset=self.test_dataset,
batch_size=1,
)
index = 0 # 테스트에서는 랜덤 마스크가 들어가지 않으므로 index를 추가적으로 사용한다.
for items in test_loader:
name = self.test_dataset.load_name(index)
images, images_gray, edges, masks = self.cuda(*items) # 배치 사이즈가 1이므로 images라고 적어도 1개이다.
index += 1
# edge model
if model == 1:
outputs = self.edge_model(images_gray, edges, masks)
outputs_merged = (outputs * masks) + (edges * (1 - masks))
# inpaint model
elif model == 2:
outputs = self.inpaint_model(images, edges, masks)
outputs_merged = (outputs * masks) + (images * (1 - masks))
# inpaint with edge model / joint model
else:
edges = self.edge_model(images_gray, edges, masks).detach() # 결과물 텐서를 다른 모델에 집어넣을 때는 항상 detach() 한다.
outputs = self.inpaint_model(images, edges, masks)
outputs_merged = (outputs * masks) + (images * (1 - masks))
output = self.postprocess(outputs_merged)[0]
path = os.path.join(self.results_path, name)
print(index, name)
imsave(output, path)
if self.debug:
edges = self.postprocess(1 - edges)[0]
masked = self.postprocess(images * (1 - masks) + masks)[0]
fname, fext = name.split('.')
imsave(edges, os.path.join(self.results_path, fname + '_edge.' + fext))
imsave(masked, os.path.join(self.results_path, fname + '_masked.' + fext))
print('\nEnd test....')
# 검증 데이터셋에서 샘플을 생성하고 저장
def sample(self, it=None): # 선택적으로 반복 횟수 인자인 it를 받을 수 있음.
# do not sample when validation set is empty
if len(self.val_dataset) == 0:
return
self.edge_model.eval() # 참고로 eval은 드롭아웃과 배치 정규화를 평가 모드로 전환
self.inpaint_model.eval()
model = self.config.MODEL
items = next(self.sample_iterator) # 검증 데이터셋의 샘플 반복자에서 다음 샘플을 가져옴.
images, images_gray, edges, masks = self.cuda(*items) # 가져온 샘플을 CUDA 디바이스로 이동
# edge model
if model == 1:
iteration = self.edge_model.iteration
inputs = (images_gray * (1 - masks)) + masks
outputs = self.edge_model(images_gray, edges, masks)
outputs_merged = (outputs * masks) + (edges * (1 - masks))
# inpaint model
elif model == 2:
iteration = self.inpaint_model.iteration
inputs = (images * (1 - masks)) + masks
outputs = self.inpaint_model(images, edges, masks)
outputs_merged = (outputs * masks) + (images * (1 - masks))
# inpaint with edge model / joint model
else:
iteration = self.inpaint_model.iteration
inputs = (images * (1 - masks)) + masks
outputs = self.edge_model(images_gray, edges, masks).detach()
edges = (outputs * masks + edges * (1 - masks)).detach()
outputs = self.inpaint_model(images, edges, masks)
outputs_merged = (outputs * masks) + (images * (1 - masks))
if it is not None:
iteration = it
image_per_row = 2
if self.config.SAMPLE_SIZE <= 6:
image_per_row = 1
images = stitch_images(
self.postprocess(images),
self.postprocess(inputs),
self.postprocess(edges),
self.postprocess(outputs),
self.postprocess(outputs_merged),
img_per_row = image_per_row
)
path = os.path.join(self.samples_path, self.model_name)
name = os.path.join(path, str(iteration).zfill(5) + ".png")
create_dir(path)
print('\nsaving sample ' + name)
images.save(name)
def log(self, logs): # 로그 데이터를 파일에 기록
with open(self.log_file, 'a') as f:
f.write('%s\n' % ' '.join([str(item[1]) for item in logs]))
def cuda(self, *args): # 입력된 아이템들을 CUDA 디바이스로 이동
return (item.to(self.config.DEVICE) for item in args)
def postprocess(self, img): # 이미지 데이터를 후처리
# [0, 1] => [0, 255]
img = img * 255.0
img = img.permute(0, 2, 3, 1)
return img.int()
'코드 분석 > Edge-Connect' 카테고리의 다른 글
Edge-Connect: 모듈 분석 (0) | 2024.01.30 |
---|---|
Edge-Connect: models.py (1) | 2024.01.27 |
Edge-Connect: dataset.py (0) | 2024.01.25 |
Edge-Connect: utils.py (1) | 2024.01.25 |
Edge-Connect: main.py (0) | 2024.01.25 |