背景
![](https://img-blog.csdnimg.cn/img_convert/cdcb957513581d73943e044cd887e44d.png)
今天的话题会偏代码技巧一些,对于以前没有接触过代码的朋友或者接触代码开发经验较少的朋友会有些吃力。
上篇文章介绍了如何广视角的生成相对稳定的视频。昨天的实现相对简单,主要用的是UI界面来做生成。但是生成的效果其实也显而易见,不算太好。人物连贯性和动作连贯性是不够的。原因如下:
1.stablediffusion webui的batch 的image2image还无法真正做到image2image
2.控制其实是通过固定的文本prompt+多个controlnet来控制
然后如果希望画面能够稳定,其实image的控制信息和每张图用相对有差异的prompt生成的图片质量连贯性和稳定性会更好(image2image控制整体的风格和内容,controlnet控制细节,prompt可以控制一些内容差异)。
这篇文章不会跟大家分享,稳定图生成的具体细节。而是跟大家分享更重要的,如何用代码来实现webui的功能,如何用代码方式搭建更可控更高效的图生成链路。
内容
1.用代码搭建stablediffusion+controlnet生产流程
2.multi-control net生产流程搭建
3.diffuser没有的功能如何自己实现加入
用代码搭建stablediffusion+controlnet生产流程
安装diffuser包
pip install --upgrade diffusers accelerate transformers
#如果要安装最新diffuser可以执行下面指令
pip install git+https://github.com/huggingface/diffusers
! pip install controlnet_hinter==0.0.5
搭建stablediffusion+controlnet脚本
from diffusers import StableDiffusionControlNetPipeline, ControlNetModel
from diffusers.utils import load_image
import torch
import numpy as np
from PIL import Image
import cv2#加载测试图片
original_image = load_image("https://huggingface.co/datasets/diffusers/test-arrays/resolve/main/stable_diffusion_imgvar/input_image_vermeer.png")
original_image#设置controlnet+stablefiddufion流水线
controlnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny",cache_dir='./')
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None
).to('cuda')
pipe.enable_xformers_memory_efficient_attention()#抽取图片canny边界
canny_edged_image = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_canny_edged.png")
canny_edged_image#canny controlnet做图片生成
generator = torch.Generator(device="cpu").manual_seed(3)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=canny_edged_image,num_inference_steps=30, generator=generator).images[0]
image#设置canny编辑过滤阀值
control_image = np.array(original_image)
low_threshold = 10 # default=100
high_threshold = 150 # default=200
control_image = cv2.Canny(control_image, low_threshold, high_threshold)
control_image = control_image[:, :, None]
control_image = np.concatenate([control_image, control_image, control_image], axis=2)
control_image = Image.fromarray(control_image)
control_image#canny controlnet做图片生成
generator = torch.Generator(device="cpu").manual_seed(3)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image,num_inference_steps=30, generator=generator).images[0]
image#pose的controlnet流水线
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-openpose",cache_dir= './')
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None
).to('cuda')
pipe.enable_xformers_memory_efficient_attention()#pose的图片
pose_image = load_image('https://huggingface.co/takuma104/controlnet_dev/resolve/main/pose.png')
pose_image#pose流水线生产图
generator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed, football, a boy", negative_prompt="lowres, bad anatomy, worst quality, low quality",image=pose_image, generator=generator,num_inference_steps=30).images[0]
image#用controlnet_hinter抽取pose控制特征
control_image = controlnet_hinter.hint_openpose(original_image)
control_image#pose流水线生产图
generator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image#深度图
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-depth")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None
).to('cuda')
pipe.enable_xformers_memory_efficient_attention()control_image = controlnet_hinter.hint_depth(original_image)
control_imagegenerator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image#轮廓图
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-scribble")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None).to('cuda')
pipe.enable_xformers_memory_efficient_attention()scribble_image = load_image('https://github.com/lllyasviel/ControlNet/raw/main/test_imgs/user_1.png')
scribble_imagegenerator = torch.Generator(device="cpu").manual_seed(1)
image = pipe(prompt="a turtle, best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=scribble_image, generator=generator,num_inference_steps=30).images[0]
image#Segmentation
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-seg")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None).to('cuda')
pipe.enable_xformers_memory_efficient_attention()
control_image = controlnet_hinter.hint_segmentation(original_image)
control_imagegenerator = torch.Generator(device="cpu").manual_seed(0)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image#Hough 建筑、大场景里用的多
controlnet = None
pipe = Nonecontrolnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-mlsd")
pipe = StableDiffusionControlNetPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", controlnet=controlnet, safety_checker=None).to('cuda')
pipe.enable_xformers_memory_efficient_attention()control_image = controlnet_hinter.hint_hough(original_image)
control_imagegenerator = torch.Generator(device="cpu").manual_seed(2)
image = pipe(prompt="best quality, extremely detailed", negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",image=control_image, generator=generator,num_inference_steps=30).images[0]
image
multi-control net生产流程搭建
diffuser包里面现在还没实现多control net控制图生成,需要使用multi-controlnet可以用以下代码。
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import inspect
from typing import Any, Callable, Dict, List, Optional, Union, Tupleimport numpy as np
import PIL.Image
import torch
from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizerfrom diffusers import AutoencoderKL, ControlNetModel, UNet2DConditionModel
from diffusers.schedulers import KarrasDiffusionSchedulers
from diffusers.utils import (PIL_INTERPOLATION,is_accelerate_available,is_accelerate_version,logging,randn_tensor,replace_example_docstring,
)
from diffusers.pipeline_utils import DiffusionPipeline
from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput, StableDiffusionSafetyChecker
from diffusers.models.controlnet import ControlNetOutputlogger = logging.get_logger(__name__) # pylint: disable=invalid-nameclass ControlNetProcessor(object):def __init__(self,controlnet: ControlNetModel,image: Union[torch.FloatTensor, PIL.Image.Image, List[torch.FloatTensor], List[PIL.Image.Image]],conditioning_scale: float = 1.0,):self.controlnet = controlnetself.image = imageself.conditioning_scale = conditioning_scaledef _default_height_width(self, height, width, image):if isinstance(image, list):image = image[0]if height is None:if isinstance(image, PIL.Image.Image):height = image.heightelif isinstance(image, torch.Tensor):height = image.shape[3]height = (height // 8) * 8 # round down to nearest multiple of 8if width is None:if isinstance(image, PIL.Image.Image):width = image.widthelif isinstance(image, torch.Tensor):width = image.shape[2]width = (width // 8) * 8 # round down to nearest multiple of 8return height, widthdef default_height_width(self, height, width):return self._default_height_width(height, width, self.image)def _prepare_image(self, image, width, height, batch_size, num_images_per_prompt, device, dtype):if not isinstance(image, torch.Tensor):if isinstance(image, PIL.Image.Image):image = [image]if isinstance(image[0], PIL.Image.Image):image = [np.array(i.resize((width, height), resample=PIL_INTERPOLATION["lanczos"]))[None, :] for i in image]image = np.concatenate(image, axis=0)image = np.array(image).astype(np.float32) / 255.0image = image.transpose(0, 3, 1, 2)image = torch.from_numpy(image)elif isinstance(image[0], torch.Tensor):image = torch.cat(image, dim=0)image_batch_size = image.shape[0]if image_batch_size == 1:repeat_by = batch_sizeelse:# image batch size is the same as prompt batch sizerepeat_by = num_images_per_promptimage = image.repeat_interleave(repeat_by, dim=0)image = image.to(device=device, dtype=dtype)return imagedef _check_inputs(self, image, prompt, prompt_embeds):image_is_pil = isinstance(image, PIL.Image.Image)image_is_tensor = isinstance(image, torch.Tensor)image_is_pil_list = isinstance(image, list) and isinstance(image[0], PIL.Image.Image)image_is_tensor_list = isinstance(image, list) and isinstance(image[0], torch.Tensor)if not image_is_pil and not image_is_tensor and not image_is_pil_list and not image_is_tensor_list:raise TypeError("image must be passed and be one of PIL image, torch tensor, list of PIL images, or list of torch tensors")if image_is_pil:image_batch_size = 1elif image_is_tensor:image_batch_size = image.shape[0]elif image_is_pil_list:image_batch_size = len(image)elif image_is_tensor_list:image_batch_size = len(image)if prompt is not None and isinstance(prompt, str):prompt_batch_size = 1elif prompt is not None and isinstance(prompt, list):prompt_batch_size = len(prompt)elif prompt_embeds is not None:prompt_batch_size = prompt_embeds.shape[0]if image_batch_size != 1 and image_batch_size != prompt_batch_size:raise ValueError(f"If image batch size is not 1, image batch size must be same as prompt batch size. image batch size: {image_batch_size}, prompt batch size: {prompt_batch_size}")def check_inputs(self, prompt, prompt_embeds):self._check_inputs(self.image, prompt, prompt_embeds)def prepare_image(self, width, height, batch_size, num_images_per_prompt, device, do_classifier_free_guidance):self.image = self._prepare_image(self.image, width, height, batch_size, num_images_per_prompt, device, self.controlnet.dtype)if do_classifier_free_guidance:self.image = torch.cat([self.image] * 2)def __call__(self,sample: torch.FloatTensor,timestep: Union[torch.Tensor, float, int],encoder_hidden_states: torch.Tensor,class_labels: Optional[torch.Tensor] = None,timestep_cond: Optional[torch.Tensor] = None,attention_mask: Optional[torch.Tensor] = None,cross_attention_kwargs: Optional[Dict[str, Any]] = None,return_dict: bool = True,) -> Tuple:down_block_res_samples, mid_block_res_sample = self.controlnet(sample=sample,controlnet_cond=self.image,timestep=timestep,encoder_hidden_states=encoder_hidden_states,class_labels=class_labels,timestep_cond=timestep_cond,attention_mask=attention_mask,cross_attention_kwargs=cross_attention_kwargs,return_dict=False,)down_block_res_samples = [down_block_res_sample * self.conditioning_scale for down_block_res_sample in down_block_res_samples]mid_block_res_sample *= self.conditioning_scalereturn (down_block_res_samples, mid_block_res_sample)EXAMPLE_DOC_STRING = """Examples:```py>>> # !pip install opencv-python transformers accelerate>>> from diffusers import StableDiffusionControlNetPipeline, ControlNetModel, UniPCMultistepScheduler>>> from diffusers.utils import load_image>>> import numpy as np>>> import torch>>> import cv2>>> from PIL import Image>>> # download an image>>> image = load_image(... "https://hf.co/datasets/huggingface/documentation-images/resolve/main/diffusers/input_image_vermeer.png"... )>>> image = np.array(image)>>> # get canny image>>> image = cv2.Canny(image, 100, 200)>>> image = image[:, :, None]>>> image = np.concatenate([image, image, image], axis=2)>>> canny_image = Image.fromarray(image)>>> # load control net and stable diffusion v1-5>>> controlnet = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny", torch_dtype=torch.float16)>>> pipe = StableDiffusionControlNetPipeline.from_pretrained(... "runwayml/stable-diffusion-v1-5", controlnet=controlnet, torch_dtype=torch.float16... )>>> # speed up diffusion process with faster scheduler and memory optimization>>> pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config)>>> # remove following line if xformers is not installed>>> pipe.enable_xformers_memory_efficient_attention()>>> pipe.enable_model_cpu_offload()>>> # generate image>>> generator = torch.manual_seed(0)>>> image = pipe(... "futuristic-looking woman", num_inference_steps=20, generator=generator, image=canny_image... ).images[0]```
"""class StableDiffusionMultiControlNetPipeline(DiffusionPipeline):r"""Pipeline for text-to-image generation using Stable Diffusion with ControlNet guidance.This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods thelibrary implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)Args:vae ([`AutoencoderKL`]):Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.text_encoder ([`CLIPTextModel`]):Frozen text-encoder. Stable Diffusion uses the text portion of[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specificallythe [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.tokenizer (`CLIPTokenizer`):Tokenizer of class[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents.scheduler ([`SchedulerMixin`]):A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].safety_checker ([`StableDiffusionSafetyChecker`]):Classification module that estimates whether generated images could be considered offensive or harmful.Please, refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for details.feature_extractor ([`CLIPFeatureExtractor`]):Model that extracts features from generated images to be used as inputs for the `safety_checker`."""_optional_components = ["safety_checker", "feature_extractor"]def __init__(self,vae: AutoencoderKL,text_encoder: CLIPTextModel,tokenizer: CLIPTokenizer,unet: UNet2DConditionModel,scheduler: KarrasDiffusionSchedulers,safety_checker: StableDiffusionSafetyChecker,feature_extractor: CLIPFeatureExtractor,requires_safety_checker: bool = True,):super().__init__()if safety_checker is None and requires_safety_checker:logger.warning(f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"" results in services or applications open to the public. Both the diffusers team and Hugging Face"" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"" it only for use-cases that involve analyzing network behavior or auditing its results. For more"" information, please have a look at https://github.com/huggingface/diffusers/pull/254 .")if safety_checker is not None and feature_extractor is None:raise ValueError("Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead.")self.register_modules(vae=vae,text_encoder=text_encoder,tokenizer=tokenizer,unet=unet,scheduler=scheduler,safety_checker=safety_checker,feature_extractor=feature_extractor,)self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)self.register_to_config(requires_safety_checker=requires_safety_checker)# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.enable_vae_slicingdef enable_vae_slicing(self):r"""Enable sliced VAE decoding.When this option is enabled, the VAE will split the input tensor in slices to compute decoding in severalsteps. This is useful to save some memory and allow larger batch sizes."""self.vae.enable_slicing()# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.disable_vae_slicingdef disable_vae_slicing(self):r"""Disable sliced VAE decoding. If `enable_vae_slicing` was previously invoked, this method will go back tocomputing decoding in one step."""self.vae.disable_slicing()def enable_sequential_cpu_offload(self, gpu_id=0):r"""Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,text_encoder, vae, controlnet, and safety checker have their state dicts saved to CPU and then are moved to a`torch.device('meta') and loaded to GPU only when their specific submodule has its `forward` method called.Note that offloading happens on a submodule basis. Memory savings are higher than with`enable_model_cpu_offload`, but performance is lower."""if is_accelerate_available():from accelerate import cpu_offloadelse:raise ImportError("Please install accelerate via `pip install accelerate`")device = torch.device(f"cuda:{gpu_id}")for cpu_offloaded_model in [self.unet, self.text_encoder, self.vae]:cpu_offload(cpu_offloaded_model, device)if self.safety_checker is not None:cpu_offload(self.safety_checker, execution_device=device, offload_buffers=True)def enable_model_cpu_offload(self, gpu_id=0):r"""Offloads all models to CPU using accelerate, reducing memory usage with a low impact on performance. Comparedto `enable_sequential_cpu_offload`, this method moves one whole model at a time to the GPU when its `forward`method is called, and the model remains in GPU until the next model runs. Memory savings are lower than with`enable_sequential_cpu_offload`, but performance is much better due to the iterative execution of the `unet`."""if is_accelerate_available() and is_accelerate_version(">=", "0.17.0.dev0"):from accelerate import cpu_offload_with_hookelse:raise ImportError("`enable_model_offload` requires `accelerate v0.17.0` or higher.")device = torch.device(f"cuda:{gpu_id}")hook = Nonefor cpu_offloaded_model in [self.text_encoder, self.unet, self.vae]:_, hook = cpu_offload_with_hook(cpu_offloaded_model, device, prev_module_hook=hook)if self.safety_checker is not None:# the safety checker can offload the vae again_, hook = cpu_offload_with_hook(self.safety_checker, device, prev_module_hook=hook)# control net hook has be manually offloaded as it alternates with unet# cpu_offload_with_hook(self.controlnet, device)# We'll offload the last model manually.self.final_offload_hook = hook@property# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._execution_devicedef _execution_device(self):r"""Returns the device on which the pipeline's models will be executed. After calling`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's modulehooks."""if not hasattr(self.unet, "_hf_hook"):return self.devicefor module in self.unet.modules():if (hasattr(module, "_hf_hook")and hasattr(module._hf_hook, "execution_device")and module._hf_hook.execution_device is not None):return torch.device(module._hf_hook.execution_device)return self.device# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_promptdef _encode_prompt(self,prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt=None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,):r"""Encodes the prompt into text encoder hidden states.Args:prompt (`str` or `List[str]`, *optional*):prompt to be encodeddevice: (`torch.device`):torch devicenum_images_per_prompt (`int`):number of images that should be generated per promptdo_classifier_free_guidance (`bool`):whether to use classifier free guidance or notnegative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument."""if prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]if prompt_embeds is None:text_inputs = self.tokenizer(prompt,padding="max_length",max_length=self.tokenizer.model_max_length,truncation=True,return_tensors="pt",)text_input_ids = text_inputs.input_idsuntruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_idsif untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):removed_text = self.tokenizer.batch_decode(untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1])logger.warning("The following part of your input was truncated because CLIP can only handle sequences up to"f" {self.tokenizer.model_max_length} tokens: {removed_text}")if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = text_inputs.attention_mask.to(device)else:attention_mask = Noneprompt_embeds = self.text_encoder(text_input_ids.to(device),attention_mask=attention_mask,)prompt_embeds = prompt_embeds[0]prompt_embeds = prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)bs_embed, seq_len, _ = prompt_embeds.shape# duplicate text embeddings for each generation per prompt, using mps friendly methodprompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)prompt_embeds = prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1)# get unconditional embeddings for classifier free guidanceif do_classifier_free_guidance and negative_prompt_embeds is None:uncond_tokens: List[str]if negative_prompt is None:uncond_tokens = [""] * batch_sizeelif type(prompt) is not type(negative_prompt):raise TypeError(f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="f" {type(prompt)}.")elif isinstance(negative_prompt, str):uncond_tokens = [negative_prompt]elif batch_size != len(negative_prompt):raise ValueError(f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"" the batch size of `prompt`.")else:uncond_tokens = negative_promptmax_length = prompt_embeds.shape[1]uncond_input = self.tokenizer(uncond_tokens,padding="max_length",max_length=max_length,truncation=True,return_tensors="pt",)if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = uncond_input.attention_mask.to(device)else:attention_mask = Nonenegative_prompt_embeds = self.text_encoder(uncond_input.input_ids.to(device),attention_mask=attention_mask,)negative_prompt_embeds = negative_prompt_embeds[0]if do_classifier_free_guidance:# duplicate unconditional embeddings for each generation per prompt, using mps friendly methodseq_len = negative_prompt_embeds.shape[1]negative_prompt_embeds = negative_prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)# For classifier free guidance, we need to do two forward passes.# Here we concatenate the unconditional and text embeddings into a single batch# to avoid doing two forward passesprompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])return prompt_embeds# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checkerdef run_safety_checker(self, image, device, dtype):if self.safety_checker is not None:safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(device)image, has_nsfw_concept = self.safety_checker(images=image, clip_input=safety_checker_input.pixel_values.to(dtype))else:has_nsfw_concept = Nonereturn image, has_nsfw_concept# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latentsdef decode_latents(self, latents):latents = 1 / self.vae.config.scaling_factor * latentsimage = self.vae.decode(latents).sampleimage = (image / 2 + 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image = image.cpu().permute(0, 2, 3, 1).float().numpy()return image# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargsdef prepare_extra_step_kwargs(self, generator, eta):# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502# and should be between [0, 1]accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())extra_step_kwargs = {}if accepts_eta:extra_step_kwargs["eta"] = eta# check if the scheduler accepts generatoraccepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())if accepts_generator:extra_step_kwargs["generator"] = generatorreturn extra_step_kwargsdef check_inputs(self,prompt,height,width,callback_steps,negative_prompt=None,prompt_embeds=None,negative_prompt_embeds=None,):if height % 8 != 0 or width % 8 != 0:raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")if (callback_steps is None) or (callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)):raise ValueError(f"`callback_steps` has to be a positive integer but is {callback_steps} of type"f" {type(callback_steps)}.")if prompt is not None and prompt_embeds is not None:raise ValueError(f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"" only forward one of the two.")elif prompt is None and prompt_embeds is None:raise ValueError("Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined.")elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")if negative_prompt is not None and negative_prompt_embeds is not None:raise ValueError(f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"f" {negative_prompt_embeds}. Please make sure to only forward one of the two.")if prompt_embeds is not None and negative_prompt_embeds is not None:if prompt_embeds.shape != negative_prompt_embeds.shape:raise ValueError("`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"f" {negative_prompt_embeds.shape}.")# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latentsdef prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):shape = (batch_size, num_channels_latents, height // self.vae_scale_factor, width // self.vae_scale_factor)if isinstance(generator, list) and len(generator) != batch_size:raise ValueError(f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"f" size of {batch_size}. Make sure the batch size matches the length of the generators.")if latents is None:latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)else:latents = latents.to(device)# scale the initial noise by the standard deviation required by the schedulerlatents = latents * self.scheduler.init_noise_sigmareturn latents@torch.no_grad()@replace_example_docstring(EXAMPLE_DOC_STRING)def __call__(self,processors: List[ControlNetProcessor],prompt: Union[str, List[str]] = None,height: Optional[int] = None,width: Optional[int] = None,num_inference_steps: int = 50,guidance_scale: float = 7.5,negative_prompt: Optional[Union[str, List[str]]] = None,num_images_per_prompt: Optional[int] = 1,eta: float = 0.0,generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,latents: Optional[torch.FloatTensor] = None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,output_type: Optional[str] = "pil",return_dict: bool = True,callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None,callback_steps: int = 1,cross_attention_kwargs: Optional[Dict[str, Any]] = None,):r"""Function invoked when calling the pipeline for generation.Args:prompt (`str` or `List[str]`, *optional*):The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`.instead.height (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The height in pixels of the generated image.width (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The width in pixels of the generated image.num_inference_steps (`int`, *optional*, defaults to 50):The number of denoising steps. More denoising steps usually lead to a higher quality image at theexpense of slower inference.guidance_scale (`float`, *optional*, defaults to 7.5):Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).`guidance_scale` is defined as `w` of equation 2. of [ImagenPaper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,usually at the expense of lower image quality.negative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).num_images_per_prompt (`int`, *optional*, defaults to 1):The number of images to generate per prompt.eta (`float`, *optional*, defaults to 0.0):Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to[`schedulers.DDIMScheduler`], will be ignored for others.generator (`torch.Generator` or `List[torch.Generator]`, *optional*):One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)to make generation deterministic.latents (`torch.FloatTensor`, *optional*):Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for imagegeneration. Can be used to tweak the same generation with different prompts. If not provided, a latentstensor will ge generated by sampling using the supplied random `generator`.prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument.output_type (`str`, *optional*, defaults to `"pil"`):The output format of the generate image. Choose between[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`.return_dict (`bool`, *optional*, defaults to `True`):Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of aplain tuple.callback (`Callable`, *optional*):A function that will be called every `callback_steps` steps during inference. The function will becalled with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`.callback_steps (`int`, *optional*, defaults to 1):The frequency at which the `callback` function will be called. If not specified, the callback will becalled at every step.cross_attention_kwargs (`dict`, *optional*):A kwargs dictionary that if specified is passed along to the `AttnProcessor` as defined under`self.processor` in[diffusers.cross_attention](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/cross_attention.py).Examples:Returns:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple.When returning a tuple, the first element is a list with the generated images, and the second element is alist of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work"(nsfw) content, according to the `safety_checker`."""# 0. Default height and width to unetheight, width = processors[0].default_height_width(height, width)# 1. Check inputs. Raise error if not correctself.check_inputs(prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds)for processor in processors:processor.check_inputs(prompt, prompt_embeds)# 2. Define call parametersif prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]device = self._execution_device# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`# corresponds to doing no classifier free guidance.do_classifier_free_guidance = guidance_scale > 1.0# 3. Encode input promptprompt_embeds = self._encode_prompt(prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt,prompt_embeds=prompt_embeds,negative_prompt_embeds=negative_prompt_embeds,)# 4. Prepare imagefor processor in processors:processor.prepare_image(width=width,height=height,batch_size=batch_size * num_images_per_prompt,num_images_per_prompt=num_images_per_prompt,device=device,do_classifier_free_guidance=do_classifier_free_guidance,)# 5. Prepare timestepsself.scheduler.set_timesteps(num_inference_steps, device=device)timesteps = self.scheduler.timesteps# 6. Prepare latent variablesnum_channels_latents = self.unet.in_channelslatents = self.prepare_latents(batch_size * num_images_per_prompt,num_channels_latents,height,width,prompt_embeds.dtype,device,generator,latents,)# 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipelineextra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)# 8. Denoising loopnum_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.orderwith self.progress_bar(total=num_inference_steps) as progress_bar:for i, t in enumerate(timesteps):# expand the latents if we are doing classifier free guidancelatent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latentslatent_model_input = self.scheduler.scale_model_input(latent_model_input, t)# controlnet inferencefor i, processor in enumerate(processors):down_samples, mid_sample = processor(latent_model_input,t,encoder_hidden_states=prompt_embeds,return_dict=False,)if i == 0:down_block_res_samples, mid_block_res_sample = down_samples, mid_sampleelse:down_block_res_samples = [d_prev + d_curr for d_prev, d_curr in zip(down_block_res_samples, down_samples)]mid_block_res_sample = mid_block_res_sample + mid_sample# predict the noise residualnoise_pred = self.unet(latent_model_input,t,encoder_hidden_states=prompt_embeds,cross_attention_kwargs=cross_attention_kwargs,down_block_additional_residuals=down_block_res_samples,mid_block_additional_residual=mid_block_res_sample,).sample# perform guidanceif do_classifier_free_guidance:noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)# compute the previous noisy sample x_t -> x_t-1latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample# call the callback, if providedif i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):progress_bar.update()if callback is not None and i % callback_steps == 0:callback(i, t, latents)# If we do sequential model offloading, let's offload unet and controlnet# manually for max memory savingsif hasattr(self, "final_offload_hook") and self.final_offload_hook is not None:self.unet.to("cpu")torch.cuda.empty_cache()if output_type == "latent":image = latentshas_nsfw_concept = Noneelif output_type == "pil":# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)# 10. Convert to PILimage = self.numpy_to_pil(image)else:# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)# Offload last model to CPUif hasattr(self, "final_offload_hook") and self.final_offload_hook is not None:self.final_offload_hook.offload()if not return_dict:return (image, has_nsfw_concept)return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept)# demo & simple test
def main():from diffusers.utils import load_imagepipe = StableDiffusionMultiControlNetPipeline.from_pretrained("./model", safety_checker=None, torch_dtype=torch.float16).to("cuda")pipe.enable_xformers_memory_efficient_attention()controlnet_canny = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-canny",cache_dir='./controlmodel', torch_dtype=torch.float16).to("cuda")controlnet_pose = ControlNetModel.from_pretrained("lllyasviel/sd-controlnet-openpose",cache_dir='./controlmodel', torch_dtype=torch.float16).to("cuda")canny_left = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_left.png")canny_right = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_right.png")pose_right = load_image("https://huggingface.co/takuma104/controlnet_dev/resolve/main/pose_right.png")image = pipe(prompt="best quality, extremely detailed",negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",processors=[ControlNetProcessor(controlnet_canny, canny_left),ControlNetProcessor(controlnet_canny, canny_right),],generator=torch.Generator(device="cpu").manual_seed(0),num_inference_steps=30,width=512,height=512,).images[0]image.save("./canny_left_right.png")image = pipe(prompt="best quality, extremely detailed",negative_prompt="monochrome, lowres, bad anatomy, worst quality, low quality",processors=[ControlNetProcessor(controlnet_canny, canny_left,0.5),ControlNetProcessor(controlnet_pose, pose_right,1.6),],generator=torch.Generator(device="cpu").manual_seed(0),num_inference_steps=30,width=512,height=512,).images[0]image.save("./canny_left_pose_right.png")if __name__ == "__main__":main()
diffuser没有的功能如何自己实现加入
假设我们要自己实现一个stablediffusion+controlnet+inpaint的功能,该如何实现。这个任务大部分是在生产流程上做串接,所以代码基本可以定位在pipline模块stablediffusion。
假设我们代码实现如下(代码在特定版本有效,这个后面会升级,大家可以不急着用。只是跟大家讲解diffuser的代码框架,以及改动一个模块该如何融入diffuser中供自己使用)
#代码文件名:pipeline_stable_diffusion_controlnet_inpaint.py
# Copyright 2023 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.import inspect
from typing import Any, Callable, Dict, List, Optional, Unionimport numpy as np
import PIL.Image
import torch
from transformers import CLIPFeatureExtractor, CLIPTextModel, CLIPTokenizerfrom ...models import AutoencoderKL, UNet2DConditionModel
from ...schedulers import KarrasDiffusionSchedulers
from ...utils import is_accelerate_available, logging, randn_tensor, replace_example_docstring
from ..pipeline_utils import DiffusionPipeline
from . import StableDiffusionPipelineOutput
from .safety_checker import StableDiffusionSafetyCheckerlogger = logging.get_logger(__name__) # pylint: disable=invalid-nameEXAMPLE_DOC_STRING = """Examples:```py>>> from diffusers import StableDiffusionControlNetPipeline>>> from diffusers.utils import load_image>>> # Canny edged image for control>>> canny_edged_image = load_image(... "https://huggingface.co/takuma104/controlnet_dev/resolve/main/vermeer_canny_edged.png"... )>>> pipe = StableDiffusionControlNetPipeline.from_pretrained("takuma104/control_sd15_canny").to("cuda")>>> image = pipe(prompt="best quality, extremely detailed", controlnet_hint=canny_edged_image).images[0]```
"""def prepare_mask_and_masked_image(image, mask):"""Prepares a pair (image, mask) to be consumed by the Stable Diffusion pipeline. This means that those inputs will beconverted to ``torch.Tensor`` with shapes ``batch x channels x height x width`` where ``channels`` is ``3`` for the``image`` and ``1`` for the ``mask``.The ``image`` will be converted to ``torch.float32`` and normalized to be in ``[-1, 1]``. The ``mask`` will bebinarized (``mask > 0.5``) and cast to ``torch.float32`` too.Args:image (Union[np.array, PIL.Image, torch.Tensor]): The image to inpaint.It can be a ``PIL.Image``, or a ``height x width x 3`` ``np.array`` or a ``channels x height x width````torch.Tensor`` or a ``batch x channels x height x width`` ``torch.Tensor``.mask (_type_): The mask to apply to the image, i.e. regions to inpaint.It can be a ``PIL.Image``, or a ``height x width`` ``np.array`` or a ``1 x height x width````torch.Tensor`` or a ``batch x 1 x height x width`` ``torch.Tensor``.Raises:ValueError: ``torch.Tensor`` images should be in the ``[-1, 1]`` range. ValueError: ``torch.Tensor`` maskshould be in the ``[0, 1]`` range. ValueError: ``mask`` and ``image`` should have the same spatial dimensions.TypeError: ``mask`` is a ``torch.Tensor`` but ``image`` is not(ot the other way around).Returns:tuple[torch.Tensor]: The pair (mask, masked_image) as ``torch.Tensor`` with 4dimensions: ``batch x channels x height x width``."""if isinstance(image, torch.Tensor):if not isinstance(mask, torch.Tensor):raise TypeError(f"`image` is a torch.Tensor but `mask` (type: {type(mask)} is not")# Batch single imageif image.ndim == 3:assert image.shape[0] == 3, "Image outside a batch should be of shape (3, H, W)"image = image.unsqueeze(0)# Batch and add channel dim for single maskif mask.ndim == 2:mask = mask.unsqueeze(0).unsqueeze(0)# Batch single mask or add channel dimif mask.ndim == 3:# Single batched mask, no channel dim or single mask not batched but channel dimif mask.shape[0] == 1:mask = mask.unsqueeze(0)# Batched masks no channel dimelse:mask = mask.unsqueeze(1)assert image.ndim == 4 and mask.ndim == 4, "Image and Mask must have 4 dimensions"assert image.shape[-2:] == mask.shape[-2:], "Image and Mask must have the same spatial dimensions"assert image.shape[0] == mask.shape[0], "Image and Mask must have the same batch size"# Check image is in [-1, 1]if image.min() < -1 or image.max() > 1:raise ValueError("Image should be in [-1, 1] range")# Check mask is in [0, 1]if mask.min() < 0 or mask.max() > 1:raise ValueError("Mask should be in [0, 1] range")# Binarize maskmask[mask < 0.5] = 0mask[mask >= 0.5] = 1# Image as float32image = image.to(dtype=torch.float32)elif isinstance(mask, torch.Tensor):raise TypeError(f"`mask` is a torch.Tensor but `image` (type: {type(image)} is not")else:# preprocess imageif isinstance(image, (PIL.Image.Image, np.ndarray)):image = [image]if isinstance(image, list) and isinstance(image[0], PIL.Image.Image):image = [np.array(i.convert("RGB"))[None, :] for i in image]image = np.concatenate(image, axis=0)elif isinstance(image, list) and isinstance(image[0], np.ndarray):image = np.concatenate([i[None, :] for i in image], axis=0)image = image.transpose(0, 3, 1, 2)image = torch.from_numpy(image).to(dtype=torch.float32) / 127.5 - 1.0# preprocess maskif isinstance(mask, (PIL.Image.Image, np.ndarray)):mask = [mask]if isinstance(mask, list) and isinstance(mask[0], PIL.Image.Image):mask = np.concatenate([np.array(m.convert("L"))[None, None, :] for m in mask], axis=0)mask = mask.astype(np.float32) / 255.0elif isinstance(mask, list) and isinstance(mask[0], np.ndarray):mask = np.concatenate([m[None, None, :] for m in mask], axis=0)mask[mask < 0.5] = 0mask[mask >= 0.5] = 1mask = torch.from_numpy(mask)masked_image = image * (mask < 0.5)return mask, masked_imageclass StableDiffusionControlNetInpaintPipeline(DiffusionPipeline):r"""Pipeline for text-to-image generation using Stable Diffusion with ControlNet guidance.This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods thelibrary implements for all the pipelines (such as downloading or saving, running on a particular device, etc.)Args:vae ([`AutoencoderKL`]):Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.text_encoder ([`CLIPTextModel`]):Frozen text-encoder. Stable Diffusion uses the text portion of[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specificallythe [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant.tokenizer (`CLIPTokenizer`):Tokenizer of class[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).unet ([`UNet2DConditionModel`]): Conditional U-Net architecture to denoise the encoded image latents.controlnet ([`UNet2DConditionModel`]):[ControlNet](https://arxiv.org/abs/2302.05543) architecture to generate guidance.scheduler ([`SchedulerMixin`]):A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].safety_checker ([`StableDiffusionSafetyChecker`]):Classification module that estimates whether generated images could be considered offensive or harmful.Please, refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for details.feature_extractor ([`CLIPFeatureExtractor`]):Model that extracts features from generated images to be used as inputs for the `safety_checker`."""def __init__(self,vae: AutoencoderKL,text_encoder: CLIPTextModel,tokenizer: CLIPTokenizer,unet: UNet2DConditionModel,controlnet: UNet2DConditionModel,scheduler: KarrasDiffusionSchedulers,safety_checker: StableDiffusionSafetyChecker,feature_extractor: CLIPFeatureExtractor,requires_safety_checker: bool = True,):super().__init__()self.register_modules(vae=vae,text_encoder=text_encoder,tokenizer=tokenizer,unet=unet,controlnet=controlnet,scheduler=scheduler,safety_checker=safety_checker,feature_extractor=feature_extractor,)self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)self.register_to_config(requires_safety_checker=requires_safety_checker)def enable_vae_slicing(self):r"""Enable sliced VAE decoding.When this option is enabled, the VAE will split the input tensor in slices to compute decoding in severalsteps. This is useful to save some memory and allow larger batch sizes."""self.vae.enable_slicing()def disable_vae_slicing(self):r"""Disable sliced VAE decoding. If `enable_vae_slicing` was previously invoked, this method will go back tocomputing decoding in one step."""self.vae.disable_slicing()def enable_sequential_cpu_offload(self, gpu_id=0):r"""Offloads all models to CPU using accelerate, significantly reducing memory usage. When called, unet,text_encoder, vae and safety checker have their state dicts saved to CPU and then are moved to a`torch.device('meta') and loaded to GPU only when their specific submodule has its `forward` method called."""if is_accelerate_available():from accelerate import cpu_offloadelse:raise ImportError("Please install accelerate via `pip install accelerate`")device = torch.device(f"cuda:{gpu_id}")for cpu_offloaded_model in [self.unet, self.text_encoder, self.vae]:cpu_offload(cpu_offloaded_model, device)if self.safety_checker is not None:cpu_offload(self.safety_checker, execution_device=device, offload_buffers=True)@propertydef _execution_device(self):r"""Returns the device on which the pipeline's models will be executed. After calling`pipeline.enable_sequential_cpu_offload()` the execution device can only be inferred from Accelerate's modulehooks."""if self.device != torch.device("meta") or not hasattr(self.unet, "_hf_hook"):return self.devicefor module in self.unet.modules():if (hasattr(module, "_hf_hook")and hasattr(module._hf_hook, "execution_device")and module._hf_hook.execution_device is not None):return torch.device(module._hf_hook.execution_device)return self.devicedef _encode_prompt(self,prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt=None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,):r"""Encodes the prompt into text encoder hidden states.Args:prompt (`str` or `List[str]`, *optional*):prompt to be encodeddevice: (`torch.device`):torch devicenum_images_per_prompt (`int`):number of images that should be generated per promptdo_classifier_free_guidance (`bool`):whether to use classifier free guidance or notnegative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument."""if prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]if prompt_embeds is None:text_inputs = self.tokenizer(prompt,padding="max_length",max_length=self.tokenizer.model_max_length,truncation=True,return_tensors="pt",)text_input_ids = text_inputs.input_idsuntruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_idsif untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):removed_text = self.tokenizer.batch_decode(untruncated_ids[:, self.tokenizer.model_max_length - 1 : -1])logger.warning("The following part of your input was truncated because CLIP can only handle sequences up to"f" {self.tokenizer.model_max_length} tokens: {removed_text}")if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = text_inputs.attention_mask.to(device)else:attention_mask = Noneprompt_embeds = self.text_encoder(text_input_ids.to(device),attention_mask=attention_mask,)prompt_embeds = prompt_embeds[0]prompt_embeds = prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)bs_embed, seq_len, _ = prompt_embeds.shape# duplicate text embeddings for each generation per prompt, using mps friendly methodprompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)prompt_embeds = prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1)# get unconditional embeddings for classifier free guidanceif do_classifier_free_guidance and negative_prompt_embeds is None:uncond_tokens: List[str]if negative_prompt is None:uncond_tokens = [""] * batch_sizeelif type(prompt) is not type(negative_prompt):raise TypeError(f"`negative_prompt` should be the same type to `prompt`, but got {type(negative_prompt)} !="f" {type(prompt)}.")elif isinstance(negative_prompt, str):uncond_tokens = [negative_prompt]elif batch_size != len(negative_prompt):raise ValueError(f"`negative_prompt`: {negative_prompt} has batch size {len(negative_prompt)}, but `prompt`:"f" {prompt} has batch size {batch_size}. Please make sure that passed `negative_prompt` matches"" the batch size of `prompt`.")else:uncond_tokens = negative_promptmax_length = prompt_embeds.shape[1]uncond_input = self.tokenizer(uncond_tokens,padding="max_length",max_length=max_length,truncation=True,return_tensors="pt",)if hasattr(self.text_encoder.config, "use_attention_mask") and self.text_encoder.config.use_attention_mask:attention_mask = uncond_input.attention_mask.to(device)else:attention_mask = Nonenegative_prompt_embeds = self.text_encoder(uncond_input.input_ids.to(device),attention_mask=attention_mask,)negative_prompt_embeds = negative_prompt_embeds[0]if do_classifier_free_guidance:# duplicate unconditional embeddings for each generation per prompt, using mps friendly methodseq_len = negative_prompt_embeds.shape[1]negative_prompt_embeds = negative_prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)# For classifier free guidance, we need to do two forward passes.# Here we concatenate the unconditional and text embeddings into a single batch# to avoid doing two forward passesprompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])return prompt_embedsdef run_safety_checker(self, image, device, dtype):if self.safety_checker is not None:safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(device)image, has_nsfw_concept = self.safety_checker(images=image, clip_input=safety_checker_input.pixel_values.to(dtype))else:has_nsfw_concept = Nonereturn image, has_nsfw_conceptdef decode_latents(self, latents):latents = 1 / self.vae.config.scaling_factor * latentsimage = self.vae.decode(latents).sampleimage = (image / 2 + 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image = image.cpu().permute(0, 2, 3, 1).float().numpy()return imagedef prepare_extra_step_kwargs(self, generator, eta):# prepare extra kwargs for the scheduler step, since not all schedulers have the same signature# eta (η) is only used with the DDIMScheduler, it will be ignored for other schedulers.# eta corresponds to η in DDIM paper: https://arxiv.org/abs/2010.02502# and should be between [0, 1]accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())extra_step_kwargs = {}if accepts_eta:extra_step_kwargs["eta"] = eta# check if the scheduler accepts generatoraccepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())if accepts_generator:extra_step_kwargs["generator"] = generatorreturn extra_step_kwargsdef decode_latents(self, latents):latents = 1 / self.vae.config.scaling_factor * latentsimage = self.vae.decode(latents).sampleimage = (image / 2 + 0.5).clamp(0, 1)# we always cast to float32 as this does not cause significant overhead and is compatible with bfloat16image = image.cpu().permute(0, 2, 3, 1).float().numpy()return imagedef check_inputs(self,prompt,height,width,callback_steps,negative_prompt=None,prompt_embeds=None,negative_prompt_embeds=None,):if height % 8 != 0 or width % 8 != 0:raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")if (callback_steps is None) or (callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)):raise ValueError(f"`callback_steps` has to be a positive integer but is {callback_steps} of type"f" {type(callback_steps)}.")if prompt is not None and prompt_embeds is not None:raise ValueError(f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"" only forward one of the two.")elif prompt is None and prompt_embeds is None:raise ValueError("Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined.")elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")if negative_prompt is not None and negative_prompt_embeds is not None:raise ValueError(f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"f" {negative_prompt_embeds}. Please make sure to only forward one of the two.")if prompt_embeds is not None and negative_prompt_embeds is not None:if prompt_embeds.shape != negative_prompt_embeds.shape:raise ValueError("`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"f" {negative_prompt_embeds.shape}.")def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):shape = (batch_size, num_channels_latents, height // self.vae_scale_factor, width // self.vae_scale_factor)if isinstance(generator, list) and len(generator) != batch_size:raise ValueError(f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"f" size of {batch_size}. Make sure the batch size matches the length of the generators.")if latents is None:latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)else:latents = latents.to(device)# scale the initial noise by the standard deviation required by the schedulerlatents = latents * self.scheduler.init_noise_sigmareturn latentsdef controlnet_hint_conversion(self, controlnet_hint, height, width, num_images_per_prompt):channels = 3if isinstance(controlnet_hint, torch.Tensor):# torch.Tensor: acceptble shape are any of chw, bchw(b==1) or bchw(b==num_images_per_prompt)shape_chw = (channels, height, width)shape_bchw = (1, channels, height, width)shape_nchw = (num_images_per_prompt, channels, height, width)if controlnet_hint.shape in [shape_chw, shape_bchw, shape_nchw]:controlnet_hint = controlnet_hint.to(dtype=self.controlnet.dtype, device=self.controlnet.device)if controlnet_hint.shape != shape_nchw:controlnet_hint = controlnet_hint.repeat(num_images_per_prompt, 1, 1, 1)return controlnet_hintelse:raise ValueError(f"Acceptble shape of `controlnet_hint` are any of ({channels}, {height}, {width}),"+ f" (1, {channels}, {height}, {width}) or ({num_images_per_prompt}, "+ f"{channels}, {height}, {width}) but is {controlnet_hint.shape}")elif isinstance(controlnet_hint, np.ndarray):# np.ndarray: acceptable shape is any of hw, hwc, bhwc(b==1) or bhwc(b==num_images_per_promot)# hwc is opencv compatible image format. Color channel must be BGR Format.if controlnet_hint.shape == (height, width):controlnet_hint = np.repeat(controlnet_hint[:, :, np.newaxis], channels, axis=2) # hw -> hwc(c==3)shape_hwc = (height, width, channels)shape_bhwc = (1, height, width, channels)shape_nhwc = (num_images_per_prompt, height, width, channels)if controlnet_hint.shape in [shape_hwc, shape_bhwc, shape_nhwc]:controlnet_hint = torch.from_numpy(controlnet_hint.copy())controlnet_hint = controlnet_hint.to(dtype=self.controlnet.dtype, device=self.controlnet.device)controlnet_hint /= 255.0if controlnet_hint.shape != shape_nhwc:controlnet_hint = controlnet_hint.repeat(num_images_per_prompt, 1, 1, 1)controlnet_hint = controlnet_hint.permute(0, 3, 1, 2) # b h w c -> b c h wreturn controlnet_hintelse:raise ValueError(f"Acceptble shape of `controlnet_hint` are any of ({width}, {channels}), "+ f"({height}, {width}, {channels}), "+ f"(1, {height}, {width}, {channels}) or "+ f"({num_images_per_prompt}, {channels}, {height}, {width}) but is {controlnet_hint.shape}")elif isinstance(controlnet_hint, PIL.Image.Image):if controlnet_hint.size == (width, height):controlnet_hint = controlnet_hint.convert("RGB") # make sure 3 channel RGB formatcontrolnet_hint = np.array(controlnet_hint) # to numpycontrolnet_hint = controlnet_hint[:, :, ::-1] # RGB -> BGRreturn self.controlnet_hint_conversion(controlnet_hint, height, width, num_images_per_prompt)else:raise ValueError(f"Acceptable image size of `controlnet_hint` is ({width}, {height}) but is {controlnet_hint.size}")else:raise ValueError(f"Acceptable type of `controlnet_hint` are any of torch.Tensor, np.ndarray, PIL.Image.Image but is {type(controlnet_hint)}")def prepare_mask_latents(self, mask, masked_image, batch_size, height, width, dtype, device, generator, do_classifier_free_guidance):# resize the mask to latents shape as we concatenate the mask to the latents# we do that before converting to dtype to avoid breaking in case we're using cpu_offload# and half precisionmask = torch.nn.functional.interpolate(mask, size=(height // self.vae_scale_factor, width // self.vae_scale_factor))mask = mask.to(device=device, dtype=dtype)masked_image = masked_image.to(device=device, dtype=dtype)# encode the mask image into latents space so we can concatenate it to the latentsif isinstance(generator, list):masked_image_latents = [self.vae.encode(masked_image[i : i + 1]).latent_dist.sample(generator=generator[i])for i in range(batch_size)]masked_image_latents = torch.cat(masked_image_latents, dim=0)else:masked_image_latents = self.vae.encode(masked_image).latent_dist.sample(generator=generator)masked_image_latents = self.vae.config.scaling_factor * masked_image_latents# duplicate mask and masked_image_latents for each generation per prompt, using mps friendly methodif mask.shape[0] < batch_size:if not batch_size % mask.shape[0] == 0:raise ValueError("The passed mask and the required batch size don't match. Masks are supposed to be duplicated to"f" a total batch size of {batch_size}, but {mask.shape[0]} masks were passed. Make sure the number"" of masks that you pass is divisible by the total requested batch size.")mask = mask.repeat(batch_size // mask.shape[0], 1, 1, 1)if masked_image_latents.shape[0] < batch_size:if not batch_size % masked_image_latents.shape[0] == 0:raise ValueError("The passed images and the required batch size don't match. Images are supposed to be duplicated"f" to a total batch size of {batch_size}, but {masked_image_latents.shape[0]} images were passed."" Make sure the number of images that you pass is divisible by the total requested batch size.")masked_image_latents = masked_image_latents.repeat(batch_size // masked_image_latents.shape[0], 1, 1, 1)mask = torch.cat([mask] * 2) if do_classifier_free_guidance else maskmasked_image_latents = (torch.cat([masked_image_latents] * 2) if do_classifier_free_guidance else masked_image_latents)# aligning device to prevent device errors when concating it with the latent model inputmasked_image_latents = masked_image_latents.to(device=device, dtype=dtype)return mask, masked_image_latents@torch.no_grad()@replace_example_docstring(EXAMPLE_DOC_STRING)def __call__(self,prompt: Union[str, List[str]] = None,height: Optional[int] = None,width: Optional[int] = None,num_inference_steps: int = 50,guidance_scale: float = 7.5,negative_prompt: Optional[Union[str, List[str]]] = None,num_images_per_prompt: Optional[int] = 1,eta: float = 0.0,generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,latents: Optional[torch.FloatTensor] = None,prompt_embeds: Optional[torch.FloatTensor] = None,negative_prompt_embeds: Optional[torch.FloatTensor] = None,output_type: Optional[str] = "pil",return_dict: bool = True,callback: Optional[Callable[[int, int, torch.FloatTensor], None]] = None,callback_steps: Optional[int] = 1,cross_attention_kwargs: Optional[Dict[str, Any]] = None,controlnet_hint: Optional[Union[torch.FloatTensor, np.ndarray, PIL.Image.Image]] = None,image: Union[torch.FloatTensor, PIL.Image.Image] = None,mask_image: Union[torch.FloatTensor, PIL.Image.Image] = None,):r"""Function invoked when calling the pipeline for generation.Args:prompt (`str` or `List[str]`, *optional*):The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`.instead.height (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The height in pixels of the generated image.width (`int`, *optional*, defaults to self.unet.config.sample_size * self.vae_scale_factor):The width in pixels of the generated image.num_inference_steps (`int`, *optional*, defaults to 50):The number of denoising steps. More denoising steps usually lead to a higher quality image at theexpense of slower inference.guidance_scale (`float`, *optional*, defaults to 7.5):Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598).`guidance_scale` is defined as `w` of equation 2. of [ImagenPaper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale >1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`,usually at the expense of lower image quality.negative_prompt (`str` or `List[str]`, *optional*):The prompt or prompts not to guide the image generation. If not defined, one has to pass`negative_prompt_embeds`. instead. If not defined, one has to pass `negative_prompt_embeds`. instead.Ignored when not using guidance (i.e., ignored if `guidance_scale` is less than `1`).num_images_per_prompt (`int`, *optional*, defaults to 1):The number of images to generate per prompt.eta (`float`, *optional*, defaults to 0.0):Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to[`schedulers.DDIMScheduler`], will be ignored for others.generator (`torch.Generator` or `List[torch.Generator]`, *optional*):One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html)to make generation deterministic.latents (`torch.FloatTensor`, *optional*):Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for imagegeneration. Can be used to tweak the same generation with different prompts. If not provided, a latentstensor will ge generated by sampling using the supplied random `generator`.prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If notprovided, text embeddings will be generated from `prompt` input argument.negative_prompt_embeds (`torch.FloatTensor`, *optional*):Pre-generated negative text embeddings. Can be used to easily tweak text inputs, *e.g.* promptweighting. If not provided, negative_prompt_embeds will be generated from `negative_prompt` inputargument.output_type (`str`, *optional*, defaults to `"pil"`):The output format of the generate image. Choose between[PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`.return_dict (`bool`, *optional*, defaults to `True`):Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of aplain tuple.callback (`Callable`, *optional*):A function that will be called every `callback_steps` steps during inference. The function will becalled with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`.callback_steps (`int`, *optional*, defaults to 1):The frequency at which the `callback` function will be called. If not specified, the callback will becalled at every step.cross_attention_kwargs (`dict`, *optional*):A kwargs dictionary that if specified is passed along to the `AttnProcessor` as defined under`self.processor` in[diffusers.cross_attention](https://github.com/huggingface/diffusers/blob/main/src/diffusers/models/cross_attention.py).controlnet_hint (`torch.FloatTensor`, `np.ndarray` or `PIL.Image.Image`, *optional*):ControlNet input embedding. ControlNet generates guidances using this input embedding. If the type isspecified as `torch.FloatTensor`, it is passed to ControlNet as is. If the type is `np.ndarray`, it isassumed to be an OpenCV compatible image format. PIL.Image.Image` can also be accepted as an image. Thesize of all these types must correspond to the output image size.Examples:Returns:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`:[`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple.When returning a tuple, the first element is a list with the generated images, and the second element is alist of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work"(nsfw) content, according to the `safety_checker`."""# 0. Default height and width to unetheight = height or self.unet.config.sample_size * self.vae_scale_factorwidth = width or self.unet.config.sample_size * self.vae_scale_factor# 1. Control Embedding check & conversionif controlnet_hint is not None:controlnet_hint = self.controlnet_hint_conversion(controlnet_hint, height, width, num_images_per_prompt)# 2. Check inputs. Raise error if not correctself.check_inputs(prompt, height, width, callback_steps, negative_prompt, prompt_embeds, negative_prompt_embeds)# 3. Define call parametersif prompt is not None and isinstance(prompt, str):batch_size = 1elif prompt is not None and isinstance(prompt, list):batch_size = len(prompt)else:batch_size = prompt_embeds.shape[0]device = self._execution_device# here `guidance_scale` is defined analog to the guidance weight `w` of equation (2)# of the Imagen paper: https://arxiv.org/pdf/2205.11487.pdf . `guidance_scale = 1`# corresponds to doing no classifier free guidance.do_classifier_free_guidance = guidance_scale > 1.0# 4. Encode input promptprompt_embeds = self._encode_prompt(prompt,device,num_images_per_prompt,do_classifier_free_guidance,negative_prompt,prompt_embeds=prompt_embeds,negative_prompt_embeds=negative_prompt_embeds,)mask, masked_image = prepare_mask_and_masked_image(image, mask_image)# 5. Prepare timestepsself.scheduler.set_timesteps(num_inference_steps, device=device)timesteps = self.scheduler.timesteps# 6. Prepare latent variablesnum_channels_latents = self.unet.in_channelslatents = self.prepare_latents(batch_size * num_images_per_prompt,num_channels_latents,height,width,prompt_embeds.dtype,device,generator,latents,)mask, masked_image_latents = self.prepare_mask_latents(mask,masked_image,batch_size * num_images_per_prompt,height,width,prompt_embeds.dtype,device,generator,do_classifier_free_guidance,)num_channels_mask = mask.shape[1]num_channels_masked_image = masked_image_latents.shape[1]# 7. Prepare extra step kwargs. TODO: Logic should ideally just be moved out of the pipelineextra_step_kwargs = self.prepare_extra_step_kwargs(generator, eta)# 8. Denoising loopnum_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.orderwith self.progress_bar(total=num_inference_steps) as progress_bar:for i, t in enumerate(timesteps):# expand the latents if we are doing classifier free guidancelatent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latentslatent_model_input = self.scheduler.scale_model_input(latent_model_input, t)if controlnet_hint is not None:# ControlNet predict the noise residualcontrol = self.controlnet(latent_model_input, t, encoder_hidden_states=prompt_embeds, controlnet_hint=controlnet_hint)control = [item for item in control]latent_model_input = torch.cat([latent_model_input, mask, masked_image_latents], dim=1)noise_pred = self.unet(latent_model_input,t,encoder_hidden_states=prompt_embeds,cross_attention_kwargs=cross_attention_kwargs,control=control,).sampleelse:# predict the noise residuallatent_model_input = torch.cat([latent_model_input, mask, masked_image_latents], dim=1)noise_pred = self.unet(latent_model_input,t,encoder_hidden_states=prompt_embeds,cross_attention_kwargs=cross_attention_kwargs,).sample# perform guidanceif do_classifier_free_guidance:noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)# compute the previous noisy sample x_t -> x_t-1latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs).prev_sample# call the callback, if providedif i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):progress_bar.update()if callback is not None and i % callback_steps == 0:callback(i, t, latents)if output_type == "latent":image = latentshas_nsfw_concept = Noneelif output_type == "pil":# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)# 10. Convert to PILimage = self.numpy_to_pil(image)else:# 8. Post-processingimage = self.decode_latents(latents)# 9. Run safety checkerimage, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)if not return_dict:return (image, has_nsfw_concept)return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept)
把文件放置在:
![](https://img-blog.csdnimg.cn/img_convert/9e2695ea736eb314f79e74a9b834248b.png)
以下项目下的每个__init__.py需要把新加的类添加进去
![](https://img-blog.csdnimg.cn/img_convert/cc49602055e5fb666619a8db20de9f63.png)
添加信息可以参考文件里面的写法,一般如下:
![](https://img-blog.csdnimg.cn/img_convert/a938fd194169466321fbc7375866f43f.png)