zig / app.py
Husr's picture
update DEBUG mode
bbf4ff5
import os
import importlib.util
import random
import re
import threading
import warnings
import inspect
from typing import List, Tuple
import gradio as gr
import spaces
import torch
from diffusers import AutoencoderKL, FlowMatchEulerDiscreteScheduler, ZImagePipeline
from diffusers.models.transformers.transformer_z_image import ZImageTransformer2DModel
from transformers import AutoModelForCausalLM, AutoTokenizer
MODEL_PATH = os.environ.get("MODEL_PATH", "Tongyi-MAI/Z-Image-Turbo")
LORA_PATH = os.environ.get("LORA_PATH", os.path.join("lora", "zit-mystic-xxx.safetensors"))
HF_TOKEN = os.environ.get("HF_TOKEN")
ENABLE_COMPILE = os.environ.get("ENABLE_COMPILE", "true").lower() == "true"
ENABLE_WARMUP = os.environ.get("ENABLE_WARMUP", "false").lower() == "true"
ATTENTION_BACKEND = os.environ.get("ATTENTION_BACKEND", "flash_3")
OFFLOAD_TO_CPU_AFTER_RUN = os.environ.get("OFFLOAD_TO_CPU_AFTER_RUN", "false").lower() == "true"
ENABLE_AOTI = os.environ.get("ENABLE_AOTI", "true").lower() == "true"
AOTI_REPO = os.environ.get("AOTI_REPO", "zerogpu-aoti/Z-Image")
AOTI_VARIANT = os.environ.get("AOTI_VARIANT", "fa3")
AOTI_ALLOW_LORA = os.environ.get("AOTI_ALLOW_LORA", "false").lower() == "true"
DEFAULT_CFG = float(os.environ.get("DEFAULT_CFG", "0.0"))
SHOW_STATUS_PANEL = os.environ.get("DEBUG", "false").lower() == "true"
def resolve_model_dtype() -> torch.dtype:
override = os.environ.get("MODEL_DTYPE")
if override:
key = override.strip().lower()
if key in {"bf16", "bfloat16"}:
return torch.bfloat16
if key in {"fp16", "float16", "half"}:
return torch.float16
if key in {"fp32", "float32"}:
return torch.float32
print(f"Unknown MODEL_DTYPE={override!r}; falling back to auto.")
if torch.cuda.is_available():
is_bf16_supported = getattr(torch.cuda, "is_bf16_supported", None)
if callable(is_bf16_supported) and is_bf16_supported():
return torch.bfloat16
return torch.float16
return torch.float32
def dtype_label(dtype: torch.dtype) -> str:
if dtype == torch.bfloat16:
return "bf16"
if dtype == torch.float16:
return "fp16"
if dtype == torch.float32:
return "fp32"
return str(dtype).replace("torch.", "")
def get_gpu_summary() -> str:
if not torch.cuda.is_available():
return "CPU"
try:
name = torch.cuda.get_device_name(0)
major, minor = torch.cuda.get_device_capability(0)
return f"{name} (cc {major}.{minor})"
except Exception:
return "CUDA"
MODEL_DTYPE = resolve_model_dtype()
MODEL_DTYPE_LABEL = dtype_label(MODEL_DTYPE)
GPU_SUMMARY = get_gpu_summary()
if torch.cuda.is_available():
torch.backends.cuda.matmul.allow_tf32 = True
torch.set_float32_matmul_precision("high")
warnings.filterwarnings("ignore")
os.environ["TOKENIZERS_PARALLELISM"] = "false"
RES_CHOICES = {
"1024": [
"1024x1024 ( 1:1 )",
"1152x896 ( 9:7 )",
"896x1152 ( 7:9 )",
"1152x864 ( 4:3 )",
"864x1152 ( 3:4 )",
"1248x832 ( 3:2 )",
"832x1248 ( 2:3 )",
"1280x720 ( 16:9 )",
"720x1280 ( 9:16 )",
"1344x576 ( 21:9 )",
"576x1344 ( 9:21 )",
],
"1280": [
"1280x1280 ( 1:1 )",
"1440x1120 ( 9:7 )",
"1120x1440 ( 7:9 )",
"1472x1104 ( 4:3 )",
"1104x1472 ( 3:4 )",
"1536x1024 ( 3:2 )",
"1024x1536 ( 2:3 )",
"1536x864 ( 16:9 )",
"864x1536 ( 9:16 )",
"1680x720 ( 21:9 )",
"720x1680 ( 9:21 )",
],
"1536": [
"1536x1536 ( 1:1 )",
"1728x1344 ( 9:7 )",
"1344x1728 ( 7:9 )",
"1728x1296 ( 4:3 )",
"1296x1728 ( 3:4 )",
"1872x1248 ( 3:2 )",
"1248x1872 ( 2:3 )",
"2048x1152 ( 16:9 )",
"1152x2048 ( 9:16 )",
"2016x864 ( 21:9 )",
"864x2016 ( 9:21 )",
],
}
RESOLUTION_SET: List[str] = []
for resolutions in RES_CHOICES.values():
RESOLUTION_SET.extend(resolutions)
EXAMPLE_PROMPTS = [
["一位男士和他的贵宾犬穿着配套的服装参加狗狗秀,室内灯光,背景中有观众。"],
[
"极具氛围感的暗调人像,一位优雅的中国美女在黑暗的房间里。一束强光通过遮光板,在她的脸上投射出一个清晰的闪电形状的光影,正好照亮一只眼睛。高对比度,明暗交界清晰,神秘感,莱卡相机色调。"
],
[
"一张中景手机自拍照片拍摄了一位留着长黑发的年轻东亚女子在灯光明亮的电梯内对着镜子自拍。她穿着一件带有白色花朵图案的黑色露肩短上衣和深色牛仔裤。她的头微微倾斜,嘴唇嘟起做亲吻状,非常可爱俏皮。她右手拿着一部深灰色智能手机,遮住了部分脸,后置摄像头镜头对着镜子"
],
[
"Young Chinese woman in red Hanfu, intricate embroidery. Impeccable makeup, red floral forehead pattern. Elaborate high bun, golden phoenix headdress, red flowers, beads. Holds round folding fan with lady, trees, bird. Neon lightning-bolt lamp (⚡️), bright yellow glow, above extended left palm. Soft-lit outdoor night background, silhouetted tiered pagoda (西安大雁塔), blurred colorful distant lights."
],
[
'''A vertical digital illustration depicting a serene and majestic Chinese landscape, rendered in a style reminiscent of traditional Shanshui painting but with a modern, clean aesthetic. The scene is dominated by towering, steep cliffs in various shades of blue and teal, which frame a central valley. In the distance, layers of mountains fade into a light blue and white mist, creating a strong sense of atmospheric perspective and depth. A calm, turquoise river flows through the center of the composition, with a small, traditional Chinese boat, possibly a sampan, navigating its waters. The boat has a bright yellow canopy and a red hull, and it leaves a gentle wake behind it. It carries several indistinct figures of people. Sparse vegetation, including green trees and some bare-branched trees, clings to the rocky ledges and peaks. The overall lighting is soft and diffused, casting a tranquil glow over the entire scene. Centered in the image is overlaid text. At the top of the text block is a small, red, circular seal-like logo containing stylized characters. Below it, in a smaller, black, sans-serif font, are the words "Zao-Xiang * East Beauty & West Fashion * Z-Image". Directly beneath this, in a larger, elegant black serif font, is the word "SHOW & SHARE CREATIVITY WITH THE WORLD". Among them, there are "SHOW & SHARE", "CREATIVITY", and "WITH THE WORLD"'''
],
[
"""一张虚构的英语电影《回忆之味》(The Taste of Memory)的电影海报。场景设置在一个质朴的19世纪风格厨房里。画面中央,一位红棕色头发、留着小胡子的中年男子(演员阿瑟·彭哈利根饰)站在一张木桌后,他身穿白色衬衫、黑色马甲和米色围裙,正看着一位女士,手中拿着一大块生红肉,下方是一个木制切菜板。在他的右边,一位梳着高髻的黑发女子(演员埃莉诺·万斯饰)倚靠在桌子上,温柔地对他微笑。她穿着浅色衬衫和一条上白下蓝的长裙。桌上除了放有切碎的葱和卷心菜丝的切菜板外,还有一个白色陶瓷盘、新鲜香草,左侧一个木箱上放着一串深色葡萄。背景是一面粗糙的灰白色抹灰墙,墙上挂着一幅风景画。最右边的一个台面上放着一盏复古油灯。海报上有大量的文字信息。左上角是白色的无衬线字体"ARTISAN FILMS PRESENTS",其下方是"ELEANOR VANCE"和"ACADEMY AWARD® WINNER"。右上角写着"ARTHUR PENHALIGON"和"GOLDEN GLOBE® AWARD WINNER"。顶部中央是圣丹斯电影节的桂冠标志,下方写着"SUNDANCE FILM FESTIVAL GRAND JURY PRIZE 2024"。主标题"THE TASTE OF MEMORY"以白色的大号衬线字体醒目地显示在下半部分。标题下方注明了"A FILM BY Tongyi Interaction Lab"。底部区域用白色小字列出了完整的演职员名单,包括"SCREENPLAY BY ANNA REID"、"CULINARY DIRECTION BY JAMES CARTER"以及Artisan Films、Riverstone Pictures和Heritage Media等众多出品公司标志。整体风格是写实主义,采用温暖柔和的灯光方案,营造出一种亲密的氛围。色调以棕色、米色和柔和的绿色等大地色系为主。两位演员的身体都在腰部被截断。"""
],
[
"""一张方形构图的特写照片,主体是一片巨大的、鲜绿色的植物叶片,并叠加了文字,使其具有海报或杂志封面的外观。主要拍摄对象是一片厚实、有蜡质感的叶子,从左下角到右上角呈对角线弯曲穿过画面。其表面反光性很强,捕捉到一个明亮的直射光源,形成了一道突出的高光,亮面下显露出平行的精细叶脉。背景由其他深绿色的叶子组成,这些叶子轻微失焦,营造出浅景深效果,突出了前景的主叶片。整体风格是写实摄影,明亮的叶片与黑暗的阴影背景之间形成高对比度。图像上有多处渲染文字。左上角是白色的衬线字体文字"PIXEL-PEEPERS GUILD Presents"。右上角同样是白色衬线字体的文字"[Instant Noodle] 泡面调料包"。左侧垂直排列着标题"Render Distance: Max",为白色衬线字体。左下角是五个硕大的白色宋体汉字"显卡在...燃烧"。右下角是较小的白色衬线字体文字"Leica Glow™ Unobtanium X-1",其正上方是用白色宋体字书写的名字"蔡几"。识别出的核心实体包括品牌像素偷窥者协会、其产品线泡面调料包、相机型号买不到™ X-1以及摄影师名字造相。"""
],
]
pipe: ZImagePipeline | None = None
lora_loaded: bool = False
lora_error: str | None = None
lora_adapter_name: str | None = None
pipe_lock = threading.Lock()
pipe_on_gpu: bool = False
aoti_loaded: bool = False
applied_attention_backend: str | None = None
attention_backend_error: str | None = None
aoti_error: str | None = None
transformer_compiled: bool = False
transformer_compile_attempted: bool = False
compile_error: str | None = None
inductor_configured: bool = False
SCHEDULERS = {"FlowMatch Euler": FlowMatchEulerDiscreteScheduler}
try:
from diffusers import FlowMatchHeunDiscreteScheduler # type: ignore
SCHEDULERS["FlowMatch Heun"] = FlowMatchHeunDiscreteScheduler
except Exception:
pass
def module_available(module_name: str) -> bool:
try:
return importlib.util.find_spec(module_name) is not None
except (ImportError, ValueError):
return False
def summarize_error(message: str, *, max_len: int = 120) -> str:
one_line = " ".join(str(message).split())
if len(one_line) <= max_len:
return one_line
return one_line[: max_len - 1] + "…"
def parse_resolution(resolution: str) -> Tuple[int, int]:
match = re.search(r"(\d+)\s*[×x]\s*(\d+)", resolution)
if match:
return int(match.group(1)), int(match.group(2))
return 1024, 1024
def set_attention_backend_safe(transformer, backend: str) -> str:
global attention_backend_error
candidates: List[str] = []
if backend:
candidates.append(backend)
if backend.startswith("_"):
candidates.append(backend.lstrip("_"))
else:
candidates.append(f"_{backend}")
candidates.extend(["flash", "xformers", "native"])
attention_backend_error = None
errors: dict[str, Exception] = {}
last_exc: Exception | None = None
for name in candidates:
if not name:
continue
try:
transformer.set_attention_backend(name)
if backend and name != backend:
for key in (backend, backend.lstrip("_"), f"_{backend}"):
if key in errors:
attention_backend_error = str(errors[key])
break
if attention_backend_error is None and last_exc is not None:
attention_backend_error = str(last_exc)
if attention_backend_error:
print(
f"Requested attention backend {backend!r} failed; using {name!r} instead. "
f"Reason: {attention_backend_error}"
)
return name
except Exception as exc: # noqa: BLE001
last_exc = exc
errors[name] = exc
continue
raise RuntimeError(f"Failed to set attention backend (tried {candidates}): {last_exc}")
def attach_lora(pipeline: ZImagePipeline) -> Tuple[bool, str | None]:
global lora_adapter_name
if not LORA_PATH or not os.path.isfile(LORA_PATH):
return False, "LoRA file not found"
if not module_available("peft"):
return False, "PEFT backend is required for LoRA. Install `peft` and restart."
def extract_present_adapter_names(exc: Exception) -> List[str]:
msg = str(exc)
match = re.search(r"present adapters:\s*(\{[^}]*\})", msg)
if not match:
return []
return re.findall(r"'([^']+)'", match.group(1))
try:
folder, weight_name = os.path.split(LORA_PATH)
folder = folder or "."
preferred_adapter = os.environ.get("LORA_ADAPTER_NAME", "default")
lora_adapter_name = preferred_adapter
try:
pipeline.load_lora_weights(folder, weight_name=weight_name, adapter_name=preferred_adapter)
except TypeError:
pipeline.load_lora_weights(folder, weight_name=weight_name)
try:
set_lora_scale(pipeline, 1.0)
except Exception as exc: # noqa: BLE001
adapter_names = extract_present_adapter_names(exc)
if adapter_names:
lora_adapter_name = adapter_names[0]
set_lora_scale(pipeline, 1.0)
else:
raise
return True, None
except Exception as exc: # noqa: BLE001
lora_adapter_name = None
return False, f"Failed to load LoRA: {exc}"
def set_lora_scale(pipeline: ZImagePipeline, scale: float) -> None:
global lora_adapter_name
weight = max(float(scale), 0.0)
adapter = lora_adapter_name or "default"
compiled_transformer = None
try:
transformer = getattr(pipeline, "transformer", None)
if transformer is not None and hasattr(transformer, "_orig_mod"):
compiled_transformer = transformer
pipeline.transformer = transformer._orig_mod
try:
pipeline.set_adapters([adapter], adapter_weights=[weight])
except TypeError:
pipeline.set_adapters([adapter], weights=[weight])
except ValueError as exc:
msg = str(exc)
present_match = re.search(r"present adapters:\s*(\{[^}]*\}|set\([^)]*\))", msg)
if present_match:
present_names = re.findall(r"'([^']+)'", present_match.group(1))
else:
present_names = []
if present_names:
lora_adapter_name = present_names[0]
adapter = lora_adapter_name
try:
pipeline.set_adapters([adapter], adapter_weights=[weight])
except TypeError:
pipeline.set_adapters([adapter], weights=[weight])
else:
raise
finally:
if compiled_transformer is not None:
pipeline.transformer = compiled_transformer
def load_models() -> Tuple[ZImagePipeline, bool, str | None]:
global pipe, lora_loaded, lora_error, pipe_on_gpu, applied_attention_backend, transformer_compiled, transformer_compile_attempted
if pipe is not None and getattr(pipe, "transformer", None) is not None:
return pipe, lora_loaded, lora_error
transformer_compiled = False
transformer_compile_attempted = False
use_auth_token = HF_TOKEN if HF_TOKEN else None
hf_kwargs = {"use_auth_token": use_auth_token} if use_auth_token else {}
print(f"Loading Z-Image from {MODEL_PATH}...")
print(f"GPU: {GPU_SUMMARY} | dtype: {MODEL_DTYPE_LABEL}")
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available. This app requires a GPU.")
if not os.path.exists(MODEL_PATH):
vae = AutoencoderKL.from_pretrained(
MODEL_PATH,
subfolder="vae",
torch_dtype=MODEL_DTYPE,
**hf_kwargs,
).to("cuda", MODEL_DTYPE)
text_encoder = AutoModelForCausalLM.from_pretrained(
MODEL_PATH,
subfolder="text_encoder",
torch_dtype=MODEL_DTYPE,
**hf_kwargs,
).to("cuda", MODEL_DTYPE).eval()
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, subfolder="tokenizer", **hf_kwargs)
else:
vae = AutoencoderKL.from_pretrained(os.path.join(MODEL_PATH, "vae"), torch_dtype=MODEL_DTYPE).to(
"cuda", MODEL_DTYPE
)
text_encoder = AutoModelForCausalLM.from_pretrained(
os.path.join(MODEL_PATH, "text_encoder"),
torch_dtype=MODEL_DTYPE,
).to("cuda", MODEL_DTYPE).eval()
tokenizer = AutoTokenizer.from_pretrained(os.path.join(MODEL_PATH, "tokenizer"))
tokenizer.padding_side = "left"
pipeline = ZImagePipeline(scheduler=None, vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, transformer=None)
if not os.path.exists(MODEL_PATH):
transformer = ZImageTransformer2DModel.from_pretrained(
MODEL_PATH,
subfolder="transformer",
torch_dtype=MODEL_DTYPE,
**hf_kwargs,
)
else:
transformer = ZImageTransformer2DModel.from_pretrained(
os.path.join(MODEL_PATH, "transformer"),
torch_dtype=MODEL_DTYPE,
)
applied_attention_backend = set_attention_backend_safe(transformer, ATTENTION_BACKEND)
print(f"Attention backend: {applied_attention_backend}")
pipeline.transformer = transformer.to("cuda", MODEL_DTYPE)
pipeline.to("cuda", MODEL_DTYPE)
loaded, error = attach_lora(pipeline)
lora_loaded, lora_error = loaded, error
if lora_error:
print(lora_error)
else:
print(f"LoRA loaded: {lora_loaded} ({LORA_PATH})")
pipe = pipeline
pipe_on_gpu = True
return pipe, lora_loaded, lora_error
def ensure_models_loaded() -> Tuple[ZImagePipeline, bool, str | None]:
global pipe, pipe_on_gpu, transformer_compiled, transformer_compile_attempted
if pipe is not None and getattr(pipe, "transformer", None) is not None:
return pipe, lora_loaded, lora_error
with pipe_lock:
if pipe is not None and getattr(pipe, "transformer", None) is not None:
return pipe, lora_loaded, lora_error
pipe = None
pipe_on_gpu = False
transformer_compiled = False
transformer_compile_attempted = False
return load_models()
def configure_inductor_for_compile() -> None:
global inductor_configured
if inductor_configured:
return
try:
torch._inductor.config.conv_1x1_as_mm = True
torch._inductor.config.coordinate_descent_tuning = True
torch._inductor.config.epilogue_fusion = False
torch._inductor.config.coordinate_descent_check_all_directions = True
torch._inductor.config.max_autotune_gemm = True
torch._inductor.config.max_autotune_gemm_backends = "TRITON,ATEN"
torch._inductor.config.triton.cudagraphs = False
except Exception as exc: # noqa: BLE001
print(f"torch.compile inductor config failed (continuing): {exc}")
inductor_configured = True
def maybe_compile_transformer() -> None:
global transformer_compiled, transformer_compile_attempted, compile_error
if not ENABLE_COMPILE or transformer_compile_attempted:
return
if pipe is None or getattr(pipe, "transformer", None) is None:
return
transformer_compile_attempted = True
compile_error = None
configure_inductor_for_compile()
try:
torch._dynamo.config.suppress_errors = True
except Exception: # noqa: BLE001
pass
try:
if getattr(pipe, "vae", None) is not None and hasattr(pipe.vae, "disable_tiling"):
pipe.vae.disable_tiling()
except Exception: # noqa: BLE001
pass
try:
print("Compiling transformer (torch.compile)...")
pipe.transformer = torch.compile(pipe.transformer, mode="max-autotune-no-cudagraphs", fullgraph=False)
transformer_compiled = True
except Exception as exc: # noqa: BLE001
transformer_compiled = False
compile_error = str(exc)
print(f"torch.compile failed (continuing without compile): {exc}")
def ensure_on_gpu() -> None:
global pipe_on_gpu
if pipe is None:
raise gr.Error("Model not loaded.")
if getattr(pipe, "transformer", None) is None:
raise gr.Error("Model init failed (transformer missing). Check startup logs.")
if not torch.cuda.is_available():
raise gr.Error("CUDA is not available. This Space requires a GPU.")
if not pipe_on_gpu:
print("Moving model to GPU...")
pipe.to("cuda", MODEL_DTYPE)
pipe_on_gpu = True
maybe_compile_transformer()
def offload_to_cpu() -> None:
global pipe_on_gpu
if pipe is None:
return
if not pipe_on_gpu:
return
print("Offloading model to CPU...")
pipe.to("cpu")
pipe_on_gpu = False
if torch.cuda.is_available():
torch.cuda.empty_cache()
def make_scheduler(scheduler_cls, **kwargs):
sig = inspect.signature(scheduler_cls.__init__)
accepted = set(sig.parameters.keys())
accepted.discard("self")
filtered = {k: v for k, v in kwargs.items() if k in accepted and v is not None}
return scheduler_cls(**filtered)
def set_scheduler(
pipeline: ZImagePipeline,
scheduler_name: str,
*,
num_train_timesteps: int,
shift: float,
use_dynamic_shifting: bool,
base_shift: float,
max_shift: float,
) -> None:
scheduler_cls = SCHEDULERS.get(scheduler_name, FlowMatchEulerDiscreteScheduler)
scheduler = make_scheduler(
scheduler_cls,
num_train_timesteps=int(num_train_timesteps),
shift=float(shift),
use_dynamic_shifting=bool(use_dynamic_shifting),
base_shift=float(base_shift),
max_shift=float(max_shift),
)
pipeline.scheduler = scheduler
def generate_image(
pipeline: ZImagePipeline,
prompt: str,
resolution: str,
seed: int,
steps: int,
shift: float,
guidance_scale: float,
use_lora: bool,
lora_scale: float,
max_sequence_length: int,
scheduler_name: str,
num_train_timesteps: int,
use_dynamic_shifting: bool,
base_shift: float,
max_shift: float,
) -> Tuple[torch.Tensor, int]:
width, height = parse_resolution(resolution)
set_scheduler(
pipeline,
str(scheduler_name),
num_train_timesteps=int(num_train_timesteps),
shift=float(shift),
use_dynamic_shifting=bool(use_dynamic_shifting),
base_shift=float(base_shift),
max_shift=float(max_shift),
)
if lora_loaded:
try:
if use_lora:
set_lora_scale(pipeline, float(lora_scale))
else:
set_lora_scale(pipeline, 0.0)
except Exception as exc: # noqa: BLE001
print(f"LoRA scale update failed (continuing without changing LoRA state): {exc}")
def run_pipeline() -> torch.Tensor:
generator = torch.Generator("cuda").manual_seed(seed)
return pipeline(
prompt=prompt,
height=height,
width=width,
guidance_scale=float(guidance_scale),
num_inference_steps=int(steps),
generator=generator,
max_sequence_length=int(max_sequence_length),
).images[0]
try:
with torch.inference_mode():
image = run_pipeline()
except Exception as exc: # noqa: BLE001
transformer = getattr(pipeline, "transformer", None)
message = str(exc)
is_dynamo_error = isinstance(exc, AssertionError) or "torch._dynamo" in message or "ConstantVariable" in message
if transformer is not None and hasattr(transformer, "_orig_mod") and is_dynamo_error:
global transformer_compiled, transformer_compile_attempted, compile_error
compile_error = message
transformer_compiled = False
transformer_compile_attempted = True
pipeline.transformer = transformer._orig_mod
print(f"torch.compile runtime failed; falling back to eager: {exc}")
with torch.inference_mode():
image = run_pipeline()
else:
raise
return image, seed
def warmup_model(pipeline: ZImagePipeline, resolutions: List[str]) -> None:
print("Warmup started...")
dummy_prompt = "warmup"
for res_str in resolutions:
try:
generate_image(
pipeline,
prompt=dummy_prompt,
resolution=res_str,
seed=42,
steps=9,
shift=3.0,
guidance_scale=0.0,
use_lora=False,
lora_scale=0.0,
max_sequence_length=512,
scheduler_name="FlowMatch Euler",
num_train_timesteps=1000,
use_dynamic_shifting=False,
base_shift=0.5,
max_shift=3.0,
)
except Exception as exc: # noqa: BLE001
print(f"Warmup failed for {res_str}: {exc}")
print("Warmup done.")
def init_app() -> None:
global aoti_loaded, aoti_error, applied_attention_backend
try:
ensure_models_loaded()
if ENABLE_COMPILE and pipe is not None:
ensure_on_gpu()
if ENABLE_AOTI and not aoti_loaded and pipe is not None and getattr(pipe, "transformer", None) is not None:
if lora_loaded and not AOTI_ALLOW_LORA:
aoti_loaded = False
aoti_error = "disabled: AoTI blocks are incompatible with LoRA adapters"
print("AoTI disabled: LoRA adapters are loaded (AoTI blocks are incompatible with LoRA).")
else:
try:
pipe.transformer.layers._repeated_blocks = ["ZImageTransformerBlock"]
spaces.aoti_blocks_load(pipe.transformer.layers, AOTI_REPO, variant=AOTI_VARIANT)
aoti_loaded = True
aoti_error = None
print(f"AoTI loaded: {AOTI_REPO} (variant={AOTI_VARIANT})")
except Exception as exc: # noqa: BLE001
aoti_loaded = False
aoti_error = str(exc)
print(f"AoTI load failed (continuing without AoTI): {exc}")
try:
applied_attention_backend = set_attention_backend_safe(pipe.transformer, ATTENTION_BACKEND)
print(f"Attention backend (post-AoTI): {applied_attention_backend}")
except Exception as exc: # noqa: BLE001
print(f"Attention backend update failed (continuing): {exc}")
if ENABLE_WARMUP and pipe is not None:
ensure_on_gpu()
try:
all_resolutions: List[str] = []
for cat in RES_CHOICES.values():
all_resolutions.extend(cat)
warmup_model(pipe, all_resolutions)
finally:
if OFFLOAD_TO_CPU_AFTER_RUN:
offload_to_cpu()
except Exception as exc: # noqa: BLE001
print(f"Model init failed: {exc}")
@spaces.GPU
def generate(
prompt: str,
resolution: str = "1024x1024 ( 1:1 )",
seed: int = 42,
steps: int = 8,
shift: float = 3.0,
cfg: float = DEFAULT_CFG,
scheduler_name: str = "FlowMatch Euler",
num_train_timesteps: int = 1000,
use_dynamic_shifting: bool = False,
base_shift: float = 0.5,
max_shift: float = 3.0,
random_seed: bool = True,
use_lora: bool = True,
lora_scale: float = 1.0,
max_sequence_length: int = 512,
gallery_images=None,
progress=gr.Progress(track_tqdm=True),
):
ensure_models_loaded()
ensure_on_gpu()
new_seed = random.randint(1, 1_000_000) if random_seed or seed == -1 else int(seed)
try:
image = generate_image(
pipeline=pipe,
prompt=str(prompt),
resolution=str(resolution),
seed=new_seed,
steps=int(steps) + 1,
shift=float(shift),
guidance_scale=float(cfg),
use_lora=use_lora,
lora_scale=float(lora_scale),
max_sequence_length=int(max_sequence_length),
scheduler_name=str(scheduler_name),
num_train_timesteps=int(num_train_timesteps),
use_dynamic_shifting=bool(use_dynamic_shifting),
base_shift=float(base_shift),
max_shift=float(max_shift),
)[0]
finally:
if OFFLOAD_TO_CPU_AFTER_RUN:
offload_to_cpu()
if gallery_images is None:
gallery_images = []
gallery_images = [image] + gallery_images
return gallery_images, str(new_seed), int(new_seed)
init_app()
with gr.Blocks(title="Z-Image + LoRA") as demo:
pipe_status = "loaded (GPU)" if pipe and pipe_on_gpu else "loaded (CPU)" if pipe else "not loaded"
lora_file_status = "found" if os.path.isfile(LORA_PATH) else "missing"
if lora_loaded:
adapter = lora_adapter_name or "default"
lora_status = f"LoRA: loaded ({LORA_PATH}, adapter={adapter})"
elif lora_error:
lora_status = f"LoRA: not loaded ({lora_error})"
else:
lora_status = f"LoRA file: {LORA_PATH} ({lora_file_status})"
attention_status = applied_attention_backend or "unknown"
if attention_backend_error and ATTENTION_BACKEND and attention_status != ATTENTION_BACKEND:
attention_status = f"{attention_status} ({ATTENTION_BACKEND} unavailable: {summarize_error(attention_backend_error)})"
if aoti_loaded:
aoti_status = "loaded"
elif aoti_error:
lower = aoti_error.lower()
if "disabled" in lower:
label = "disabled"
elif "kernels" in lower:
label = "unavailable"
else:
label = "failed"
aoti_status = f"{label} ({summarize_error(aoti_error)})"
else:
aoti_status = "not loaded"
if not ENABLE_COMPILE:
compile_status = "off"
elif transformer_compiled:
compile_status = "on"
elif transformer_compile_attempted:
compile_status = "failed"
else:
compile_status = "pending"
gr.Markdown(
"""<div align="center">
# Z-Image with LoRA
LoRA: zit-mystic-xxx
</div>"""
)
status_md = (
f"Model: `{MODEL_PATH}` | {pipe_status} \n"
f"GPU: `{GPU_SUMMARY}` | dtype: `{MODEL_DTYPE_LABEL}` \n"
f"Attention: `{attention_status}` \n"
f"AoTI: `{aoti_status}` \n"
f"torch.compile: `{compile_status}` \n"
f"{lora_status}"
)
details_md_blocks: List[str] = []
if attention_backend_error:
details_md_blocks.append(f"**Attention backend error**\n```\n{attention_backend_error}\n```")
if aoti_error:
details_md_blocks.append(f"**AoTI error**\n```\n{aoti_error}\n```")
if compile_error:
details_md_blocks.append(f"**torch.compile error**\n```\n{compile_error}\n```")
if SHOW_STATUS_PANEL:
with gr.Column(elem_id="floating_status_panel"):
with gr.Accordion("Status / Debug", open=False):
gr.Markdown(status_md)
if details_md_blocks:
gr.Markdown("\n\n".join(details_md_blocks))
with gr.Row():
with gr.Column(scale=1):
prompt_input = gr.Textbox(label="Prompt", lines=3, placeholder="Enter your prompt here...")
with gr.Row():
choices = [int(k) for k in RES_CHOICES.keys()]
res_cat = gr.Dropdown(value=1024, choices=choices, label="Resolution Category")
resolution = gr.Dropdown(
value=RES_CHOICES["1024"][0],
choices=RESOLUTION_SET,
label="Width x Height (Ratio)",
)
with gr.Row():
seed = gr.Number(label="Seed", value=42, precision=0)
random_seed = gr.Checkbox(label="Random Seed", value=True)
with gr.Row():
steps = gr.Slider(label="Steps", minimum=1, maximum=100, value=8, step=1)
shift = gr.Slider(label="Time Shift", minimum=1.0, maximum=10.0, value=3.0, step=0.1)
with gr.Accordion("KSampler / Advanced", open=False):
cfg = gr.Slider(label="CFG", minimum=0.0, maximum=10.0, value=DEFAULT_CFG, step=0.1)
with gr.Row():
scheduler_name = gr.Dropdown(
label="Scheduler",
choices=list(SCHEDULERS.keys()),
value="FlowMatch Euler",
)
num_train_timesteps = gr.Slider(
label="num_train_timesteps",
minimum=100,
maximum=2000,
value=1000,
step=10,
)
with gr.Row():
use_dynamic_shifting = gr.Checkbox(label="use_dynamic_shifting", value=False)
max_seq = gr.Slider(label="Max Sequence Length", minimum=256, maximum=1024, value=512, step=16)
with gr.Row():
base_shift = gr.Slider(label="base_shift", minimum=0.0, maximum=10.0, value=0.5, step=0.1)
max_shift = gr.Slider(label="max_shift", minimum=0.0, maximum=10.0, value=3.0, step=0.1)
with gr.Row():
lora_controls_enabled = bool(lora_loaded)
use_lora = gr.Checkbox(label="Use LoRA", value=lora_controls_enabled, interactive=lora_controls_enabled)
lora_strength = gr.Slider(
label="LoRA Strength",
minimum=0.0,
maximum=1.5,
value=1.0,
step=0.05,
interactive=lora_controls_enabled,
)
generate_btn = gr.Button("Generate", variant="primary")
gr.Markdown("### 📝 Example Prompts")
gr.Examples(examples=EXAMPLE_PROMPTS, inputs=prompt_input, label=None)
with gr.Column(scale=1):
output_gallery = gr.Gallery(
label="Generated Images",
columns=2,
rows=2,
height=600,
object_fit="contain",
format="png",
interactive=False,
)
used_seed = gr.Textbox(label="Seed Used", interactive=False)
def update_res_choices(_res_cat):
if str(_res_cat) in RES_CHOICES:
res_choices = RES_CHOICES[str(_res_cat)]
else:
res_choices = RES_CHOICES["1024"]
return gr.update(value=res_choices[0], choices=res_choices)
res_cat.change(update_res_choices, inputs=res_cat, outputs=resolution, api_visibility="private")
generate_btn.click(
generate,
inputs=[
prompt_input,
resolution,
seed,
steps,
shift,
cfg,
scheduler_name,
num_train_timesteps,
use_dynamic_shifting,
base_shift,
max_shift,
random_seed,
use_lora,
lora_strength,
max_seq,
output_gallery,
],
outputs=[output_gallery, used_seed, seed],
api_visibility="public",
)
css = """
.fillable{max-width: 1230px !important}
#floating_status_panel{
position: fixed !important;
right: 16px;
bottom: 16px;
width: min(420px, 92vw);
max-height: 70vh;
overflow: auto;
z-index: 1000;
background: rgba(255,255,255,0.9);
border: 1px solid rgba(0,0,0,0.12);
border-radius: 12px;
box-shadow: 0 10px 30px rgba(0,0,0,0.18);
padding: 8px 10px;
backdrop-filter: blur(8px);
}
"""
if __name__ == "__main__":
demo.launch(css=css)