DB/PostgreSQL

PostgreSQL 완전 가이드

shaprimanDev 2025. 8. 25. 10:14
반응형

1. PostgreSQL 소개 및 역사

PostgreSQL의 탄생

PostgreSQL은 1986년 캘리포니아 대학교 버클리에서 Michael Stonebraker 교수가 시작한 POSTGRES 프로젝트에서 출발했습니다. 1996년 SQL 지원이 추가되면서 PostgreSQL로 이름이 변경되었고, 현재는 세계에서 가장 고급 오픈소스 관계형 데이터베이스로 인정받고 있습니다.

핵심 철학

  • 확장성 (Extensibility): 사용자가 새로운 데이터 타입, 함수, 연산자를 정의할 수 있음
  • 표준 준수: SQL 표준을 엄격히 따름
  • 안정성: ACID 속성을 완벽히 지원
  • 오픈소스: PostgreSQL License (BSD 스타일)로 자유롭게 사용 가능

2. PostgreSQL 고유 기능들

2.1 MVCC (Multi-Version Concurrency Control)

PostgreSQL의 핵심 아키텍처 중 하나입니다.

-- MVCC 동작 원리 이해하기
-- 세션 1
BEGIN;
SELECT * FROM accounts WHERE id = 1; -- balance: 1000
UPDATE accounts SET balance = 900 WHERE id = 1;
-- 아직 COMMIT 안함

-- 세션 2 (동시에 실행)
SELECT * FROM accounts WHERE id = 1; -- 여전히 1000을 봄 (읽기 잠금 없음)

-- 세션 1
COMMIT; -- 이제 세션 2에서도 900을 보게 됨

MVCC의 장점:

  • 읽기 작업이 쓰기 작업을 차단하지 않음
  • 높은 동시성 성능
  • 데드락 발생 가능성 감소

2.2 고급 데이터 타입들

-- 1. JSONB - Binary JSON (성능 최적화)
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name TEXT,
    specs JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

INSERT INTO products (name, specs) VALUES 
('Laptop', '{"brand": "Apple", "ram": "16GB", "storage": "512GB", "ports": ["USB-C", "Thunderbolt"]}'),
('Phone', '{"brand": "Samsung", "ram": "8GB", "storage": "256GB", "camera": {"main": "108MP", "ultra": "12MP"}}');

-- JSONB 쿼리 예시
-- 1) 특정 키 값으로 검색
SELECT name FROM products WHERE specs->>'brand' = 'Apple';

-- 2) 중첩 객체 접근
SELECT name FROM products WHERE specs->'camera'->>'main' = '108MP';

-- 3) 배열 요소 검색
SELECT name FROM products WHERE specs->'ports' @> '["USB-C"]';

-- 4) 키 존재 여부 확인
SELECT name FROM products WHERE specs ? 'camera';

-- 5) JSONB 함수들
SELECT 
    name,
    jsonb_object_keys(specs) as spec_keys,
    jsonb_typeof(specs->'ram') as ram_type
FROM products;
-- 2. Array 타입
CREATE TABLE articles (
    id SERIAL PRIMARY KEY,
    title TEXT,
    tags TEXT[],
    ratings INTEGER[]
);

INSERT INTO articles (title, tags, ratings) VALUES 
('PostgreSQL Guide', ARRAY['database', 'postgresql', 'sql'], ARRAY[5, 4, 5, 3, 4]),
('Python Tutorial', ARRAY['programming', 'python'], ARRAY[4, 5, 4, 4]);

-- Array 쿼리 예시
-- 1) 특정 값 포함 검색
SELECT title FROM articles WHERE 'postgresql' = ANY(tags);

-- 2) 배열 연산
SELECT title, array_length(tags, 1) as tag_count FROM articles;

-- 3) 배열 요소 접근 (1부터 시작)
SELECT title, tags[1] as first_tag FROM articles;

-- 4) 배열 슬라이싱
SELECT title, tags[1:2] as first_two_tags FROM articles;

-- 5) 집계 함수와 함께
SELECT 
    title,
    ROUND(AVG(rating), 2) as avg_rating
FROM articles, unnest(ratings) as rating
GROUP BY title;
-- 3. Geometric 타입
CREATE TABLE locations (
    id SERIAL PRIMARY KEY,
    name TEXT,
    coordinates POINT,
    area POLYGON,
    route PATH
);

INSERT INTO locations (name, coordinates, area) VALUES 
('Seoul City Hall', POINT(126.978, 37.566), 
 POLYGON('((126.97, 37.56), (126.98, 37.56), (126.98, 37.57), (126.97, 37.57))'));

-- 거리 계산
SELECT 
    name,
    coordinates <-> POINT(126.980, 37.565) as distance
FROM locations
ORDER BY distance
LIMIT 5;
-- 4. Range 타입 (시간/숫자 범위)
CREATE TABLE reservations (
    id SERIAL PRIMARY KEY,
    room_id INT,
    time_range TSRANGE,
    price_range NUMRANGE
);

INSERT INTO reservations (room_id, time_range, price_range) VALUES 
(1, '[2024-08-09 09:00, 2024-08-09 17:00)', '[100000, 150000]'),
(2, '[2024-08-09 14:00, 2024-08-09 18:00)', '[80000, 120000]');

-- Range 쿼리
-- 1) 겹치는 예약 찾기
SELECT * FROM reservations 
WHERE time_range && '[2024-08-09 15:00, 2024-08-09 16:00)'::TSRANGE;

-- 2) 특정 시간이 포함된 예약
SELECT * FROM reservations 
WHERE time_range @> '2024-08-09 15:30'::TIMESTAMP;

-- 3) Range 연산
SELECT 
    room_id,
    upper(time_range) - lower(time_range) as duration,
    upper(price_range) - lower(price_range) as price_difference
FROM reservations;

2.3 전문 검색 (Full Text Search)

-- 1. 기본 전문 검색 설정
CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    title TEXT,
    content TEXT,
    search_vector TSVECTOR
);

-- 검색 벡터 생성
UPDATE documents 
SET search_vector = to_tsvector('english', title || ' ' || content);

-- 인덱스 생성 (성능 향상)
CREATE INDEX idx_search_vector ON documents USING gin(search_vector);

-- 검색 예시
INSERT INTO documents (title, content) VALUES 
('PostgreSQL Advanced Features', 'PostgreSQL offers many advanced features like JSONB, arrays, and full text search'),
('Database Performance Tuning', 'Learn how to optimize your database queries for better performance');

UPDATE documents SET search_vector = to_tsvector('english', title || ' ' || content);

-- 1) 단순 검색
SELECT title FROM documents 
WHERE search_vector @@ to_tsquery('english', 'postgresql');

-- 2) 복합 검색 (AND, OR, NOT)
SELECT title FROM documents 
WHERE search_vector @@ to_tsquery('english', 'postgresql & advanced');

SELECT title FROM documents 
WHERE search_vector @@ to_tsquery('english', 'database | performance');

-- 3) 랭킹과 하이라이팅
SELECT 
    title,
    ts_rank(search_vector, query) as rank,
    ts_headline('english', content, query) as snippet
FROM documents, to_tsquery('english', 'postgresql') as query
WHERE search_vector @@ query
ORDER BY rank DESC;

2.4 사용자 정의 타입 및 도메인

-- 1. 복합 타입 정의
CREATE TYPE address AS (
    street TEXT,
    city TEXT,
    postal_code TEXT,
    country TEXT
);

CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    name TEXT,
    billing_address address,
    shipping_address address
);

INSERT INTO customers (name, billing_address, shipping_address) VALUES 
('김철수', ROW('강남대로 123', '서울', '06292', '한국')::address,
         ROW('판교로 456', '성남', '13494', '한국')::address);

-- 복합 타입 쿼리
SELECT 
    name,
    (billing_address).city as billing_city,
    (shipping_address).city as shipping_city
FROM customers;
-- 2. 도메인 (제약조건이 있는 타입)
CREATE DOMAIN email AS TEXT
CHECK (VALUE ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$');

CREATE DOMAIN positive_numeric AS NUMERIC
CHECK (VALUE > 0);

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username TEXT UNIQUE NOT NULL,
    email_address email,
    balance positive_numeric DEFAULT 0
);

-- 도메인 제약 조건 테스트
INSERT INTO users (username, email_address, balance) VALUES 
('john_doe', 'john@example.com', 1000.50);

-- 이건 실패함: CHECK 제약 위반
-- INSERT INTO users (username, email_address, balance) VALUES 
-- ('jane', 'invalid-email', -100);

3. 고급 쿼리 기능들

3.1 Window Functions (윈도우 함수)

-- 샘플 데이터
CREATE TABLE sales (
    id SERIAL PRIMARY KEY,
    employee_id INT,
    department TEXT,
    sale_date DATE,
    amount DECIMAL(10,2)
);

INSERT INTO sales (employee_id, department, sale_date, amount) VALUES 
(1, 'IT', '2024-01-15', 5000),
(2, 'IT', '2024-01-20', 3000),
(3, 'Sales', '2024-01-10', 8000),
(1, 'IT', '2024-02-15', 5500),
(4, 'Sales', '2024-02-20', 7200),
(2, 'IT', '2024-02-25', 3200);

-- 1) 순위 함수들
SELECT 
    employee_id,
    department,
    amount,
    ROW_NUMBER() OVER (PARTITION BY department ORDER BY amount DESC) as row_num,
    RANK() OVER (PARTITION BY department ORDER BY amount DESC) as rank,
    DENSE_RANK() OVER (PARTITION BY department ORDER BY amount DESC) as dense_rank,
    PERCENT_RANK() OVER (PARTITION BY department ORDER BY amount DESC) as percent_rank
FROM sales;

-- 2) 집계 윈도우 함수
SELECT 
    employee_id,
    sale_date,
    amount,
    SUM(amount) OVER (PARTITION BY employee_id ORDER BY sale_date) as running_total,
    AVG(amount) OVER (PARTITION BY employee_id ORDER BY sale_date 
                      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) as moving_avg_3,
    COUNT(*) OVER (PARTITION BY employee_id) as total_sales_count
FROM sales;

-- 3) LAG/LEAD 함수
SELECT 
    employee_id,
    sale_date,
    amount,
    LAG(amount, 1) OVER (PARTITION BY employee_id ORDER BY sale_date) as prev_sale,
    LEAD(amount, 1) OVER (PARTITION BY employee_id ORDER BY sale_date) as next_sale,
    amount - LAG(amount, 1) OVER (PARTITION BY employee_id ORDER BY sale_date) as growth
FROM sales;

-- 4) NTILE (분위수)
SELECT 
    employee_id,
    amount,
    NTILE(4) OVER (ORDER BY amount) as quartile,
    CASE NTILE(4) OVER (ORDER BY amount)
        WHEN 1 THEN 'Low Performer'
        WHEN 2 THEN 'Average'
        WHEN 3 THEN 'Good'
        WHEN 4 THEN 'Top Performer'
    END as performance_category
FROM sales;

3.2 Common Table Expressions (CTE)

-- 1. 재귀 CTE - 조직도 구성
CREATE TABLE employees (
    id INT PRIMARY KEY,
    name TEXT,
    manager_id INT REFERENCES employees(id),
    department TEXT,
    salary DECIMAL(10,2)
);

INSERT INTO employees VALUES 
(1, 'CEO Kim', NULL, 'Executive', 10000000),
(2, 'CTO Lee', 1, 'Technology', 8000000),
(3, 'CFO Park', 1, 'Finance', 8000000),
(4, 'Dev Manager Choi', 2, 'Technology', 6000000),
(5, 'Senior Dev Jung', 4, 'Technology', 5000000),
(6, 'Junior Dev Yoon', 4, 'Technology', 3500000),
(7, 'Accountant Lim', 3, 'Finance', 4000000);

-- 재귀 CTE로 조직 계층구조 탐색
WITH RECURSIVE org_chart AS (
    -- Base case: 최고 관리자들
    SELECT id, name, manager_id, department, salary, 0 as level, 
           name as path, ARRAY[id] as id_path
    FROM employees 
    WHERE manager_id IS NULL

    UNION ALL

    -- Recursive case: 하위 직원들
    SELECT e.id, e.name, e.manager_id, e.department, e.salary, 
           oc.level + 1,
           oc.path || ' -> ' || e.name,
           oc.id_path || e.id
    FROM employees e
    JOIN org_chart oc ON e.manager_id = oc.id
)
SELECT 
    REPEAT('  ', level) || name as hierarchy,
    department,
    salary,
    path
FROM org_chart
ORDER BY id_path;

-- 2. 비재귀 CTE - 복잡한 분석
WITH monthly_sales AS (
    SELECT 
        DATE_TRUNC('month', sale_date) as month,
        department,
        SUM(amount) as total_amount,
        COUNT(*) as sale_count,
        AVG(amount) as avg_amount
    FROM sales 
    GROUP BY 1, 2
),
department_stats AS (
    SELECT 
        department,
        AVG(total_amount) as avg_monthly_sales,
        STDDEV(total_amount) as stddev_monthly_sales
    FROM monthly_sales
    GROUP BY department
)
SELECT 
    ms.month,
    ms.department,
    ms.total_amount,
    ds.avg_monthly_sales,
    CASE 
        WHEN ms.total_amount > ds.avg_monthly_sales + ds.stddev_monthly_sales 
        THEN 'Above Average'
        WHEN ms.total_amount < ds.avg_monthly_sales - ds.stddev_monthly_sales 
        THEN 'Below Average'
        ELSE 'Normal'
    END as performance
FROM monthly_sales ms
JOIN department_stats ds ON ms.department = ds.department
ORDER BY ms.month, ms.department;

3.3 고급 집계 함수들

-- 1. GROUPING SETS, ROLLUP, CUBE
SELECT 
    department,
    DATE_TRUNC('month', sale_date) as month,
    COUNT(*) as sale_count,
    SUM(amount) as total_amount
FROM sales
GROUP BY GROUPING SETS (
    (department),                    -- 부서별 총계
    (DATE_TRUNC('month', sale_date)), -- 월별 총계
    (department, DATE_TRUNC('month', sale_date)), -- 부서+월별
    ()                              -- 전체 총계
)
ORDER BY department, month;

-- ROLLUP 사용
SELECT 
    department,
    employee_id,
    COUNT(*) as sale_count,
    SUM(amount) as total_amount
FROM sales
GROUP BY ROLLUP(department, employee_id)
ORDER BY department, employee_id;

-- 2. 통계 함수들
SELECT 
    department,
    COUNT(*) as count,
    AVG(amount) as mean,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount) as median,
    MODE() WITHIN GROUP (ORDER BY amount) as mode,
    STDDEV_POP(amount) as std_dev,
    VAR_POP(amount) as variance
FROM sales
GROUP BY department;

-- 3. 배열 집계 함수
SELECT 
    department,
    ARRAY_AGG(amount ORDER BY amount DESC) as amounts_desc,
    STRING_AGG(employee_id::TEXT, ', ' ORDER BY amount DESC) as top_employees
FROM sales
GROUP BY department;

4. 인덱스 전략

4.1 인덱스 타입들

-- 1. B-tree 인덱스 (기본)
CREATE INDEX idx_sales_date ON sales(sale_date);
CREATE INDEX idx_sales_dept_amount ON sales(department, amount);

-- 2. 부분 인덱스 (조건부)
CREATE INDEX idx_high_value_sales ON sales(sale_date) 
WHERE amount > 5000;

-- 3. 표현식 인덱스
CREATE INDEX idx_sales_month ON sales(DATE_TRUNC('month', sale_date));
CREATE INDEX idx_employee_upper_name ON employees(UPPER(name));

-- 4. GIN 인덱스 (배열, JSONB, 전문검색용)
CREATE INDEX idx_products_specs ON products USING gin(specs);
CREATE INDEX idx_articles_tags ON articles USING gin(tags);

-- 5. GiST 인덱스 (기하학적 데이터, 범위 타입)
CREATE INDEX idx_locations_coordinates ON locations USING gist(coordinates);
CREATE INDEX idx_reservations_time ON reservations USING gist(time_range);

-- 6. Hash 인덱스 (등등 비교만)
CREATE INDEX idx_products_id_hash ON products USING hash(id);

-- 7. BRIN 인덱스 (대용량 순차 데이터)
CREATE INDEX idx_sales_date_brin ON sales USING brin(sale_date);

4.2 인덱스 사용 분석

-- 실행 계획 분석
EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) 
SELECT * FROM sales 
WHERE department = 'IT' AND amount > 3000;

-- 인덱스 사용 통계 확인
SELECT 
    schemaname,
    tablename,
    indexname,
    idx_scan,
    idx_tup_read,
    idx_tup_fetch
FROM pg_stat_user_indexes
ORDER BY idx_scan DESC;

-- 사용되지 않는 인덱스 찾기
SELECT 
    schemaname,
    tablename,
    indexname,
    idx_scan
FROM pg_stat_user_indexes
WHERE idx_scan = 0 AND schemaname = 'public';

5. 성능 최적화

5.1 쿼리 최적화

-- 1. EXPLAIN 활용
EXPLAIN (ANALYZE, BUFFERS, VERBOSE, FORMAT JSON)
SELECT s.*, e.name
FROM sales s
JOIN employees e ON s.employee_id = e.id
WHERE s.sale_date >= '2024-01-01' AND s.amount > 4000;

-- 2. 통계 업데이트
ANALYZE sales;
ANALYZE employees;

-- 자동 통계 수집 확인
SELECT 
    schemaname,
    tablename,
    last_analyze,
    last_autoanalyze,
    n_tup_ins,
    n_tup_upd,
    n_tup_del
FROM pg_stat_user_tables;

-- 3. 쿼리 최적화 예시
-- 비효율적인 쿼리
SELECT * FROM sales s
WHERE EXISTS (
    SELECT 1 FROM employees e 
    WHERE e.id = s.employee_id AND e.department = 'IT'
);

-- 최적화된 쿼리
SELECT s.* FROM sales s
INNER JOIN employees e ON s.employee_id = e.id
WHERE e.department = 'IT';

5.2 파티셔닝

-- 1. Range 파티셔닝 (날짜별)
CREATE TABLE sales_partitioned (
    id SERIAL,
    employee_id INT,
    department TEXT,
    sale_date DATE,
    amount DECIMAL(10,2)
) PARTITION BY RANGE (sale_date);

-- 파티션 생성
CREATE TABLE sales_2024_q1 PARTITION OF sales_partitioned
    FOR VALUES FROM ('2024-01-01') TO ('2024-04-01');

CREATE TABLE sales_2024_q2 PARTITION OF sales_partitioned
    FOR VALUES FROM ('2024-04-01') TO ('2024-07-01');

-- 2. Hash 파티셔닝 (고르게 분산)
CREATE TABLE customer_data (
    id SERIAL,
    customer_id INT,
    data JSONB
) PARTITION BY HASH (customer_id);

CREATE TABLE customer_data_0 PARTITION OF customer_data
    FOR VALUES WITH (modulus 4, remainder 0);

CREATE TABLE customer_data_1 PARTITION OF customer_data
    FOR VALUES WITH (modulus 4, remainder 1);

-- 3. List 파티셔닝 (특정 값들)
CREATE TABLE sales_by_region (
    id SERIAL,
    region TEXT,
    amount DECIMAL
) PARTITION BY LIST (region);

CREATE TABLE sales_asia PARTITION OF sales_by_region
    FOR VALUES IN ('Korea', 'Japan', 'China');

CREATE TABLE sales_europe PARTITION OF sales_by_region
    FOR VALUES IN ('Germany', 'France', 'UK');

5.3 연결 풀링 및 성능 모니터링

-- 1. 현재 연결 상태 확인
SELECT 
    pid,
    usename,
    application_name,
    client_addr,
    state,
    query_start,
    state_change,
    query
FROM pg_stat_activity
WHERE state = 'active';

-- 2. 느린 쿼리 찾기
SELECT 
    query,
    calls,
    total_time,
    rows,
    100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0) AS hit_percent
FROM pg_stat_statements
ORDER BY total_time DESC
LIMIT 10;

-- 3. 테이블 사용 통계
SELECT 
    schemaname,
    tablename,
    seq_scan,
    seq_tup_read,
    idx_scan,
    idx_tup_fetch,
    n_tup_ins,
    n_tup_upd,
    n_tup_del
FROM pg_stat_user_tables
ORDER BY seq_scan DESC;

-- 4. 인덱스 효율성 분석
SELECT 
    t.tablename,
    indexname,
    c.reltuples AS num_rows,
    pg_size_pretty(pg_relation_size(quote_ident(t.schemaname)||'.'||quote_ident(t.tablename))) AS table_size,
    pg_size_pretty(pg_relation_size(quote_ident(t.schemaname)||'.'||quote_ident(t.indexrelname))) AS index_size,
    CASE WHEN indisunique THEN 'Y' ELSE 'N' END AS unique,
    idx_scan AS number_of_scans,
    idx_tup_read AS tuples_read,
    idx_tup_fetch AS tuples_fetched
FROM pg_tables t
LEFT JOIN pg_class c ON c.relname=t.tablename
LEFT JOIN pg_indexes i ON i.tablename=t.tablename
LEFT JOIN pg_stat_user_indexes ui ON ui.indexrelname=i.indexname
LEFT JOIN pg_index ix ON ix.indexrelid=ui.indexrelid
WHERE t.schemaname='public'
ORDER BY pg_relation_size(quote_ident(t.schemaname)||'.'||quote_ident(t.indexrelname)) DESC;

6. 확장(Extension) 생태계

6.1 주요 확장들

-- 1. PostGIS (지리 공간 데이터)
CREATE EXTENSION postgis;

CREATE TABLE restaurants (
    id SERIAL PRIMARY KEY,
    name TEXT,
    location GEOGRAPHY(POINT, 4326)
);

INSERT INTO restaurants (name, location) VALUES 
('Pizza House', ST_GeogFromText('POINT(126.9780 37.5665)')),
('Burger King', ST_GeogFromText('POINT(126.9850 37.5700)'));

-- 거리 기반 검색
SELECT name, ST_Distance(location, ST_GeogFromText('POINT(126.9800 37.5680)')) as distance
FROM restaurants
WHERE ST_DWithin(location, ST_GeogFromText('POINT(126.9800 37.5680)'), 1000)
ORDER BY distance;

-- 2. pg_stat_statements (쿼리 성능 분석)
CREATE EXTENSION pg_stat_statements;

-- 3. uuid-ossp (UUID 생성)
CREATE EXTENSION "uuid-ossp";

CREATE TABLE orders (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    customer_id INT,
    order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 4. ltree (계층 구조 데이터)
CREATE EXTENSION ltree;

CREATE TABLE categories (
    id SERIAL PRIMARY KEY,
    path LTREE,
    name TEXT
);

INSERT INTO categories (path, name) VALUES 
('electronics', 'Electronics'),
('electronics.computers', 'Computers'),
('electronics.computers.laptops', 'Laptops'),
('electronics.phones', 'Phones'),
('books', 'Books'),
('books.fiction', 'Fiction');

-- 계층 구조 쿼리
SELECT * FROM categories WHERE path <@ 'electronics';
SELECT * FROM categories WHERE path ~ 'electronics.*';

-- 5. pg_trgm (유사도 검색)
CREATE EXTENSION pg_trgm;

CREATE INDEX idx_products_name_trgm ON products USING gin(name gin_trgm_ops);

-- 퍼지 검색
SELECT name, similarity(name, 'laptop') as sim
FROM products
WHERE similarity(name, 'laptop') > 0.3
ORDER BY sim DESC;

6.2 커스텀 함수 및 프로시저

-- 1. PL/pgSQL 함수
CREATE OR REPLACE FUNCTION calculate_bonus(emp_id INT)
RETURNS DECIMAL(10,2) AS $$
DECLARE
    emp_salary DECIMAL(10,2);
    bonus_rate DECIMAL(3,2);
    total_bonus DECIMAL(10,2);
BEGIN
    -- 직원 급여 조회
    SELECT salary INTO emp_salary 
    FROM employees 
    WHERE id = emp_id;

    IF NOT FOUND THEN
        RAISE EXCEPTION 'Employee not found: %', emp_id;
    END IF;

    -- 급여에 따른 보너스율 결정
    CASE 
        WHEN emp_salary >= 8000000 THEN bonus_rate := 0.20;
        WHEN emp_salary >= 5000000 THEN bonus_rate := 0.15;
        WHEN emp_salary >= 3000000 THEN bonus_rate := 0.10;
        ELSE bonus_rate := 0.05;
    END CASE;

    total_bonus := emp_salary * bonus_rate;

    RETURN total_bonus;
END;
$$ LANGUAGE plpgsql;

-- 함수 사용
SELECT name, salary, calculate_bonus(id) as bonus
FROM employees;

-- 2. 트리거 함수
CREATE OR REPLACE FUNCTION update_employee_count()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        UPDATE departments 
        SET employee_count = employee_count + 1 
        WHERE name = NEW.department;
        RETURN NEW;
    ELSIF TG_OP = 'DELETE' THEN
        UPDATE departments 
        SET employee_count = employee_count - 1 
        WHERE name = OLD.department;
        RETURN OLD;
    END IF;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trigger_employee_count
    AFTER INSERT OR DELETE ON employees
    FOR EACH ROW EXECUTE FUNCTION update_employee_count();

-- 3. 집계 함수 생성
CREATE OR REPLACE FUNCTION median_finalfunc(anyarray)
RETURNS ANYELEMENT AS $$
SELECT CASE
    WHEN array_length($1, 1) % 2 = 1 THEN
        $1[(array_length($1, 1) + 1) / 2]
    ELSE
        ($1[array_length($1, 1) / 2] + $1[array_length($1, 1) / 2 + 1]) / 2
END;
$$ LANGUAGE SQL IMMUTABLE;

CREATE AGGREGATE median(ANYELEMENT) (
    SFUNC = array_append,
    STYPE = anyarray,
    FINALFUNC = median_finalfunc,
    INITCOND = '{}'
);

-- 사용 예시
SELECT department, median(salary) as median_salary
FROM employees
GROUP BY department;

7. 백업 및 복구 전략

7.1 논리적 백업 (pg_dump/pg_restore)

# 1. 전체 데이터베이스 백업
pg_dump -U postgres -h localhost -W -F t -b -v -f mydb_backup.tar mydb

# 2. 특정 테이블만 백업
pg_dump -U postgres -h localhost -W -t employees -t sales -f specific_tables.sql mydb

# 3. 스키마만 백업 (데이터 제외)
pg_dump -U postgres -h localhost -W -s -f schema_only.sql mydb

# 4. 데이터만 백업 (스키마 제외)
pg_dump -U postgres -h localhost -W -a -f data_only.sql mydb

# 5. 압축 백업
pg_dump -U postgres -h localhost -W -F c -Z 9 -f compressed_backup.backup mydb
# 복구 예시
# 1. tar 형식 복구
pg_restore -U postgres -h localhost -W -d restored_db -v mydb_backup.tar

# 2. 선택적 복구 (특정 테이블만)
pg_restore -U postgres -h localhost -W -d mydb -t employees mydb_backup.tar

# 3. 스키마만 복구
pg_restore -U postgres -h localhost -W -d mydb -s mydb_backup.tar

# 4. 병렬 복구 (성능 향상)
pg_restore -U postgres -h localhost -W -d mydb -j 4 -v mydb_backup.tar

7.2 물리적 백업 (Point-in-Time Recovery)

-- 1. WAL 아카이빙 설정 확인
SHOW wal_level;        -- replica 이상이어야 함
SHOW archive_mode;     -- on이어야 함
SHOW archive_command;  -- 아카이브 명령 확인

-- 2. 베이스 백업 생성
-- postgresql.conf 설정
-- archive_mode = on
-- archive_command = 'cp %p /var/lib/postgresql/wal_archive/%f'
-- wal_level = replica
# 베이스 백업 실행
pg_basebackup -U postgres -h localhost -W -D /backup/base_backup -P -v -W

# Point-in-Time Recovery 복구 예시
# 1. 서버 중지
systemctl stop postgresql

# 2. 데이터 디렉토리 백업
mv /var/lib/postgresql/data /var/lib/postgresql/data_old

# 3. 베이스 백업 복원
cp -r /backup/base_backup /var/lib/postgresql/data

# 4. recovery.conf 생성 (PostgreSQL 12 이전)
# 또는 postgresql.conf에 복구 설정 추가 (PostgreSQL 12 이후)

7.3 연속 아카이빙 및 스트리밍 복제

-- 마스터 서버 설정
-- postgresql.conf
-- listen_addresses = '*'
-- wal_level = replica
-- max_wal_senders = 3
-- max_replication_slots = 3
-- synchronous_commit = on

-- pg_hba.conf에 복제 권한 추가
-- host replication replicator slave_ip/32 md5

-- 복제 사용자 생성
CREATE USER replicator REPLICATION LOGIN ENCRYPTED PASSWORD 'password';

-- 복제 슬롯 생성 (옵션)
SELECT pg_create_physical_replication_slot('slave1_slot');
# 슬레이브 서버 설정
# 1. 베이스 백업으로 초기 동기화
pg_basebackup -h master_ip -D /var/lib/postgresql/data -U replicator -P -v -W

# 2. recovery.conf 또는 postgresql.conf 설정
# standby_mode = 'on'
# primary_conninfo = 'host=master_ip port=5432 user=replicator password=password'
# primary_slot_name = 'slave1_slot'

7.4 백업 검증 및 자동화

-- 백업 검증 스크립트
DO $
DECLARE
    backup_size BIGINT;
    table_count INT;
    expected_tables INT := 10; -- 예상 테이블 수
BEGIN
    -- 테이블 수 확인
    SELECT COUNT(*) INTO table_count 
    FROM information_schema.tables 
    WHERE table_schema = 'public';

    IF table_count < expected_tables THEN
        RAISE EXCEPTION 'Backup verification failed: Expected % tables, found %', 
            expected_tables, table_count;
    END IF;

    -- 데이터 일관성 검사 예시
    PERFORM 1 FROM employees WHERE salary < 0;
    IF FOUND THEN
        RAISE EXCEPTION 'Data consistency check failed: Invalid salary values found';
    END IF;

    RAISE NOTICE 'Backup verification passed: % tables found', table_count;
END $;

8. 보안 및 권한 관리

8.1 사용자 및 역할 관리

-- 1. 역할 생성 및 관리
CREATE ROLE developer_role;
CREATE ROLE analyst_role;
CREATE ROLE admin_role;

-- 2. 권한 부여
GRANT SELECT, INSERT, UPDATE ON ALL TABLES IN SCHEMA public TO developer_role;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO analyst_role;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO admin_role;

-- 3. 사용자 생성 및 역할 할당
CREATE USER john_doe LOGIN PASSWORD 'secure_password';
CREATE USER jane_analyst LOGIN PASSWORD 'another_password';
CREATE USER admin_user LOGIN PASSWORD 'admin_password' CREATEDB CREATEROLE;

GRANT developer_role TO john_doe;
GRANT analyst_role TO jane_analyst;
GRANT admin_role TO admin_user;

-- 4. 세밀한 권한 제어
-- 특정 컬럼만 접근 허용
GRANT SELECT (id, name, department) ON employees TO analyst_role;

-- 특정 행만 접근 허용 (Row Level Security)
ALTER TABLE employees ENABLE ROW LEVEL SECURITY;

CREATE POLICY employee_policy ON employees
    FOR ALL TO developer_role
    USING (department = current_setting('app.current_department'));

-- 사용 시 세션 변수 설정
SET app.current_department = 'IT';

8.2 데이터 암호화 및 보안

-- 1. pgcrypto 확장 사용
CREATE EXTENSION pgcrypto;

-- 비밀번호 해시화
CREATE TABLE secure_users (
    id SERIAL PRIMARY KEY,
    username TEXT UNIQUE,
    password_hash TEXT,
    email TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 비밀번호 저장 시 해시화
INSERT INTO secure_users (username, password_hash, email) VALUES 
('testuser', crypt('mypassword', gen_salt('bf', 8)), 'test@example.com');

-- 로그인 검증
SELECT id, username 
FROM secure_users 
WHERE username = 'testuser' 
  AND password_hash = crypt('mypassword', password_hash);

-- 2. 민감한 데이터 암호화
ALTER TABLE employees ADD COLUMN ssn_encrypted BYTEA;

-- 데이터 암호화 저장
UPDATE employees 
SET ssn_encrypted = pgp_sym_encrypt('123-45-6789', 'encryption_key')
WHERE id = 1;

-- 데이터 복호화 조회
SELECT id, name, 
       pgp_sym_decrypt(ssn_encrypted, 'encryption_key') as ssn
FROM employees 
WHERE id = 1;

-- 3. 감사 로그 테이블
CREATE TABLE audit_log (
    id SERIAL PRIMARY KEY,
    table_name TEXT,
    operation TEXT,
    old_values JSONB,
    new_values JSONB,
    user_name TEXT DEFAULT current_user,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 감사 트리거 함수
CREATE OR REPLACE FUNCTION audit_trigger_function()
RETURNS TRIGGER AS $
BEGIN
    IF TG_OP = 'DELETE' THEN
        INSERT INTO audit_log (table_name, operation, old_values)
        VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD));
        RETURN OLD;
    ELSIF TG_OP = 'UPDATE' THEN
        INSERT INTO audit_log (table_name, operation, old_values, new_values)
        VALUES (TG_TABLE_NAME, TG_OP, row_to_json(OLD), row_to_json(NEW));
        RETURN NEW;
    ELSIF TG_OP = 'INSERT' THEN
        INSERT INTO audit_log (table_name, operation, new_values)
        VALUES (TG_TABLE_NAME, TG_OP, row_to_json(NEW));
        RETURN NEW;
    END IF;
    RETURN NULL;
END;
$ LANGUAGE plpgsql;

-- 감사 트리거 적용
CREATE TRIGGER audit_employees_trigger
    AFTER INSERT OR UPDATE OR DELETE ON employees
    FOR EACH ROW EXECUTE FUNCTION audit_trigger_function();

9. 고가용성 및 확장성

9.1 연결 풀링

# Python 예시 - pgbouncer 설정과 연동
import psycopg2
from psycopg2 import pool

# 연결 풀 생성
connection_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn=1,
    maxconn=20,
    host='localhost',
    database='mydb',
    user='postgres',
    password='password',
    port=6432  # pgbouncer 포트
)

def get_db_connection():
    return connection_pool.getconn()

def return_db_connection(conn):
    connection_pool.putconn(conn)

9.2 읽기 복제본 활용

-- 읽기 전용 복제본에서 실행할 쿼리 분리
-- 애플리케이션에서 읽기/쓰기 분리 전략

-- 마스터에서만 실행 (쓰기 작업)
INSERT INTO sales (employee_id, department, sale_date, amount) 
VALUES (1, 'IT', CURRENT_DATE, 5000);

-- 읽기 복제본에서 실행 가능 (읽기 작업)
SELECT 
    department,
    COUNT(*) as sale_count,
    SUM(amount) as total_amount
FROM sales 
WHERE sale_date >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY department;

-- 복제 지연 모니터링
SELECT 
    application_name,
    client_addr,
    state,
    sync_state,
    pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn) as pending_bytes,
    pg_wal_lsn_diff(sent_lsn, flush_lsn) as flush_lag_bytes
FROM pg_stat_replication;

9.3 파티셔닝 고급 활용

-- 시간 기반 자동 파티션 생성 (pg_partman 확장 사용)
CREATE EXTENSION pg_partman;

-- 월별 파티션 테이블
CREATE TABLE sales_monthly (
    id BIGSERIAL,
    sale_date DATE NOT NULL,
    amount DECIMAL(10,2),
    employee_id INT
) PARTITION BY RANGE (sale_date);

-- pg_partman으로 자동 파티션 관리 설정
SELECT partman.create_parent(
    p_parent_table => 'public.sales_monthly',
    p_control => 'sale_date',
    p_type => 'range',
    p_interval => 'monthly',
    p_premake => 2  -- 미리 생성할 파티션 수
);

-- 파티션 프루닝 확인
EXPLAIN (ANALYZE, BUFFERS) 
SELECT * FROM sales_monthly 
WHERE sale_date BETWEEN '2024-01-01' AND '2024-01-31';

10. 모니터링 및 튜닝

10.1 성능 모니터링 쿼리들

-- 1. 활성 쿼리 및 잠금 상태
SELECT 
    pid,
    now() - pg_stat_activity.query_start AS duration,
    query,
    state,
    wait_event,
    wait_event_type
FROM pg_stat_activity
WHERE (now() - pg_stat_activity.query_start) > interval '5 minutes'
  AND state = 'active';

-- 2. 잠금 대기 상황 분석
SELECT 
    blocked_locks.pid AS blocked_pid,
    blocked_activity.usename AS blocked_user,
    blocking_locks.pid AS blocking_pid,
    blocking_activity.usename AS blocking_user,
    blocked_activity.query AS blocked_statement,
    blocking_activity.query AS current_statement_in_blocking_process
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks 
    ON blocking_locks.locktype = blocked_locks.locktype
    AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE
    AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
    AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
    AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
    AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
    AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
    AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
    AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
    AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
    AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.GRANTED;

-- 3. 테이블 크기 및 사용 통계
SELECT 
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size,
    pg_size_pretty(pg_relation_size(schemaname||'.'||tablename)) as table_size,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename) - pg_relation_size(schemaname||'.'||tablename)) as index_size,
    n_tup_ins,
    n_tup_upd,
    n_tup_del,
    n_live_tup,
    n_dead_tup,
    last_vacuum,
    last_autovacuum,
    last_analyze,
    last_autoanalyze
FROM pg_stat_user_tables
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;

-- 4. 캐시 적중률 분석
SELECT 
    'Database' as type,
    datname as name,
    numbackends as connections,
    round(100.0 * blks_hit / (blks_hit + blks_read), 2) as cache_hit_ratio
FROM pg_stat_database
WHERE datname = current_database()

UNION ALL

SELECT 
    'Table' as type,
    schemaname || '.' || tablename as name,
    NULL as connections,
    round(100.0 * heap_blks_hit / nullif(heap_blks_hit + heap_blks_read, 0), 2) as cache_hit_ratio
FROM pg_statio_user_tables
WHERE heap_blks_read > 0
ORDER BY cache_hit_ratio;

-- 5. 자주 실행되는 쿼리 분석 (pg_stat_statements 필요)
SELECT 
    query,
    calls,
    total_time,
    round(total_time / calls, 2) as avg_time,
    rows,
    round(100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0), 2) as hit_percent
FROM pg_stat_statements
ORDER BY total_time DESC
LIMIT 10;

10.2 자동 튜닝 및 유지보수

-- 1. VACUUM 및 ANALYZE 자동화 설정 확인
SELECT 
    name,
    setting,
    unit,
    short_desc
FROM pg_settings 
WHERE name LIKE '%autovacuum%' OR name LIKE '%vacuum%'
ORDER BY name;

-- 2. 테이블별 VACUUM 설정 커스터마이징
ALTER TABLE high_activity_table SET (
    autovacuum_vacuum_scale_factor = 0.1,  -- 기본값 0.2보다 더 자주
    autovacuum_analyze_scale_factor = 0.05,
    autovacuum_vacuum_cost_delay = 10
);

-- 3. 통계 수집 목표 조정
ALTER TABLE important_table ALTER COLUMN search_column SET STATISTICS 1000;
-- 기본값은 100, 높일수록 더 정확한 통계 수집

-- 4. 데이터베이스 유지보수 스크립트
DO $
DECLARE
    r RECORD;
    table_size BIGINT;
    dead_tuple_ratio FLOAT;
BEGIN
    FOR r IN SELECT schemaname, tablename FROM pg_stat_user_tables LOOP
        -- 테이블 크기 및 dead tuple 비율 확인
        SELECT 
            pg_relation_size(r.schemaname||'.'||r.tablename),
            CASE 
                WHEN n_live_tup + n_dead_tup > 0 
                THEN n_dead_tup::FLOAT / (n_live_tup + n_dead_tup) 
                ELSE 0 
            END
        INTO table_size, dead_tuple_ratio
        FROM pg_stat_user_tables 
        WHERE schemaname = r.schemaname AND tablename = r.tablename;

        -- 큰 테이블이고 dead tuple이 많으면 수동 VACUUM
        IF table_size > 100 * 1024 * 1024 AND dead_tuple_ratio > 0.1 THEN  -- 100MB 이상, 10% 이상
            RAISE NOTICE 'Running VACUUM on %.%', r.schemaname, r.tablename;
            EXECUTE format('VACUUM ANALYZE %I.%I', r.schemaname, r.tablename);
        END IF;
    END LOOP;
END $;

11. 실제 사용 사례 및 패턴

11.1 시계열 데이터 처리

-- TimescaleDB 스타일의 시계열 데이터 처리 (네이티브 PostgreSQL)
CREATE TABLE sensor_data (
    time TIMESTAMPTZ NOT NULL,
    sensor_id INTEGER NOT NULL,
    temperature DECIMAL(5,2),
    humidity DECIMAL(5,2),
    pressure DECIMAL(7,2)
);

-- 하이퍼테이블 스타일 파티셔닝 (시간 기반)
CREATE TABLE sensor_data_2024_01 PARTITION OF sensor_data
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

-- 시계열 집계 쿼리
SELECT 
    time_bucket('1 hour', time) as hour,
    sensor_id,
    AVG(temperature) as avg_temp,
    MAX(temperature) as max_temp,
    MIN(temperature) as min_temp,
    STDDEV(temperature) as temp_stddev
FROM sensor_data
WHERE time >= NOW() - INTERVAL '7 days'
GROUP BY 1, 2
ORDER BY 1, 2;

-- 시간 버킷 함수 정의
CREATE OR REPLACE FUNCTION time_bucket(bucket_width INTERVAL, ts TIMESTAMPTZ)
RETURNS TIMESTAMPTZ AS $
    SELECT TO_TIMESTAMP(FLOOR(EXTRACT(EPOCH FROM ts) / EXTRACT(EPOCH FROM bucket_width)) * EXTRACT(EPOCH FROM bucket_width))::TIMESTAMPTZ;
$ LANGUAGE SQL IMMUTABLE;

-- 이상 값 탐지 (Z-score 기반)
WITH stats AS (
    SELECT 
        sensor_id,
        AVG(temperature) as avg_temp,
        STDDEV(temperature) as stddev_temp
    FROM sensor_data
    WHERE time >= NOW() - INTERVAL '24 hours'
    GROUP BY sensor_id
)
SELECT 
    sd.time,
    sd.sensor_id,
    sd.temperature,
    ABS(sd.temperature - s.avg_temp) / s.stddev_temp as z_score
FROM sensor_data sd
JOIN stats s ON sd.sensor_id = s.sensor_id
WHERE ABS(sd.temperature - s.avg_temp) / s.stddev_temp > 2  -- 2 표준편차 초과
  AND sd.time >= NOW() - INTERVAL '1 hour'
ORDER BY z_score DESC;

11.2 전자상거래 패턴

-- 장바구니 및 주문 시스템
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    name TEXT NOT NULL,
    price DECIMAL(10,2) NOT NULL,
    inventory_count INTEGER NOT NULL DEFAULT 0,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    email TEXT UNIQUE NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE carts (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    customer_id INTEGER REFERENCES customers(id),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE cart_items (
    cart_id UUID REFERENCES carts(id) ON DELETE CASCADE,
    product_id INTEGER REFERENCES products(id),
    quantity INTEGER NOT NULL CHECK (quantity > 0),
    added_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (cart_id, product_id)
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    status TEXT NOT NULL DEFAULT 'pending' CHECK (status IN ('pending', 'confirmed', 'shipped', 'delivered', 'cancelled')),
    total_amount DECIMAL(10,2) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE order_items (
    order_id INTEGER REFERENCES orders(id),
    product_id INTEGER REFERENCES products(id),
    quantity INTEGER NOT NULL,
    unit_price DECIMAL(10,2) NOT NULL,
    PRIMARY KEY (order_id, product_id)
);

-- 장바구니에서 주문으로 변환하는 저장 프로시저
CREATE OR REPLACE FUNCTION checkout_cart(p_cart_id UUID)
RETURNS INTEGER AS $
DECLARE
    v_order_id INTEGER;
    v_customer_id INTEGER;
    v_total_amount DECIMAL(10,2) := 0;
    cart_item RECORD;
BEGIN
    -- 장바구니 정보 조회
    SELECT customer_id INTO v_customer_id
    FROM carts WHERE id = p_cart_id;

    IF NOT FOUND THEN
        RAISE EXCEPTION 'Cart not found: %', p_cart_id;
    END IF;

    -- 재고 확인 및 총액 계산
    FOR cart_item IN 
        SELECT ci.product_id, ci.quantity, p.price, p.inventory_count
        FROM cart_items ci
        JOIN products p ON ci.product_id = p.id
        WHERE ci.cart_id = p_cart_id
    LOOP
        IF cart_item.inventory_count < cart_item.quantity THEN
            RAISE EXCEPTION 'Insufficient inventory for product %', cart_item.product_id;
        END IF;

        v_total_amount := v_total_amount + (cart_item.quantity * cart_item.price);
    END LOOP;

    -- 주문 생성
    INSERT INTO orders (customer_id, total_amount)
    VALUES (v_customer_id, v_total_amount)
    RETURNING id INTO v_order_id;

    -- 주문 아이템 생성 및 재고 차감
    FOR cart_item IN 
        SELECT ci.product_id, ci.quantity, p.price
        FROM cart_items ci
        JOIN products p ON ci.product_id = p.id
        WHERE ci.cart_id = p_cart_id
    LOOP
        INSERT INTO order_items (order_id, product_id, quantity, unit_price)
        VALUES (v_order_id, cart_item.product_id, cart_item.quantity, cart_item.price);

        UPDATE products 
        SET inventory_count = inventory_count - cart_item.quantity
        WHERE id = cart_item.product_id;
    END LOOP;

    -- 장바구니 삭제
    DELETE FROM carts WHERE id = p_cart_id;

    RETURN v_order_id;
END;
$ LANGUAGE plpgsql;

-- 사용 예시
SELECT checkout_cart('cart-uuid-here');

11.3 분석 및 리포팅 패턴

-- 비즈니스 인텔리전스 쿼리 예시
-- 1. 코호트 분석 (고객 리텐션)
WITH first_orders AS (
    SELECT 
        customer_id,
        MIN(DATE_TRUNC('month', created_at)) as first_order_month
    FROM orders
    GROUP BY customer_id
),
order_months AS (
    SELECT 
        o.customer_id,
        fo.first_order_month,
        DATE_TRUNC('month', o.created_at) as order_month,
        EXTRACT(YEAR FROM AGE(DATE_TRUNC('month', o.created_at), fo.first_order_month)) * 12 + 
        EXTRACT(MONTH FROM AGE(DATE_TRUNC('month', o.created_at), fo.first_order_month)) as months_since_first
    FROM orders o
    JOIN first_orders fo ON o.customer_id = fo.customer_id
)
SELECT 
    first_order_month,
    months_since_first,
    COUNT(DISTINCT customer_id) as customers,
    ROUND(100.0 * COUNT(DISTINCT customer_id) / 
          COUNT(DISTINCT customer_id) FILTER (WHERE months_since_first = 0), 2) as retention_rate
FROM order_months
WHERE first_order_month >= '2024-01-01'
GROUP BY first_order_month, months_since_first
ORDER BY first_order_month, months_since_first;

-- 2. RFM 분석 (Recency, Frequency, Monetary)
WITH customer_metrics AS (
    SELECT 
        customer_id,
        CURRENT_DATE - MAX(created_at::DATE) as recency_days,
        COUNT(*) as frequency,
        SUM(total_amount) as monetary_value
    FROM orders
    WHERE created_at >= CURRENT_DATE - INTERVAL '1 year'
    GROUP BY customer_id
),
rfm_scores AS (
    SELECT 
        customer_id,
        recency_days,
        frequency,
        monetary_value,
        NTILE(5) OVER (ORDER BY recency_days) as recency_score,
        NTILE(5) OVER (ORDER BY frequency DESC) as frequency_score,
        NTILE(5) OVER (ORDER BY monetary_value DESC) as monetary_score
    FROM customer_metrics
)
SELECT 
    CASE 
        WHEN recency_score >= 4 AND frequency_score >= 4 AND monetary_score >= 4 THEN 'Champions'
        WHEN recency_score >= 3 AND frequency_score >= 3 AND monetary_score >= 3 THEN 'Loyal Customers'
        WHEN recency_score >= 3 AND frequency_score <= 2 THEN 'Potential Loyalists'
        WHEN recency_score <= 2 AND frequency_score >= 3 THEN 'At Risk'
        WHEN recency_score <= 2 AND frequency_score <= 2 THEN 'Lost Customers'
        ELSE 'Other'
    END as customer_segment,
    COUNT(*) as customer_count,
    ROUND(AVG(monetary_value), 2) as avg_monetary_value
FROM rfm_scores
GROUP BY 1
ORDER BY customer_count DESC;

-- 3. 매출 예측 (선형 회귀)
WITH daily_sales AS (
    SELECT 
        created_at::DATE as sale_date,
        SUM(total_amount) as daily_revenue
    FROM orders
    WHERE created_at >= CURRENT_DATE - INTERVAL '90 days'
    GROUP BY 1
),
regression_data AS (
    SELECT 
        sale_date,
        daily_revenue,
        EXTRACT(EPOCH FROM sale_date - MIN(sale_date) OVER()) / 86400 as day_number
    FROM daily_sales
),
regression_stats AS (
    SELECT 
        COUNT(*) as n,
        AVG(day_number) as avg_x,
        AVG(daily_revenue) as avg_y,
        SUM((day_number - AVG(day_number) OVER()) * (daily_revenue - AVG(daily_revenue) OVER())) as sum_xy,
        SUM(POWER(day_number - AVG(day_number) OVER(), 2)) as sum_xx
    FROM regression_data
)
SELECT 
    'Revenue Prediction' as metric,
    avg_y + (sum_xy / sum_xx) * (90 + 7) as predicted_revenue_week_ahead,  -- 7일 후 예측
    sum_xy / sum_xx as daily_growth_rate,
    avg_y as current_avg_daily_revenue
FROM regression_stats;

12. PostgreSQL 개발 모범 사례

12.1 스키마 설계 원칙

-- 1. 정규화와 비정규화의 균형
-- 정규화된 설계 (OLTP 최적화)
CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    email TEXT UNIQUE NOT NULL,
    first_name TEXT NOT NULL,
    last_name TEXT NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE addresses (
    id SERIAL PRIMARY KEY,
    customer_id INTEGER REFERENCES customers(id),
    type TEXT CHECK (type IN ('billing', 'shipping')),
    street TEXT NOT NULL,
    city TEXT NOT NULL,
    country TEXT NOT NULL,
    postal_code TEXT,
    is_default BOOLEAN DEFAULT FALSE
);

-- 비정규화된 설계 (분석 최적화)
CREATE TABLE customer_summary (
    customer_id INTEGER PRIMARY KEY,
    email TEXT,
    full_name TEXT,
    total_orders INTEGER DEFAULT 0,
    total_spent DECIMAL(12,2) DEFAULT 0,
    last_order_date TIMESTAMP,
    preferred_shipping_address JSONB,
    customer_tier TEXT,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 2. 제약 조건 활용
CREATE TABLE products (
    id SERIAL PRIMARY KEY,
    sku TEXT UNIQUE NOT NULL CHECK (LENGTH(sku) >= 3),
    name TEXT NOT NULL CHECK (LENGTH(TRIM(name)) > 0),
    price DECIMAL(10,2) NOT NULL CHECK (price > 0),
    category_id INTEGER NOT NULL,
    status TEXT DEFAULT 'active' CHECK (status IN ('active', 'inactive', 'discontinued')),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    -- 복합 제약 조건
    CONSTRAINT valid_price_range CHECK (
        CASE 
            WHEN category_id = 1 THEN price BETWEEN 10 AND 10000  -- 전자제품
            WHEN category_id = 2 THEN price BETWEEN 5 AND 1000    -- 도서
            ELSE price > 0
        END
    )
);

-- 3. 적절한 데이터 타입 선택
CREATE TABLE events (
    id BIGSERIAL PRIMARY KEY,  -- 대용량 데이터 예상 시
    event_type SMALLINT NOT NULL,  -- 제한된 값들
    user_id INTEGER,  -- 일반적인 ID
    session_id UUID,  -- 고유 식별자
    timestamp TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,  -- 시간대 정보 포함
    properties JSONB,  -- 반구조화 데이터
    ip_address INET,  -- IP 주소 전용 타입
    user_agent TEXT,  -- 가변 길이 텍스트

    -- 적절한 인덱스
    INDEX idx_events_user_timestamp (user_id, timestamp),
    INDEX idx_events_type_timestamp (event_type, timestamp),
    INDEX idx_events_properties (properties) USING gin
);

12.2 쿼리 최적화 기법

-- 1. 효율적인 JOIN 패턴
-- 비효율적인 쿼리 (N+1 문제)
-- SELECT * FROM orders WHERE customer_id = 1;
-- SELECT * FROM order_items WHERE order_id = 123; (각 주문마다 반복)

-- 효율적인 쿼리 (JOIN 사용)
SELECT 
    o.id as order_id,
    o.total_amount,
    o.created_at,
    ARRAY_AGG(
        JSON_BUILD_OBJECT(
            'product_id', oi.product_id,
            'quantity', oi.quantity,
            'unit_price', oi.unit_price,
            'product_name', p.name
        )
    ) as items
FROM orders o
LEFT JOIN order_items oi ON o.id = oi.order_id
LEFT JOIN products p ON oi.product_id = p.id
WHERE o.customer_id = 1
GROUP BY o.id, o.total_amount, o.created_at
ORDER BY o.created_at DESC;

-- 2. 조건절 최적화
-- 비효율적: 함수 사용으로 인덱스 사용 불가
-- SELECT * FROM orders WHERE EXTRACT(YEAR FROM created_at) = 2024;

-- 효율적: 범위 조건으로 인덱스 활용
SELECT * FROM orders 
WHERE created_at >= '2024-01-01' 
  AND created_at < '2025-01-01';

-- 3. EXISTS vs IN 최적화
-- 대용량 데이터에서 EXISTS가 더 효율적
SELECT c.id, c.email
FROM customers c
WHERE EXISTS (
    SELECT 1 FROM orders o 
    WHERE o.customer_id = c.id 
      AND o.created_at >= CURRENT_DATE - INTERVAL '30 days'
);

-- 4. LIMIT과 OFFSET 최적화
-- 비효율적: 큰 OFFSET
-- SELECT * FROM orders ORDER BY created_at DESC LIMIT 20 OFFSET 10000;

-- 효율적: 커서 기반 페이징
SELECT * FROM orders 
WHERE created_at < '2024-01-15 10:00:00'  -- 마지막 조회 시점
ORDER BY created_at DESC 
LIMIT 20;

-- 5. 집계 쿼리 최적화
-- 부분 인덱스를 활용한 조건부 집계
CREATE INDEX idx_orders_recent_total ON orders(created_at, total_amount) 
WHERE created_at >= '2024-01-01';

-- 윈도우 함수로 효율적인 순위 계산
SELECT 
    customer_id,
    total_amount,
    created_at,
    RANK() OVER (PARTITION BY customer_id ORDER BY total_amount DESC) as amount_rank
FROM orders
WHERE created_at >= '2024-01-01';

12.3 트랜잭션 관리 패턴

-- 1. 적절한 격리 수준 선택
-- 읽기 전용 분석 쿼리 (더 나은 성능)
BEGIN TRANSACTION ISOLATION LEVEL READ UNCOMMITTED;
SELECT COUNT(*) FROM large_table WHERE status = 'active';
COMMIT;

-- 일관성이 중요한 비즈니스 로직
BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- 복잡한 비즈니스 로직
COMMIT;

-- 2. 세이브포인트 활용
BEGIN;
    INSERT INTO customers (email, first_name, last_name) 
    VALUES ('test@example.com', 'Test', 'User');

    SAVEPOINT after_customer;

    BEGIN
        INSERT INTO addresses (customer_id, type, street, city, country) 
        VALUES (currval('customers_id_seq'), 'billing', '123 Main St', 'City', 'Country');
    EXCEPTION 
        WHEN OTHERS THEN
            ROLLBACK TO SAVEPOINT after_customer;
            -- 고객은 생성되었지만 주소는 실패
    END;

COMMIT;

-- 3. 데드락 방지 패턴
-- 일관된 순서로 리소스 접근
CREATE OR REPLACE FUNCTION transfer_funds(
    from_account_id INTEGER,
    to_account_id INTEGER,
    amount DECIMAL(10,2)
) RETURNS VOID AS $
DECLARE
    first_account INTEGER;
    second_account INTEGER;
BEGIN
    -- 데드락 방지를 위해 항상 작은 ID부터 잠금
    IF from_account_id < to_account_id THEN
        first_account := from_account_id;
        second_account := to_account_id;
    ELSE
        first_account := to_account_id;
        second_account := from_account_id;
    END IF;

    -- 순서대로 잠금 획득
    PERFORM balance FROM accounts WHERE id = first_account FOR UPDATE;
    PERFORM balance FROM accounts WHERE id = second_account FOR UPDATE;

    -- 잔액 확인
    IF (SELECT balance FROM accounts WHERE id = from_account_id) < amount THEN
        RAISE EXCEPTION 'Insufficient funds';
    END IF;

    -- 이체 실행
    UPDATE accounts SET balance = balance - amount WHERE id = from_account_id;
    UPDATE accounts SET balance = balance + amount WHERE id = to_account_id;

    -- 이체 기록
    INSERT INTO transactions (from_account_id, to_account_id, amount, transaction_type)
    VALUES (from_account_id, to_account_id, amount, 'transfer');
END;
$ LANGUAGE plpgsql;

12.4 애플리케이션 통합 패턴

# Python에서 PostgreSQL 고급 기능 활용 예시
import psycopg2
import psycopg2.extras
import json
from contextlib import contextmanager

class DatabaseManager:
    def __init__(self, connection_string):
        self.connection_string = connection_string

    @contextmanager
    def get_connection(self):
        conn = psycopg2.connect(self.connection_string)
        try:
            yield conn
        except Exception:
            conn.rollback()
            raise
        else:
            conn.commit()
        finally:
            conn.close()

    def execute_with_json(self, query, params=None):
        """JSONB 데이터 처리 예시"""
        with self.get_connection() as conn:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
                cursor.execute(query, params)
                return cursor.fetchall()

    def bulk_insert_json(self, data_list):
        """대량 JSON 데이터 삽입"""
        with self.get_connection() as conn:
            with conn.cursor() as cursor:
                # COPY를 사용한 고성능 삽입
                cursor.execute("""
                    CREATE TEMP TABLE temp_data (data JSONB)
                """)

                # psycopg2의 execute_values 사용
                psycopg2.extras.execute_values(
                    cursor,
                    "INSERT INTO temp_data (data) VALUES %s",
                    [(json.dumps(data),) for data in data_list],
                    template=None,
                    page_size=1000
                )

                # 최종 테이블로 이동
                cursor.execute("""
                    INSERT INTO events (properties, created_at)
                    SELECT data, CURRENT_TIMESTAMP
                    FROM temp_data
                """)

    def get_customer_analytics(self, customer_id):
        """복합 분석 쿼리 실행"""
        query = """
        WITH customer_stats AS (
            SELECT 
                COUNT(*) as total_orders,
                SUM(total_amount) as total_spent,
                AVG(total_amount) as avg_order_value,
                MAX(created_at) as last_order_date
            FROM orders 
            WHERE customer_id = %s
        ),
        recent_activity AS (
            SELECT 
                COUNT(*) as recent_orders,
                SUM(total_amount) as recent_spent
            FROM orders 
            WHERE customer_id = %s 
              AND created_at >= CURRENT_DATE - INTERVAL '30 days'
        )
        SELECT 
            cs.*,
            ra.recent_orders,
            ra.recent_spent,
            CASE 
                WHEN cs.total_spent >= 10000 THEN 'VIP'
                WHEN cs.total_spent >= 5000 THEN 'Gold'
                WHEN cs.total_spent >= 1000 THEN 'Silver'
                ELSE 'Bronze'
            END as customer_tier
        FROM customer_stats cs
        CROSS JOIN recent_activity ra
        """

        with self.get_connection() as conn:
            with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cursor:
                cursor.execute(query, (customer_id, customer_id))
                return cursor.fetchone()

# 사용 예시
db = DatabaseManager("postgresql://user:pass@localhost/dbname")

# JSON 데이터 쿼리
results = db.execute_with_json("""
    SELECT id, properties->>'event_type' as event_type, created_at
    FROM events 
    WHERE properties @> %s
    ORDER BY created_at DESC
    LIMIT 10
""", [json.dumps({"user_id": 12345})])

# 고객 분석 데이터 조회
analytics = db.get_customer_analytics(12345)
print(f"Customer tier: {analytics['customer_tier']}")
print(f"Total spent: ${analytics['total_spent']}")

13. 문제 해결 및 디버깅

13.1 일반적인 성능 문제 진단

-- 1. 느린 쿼리 식별
-- postgresql.conf에서 설정
-- log_min_duration_statement = 1000  -- 1초 이상 쿼리 로깅
-- log_statement = 'all'  -- 모든 쿼리 로깅 (개발 환경만)

-- 현재 실행 중인 느린 쿼리 확인
SELECT 
    pid,
    now() - pg_stat_activity.query_start AS duration,
    query,
    state,
    wait_event,
    wait_event_type
FROM pg_stat_activity
WHERE (now() - pg_stat_activity.query_start) > interval '1 minute'
  AND state = 'active'
ORDER BY duration DESC;

-- 2. 잠금 경합 분석
SELECT 
    l.locktype,
    l.database,
    l.relation::regclass,
    l.page,
    l.tuple,
    l.pid,
    a.query,
    l.mode,
    l.granted
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE NOT l.granted
ORDER BY l.pid;

-- 3. 인덱스 사용률 분석
SELECT 
    schemaname,
    tablename,
    attname as column_name,
    n_distinct,
    correlation,
    most_common_vals,
    most_common_freqs
FROM pg_stats
WHERE schemaname = 'public'
  AND tablename = 'orders'
ORDER BY n_distinct DESC;

-- 4. 테이블 bloat 분석
SELECT 
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as total_size,
    pg_size_pretty(pg_relation_size(schemaname||'.'||tablename)) as table_size,
    n_dead_tup,
    n_live_tup,
    ROUND(100.0 * n_dead_tup / NULLIF(n_live_tup + n_dead_tup, 0), 2) as dead_tuple_percent,
    last_vacuum,
    last_autovacuum
FROM pg_stat_user_tables
WHERE n_dead_tup > 1000
ORDER BY dead_tuple_percent DESC;

13.2 메모리 및 캐시 최적화

-- 1. 버퍼 캐시 분석
SELECT 
    c.relname,
    pg_size_pretty(count(*) * 8192) as buffered,
    round(100.0 * count(*) / 
        (SELECT setting FROM pg_settings WHERE name='shared_buffers')::integer, 1) as buffer_percent,
    round(100.0 * count(*) * 8192 / pg_relation_size(c.oid), 1) as percent_of_relation
FROM pg_class c
INNER JOIN pg_buffercache b ON b.relfilenode = c.relfilenode
INNER JOIN pg_database d ON (b.reldatabase = d.oid AND d.datname = current_database())
GROUP BY c.oid, c.relname
ORDER BY 2 DESC
LIMIT 20;

-- 2. 쿼리 플랜 캐시 상태
SELECT 
    query,
    calls,
    total_time,
    rows,
    100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0) AS hit_percent,
    pg_size_pretty(temp_blks_written * 8192) as temp_written
FROM pg_stat_statements
WHERE calls > 100
ORDER BY temp_blks_written DESC
LIMIT 20;

-- 3. 작업 메모리 사용량 최적화
-- work_mem 설정 최적화를 위한 분석
SELECT 
    query,
    calls,
    total_time / calls as avg_time,
    temp_blks_written,
    temp_blks_read
FROM pg_stat_statements
WHERE temp_blks_written > 0
ORDER BY temp_blks_written DESC;

13.3 연결 및 리소스 관리

-- 1. 연결 상태 모니터링
SELECT 
    state,
    COUNT(*) as connection_count,
    MAX(now() - state_change) as max_age
FROM pg_stat_activity
GROUP BY state
ORDER BY connection_count DESC;

-- 2. 유휴 연결 정리
SELECT 
    pid,
    usename,
    application_name,
    client_addr,
    state,
    state_change,
    now() - state_change as idle_duration
FROM pg_stat_activity
WHERE state = 'idle'
  AND (now() - state_change) > interval '30 minutes'
ORDER BY idle_duration DESC;

-- 위험한 유휴 연결 강제 종료 (주의깊게 사용)
-- SELECT pg_terminate_backend(pid) 
-- FROM pg_stat_activity 
-- WHERE state = 'idle' 
--   AND (now() - state_change) > interval '1 hour'
--   AND usename != 'postgres';

-- 3. 자원 사용량 모니터링
SELECT 
    datname,
    numbackends as active_connections,
    xact_commit,
    xact_rollback,
    blks_read,
    blks_hit,
    temp_files,
    temp_bytes,
    deadlocks
FROM pg_stat_database
WHERE datname = current_database();

14. 마이그레이션 및 업그레이드 전략

14.1 무중단 마이그레이션 기법

-- 1. 단계별 컬럼 추가 마이그레이션
-- 1단계: NOT NULL 제약 없이 컬럼 추가
ALTER TABLE large_table ADD COLUMN new_status TEXT;

-- 2단계: 기본값으로 기존 데이터 업데이트 (배치 처리)
DO $
DECLARE
    batch_size INTEGER := 1000;
    rows_updated INTEGER;
BEGIN
    LOOP
        UPDATE large_table 
        SET new_status = 'active'
        WHERE new_status IS NULL
          AND id IN (
              SELECT id FROM large_table 
              WHERE new_status IS NULL 
              LIMIT batch_size
          );

        GET DIAGNOSTICS rows_updated = ROW_COUNT;
        EXIT WHEN rows_updated = 0;

        -- 다른 트랜잭션이 실행될 수 있도록 잠시 대기
        PERFORM pg_sleep(0.1);
    END LOOP;
END $;

-- 3단계: NOT NULL 제약 추가
ALTER TABLE large_table ALTER COLUMN new_status SET NOT NULL;

-- 4단계: 기존 컬럼 제거 (필요시)
-- ALTER TABLE large_table DROP COLUMN old_status;

-- 2. 인덱스 동시 생성 (CREATE INDEX CONCURRENTLY)
CREATE INDEX CONCURRENTLY idx_large_table_new_status 
ON large_table (new_status);

-- 실패한 동시 인덱스 정리
-- DROP INDEX CONCURRENTLY idx_large_table_new_status;

14.2 데이터 검증 및 무결성 체크

-- 1. 마이그레이션 후 데이터 검증
CREATE OR REPLACE FUNCTION validate_migration()
RETURNS TABLE(
    check_name TEXT,
    status TEXT,
    details TEXT
) AS $
BEGIN
    -- 레코드 수 확인
    RETURN QUERY 
    SELECT 
        'record_count'::TEXT,
        CASE WHEN COUNT(*) > 0 THEN 'PASS' ELSE 'FAIL' END::TEXT,
        'Total records: ' || COUNT(*)::TEXT
    FROM customers;

    -- 외래 키 무결성 확인
    RETURN QUERY
    SELECT 
        'foreign_key_integrity'::TEXT,
        CASE WHEN COUNT(*) = 0 THEN 'PASS' ELSE 'FAIL' END::TEXT,
        'Orphaned orders: ' || COUNT(*)::TEXT
    FROM orders o
    LEFT JOIN customers c ON o.customer_id = c.id
    WHERE c.id IS NULL;

    -- 데이터 타입 일관성 확인
    RETURN QUERY
    SELECT 
        'data_consistency'::TEXT,
        CASE WHEN COUNT(*) = 0 THEN 'PASS' ELSE 'FAIL' END::TEXT,
        'Invalid email formats: ' || COUNT(*)::TEXT
    FROM customers
    WHERE email !~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,};

    -- JSON 데이터 유효성 확인
    RETURN QUERY
    SELECT 
        'json_validity'::TEXT,
        CASE WHEN COUNT(*) = 0 THEN 'PASS' ELSE 'FAIL' END::TEXT,
        'Invalid JSON properties: ' || COUNT(*)::TEXT
    FROM products
    WHERE specifications IS NOT NULL 
      AND NOT (specifications::TEXT ~ '^{.*});

END;
$ LANGUAGE plpgsql;

-- 검증 실행
SELECT * FROM validate_migration();

14.3 버전 업그레이드 준비

-- 1. PostgreSQL 호환성 체크
-- pg_upgrade 전 호환성 확인
-- pg_upgrade --check -b /usr/lib/postgresql/13/bin -B /usr/lib/postgresql/14/bin -d /var/lib/postgresql/13/main -D /var/lib/postgresql/14/main

-- 2. 확장 모듈 호환성 확인
SELECT 
    extname,
    extversion,
    extrelocatable
FROM pg_extension
ORDER BY extname;

-- 3. 사용자 정의 함수 호환성 체크
SELECT 
    n.nspname as schema_name,
    p.proname as function_name,
    pg_get_function_identity_arguments(p.oid) as arguments,
    l.lanname as language
FROM pg_proc p
JOIN pg_namespace n ON p.pronamespace = n.oid
JOIN pg_language l ON p.prolang = l.oid
WHERE n.nspname NOT IN ('information_schema', 'pg_catalog', 'pg_toast')
  AND l.lanname != 'internal'
ORDER BY n.nspname, p.proname;

-- 4. 통계 정보 백업 (업그레이드 후 복원용)
CREATE TABLE pg_stats_backup AS
SELECT * FROM pg_stats;

-- 업그레이드 후 통계 복원을 위한 스크립트 생성
SELECT 
    'ALTER TABLE ' || schemaname || '.' || tablename || 
    ' ALTER COLUMN ' || attname || ' SET STATISTICS ' || 
    COALESCE(n_distinct::text, 'DEFAULT') || ';'
FROM pg_stats_backup
WHERE schemaname = 'public'
  AND n_distinct IS NOT NULL;

15. 결론 및 향후 방향

PostgreSQL은 단순한 관계형 데이터베이스를 넘어서 현대적인 애플리케이션 개발에 필요한 거의 모든 기능을 제공하는 종합적인 데이터 플랫폼으로 발전했습니다.

주요 강점 요약:

  • 확장성: 사용자 정의 타입, 함수, 연산자를 통한 무한 확장 가능
  • 표준 준수: SQL 표준을 엄격히 따르면서도 혁신적인 기능 제공
  • 성능: MVCC, 고급 인덱싱, 쿼리 최적화를 통한 뛰어난 성능
  • 안정성: ACID 트랜잭션과 강력한 데이터 무결성 보장
  • 비용 효율성: 오픈소스로 제공되는 엔터프라이즈급 기능

학습 로드맵:

  1. 기초: SQL 문법, 기본 데이터 타입, 인덱스
  2. 중급: 고급 데이터 타입(JSONB, Arrays), 윈도우 함수, CTE
  3. 고급: 확장 개발, 성능 튜닝, 고가용성 구성
  4. 전문가: 커스텀 확장 개발, 대규모 시스템 아키텍처 설계

지속적인 발전:

PostgreSQL은 매년 새로운 기능과 성능 개선을 제공하며, 클라우드 네이티브 환경, 분산 시스템, AI/ML 워크로드 등 새로운 요구사항에 지속적으로 적응하고 있습니다.

이 가이드의 예제들을 실습해보고, 실제 프로젝트에 적용해보시기 바랍니다. PostgreSQL의 진정한 힘은 이론이 아닌 실무에서 발휘됩니다!

반응형