Audit Tables in PostgreSQL with Debian
Posted: | 2007-10-09 12:09 |
---|---|
Tags: | Debian, PostgreSQL |
In many computer systems it is very important to be able to log every change made to the database for audit purposes. One way of doing this is in PostgreSQL is with an audit table, an audit function and some triggers.
The solution described here was originally proposed by Lorenzo Alberton so all credit for solving this problem goes to him. This entry is nothing more than an extension to his ideas. One drawback with the approcah described is that tables that are logged can only have one primary key whose name must be 30 charcters or less and whose value must be 40 characters or less. You can change the size limits by altering the audit table definition. As long as you can live with these constraints the solution described here works really well. Lorenzo's solution also crashes if you try to save any value in any table which uses an akward character like ', this version doesn't because it properly quotes the values it logs.
First of all you'll need to install the PL/TCL language used by PostgreSQL. We need this because apparantly it is not possible to create the trigger to get the old and new values of a field using PostgreSQL's PL/SQL language (see email thread here although I'm not really convinced):
sudo apt-get install postgresql-pltcl-8.1
This will also install TCL 8.4 which you will need. Next set up the PL/TCL language for the database you are using:
$ createlang -U username pltcl yourdatabase
Note
Incidentally there are also some windows binaries provided by ActiveState and Lorenzo Alberton points out that to configure TCL on Windows you need to remember to set these two system environment variables:
TCL_LIBRARY = /path/to/Tcl/lib/tcl8.4 TCLLIBPATH = /path/to/Tcl/lib
otherwise you'll get this nasty and clueless error: ERROR: could not create "normal" interpreter.. You don't have to do this on Debian though.
What we want is a record of every change made to every field of the database along with the username of the user who made the change, the timestamp of the change, the old value and the new value. To identify the record correctly we'll need to store the table name and the primary key of the changed record as well.
Here's a table which meets our needs:
CREATE TABLE audit_table ( ts TIMESTAMP WITH TIME ZONE, usr VARCHAR(30), tbl VARCHAR(30), fld VARCHAR(30), pk_name VARCHAR(30), pk_value VARCHAR(40), mod_type CHAR(6), old_val TEXT, new_val TEXT );
Let's also create two tables to test the auditing on. We've given each a different primary key name so that you can see the audit table working later on:
CREATE TABLE test1 ( id INTEGER NOT NULL PRIMARY KEY, somename VARCHAR(50), somevalue INTEGER, somets TIMESTAMP WITH TIME ZONE ); CREATE TABLE test2 ( pkid INTEGER NOT NULL PRIMARY KEY, testname TEXT, testvalue REAL, testdate DATE );
Next you need to create a function to log the changes. Here's the function we need but make sure when you create it you keep the line ending characters intact, if they get lost the function won't work and you will get an error which starts ERROR: can't read "tgname": no such variable. Also, this code ensures changes to the audit_table table are not captured, otherwise you would get an infinite loop of triggers logging to the audit table triggering more triggers logging themselves. The function also ignores a table called base_audit, we'll come to that later.
Update This code really is horrible but it does seem to work. A re-written version in pl/pgsql would be nice
CREATE OR REPLACE FUNCTION "public"."audit_log" () RETURNS trigger AS $body$ spi_exec "SELECT CURRENT_USER AS tguser" spi_exec "SELECT relname AS tgname FROM pg_class WHERE relfilenode = $TG_relid" #skip changes on audit_table if {[string equal -nocase $tgname audit_table]} { return OK } #get PK name set pk_name "" spi_exec "SELECT a.attname AS pk_name FROM pg_class c, pg_attribute a, pg_index i WHERE c.relname = '$tgname' AND c.oid=i.indrelid AND a.attnum > 0 AND a.attrelid = i.indexrelid AND i.indisprimary='t'" switch $TG_op { INSERT { set pk_value "" #get PK value foreach field $TG_relatts { if {[string equal -nocase [lindex [array get NEW $field] 0] $pk_name]} { set pk_value [lindex [array get NEW $field] 1] break; } } #log inserted row values foreach field $TG_relatts { if {! [string equal -nocase [lindex [array get NEW $field] 0] $pk_name]} { set modified_field [lindex [array get NEW $field] 0] if {[string compare $modified_field ""] != 0} { set current_value [lindex [array get NEW $field] 1] spi_exec -array C "INSERT INTO audit_table(ts, usr, tbl, fld, pk_name, pk_value, mod_type, old_val, new_val) VALUES (CURRENT_TIMESTAMP, '[ quote $tguser ]', '[ quote $tgname ]', '[ quote $modified_field ]', '[ quote $pk_name ]', '[ quote $pk_value ]', '$TG_op', NULL, '[ quote $current_value ]')" } } } } UPDATE { set pk_value "" #get PK value foreach field $TG_relatts { if {[string equal -nocase [lindex [array get NEW $field] 0] $pk_name]} { set pk_value [lindex [array get NEW $field] 1] break; } } #log inserted row values foreach field $TG_relatts { #check changed fields if {[string equal -nocase [array get NEW $field] [array get OLD $field]] == 0} { set modified_field [lindex [array get OLD $field] 0] if {[string compare $modified_field ""] == 0} { set modified_field [lindex [array get NEW $field] 0] } set previous_value [lindex [array get OLD $field] 1] set current_value [lindex [array get NEW $field] 1] spi_exec -array C "INSERT INTO audit_table(ts, usr, tbl, fld, pk_name, pk_value, mod_type, old_val, new_val) VALUES (CURRENT_TIMESTAMP, '[ quote $tguser ]', '[ quote $tgname ]', '[ quote $modified_field ]', '[ quote $pk_name ]', '[ quote $pk_value ]', '$TG_op', '[ quote $previous_value ]', '[ quote $current_value ]')" } } } DELETE { set pk_value "" #get PK value foreach field $TG_relatts { if {[string equal -nocase [lindex [array get OLD $field] 0] $pk_name]} { set pk_value [lindex [array get OLD $field] 1] break; } } #log inserted row values foreach field $TG_relatts { if {! [string equal -nocase [lindex [array get OLD $field] 0] $pk_name]} { set modified_field [lindex [array get OLD $field] 0] set previous_value [lindex [array get OLD $field] 1] spi_exec -array C "INSERT INTO audit_table(ts, usr, tbl, fld, pk_name, pk_value, mod_type, old_val, new_val) VALUES (CURRENT_TIMESTAMP, '[ quote $tguser ]', '[ quote $tgname ]', '[ quote $modified_field ]', '[ quote $pk_name ]', '[ quote $pk_value ]', '$TG_op', '[ quote $previous_value ]', NULL)" } } } } return OK $body$ LANGUAGE 'pltcl' VOLATILE CALLED ON NULL INPUT SECURITY INVOKER;
Now that the function is in place all we need to do is apply the trigger to every table we want to log. In this case it is just our test tables:
-- DROP TRIGGER tg_audit_test1 ON test1; CREATE TRIGGER "tg_audit_test1" AFTER INSERT OR DELETE OR UPDATE ON "test1" FOR EACH ROW EXECUTE PROCEDURE "audit_log" (); -- DROP TRIGGER tg_audit_test2 ON test2; CREATE TRIGGER "tg_audit_test2" AFTER INSERT OR DELETE OR UPDATE ON "test2" FOR EACH ROW EXECUTE PROCEDURE "audit_log" ();
We are adding the trigger after the row is updated.
Now let's test the solution we have:
yourdatabase=# INSERT INTO test1 (id, somename, somevalue, somets) VALUES (1, 'php', 5, CURRENT_TIMESTAMP); INSERT 0 1 yourdatabase=# SELECT * FROM audit_table; ts | usr | tbl | fld | pk_name | pk_value | mod_type | old_val | new_val -------------------------------+-------+-------+-----------+---------+----------+----------+---------+------------------------------- 2007-10-09 10:15:55.934479+00 | james | test1 | | id | 1 | INSERT | | 2007-10-09 10:15:55.934479+00 | james | test1 | somename | id | 1 | INSERT | | php 2007-10-09 10:15:55.934479+00 | james | test1 | somevalue | id | 1 | INSERT | | 5 2007-10-09 10:15:55.934479+00 | james | test1 | somets | id | 1 | INSERT | | 2007-10-09 10:15:55.934479+00 (4 rows) yourdatabase=# INSERT INTO test2 (pkid, testname, testvalue, testdate) VALUES (1, 'php', 5.21, CURRENT_DATE); INSERT 0 1 yourdatabase=# SELECT * FROM audit_table; ts | usr | tbl | fld | pk_name | pk_value | mod_type | old_val | new_val -------------------------------+-------+-------+-----------+---------+----------+----------+---------+------------------------------- 2007-10-09 10:15:55.934479+00 | james | test1 | | id | 1 | INSERT | | 2007-10-09 10:15:55.934479+00 | james | test1 | somename | id | 1 | INSERT | | php 2007-10-09 10:15:55.934479+00 | james | test1 | somevalue | id | 1 | INSERT | | 5 2007-10-09 10:15:55.934479+00 | james | test1 | somets | id | 1 | INSERT | | 2007-10-09 10:15:55.934479+00 2007-10-09 10:17:44.135132+00 | james | test2 | | pkid | 1 | INSERT | | 2007-10-09 10:17:44.135132+00 | james | test2 | testname | pkid | 1 | INSERT | | php 2007-10-09 10:17:44.135132+00 | james | test2 | testvalue | pkid | 1 | INSERT | | 5.21 2007-10-09 10:17:44.135132+00 | james | test2 | testdate | pkid | 1 | INSERT | | 2007-10-09 (8 rows) yourdatabase=#
It is a bit of a pain to manually create and drop all the required triggers for each table so here are some functions to do that. The first creates all the triggers, the second drops them all and the third creates any that are missing but doesn't change the deferred or other properties of the trigger:
CREATE OR REPLACE FUNCTION create_audit_log_triggers() RETURNS integer AS $$ DECLARE statement RECORD; BEGIN FOR statement IN SELECT 'CREATE TRIGGER "trg_audit_log" AFTER INSERT OR DELETE OR UPDATE ON "' || tablename || '" FOR EACH ROW EXECUTE PROCEDURE "audit_log" ();' as command from pg_tables where schemaname='public' and tablename not in ('audit_table', 'base_audit') LOOP RAISE NOTICE 'Adding trigger: %', statement.command; EXECUTE statement.command; END LOOP; RETURN 1; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION drop_audit_log_triggers() RETURNS integer AS $$ DECLARE statement RECORD; BEGIN FOR statement IN SELECT 'DROP TRIGGER "trg_audit_log" ON "' || tablename || '";' as command from pg_tables where schemaname='public' and tablename not in ('audit_table', 'base_audit') LOOP RAISE NOTICE 'Removing trigger: %', statement.command; EXECUTE statement.command; END LOOP; RETURN 1; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION add_missing_audit_log_triggers() RETURNS integer AS $$ DECLARE statement RECORD; BEGIN FOR statement IN SELECT 'CREATE TRIGGER "trg_audit_log" AFTER INSERT OR DELETE OR UPDATE ON "' || tablename || '" FOR EACH ROW EXECUTE PROCEDURE "audit_log" ();' AS command FROM pg_tables WHERE schemaname='public' AND tablename NOT IN (SELECT relname FROM pg_class INNER JOIN pg_trigger ON (tgrelid = relfilenode) WHERE tgname = 'trg_audit_log') AND tablename NOT IN ('audit_table', 'base_audit') LOOP RAISE NOTICE 'Adding trigger: %', statement.command; EXECUTE statement.command; END LOOP; RETURN 1; END; $$ LANGUAGE plpgsql;
In order to use these functions you will need to add the PL/pgSQL language to your database:
createlang plpgsql yourdatabase
You can then execute these functions like this:
yourdatabase=# SELECT create_audit_log_triggers(); yourdatabase=# SELECT drop_audit_log_triggers(); yourdatabase=# SELECT add_missing_audit_log_triggers();
Unfortunately as of PostgreSQL 8.1 there is no way to automatically add the triggers when you create a new table so you'll have to manually execute the SELECT add_missing_audit_log_triggers(); statement each time you create a new table you want logged. Alternatively you could create a nightly cron job to run this function every night in case you forget.
Caution!
Remember, this technique only works with tables with a prmiary key. If your table doesn't have a primary key, the primary key name and value can't be logged. To alter a table so it has a primary key you should can use a command simialr to this, replacing tablename with the name of the table and uid with the name of the primary key column:
ALTER TABLE "tablename" ADD PRIMARY KEY ("uid");
You can find out which tables don't have primary keys like this:
SELECT n.nspname AS "Schema", c.relname AS "Table Name", c.relhaspkey AS "Has PK" FROM pg_catalog.pg_class c JOIN pg_namespace n ON ( c.relnamespace = n.oid AND n.nspname NOT IN ('information_schema', 'pg_catalog') AND c.relkind='r' ) WHERE NOT c.relhaspkey ORDER BY c.relhaspkey, c.relname;
Timestamping
Although the audit_log described above is very useful in keeping a complete audit trail of every change to your database there are often key pieces of information you would like logged for each record, for example:
- The user who created the record
- The date the record was created
- The last user to update the record
- The date the record was last updated
It is actually perfectly possible to get this information from the audit log created above but over time it is possible the audit log might grow rather large so you might decide to do a pg_dump of the audit_table table and then remove old records from the version in the database. As a result you would lose the key information listed above.
One way around this is to store this information for each record, much like a cached version of the full audit log. You can do this most easily using table inheritance.
Here's what the base_audit table might look like:
CREATE TABLE base_audit ( created_user character varying(255), created_datetime timestamp without time zone, updated_user character varying(255), updated_datetime timestamp without time zone, );
As you can see the creation date and the user who created the record are recorded for us by using the default values mechanism. Recording the updates is a bit trickier but this function will do the job. It also checks that the application isn't over-writing the values for the created or updated data:
CREATE OR REPLACE FUNCTION audit_stamp() RETURNS trigger AS $$ BEGIN IF TG_OP='INSERT' THEN RAISE NOTICE 'BEFORE INSERT trigger firing to update the created_user and created_datetime fields'; IF NEW.created_user IS NOT NULL THEN RAISE EXCEPTION 'created_user is assigned automatically and should not be updated in a query'; END IF; IF NEW.created_datetime IS NOT NULL THEN RAISE EXCEPTION 'created_datetime is assigned automatically and should not be updated in a query'; END IF; NEW.created_datetime := now(); NEW.created_user := current_user; END IF; IF TG_OP='UPDATE' THEN RAISE NOTICE 'BEFORE UPDATE trigger firing to update the updated_user and updated_datetime fields'; IF NEW.updated_user != OLD.updated_user THEN RAISE EXCEPTION 'updated_user is assigned automatically and should not be updated in a query. NEW %, OLD: %', NEW.updated_user, OLD.updated_user; END IF; IF NEW.updated_datetime != OLD.updated_datetime THEN RAISE EXCEPTION 'updated_datetime is assigned automatically and should not be updated in a query. NEW %, OLD: %', NEW.updated_datetime, OLD.updated_datetime; END IF; IF NEW.created_user != OLD.created_user THEN RAISE EXCEPTION 'created_user is assigned automatically and should not be updated in a query'; END IF; IF NEW.created_datetime != OLD.created_datetime THEN RAISE EXCEPTION 'created_datetime is assigned automatically and should not be updated in a query'; END IF; NEW.updated_datetime := now(); NEW.updated_user := current_user; END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;
You might imagine you could set the creation date and user by specifying default values of now() and CURRENT_USER respectively in the base_audit table but if you do this you have no way of knowing whether the application has set them any you want to ensure it doesn't so you set the values in the trigger instead as shown above.
Whenever you create a table you want to have these audit stamps you would inherit it from base_audit and then set a trigger. Lets do this with two more test tables:
CREATE TABLE test3 ( id INTEGER NOT NULL PRIMARY KEY, somename VARCHAR(50), somevalue INTEGER, somets TIMESTAMP WITH TIME ZONE ) INHERITS (base_audit); CREATE TRIGGER "trg_audit_stamp" BEFORE INSERT OR UPDATE ON "test3" FOR EACH ROW EXECUTE PROCEDURE audit_stamp(); CREATE TABLE test4 ( pkid INTEGER NOT NULL PRIMARY KEY, testname TEXT, testvalue REAL, testdate DATE ) INHERITS (base_audit); CREATE TRIGGER "trg_audit_stamp" BEFORE INSERT OR UPDATE ON "test4" FOR EACH ROW EXECUTE PROCEDURE audit_stamp();
Here are some functions to add, drop and add any new audit stamp triggers. They are the same as the earlier ones but with the trigger name changed and modified to trigger before update or insert statements:
CREATE OR REPLACE FUNCTION create_audit_stamp_triggers() RETURNS integer AS $$ DECLARE statement RECORD; BEGIN FOR statement IN SELECT 'CREATE TRIGGER "trg_audit_stamp" BEFORE INSERT OR UPDATE ON "' || tablename || '" FOR EACH ROW EXECUTE PROCEDURE "audit_stamp" ();' as command from pg_tables where schemaname='public' and tablename not in ('audit_table', 'base_audit') LOOP RAISE NOTICE 'Adding trigger: %', statement.command; EXECUTE statement.command; END LOOP; RETURN 1; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION drop_audit_stamp_triggers() RETURNS integer AS $$ DECLARE statement RECORD; BEGIN FOR statement IN SELECT 'DROP TRIGGER "trg_audit_stamp" ON "' || tablename || '";' as command from pg_tables where schemaname='public' and tablename not in ('audit_table', 'base_audit') LOOP RAISE NOTICE 'Removing trigger: %', statement.command; EXECUTE statement.command; END LOOP; RETURN 1; END; $$ LANGUAGE plpgsql; CREATE OR REPLACE FUNCTION add_missing_audit_stamp_triggers() RETURNS integer AS $$ DECLARE statement RECORD; BEGIN FOR statement IN SELECT 'CREATE TRIGGER "trg_audit_stamp" BEFORE INSERT OR UPDATE ON "' || tablename || '" FOR EACH ROW EXECUTE PROCEDURE "audit_stamp" ();' AS command FROM pg_tables WHERE schemaname='public' AND tablename NOT IN (SELECT relname FROM pg_class INNER JOIN pg_trigger ON (tgrelid = relfilenode) WHERE tgname = 'trg_audit_stamp') AND tablename NOT IN ('audit_table', 'base_audit') LOOP RAISE NOTICE 'Adding trigger: %', statement.command; EXECUTE statement.command; END LOOP; RETURN 1; END; $$ LANGUAGE plpgsql;
You can then execute these functions like this:
yourdatabase=# SELECT create_audit_stamp_triggers(); yourdatabase=# SELECT drop_audit_stamp_triggers(); yourdatabase=# SELECT add_missing_audit_stamp_triggers();
You should now find all your INSERTS and UPDATES are being time and user stamped as well as being audited.
ToDo:
- Log primary key changes
- Use more sensible names