Koseti - Web Design, Data Analytics and Digital Marketing Agency UK
Building a Reliable SQL Server to PostgreSQL Migration Pipeline with Python
Data & Analytics

Building a Reliable SQL Server to PostgreSQL Migration Pipeline with Python

Chijioke Ani
Jan 25, 2026
10 min read

Learn how to build a production-ready data migration pipeline from SQL Server to PostgreSQL using Python. Covers schema discovery, chunked loads, validation, and audit logging.

Anyone can move data from point A to point B. The real challenge is building a migration pipeline you can trust—one that's repeatable, auditable, and doesn't leave you wondering whether that critical customer table actually made it across intact.

I recently built a migration pipeline to transition a production workload from SQL Server to PostgreSQL using Python and SQL. This post breaks down the architecture, the decisions behind it, and the lessons learned along the way.

Why Migrate from SQL Server to PostgreSQL?

Before diving into the how, let's address the why. SQL Server is a solid database, but there are compelling reasons organisations are making the switch:

Cost reduction is often the primary driver. SQL Server licensing fees add up quickly, especially at scale. PostgreSQL is open-source with no per-core licensing costs.

Cloud flexibility matters more than ever. PostgreSQL runs natively on AWS (RDS, Aurora), Google Cloud SQL, Azure, Digital Ocean, and virtually every cloud provider. You're not locked into a specific ecosystem.

Feature parity has reached the point where PostgreSQL handles most enterprise workloads comfortably—JSON support, full-text search, advanced indexing, and extensions like PostGIS for geospatial data.

Community and ecosystem around PostgreSQL is thriving. The tooling, documentation, and community support have matured significantly.

That said, migration isn't trivial. The databases handle data types differently, stored procedures need rewriting, and you need confidence that every row arrived correctly.

Pipeline Architecture Overview

Rather than a one-shot migration script, I designed a modular pipeline with distinct stages. Each stage is independently testable and logged.

SQL Server to PostgreSQL migration

The pipeline breaks down into these components:

  1. Schema Discovery – Programmatically extract source schema metadata

  2. Schema Conversion – Map SQL Server types to PostgreSQL equivalents

  3. Data Extraction – Pull data in manageable chunks

  4. Data Loading – Insert into PostgreSQL with transaction control

  5. Validation – Verify counts, keys, and data integrity

  6. Audit Logging – Track everything for troubleshooting and compliance

Let's walk through each component.

Schema Discovery: Know What You're Moving

You can't migrate what you don't understand. The first step is programmatically discovering the source schema rather than relying on documentation that may be outdated.

In SQL Server, the INFORMATION_SCHEMA views are your friend:

SELECT 
    TABLE_SCHEMA,
    TABLE_NAME,
    COLUMN_NAME,
    DATA_TYPE,
    CHARACTER_MAXIMUM_LENGTH,
    NUMERIC_PRECISION,
    NUMERIC_SCALE,
    IS_NULLABLE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA NOT IN ('sys', 'INFORMATION_SCHEMA')
ORDER BY TABLE_SCHEMA, TABLE_NAME, ORDINAL_POSITION;

On the Python side, I used pyodbc for SQL Server connections and built a discovery module that outputs a structured JSON representation of every table, column, constraint, and index.

def discover_schema(connection) -> dict:
    """Extract complete schema metadata from SQL Server."""    schema = {
        'tables': {},
        'primary_keys': {},
        'foreign_keys': {},
        'indexes': {}
    }
    # Query INFORMATION_SCHEMA and sys tables    # Build comprehensive schema dictionary    return schema

This schema dictionary becomes the source of truth for everything that follows.

Data Type Mapping: Where Things Get Tricky

SQL Server and PostgreSQL don't speak exactly the same language when it comes to data types. Some mappings are straightforward; others require decisions.

Here's the mapping table I used:

SQL Server Type

PostgreSQL Type

Notes

INT

INTEGER

Direct mapping

BIGINT

BIGINT

Direct mapping

BIT

BOOLEAN

Convert 0/1 to false/true

DATETIME

TIMESTAMP

Watch for precision differences

DATETIME2

TIMESTAMP

PostgreSQL handles microseconds

NVARCHAR(MAX)

TEXT

No length limit needed

NVARCHAR(n)

VARCHAR(n)

Preserve length constraints

UNIQUEIDENTIFIER

UUID

Native UUID support in PostgreSQL

MONEY

NUMERIC(19,4)

Explicit precision is safer

IMAGE / VARBINARY

BYTEA

Binary data handling differs


The tricky cases:

DATETIME precision: SQL Server's DATETIME has a precision of approximately 3.33 milliseconds. PostgreSQL's TIMESTAMP is more precise. This usually isn't a problem, but if you're doing exact datetime comparisons in validation, be aware.

BIT to BOOLEAN: SQL Server stores BIT as 0 or 1. You need explicit conversion when inserting into PostgreSQL.

IDENTITY columns: SQL Server uses IDENTITY; PostgreSQL uses SERIAL or GENERATED AS IDENTITY. I opted for BIGSERIAL for primary keys and reset sequences after data load.

I built a mapping function that handles these conversions:

TYPE_MAPPING = {
    'int': 'INTEGER',
    'bigint': 'BIGINT',
    'bit': 'BOOLEAN',
    'datetime': 'TIMESTAMP',
    'datetime2': 'TIMESTAMP',
    'nvarchar': 'VARCHAR',
    'uniqueidentifier': 'UUID',
    # ... complete mapping}

def map_column_type(sql_server_type: str, length: int = None) -> str:
    """Convert SQL Server type to PostgreSQL equivalent."""    base_type = TYPE_MAPPING.get(sql_server_type.lower())
    if base_type == 'VARCHAR' and length:
        return f'VARCHAR({length})' if length < 8000 else 'TEXT'    return base_type

Chunked Loading: Handling Large Tables Without Pain

Attempting to migrate a 50-million-row table in a single transaction is a recipe for disaster. Memory exhaustion, transaction timeouts, and zero visibility into progress.

Chunked loading solves this:

def migrate_table_chunked(
    source_conn,
    target_conn,
    table_name: str,
    chunk_size: int = 10000,
    order_by: str = 'id'):
    """Migrate table data in manageable chunks."""    offset = 0    total_migrated = 0    
    while True:
        # Extract chunk from source        query = f"""
            SELECT * FROM {table_name}
            ORDER BY {order_by}
            OFFSET {offset} ROWS
            FETCH NEXT {chunk_size} ROWS ONLY
        """        rows = source_conn.execute(query).fetchall()
        
        if not rows:
            break        
        # Load chunk to target        insert_rows(target_conn, table_name, rows)
        
        total_migrated += len(rows)
        offset += chunk_size
        
        logger.info(f"{table_name}: Migrated {total_migrated} rows")
    
    return total_migrated

Chunk size tuning matters. Too small and you're making excessive round trips. Too large and you risk memory issues. I found 10,000–50,000 rows per chunk works well for most tables, but tables with large TEXT or BYTEA columns need smaller chunks.

Order consistency is critical. Always use a deterministic ORDER BY clause—typically the primary key. Without it, you might get duplicate or missing rows across chunks.

Resumability: For very large tables, I added checkpoint logging. If the migration fails mid-table, you can resume from the last successful chunk rather than starting over.

Audit Logging: Track Everything

Every migration decision and action gets logged to an audit table:

CREATE TABLE migration_audit (
    id SERIAL PRIMARY KEY,
    run_id UUID NOT NULL,
    table_name VARCHAR(255),
    operation VARCHAR(50),
    rows_affected INTEGER,
    status VARCHAR(20),
    error_message TEXT,
    started_at TIMESTAMP DEFAULT NOW(),
    completed_at TIMESTAMP,
    metadata JSONB
);

The run_id groups all operations from a single migration execution. The metadata JSONB column captures anything else useful—chunk numbers, source queries, configuration parameters.

This audit trail is invaluable when something goes wrong three weeks later and you need to understand exactly what happened.

Validation Checks: Trust but Verify

Validation is non-negotiable. I implemented three levels:

Row Count Validation

The simplest check—do the counts match?

def validate_row_counts(source_conn, target_conn, tables: list) -> dict:
    """Compare row counts between source and target."""    results = {}
    for table in tables:
        source_count = source_conn.execute(
            f"SELECT COUNT(*) FROM {table}"        ).scalar()
        target_count = target_conn.execute(
            f"SELECT COUNT(*) FROM {table}"        ).scalar()
        
        results[table] = {
            'source': source_count,
            'target': target_count,
            'match': source_count == target_count
        }
    return results

Primary Key Integrity

Verify that all primary keys made it across and there are no duplicates:

def validate_primary_keys(source_conn, target_conn, table: str, pk_column: str):
    """Ensure all PKs exist in target and no duplicates."""    # Check for missing keys    missing = source_conn.execute(f"""
        SELECT {pk_column} FROM {table}
        EXCEPT
        SELECT {pk_column} FROM target_db.{table}
    """).fetchall()
    
    # Check for duplicates in target    duplicates = target_conn.execute(f"""
        SELECT {pk_column}, COUNT(*) 
        FROM {table}
        GROUP BY {pk_column}
        HAVING COUNT(*) > 1
    """).fetchall()
    
    return {'missing': missing, 'duplicates': duplicates}

Referential Integrity

After migration, verify foreign key relationships still hold:

def validate_foreign_keys(target_conn, fk_definitions: list):
    """Check all foreign key relationships are valid."""    orphans = {}
    for fk in fk_definitions:
        query = f"""
            SELECT COUNT(*) FROM {fk['child_table']} c
            LEFT JOIN {fk['parent_table']} p 
                ON c.{fk['child_column']} = p.{fk['parent_column']}
            WHERE c.{fk['child_column']} IS NOT NULL
            AND p.{fk['parent_column']} IS NULL
        """        orphan_count = target_conn.execute(query).scalar()
        if orphan_count > 0:
            orphans[fk['constraint_name']] = orphan_count
    return orphans

Structured Logging: Debug Without the Headache

Print statements don't cut it for production pipelines. I used Python's structlog for structured, JSON-formatted logs:

import structlog

logger = structlog.get_logger()

logger.info(
    "chunk_migrated",
    table=table_name,
    chunk_number=chunk_num,
    rows_in_chunk=len(rows),
    total_migrated=total_migrated,
    elapsed_seconds=elapsed
)

Output:

{
    "event": "chunk_migrated",
    "table": "orders",
    "chunk_number": 42,
    "rows_in_chunk": 10000,
    "total_migrated": 420000,
    "elapsed_seconds": 2.34,
    "timestamp": "2025-01-15T14:32:01Z"}

These logs ship to CloudWatch (or your logging platform of choice) and make debugging straightforward. When a validation fails, you can trace back through the exact sequence of operations.

Lessons Learned and Common Pitfalls

After completing the migration, here's what I'd emphasise:

Test on realistic data volumes. A pipeline that works on 1,000 rows might fall apart at 10 million. Test with production-scale data early.

Handle NULLs explicitly. SQL Server and PostgreSQL handle NULLs in comparisons and string concatenation differently. Audit your application code.

Sequence reset is easy to forget. After loading data with explicit IDs, reset PostgreSQL sequences to avoid primary key collisions on new inserts:

SELECT setval('table_name_id_seq', (SELECT MAX(id) FROM table_name));

Collation differences matter. String sorting and comparison can behave differently. If your application relies on specific sort orders, test thoroughly.

Plan for rollback. Keep your source database intact and have a clear rollback procedure until you've validated the migration in production.

Automate everything. If you run the migration once, you'll run it ten times during testing. Make it fully automated and idempotent.

What's Next

The pipeline I've described handles the core data migration. Depending on your workload, you may also need to address:

  • Stored procedure conversion (this often requires manual rewriting)

  • Application connection string updates and testing

  • Performance tuning on the new PostgreSQL instance

  • Monitoring and alerting setup

Migration projects like this reinforce why building reliable, observable data pipelines matters. The goal isn't just moving data—it's moving it with confidence.

Frequently Asked Questions

How long does a SQL Server to PostgreSQL migration take?

Timeline depends heavily on data volume, schema complexity, and how many stored procedures need rewriting. A straightforward migration of a 100GB database with simple schemas might take 2–4 weeks including testing. Complex migrations with extensive business logic in stored procedures can take months.

Can I migrate SQL Server to PostgreSQL without downtime?

Near-zero downtime migration is possible using change data capture (CDC) and replication tools. You run the bulk migration, then continuously sync changes until cutover. Tools like AWS DMS or Debezium can help, though they add complexity.

What tools can I use for SQL Server to PostgreSQL migration?

Options include AWS Database Migration Service (DMS), pgLoader, ora2pg (works for SQL Server too), and custom Python pipelines like the one described here. The right choice depends on your specific requirements for control, validation, and customisation.

Should I migrate stored procedures or rewrite application logic?

This is a case-by-case decision. Simple procedures can often be converted to PL/pgSQL. Complex business logic is sometimes better moved into application code where it's more testable and maintainable. Evaluate each procedure individually.

Building data pipelines and tackling database migrations is what I do. If you're working on similar challenges or have Data Engineering opportunities in the UK, I'd love to connect.

Share this article

Written by

Chijioke Ani

Continue Reading

More articles you might enjoy

Ready to Put This Into Action?

We can help you implement these strategies for your business.

Link copied to clipboard!

We value your privacy

We use cookies to enhance your browsing experience, analyse site traffic, and for marketing purposes. By clicking "Accept All", you consent to our use of cookies. Read our Cookie Policy